You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/03/20 19:55:52 UTC
[arrow] branch master updated: ARROW-549: [C++] Add
arrow::Concatenate function to combine multiple arrays into a single Array
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 43f2a31 ARROW-549: [C++] Add arrow::Concatenate function to combine multiple arrays into a single Array
43f2a31 is described below
commit 43f2a31d3dd31cb2d5d6f0be72dba13a7a4e1e1f
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Wed Mar 20 20:55:45 2019 +0100
ARROW-549: [C++] Add arrow::Concatenate function to combine multiple arrays into a single Array
Concatenate arrays into a single array
Author: Benjamin Kietzman <be...@gmail.com>
Closes #3746 from bkietz/ARROW-549-concatenate-arrays and squashes the following commits:
c1600bc2e <Benjamin Kietzman> move concatenate test to separate file
ffa58ec4a <Benjamin Kietzman> Refactor with Antoine's recommendations
aeb2626a1 <Benjamin Kietzman> move BufferVector alias to buffer.h and use for Concatenate
beb3ad29d <Benjamin Kietzman> use ArrayVector, default_random_engine
7fcd9c75c <Benjamin Kietzman> fix implicit conversion warning
25a040e6a <Benjamin Kietzman> refactor bitmap concatenation
c6686a4ab <Benjamin Kietzman> refactor ConcatenateImpl::Visit to be thinner with helpers
6d8b76ffc <Benjamin Kietzman> Refactor SliceData to ArrayData::Slice
d58d06384 <Benjamin Kietzman> use less_than rather than not_equal for loop conditions
aa3d3996d <Benjamin Kietzman> get out_ correctly shaped in constructor
94460a8ff <Benjamin Kietzman> remove offsets_, lengths_, range->Range
76ac1017d <Benjamin Kietzman> assert trailing bits are zeroed in bitmap buffers
d8ba14d4d <Benjamin Kietzman> add lint #includes
5803f1d45 <Benjamin Kietzman> remove ConcatenateParam, fix concat(Dictionary)
acc6c08d9 <Benjamin Kietzman> Improve Concatenate testing
711de4a4f <Benjamin Kietzman> remove unnecessary shared_ptr indirection
a9538264d <Benjamin Kietzman> bail on offset overflow
cbb89c524 <Benjamin Kietzman> return Invalid on mismatched types
0c524f30a <Benjamin Kietzman> don't concatenate null bitmaps twice
2573acaeb <Benjamin Kietzman> move Concatenate to util/concatenate
efc34f331 <Benjamin Kietzman> fist pass at concatenate function
---
cpp/src/arrow/CMakeLists.txt | 1 +
cpp/src/arrow/array.cc | 37 ++--
cpp/src/arrow/array.h | 23 +--
cpp/src/arrow/buffer.cc | 15 ++
cpp/src/arrow/buffer.h | 13 ++
cpp/src/arrow/util/CMakeLists.txt | 1 +
cpp/src/arrow/util/bit-util.cc | 8 +-
cpp/src/arrow/util/bit-util.h | 3 +-
cpp/src/arrow/util/concatenate-test.cc | 206 +++++++++++++++++++++
cpp/src/arrow/util/concatenate.cc | 321 +++++++++++++++++++++++++++++++++
cpp/src/arrow/util/concatenate.h | 39 ++++
11 files changed, 632 insertions(+), 35 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 83c2674..865c453 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -118,6 +118,7 @@ set(ARROW_SRCS
testing/util.cc
util/basic_decimal.cc
util/bit-util.cc
+ util/concatenate.cc
util/compression.cc
util/cpu-info.cc
util/decimal.cc
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index bb3b47d..5cb7bf4 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -77,6 +77,18 @@ std::shared_ptr<ArrayData> ArrayData::Make(const std::shared_ptr<DataType>& type
return std::make_shared<ArrayData>(type, length, null_count, offset);
}
+ArrayData ArrayData::Slice(int64_t off, int64_t len) const {
+ DCHECK_LE(off, length);
+ len = std::min(length - off, len);
+ off += offset;
+
+ auto copy = *this;
+ copy.length = len;
+ copy.offset = off;
+ copy.null_count = null_count != 0 ? kUnknownNullCount : 0;
+ return copy;
+}
+
int64_t ArrayData::GetNullCount() const {
if (ARROW_PREDICT_FALSE(this->null_count == kUnknownNullCount)) {
if (this->buffers[0]) {
@@ -125,21 +137,8 @@ bool Array::RangeEquals(const Array& other, int64_t start_idx, int64_t end_idx,
return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx);
}
-static inline std::shared_ptr<ArrayData> SliceData(const ArrayData& data, int64_t offset,
- int64_t length) {
- DCHECK_LE(offset, data.length);
- length = std::min(data.length - offset, length);
- offset += data.offset;
-
- auto new_data = data.Copy();
- new_data->length = length;
- new_data->offset = offset;
- new_data->null_count = data.null_count != 0 ? kUnknownNullCount : 0;
- return new_data;
-}
-
std::shared_ptr<Array> Array::Slice(int64_t offset, int64_t length) const {
- return MakeArray(SliceData(*data_, offset, length));
+ return MakeArray(std::make_shared<ArrayData>(data_->Slice(offset, length)));
}
std::shared_ptr<Array> Array::Slice(int64_t offset) const {
@@ -385,7 +384,8 @@ std::shared_ptr<Array> StructArray::field(int i) const {
if (!boxed_fields_[i]) {
std::shared_ptr<ArrayData> field_data;
if (data_->offset != 0 || data_->child_data[i]->length != data_->length) {
- field_data = SliceData(*data_->child_data[i].get(), data_->offset, data_->length);
+ field_data = std::make_shared<ArrayData>(
+ data_->child_data[i]->Slice(data_->offset, data_->length));
} else {
field_data = data_->child_data[i];
}
@@ -410,7 +410,7 @@ Status StructArray::Flatten(MemoryPool* pool, ArrayVector* out) const {
// Need to adjust for parent offset
if (data_->offset != 0 || data_->length != child_data->length) {
- child_data = SliceData(*child_data, data_->offset, data_->length);
+ *child_data = child_data->Slice(data_->offset, data_->length);
}
std::shared_ptr<Buffer> child_null_bitmap = child_data->buffers[0];
const int64_t child_offset = child_data->offset;
@@ -540,13 +540,13 @@ Status UnionArray::MakeSparse(const Array& type_ids,
std::shared_ptr<Array> UnionArray::child(int i) const {
if (!boxed_fields_[i]) {
- std::shared_ptr<ArrayData> child_data = data_->child_data[i];
+ std::shared_ptr<ArrayData> child_data = data_->child_data[i]->Copy();
if (mode() == UnionMode::SPARSE) {
// Sparse union: need to adjust child if union is sliced
// (for dense unions, the need to lookup through the offsets
// makes this unnecessary)
if (data_->offset != 0 || child_data->length > data_->length) {
- child_data = SliceData(*child_data.get(), data_->offset, data_->length);
+ *child_data = child_data->Slice(data_->offset, data_->length);
}
}
boxed_fields_[i] = MakeArray(child_data);
@@ -994,5 +994,4 @@ std::vector<ArrayVector> RechunkArraysConsistently(
}
} // namespace internal
-
} // namespace arrow
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index bee133c..f681542 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -40,8 +40,6 @@ namespace arrow {
class Array;
class ArrayVisitor;
-using BufferVector = std::vector<std::shared_ptr<Buffer>>;
-
// When slicing, we do not know the null count of the sliced range without
// doing some computation. To avoid doing this eagerly, we set the null count
// to -1 (any negative number will do). When Array::null_count is called the
@@ -67,7 +65,7 @@ class Status;
/// could cast from int64 to float64 like so:
///
/// Int64Array arr = GetMyData();
-/// auto new_data = arr.data()->ShallowCopy();
+/// auto new_data = arr.data()->Copy();
/// new_data->type = arrow::float64();
/// DoubleArray double_arr(new_data);
///
@@ -75,7 +73,7 @@ class Status;
/// reused. For example, if we had a group of operations all returning doubles,
/// say:
///
-/// Log(Sqrt(Expr(arr))
+/// Log(Sqrt(Expr(arr)))
///
/// Then the low-level implementations of each of these functions could have
/// the signatures
@@ -146,6 +144,7 @@ struct ARROW_EXPORT ArrayData {
buffers(std::move(other.buffers)),
child_data(std::move(other.child_data)) {}
+ // Copy constructor
ArrayData(const ArrayData& other) noexcept
: type(other.type),
length(other.length),
@@ -155,15 +154,10 @@ struct ARROW_EXPORT ArrayData {
child_data(other.child_data) {}
// Move assignment
- ArrayData& operator=(ArrayData&& other) {
- type = std::move(other.type);
- length = other.length;
- null_count = other.null_count;
- offset = other.offset;
- buffers = std::move(other.buffers);
- child_data = std::move(other.child_data);
- return *this;
- }
+ ArrayData& operator=(ArrayData&& other) = default;
+
+ // Copy assignment
+ ArrayData& operator=(const ArrayData& other) = default;
std::shared_ptr<ArrayData> Copy() const { return std::make_shared<ArrayData>(*this); }
@@ -197,6 +191,9 @@ struct ARROW_EXPORT ArrayData {
return GetMutableValues<T>(i, offset);
}
+ // Construct a zero-copy slice of the data with the indicated offset and length
+ ArrayData Slice(int64_t offset, int64_t length) const;
+
/// \brief Return null count, or compute and set it if it's not known
int64_t GetNullCount() const;
diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc
index 8f05912..9e9bd2e 100644
--- a/cpp/src/arrow/buffer.cc
+++ b/cpp/src/arrow/buffer.cc
@@ -227,4 +227,19 @@ Status AllocateEmptyBitmap(int64_t length, std::shared_ptr<Buffer>* out) {
return AllocateEmptyBitmap(default_memory_pool(), length, out);
}
+Status ConcatenateBuffers(const std::vector<std::shared_ptr<Buffer>>& buffers,
+ MemoryPool* pool, std::shared_ptr<Buffer>* out) {
+ int64_t out_length = 0;
+ for (const auto& buffer : buffers) {
+ out_length += buffer->size();
+ }
+ RETURN_NOT_OK(AllocateBuffer(pool, out_length, out));
+ auto out_data = (*out)->mutable_data();
+ for (const auto& buffer : buffers) {
+ std::memcpy(out_data, buffer->data(), buffer->size());
+ out_data += buffer->size();
+ }
+ return Status::OK();
+}
+
} // namespace arrow
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index 306e677..1f1cd4b 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -198,6 +198,8 @@ class ARROW_EXPORT Buffer {
ARROW_DISALLOW_COPY_AND_ASSIGN(Buffer);
};
+using BufferVector = std::vector<std::shared_ptr<Buffer>>;
+
/// \defgroup buffer-slicing-functions Functions for slicing buffers
///
/// @{
@@ -402,6 +404,17 @@ Status AllocateEmptyBitmap(MemoryPool* pool, int64_t length,
ARROW_EXPORT
Status AllocateEmptyBitmap(int64_t length, std::shared_ptr<Buffer>* out);
+/// \brief Concatenate multiple buffers into a single buffer
+///
+/// \param[in] buffers to be concatenated
+/// \param[in] pool memory pool to allocate the new buffer from
+/// \param[out] out the concatenated buffer
+///
+/// \return Status
+ARROW_EXPORT
+Status ConcatenateBuffers(const BufferVector& buffers, MemoryPool* pool,
+ std::shared_ptr<Buffer>* out);
+
/// @}
} // namespace arrow
diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt
index ca0b96e..8d463ca 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -29,6 +29,7 @@ arrow_install_all_headers("arrow/util")
add_arrow_test(bit-util-test)
add_arrow_test(checked-cast-test)
add_arrow_test(compression-test)
+add_arrow_test(concatenate-test)
add_arrow_test(decimal-test)
add_arrow_test(hashing-test)
add_arrow_test(int-util-test)
diff --git a/cpp/src/arrow/util/bit-util.cc b/cpp/src/arrow/util/bit-util.cc
index 2e7bf2f..033267b 100644
--- a/cpp/src/arrow/util/bit-util.cc
+++ b/cpp/src/arrow/util/bit-util.cc
@@ -210,8 +210,12 @@ Status TransferBitmap(MemoryPool* pool, const uint8_t* data, int64_t offset,
}
void CopyBitmap(const uint8_t* data, int64_t offset, int64_t length, uint8_t* dest,
- int64_t dest_offset) {
- TransferBitmap<false, true>(data, offset, length, dest_offset, dest);
+ int64_t dest_offset, bool restore_trailing_bits) {
+ if (restore_trailing_bits) {
+ TransferBitmap<false, true>(data, offset, length, dest_offset, dest);
+ } else {
+ TransferBitmap<false, false>(data, offset, length, dest_offset, dest);
+ }
}
void InvertBitmap(const uint8_t* data, int64_t offset, int64_t length, uint8_t* dest,
diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h
index 22bf8fc..b7de112 100644
--- a/cpp/src/arrow/util/bit-util.h
+++ b/cpp/src/arrow/util/bit-util.h
@@ -708,11 +708,12 @@ Status CopyBitmap(MemoryPool* pool, const uint8_t* bitmap, int64_t offset, int64
/// \param[in] offset bit offset into the source data
/// \param[in] length number of bits to copy
/// \param[in] dest_offset bit offset into the destination
+/// \param[in] restore_trailing_bits don't clobber bits outside the destination range
/// \param[out] dest the destination buffer, must have at least space for
/// (offset + length) bits
ARROW_EXPORT
void CopyBitmap(const uint8_t* bitmap, int64_t offset, int64_t length, uint8_t* dest,
- int64_t dest_offset);
+ int64_t dest_offset, bool restore_trailing_bits = true);
/// Invert a bit range of an existing bitmap into an existing bitmap
///
diff --git a/cpp/src/arrow/util/concatenate-test.cc b/cpp/src/arrow/util/concatenate-test.cc
new file mode 100644
index 0000000..8d9e9d6
--- /dev/null
+++ b/cpp/src/arrow/util/concatenate-test.cc
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <array>
+#include <cstdint>
+#include <cstring>
+#include <iterator>
+#include <limits>
+#include <memory>
+#include <numeric>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/status.h"
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/type.h"
+#include "arrow/util/concatenate.h"
+
+namespace arrow {
+
+class ConcatenateTest : public ::testing::Test {
+ protected:
+ ConcatenateTest()
+ : rng_(seed_),
+ sizes_({0, 1, 2, 4, 16, 31, 1234}),
+ null_probabilities_({0.0, 0.1, 0.5, 0.9, 1.0}) {}
+
+ std::vector<int32_t> Offsets(int32_t length, int32_t slice_count) {
+ std::vector<int32_t> offsets(static_cast<std::size_t>(slice_count + 1));
+ std::default_random_engine gen(seed_);
+ std::uniform_int_distribution<int32_t> dist(0, length);
+ std::generate(offsets.begin(), offsets.end(), [&] { return dist(gen); });
+ std::sort(offsets.begin(), offsets.end());
+ return offsets;
+ }
+
+ ArrayVector Slices(const std::shared_ptr<Array>& array,
+ const std::vector<int32_t>& offsets) {
+ ArrayVector slices(offsets.size() - 1);
+ for (size_t i = 0; i != slices.size(); ++i) {
+ slices[i] = array->Slice(offsets[i], offsets[i + 1] - offsets[i]);
+ }
+ return slices;
+ }
+
+ template <typename PrimitiveType>
+ std::shared_ptr<Array> GeneratePrimitive(int64_t size, double null_probability) {
+ if (std::is_same<PrimitiveType, BooleanType>::value) {
+ return rng_.Boolean(size, 0.5, null_probability);
+ }
+ return rng_.Numeric<PrimitiveType, uint8_t>(size, 0, 127, null_probability);
+ }
+
+ void CheckTrailingBitsAreZeroed(const std::shared_ptr<Buffer>& bitmap, int64_t length) {
+ if (auto preceding_bits = BitUtil::kPrecedingBitmask[length % 8]) {
+ auto last_byte = bitmap->data()[length / 8];
+ ASSERT_EQ(static_cast<uint8_t>(last_byte & preceding_bits), last_byte)
+ << length << " " << int(preceding_bits);
+ }
+ }
+
+ template <typename ArrayFactory>
+ void Check(ArrayFactory&& factory) {
+ for (auto size : this->sizes_) {
+ auto offsets = this->Offsets(size, 3);
+ for (auto null_probability : this->null_probabilities_) {
+ std::shared_ptr<Array> array;
+ factory(size, null_probability, &array);
+ auto expected = array->Slice(offsets.front(), offsets.back() - offsets.front());
+ auto slices = this->Slices(array, offsets);
+ std::shared_ptr<Array> actual;
+ ASSERT_OK(Concatenate(slices, default_memory_pool(), &actual));
+ AssertArraysEqual(*expected, *actual);
+ if (actual->data()->buffers[0]) {
+ CheckTrailingBitsAreZeroed(actual->data()->buffers[0], actual->length());
+ }
+ if (actual->type_id() == Type::BOOL) {
+ CheckTrailingBitsAreZeroed(actual->data()->buffers[1], actual->length());
+ }
+ }
+ }
+ }
+
+ random::SeedType seed_ = 0xdeadbeef;
+ random::RandomArrayGenerator rng_;
+ std::vector<int32_t> sizes_;
+ std::vector<double> null_probabilities_;
+};
+
+template <typename PrimitiveType>
+class PrimitiveConcatenateTest : public ConcatenateTest {
+ public:
+};
+
+using PrimitiveTypes =
+ ::testing::Types<BooleanType, Int8Type, UInt8Type, Int16Type, UInt16Type, Int32Type,
+ UInt32Type, Int64Type, UInt64Type, FloatType, DoubleType>;
+TYPED_TEST_CASE(PrimitiveConcatenateTest, PrimitiveTypes);
+
+TYPED_TEST(PrimitiveConcatenateTest, Primitives) {
+ this->Check([this](int64_t size, double null_probability, std::shared_ptr<Array>* out) {
+ *out = this->template GeneratePrimitive<TypeParam>(size, null_probability);
+ });
+}
+
+TEST_F(ConcatenateTest, StringType) {
+ Check([this](int32_t size, double null_probability, std::shared_ptr<Array>* out) {
+ auto values_size = size * 4;
+ auto char_array = this->GeneratePrimitive<Int8Type>(values_size, null_probability);
+ std::shared_ptr<Buffer> offsets;
+ auto offsets_vector = this->Offsets(values_size, size);
+ // ensure the first offset is 0, which is expected for StringType
+ offsets_vector[0] = 0;
+ ASSERT_OK(CopyBufferFromVector(offsets_vector, default_memory_pool(), &offsets));
+ *out = MakeArray(ArrayData::Make(
+ utf8(), size,
+ {char_array->data()->buffers[0], offsets, char_array->data()->buffers[1]}));
+ });
+}
+
+TEST_F(ConcatenateTest, ListType) {
+ Check([this](int32_t size, double null_probability, std::shared_ptr<Array>* out) {
+ auto values_size = size * 4;
+ auto values = this->GeneratePrimitive<Int8Type>(values_size, null_probability);
+ auto offsets_vector = this->Offsets(values_size, size);
+ // ensure the first offset is 0, which is expected for ListType
+ offsets_vector[0] = 0;
+ std::shared_ptr<Array> offsets;
+ ArrayFromVector<Int32Type>(offsets_vector, &offsets);
+ ASSERT_OK(ListArray::FromArrays(*offsets, *values, default_memory_pool(), out));
+ });
+}
+
+TEST_F(ConcatenateTest, StructType) {
+ Check([this](int32_t size, double null_probability, std::shared_ptr<Array>* out) {
+ auto foo = this->GeneratePrimitive<Int8Type>(size, null_probability);
+ auto bar = this->GeneratePrimitive<DoubleType>(size, null_probability);
+ auto baz = this->GeneratePrimitive<BooleanType>(size, null_probability);
+ *out = std::make_shared<StructArray>(
+ struct_({field("foo", int8()), field("bar", float64()), field("baz", boolean())}),
+ size, ArrayVector{foo, bar, baz});
+ });
+}
+
+TEST_F(ConcatenateTest, DictionaryType) {
+ Check([this](int32_t size, double null_probability, std::shared_ptr<Array>* out) {
+ auto indices = this->GeneratePrimitive<Int32Type>(size, null_probability);
+ auto type = dictionary(int32(), this->GeneratePrimitive<DoubleType>(128, 0));
+ *out = std::make_shared<DictionaryArray>(type, indices);
+ });
+}
+
+TEST_F(ConcatenateTest, DISABLED_UnionType) {
+ // sparse mode
+ Check([this](int32_t size, double null_probability, std::shared_ptr<Array>* out) {
+ auto foo = this->GeneratePrimitive<Int8Type>(size, null_probability);
+ auto bar = this->GeneratePrimitive<DoubleType>(size, null_probability);
+ auto baz = this->GeneratePrimitive<BooleanType>(size, null_probability);
+ auto type_ids = rng_.Numeric<Int8Type>(size, 0, 2, null_probability);
+ ASSERT_OK(UnionArray::MakeSparse(*type_ids, {foo, bar, baz}, out));
+ });
+ // dense mode
+ Check([this](int32_t size, double null_probability, std::shared_ptr<Array>* out) {
+ auto foo = this->GeneratePrimitive<Int8Type>(size, null_probability);
+ auto bar = this->GeneratePrimitive<DoubleType>(size, null_probability);
+ auto baz = this->GeneratePrimitive<BooleanType>(size, null_probability);
+ auto type_ids = rng_.Numeric<Int8Type>(size, 0, 2, null_probability);
+ auto value_offsets = rng_.Numeric<Int32Type>(size, 0, size, 0);
+ ASSERT_OK(UnionArray::MakeDense(*type_ids, *value_offsets, {foo, bar, baz}, out));
+ });
+}
+
+TEST_F(ConcatenateTest, OffsetOverflow) {
+ auto fake_long = ArrayFromJSON(utf8(), "[\"\"]");
+ fake_long->data()->GetMutableValues<int32_t>(1)[1] =
+ std::numeric_limits<int32_t>::max();
+ std::shared_ptr<Array> concatenated;
+ // XX since the data fake_long claims to own isn't there, this will segfault if
+ // Concatenate doesn't detect overflow and raise an error.
+ ASSERT_RAISES(
+ Invalid, Concatenate({fake_long, fake_long}, default_memory_pool(), &concatenated));
+}
+
+} // namespace arrow
diff --git a/cpp/src/arrow/util/concatenate.cc b/cpp/src/arrow/util/concatenate.cc
new file mode 100644
index 0000000..b982d27
--- /dev/null
+++ b/cpp/src/arrow/util/concatenate.cc
@@ -0,0 +1,321 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/util/concatenate.h"
+
+#include <algorithm>
+#include <limits>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/memory_pool.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/visibility.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+/// offset, length pair for representing a Range of a buffer or array
+struct Range {
+ int64_t offset, length;
+
+ Range() : offset(-1), length(0) {}
+ Range(int64_t o, int64_t l) : offset(o), length(l) {}
+};
+
+/// non-owning view into a range of bits
+struct Bitmap {
+ Bitmap() = default;
+ Bitmap(const uint8_t* d, Range r) : data(d), range(r) {}
+ explicit Bitmap(const std::shared_ptr<Buffer>& buffer, Range r)
+ : Bitmap(buffer ? buffer->data() : nullptr, r) {}
+
+ const uint8_t* data;
+ Range range;
+
+ bool AllSet() const { return data == nullptr; }
+};
+
+// Allocate a buffer and concatenate bitmaps into it.
+static Status ConcatenateBitmaps(const std::vector<Bitmap>& bitmaps, MemoryPool* pool,
+ std::shared_ptr<Buffer>* out) {
+ int64_t out_length = 0;
+ for (size_t i = 0; i < bitmaps.size(); ++i) {
+ out_length += bitmaps[i].range.length;
+ }
+ RETURN_NOT_OK(AllocateBitmap(pool, out_length, out));
+ uint8_t* dst = (*out)->mutable_data();
+
+ int64_t bitmap_offset = 0;
+ for (size_t i = 0; i < bitmaps.size(); ++i) {
+ auto bitmap = bitmaps[i];
+ if (bitmap.AllSet()) {
+ BitUtil::SetBitsTo(dst, bitmap_offset, bitmap.range.length, true);
+ } else {
+ internal::CopyBitmap(bitmap.data, bitmap.range.offset, bitmap.range.length, dst,
+ bitmap_offset, false);
+ }
+ bitmap_offset += bitmap.range.length;
+ }
+
+ // finally (if applicable) zero out any trailing bits
+ if (auto preceding_bits = BitUtil::kPrecedingBitmask[out_length % 8]) {
+ dst[out_length / 8] &= preceding_bits;
+ }
+ return Status::OK();
+}
+
+// Write offsets in src into dst, adjusting them such that first_offset
+// will be the first offset written.
+template <typename Offset>
+static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset,
+ Offset* dst, Range* values_range);
+
+// Concatenate buffers holding offsets into a single buffer of offsets,
+// also computing the ranges of values spanned by each buffer of offsets.
+template <typename Offset>
+static Status ConcatenateOffsets(const BufferVector& buffers, MemoryPool* pool,
+ std::shared_ptr<Buffer>* out,
+ std::vector<Range>* values_ranges) {
+ values_ranges->resize(buffers.size());
+
+ // allocate output buffer
+ int64_t out_length = 0;
+ for (size_t i = 0; i < buffers.size(); ++i) {
+ out_length += buffers[i]->size() / sizeof(Offset);
+ }
+ RETURN_NOT_OK(AllocateBuffer(pool, (out_length + 1) * sizeof(Offset), out));
+ auto dst = reinterpret_cast<Offset*>((*out)->mutable_data());
+
+ int64_t elements_length = 0;
+ Offset values_length = 0;
+ for (size_t i = 0; i < buffers.size(); ++i) {
+ // the first offset from buffers[i] will be adjusted to values_length
+ // (the cumulative length of values spanned by offsets in previous buffers)
+ RETURN_NOT_OK(PutOffsets<Offset>(buffers[i], values_length, &dst[elements_length],
+ &values_ranges->at(i)));
+ elements_length += buffers[i]->size() / sizeof(Offset);
+ values_length += static_cast<Offset>(values_ranges->at(i).length);
+ }
+
+ // the final element in dst is the length of all values spanned by the offsets
+ dst[out_length] = values_length;
+ return Status::OK();
+}
+
+template <typename Offset>
+static Status PutOffsets(const std::shared_ptr<Buffer>& src, Offset first_offset,
+ Offset* dst, Range* values_range) {
+ // Get the range of offsets to transfer from src
+ auto src_begin = reinterpret_cast<const Offset*>(src->data());
+ auto src_end = reinterpret_cast<const Offset*>(src->data() + src->size());
+
+ // Compute the range of values which is spanned by this range of offsets
+ values_range->offset = src_begin[0];
+ values_range->length = *src_end - values_range->offset;
+ if (first_offset > std::numeric_limits<Offset>::max() - values_range->length) {
+ return Status::Invalid("offset overflow while concatenating arrays");
+ }
+
+ // Write offsets into dst, ensuring that the first offset written is
+ // first_offset
+ auto adjustment = first_offset - src_begin[0];
+ std::transform(src_begin, src_end, dst,
+ [adjustment](Offset offset) { return offset + adjustment; });
+ return Status::OK();
+}
+
+class ConcatenateImpl {
+ public:
+ ConcatenateImpl(const std::vector<ArrayData>& in, MemoryPool* pool)
+ : in_(in), pool_(pool) {
+ out_.type = in[0].type;
+ for (size_t i = 0; i < in_.size(); ++i) {
+ out_.length += in[i].length;
+ if (out_.null_count == kUnknownNullCount || in[i].null_count == kUnknownNullCount) {
+ out_.null_count = kUnknownNullCount;
+ continue;
+ }
+ out_.null_count += in[i].null_count;
+ }
+ out_.buffers.resize(in[0].buffers.size());
+ out_.child_data.resize(in[0].child_data.size());
+ for (auto& data : out_.child_data) {
+ data = std::make_shared<ArrayData>();
+ }
+ }
+
+ Status Concatenate(ArrayData* out) && {
+ if (out_.null_count != 0) {
+ RETURN_NOT_OK(ConcatenateBitmaps(Bitmaps(0), pool_, &out_.buffers[0]));
+ }
+ RETURN_NOT_OK(VisitTypeInline(*out_.type, this));
+ *out = std::move(out_);
+ return Status::OK();
+ }
+
+ Status Visit(const NullType&) { return Status::OK(); }
+
+ Status Visit(const BooleanType&) {
+ return ConcatenateBitmaps(Bitmaps(1), pool_, &out_.buffers[1]);
+ }
+
+ Status Visit(const FixedWidthType& fixed) {
+ // handles numbers, decimal128, fixed_size_binary
+ return ConcatenateBuffers(Buffers(1, fixed), pool_, &out_.buffers[1]);
+ }
+
+ Status Visit(const BinaryType&) {
+ std::vector<Range> value_ranges;
+ RETURN_NOT_OK(ConcatenateOffsets<int32_t>(Buffers(1, *offset_type), pool_,
+ &out_.buffers[1], &value_ranges));
+ return ConcatenateBuffers(Buffers(2, value_ranges), pool_, &out_.buffers[2]);
+ }
+
+ Status Visit(const ListType&) {
+ std::vector<Range> value_ranges;
+ RETURN_NOT_OK(ConcatenateOffsets<int32_t>(Buffers(1, *offset_type), pool_,
+ &out_.buffers[1], &value_ranges));
+ return ConcatenateImpl(ChildData(0, value_ranges), pool_)
+ .Concatenate(out_.child_data[0].get());
+ }
+
+ Status Visit(const StructType& s) {
+ for (int i = 0; i < s.num_children(); ++i) {
+ RETURN_NOT_OK(
+ ConcatenateImpl(ChildData(i), pool_).Concatenate(out_.child_data[i].get()));
+ }
+ return Status::OK();
+ }
+
+ Status Visit(const DictionaryType& d) {
+ auto fixed = internal::checked_cast<const FixedWidthType*>(d.index_type().get());
+ return ConcatenateBuffers(Buffers(1, *fixed), pool_, &out_.buffers[1]);
+ }
+
+ Status Visit(const UnionType& u) {
+ return Status::NotImplemented("concatenation of ", u);
+ }
+
+ Status Visit(const ExtensionType& e) {
+ // XXX can we just concatenate their storage?
+ return Status::NotImplemented("concatenation of ", e);
+ }
+
+ private:
+ // Gather the index-th buffer of each input into a vector.
+ // Bytes are sliced with that input's offset and length.
+ BufferVector Buffers(size_t index) {
+ BufferVector buffers(in_.size());
+ for (size_t i = 0; i < in_.size(); ++i) {
+ buffers[i] = SliceBuffer(in_[i].buffers[index], in_[i].offset, in_[i].length);
+ }
+ return buffers;
+ }
+
+ // Gather the index-th buffer of each input into a vector.
+ // Bytes are sliced with the explicitly passed ranges.
+ BufferVector Buffers(size_t index, const std::vector<Range>& ranges) {
+ DCHECK_EQ(in_.size(), ranges.size());
+ BufferVector buffers(in_.size());
+ for (size_t i = 0; i < in_.size(); ++i) {
+ buffers[i] = SliceBuffer(in_[i].buffers[index], ranges[i].offset, ranges[i].length);
+ }
+ return buffers;
+ }
+
+ // Gather the index-th buffer of each input into a vector.
+ // Buffers are assumed to contain elements of fixed.bit_width(),
+ // those elements are sliced with that input's offset and length.
+ BufferVector Buffers(size_t index, const FixedWidthType& fixed) {
+ DCHECK_EQ(fixed.bit_width() % 8, 0);
+ auto byte_width = fixed.bit_width() / 8;
+ BufferVector buffers(in_.size());
+ for (size_t i = 0; i < in_.size(); ++i) {
+ buffers[i] = SliceBuffer(in_[i].buffers[index], in_[i].offset * byte_width,
+ in_[i].length * byte_width);
+ }
+ return buffers;
+ }
+
+ // Gather the index-th buffer of each input as a Bitmap
+ // into a vector of Bitmaps.
+ std::vector<Bitmap> Bitmaps(size_t index) {
+ std::vector<Bitmap> bitmaps(in_.size());
+ for (size_t i = 0; i < in_.size(); ++i) {
+ Range range(in_[i].offset, in_[i].length);
+ bitmaps[i] = Bitmap(in_[i].buffers[index], range);
+ }
+ return bitmaps;
+ }
+
+ // Gather the index-th child_data of each input into a vector.
+ // Elements are sliced with that input's offset and length.
+ std::vector<ArrayData> ChildData(size_t index) {
+ std::vector<ArrayData> child_data(in_.size());
+ for (size_t i = 0; i < in_.size(); ++i) {
+ child_data[i] = in_[i].child_data[index]->Slice(in_[i].offset, in_[i].length);
+ }
+ return child_data;
+ }
+
+ // Gather the index-th child_data of each input into a vector.
+ // Elements are sliced with the explicitly passed ranges.
+ std::vector<ArrayData> ChildData(size_t index, const std::vector<Range>& ranges) {
+ DCHECK_EQ(in_.size(), ranges.size());
+ std::vector<ArrayData> child_data(in_.size());
+ for (size_t i = 0; i < in_.size(); ++i) {
+ child_data[i] = in_[i].child_data[index]->Slice(ranges[i].offset, ranges[i].length);
+ }
+ return child_data;
+ }
+
+ static const std::shared_ptr<FixedWidthType> offset_type;
+ const std::vector<ArrayData>& in_;
+ MemoryPool* pool_;
+ ArrayData out_;
+};
+
+const std::shared_ptr<FixedWidthType> ConcatenateImpl::offset_type =
+ std::static_pointer_cast<FixedWidthType>(int32());
+
+Status Concatenate(const ArrayVector& arrays, MemoryPool* pool,
+ std::shared_ptr<Array>* out) {
+ if (arrays.size() == 0) {
+ return Status::Invalid("Must pass at least one array");
+ }
+
+ // gather ArrayData of input arrays
+ std::vector<ArrayData> data(arrays.size());
+ for (size_t i = 0; i < arrays.size(); ++i) {
+ if (!arrays[i]->type()->Equals(*arrays[0]->type())) {
+ return Status::Invalid("arrays to be concatenated must be identically typed, but ",
+ *arrays[0]->type(), " and ", *arrays[i]->type(),
+ " were encountered.");
+ }
+ data[i] = ArrayData(*arrays[i]->data());
+ }
+
+ ArrayData out_data;
+ RETURN_NOT_OK(ConcatenateImpl(data, pool).Concatenate(&out_data));
+ *out = MakeArray(std::make_shared<ArrayData>(std::move(out_data)));
+ return Status::OK();
+}
+
+} // namespace arrow
diff --git a/cpp/src/arrow/util/concatenate.h b/cpp/src/arrow/util/concatenate.h
new file mode 100644
index 0000000..67738d5
--- /dev/null
+++ b/cpp/src/arrow/util/concatenate.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/memory_pool.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+/// \brief Concatenate arrays
+///
+/// \param[in] arrays a vector of arrays to be concatenated
+/// \param[in] pool memory to store the result will be allocated from this memory pool
+/// \param[out] out the resulting concatenated array
+/// \return Status
+ARROW_EXPORT
+Status Concatenate(const ArrayVector& arrays, MemoryPool* pool,
+ std::shared_ptr<Array>* out);
+
+} // namespace arrow