You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/19 01:12:12 UTC

arrow git commit: ARROW-809: [C++] Do not write excess bytes in IPC writer after slicing arrays

Repository: arrow
Updated Branches:
  refs/heads/master 4baaa88c3 -> a94c03a02


ARROW-809: [C++] Do not write excess bytes in IPC writer after slicing arrays

cc @itaiin

Author: Wes McKinney <we...@twosigma.com>

Closes #555 from wesm/ARROW-809 and squashes the following commits:

318c748 [Wes McKinney] Fix compiler warning
7fd6410 [Wes McKinney] Add sparse union test
290a300 [Wes McKinney] clang-format
1d14aa8 [Wes McKinney] Buffer truncation for unions
51f450f [Wes McKinney] Fix struct
7da5cac [Wes McKinney] Add List test and fix implementation. Fix list comparison bug for sliced arrays
9da3936 [Wes McKinney] Refactor to construct explicit non-nullable arrays
33eaa53 [Wes McKinney] Sliced array buffer truncation for fixed size types, string/binary


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a94c03a0
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a94c03a0
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a94c03a0

Branch: refs/heads/master
Commit: a94c03a02f1da8fa61ac86ba2d6c5e91d29c5767
Parents: 4baaa88
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Apr 18 21:12:06 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Apr 18 21:12:06 2017 -0400

----------------------------------------------------------------------
 cpp/CMakeLists.txt                       |   5 +-
 cpp/src/arrow/compare.cc                 |  10 +-
 cpp/src/arrow/ipc/ipc-read-write-test.cc |  69 +++++++++++
 cpp/src/arrow/ipc/test-common.h          |  50 +++++---
 cpp/src/arrow/ipc/writer.cc              | 158 +++++++++++++++++---------
 cpp/src/arrow/pretty_print.cc            |   6 +-
 cpp/src/arrow/python/util/datetime.h     |   2 +-
 cpp/src/arrow/table.cc                   |   4 +-
 cpp/src/arrow/table.h                    |   4 +-
 9 files changed, 216 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/a94c03a0/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 5d8a0f6..f702da1 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -839,8 +839,9 @@ if (${CLANG_FORMAT_FOUND})
     `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h |
     sed -e '/_generated/g' |
     sed -e '/windows_compatibility.h/g' |
-    sed -e '/config.h/g' |
-    sed -e '/platform.h/g'`)
+    sed -e '/config.h/g' |   # python/config.h
+    sed -e '/platform.h/g'`  # python/platform.h
+    )
 
   # runs clang format and exits with a non-zero exit code if any files need to be reformatted
   add_custom_target(check-format ${BUILD_SUPPORT_DIR}/run-clang-format.sh ${CMAKE_CURRENT_SOURCE_DIR} ${CLANG_FORMAT_BIN} 0

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94c03a0/cpp/src/arrow/compare.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index e02f3f0..ccb299e 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -460,14 +460,8 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor {
       return Status::OK();
     }
 
-    if (left.offset() == 0 && right.offset() == 0) {
-      result_ = left.values()->Equals(right.values());
-    } else {
-      // One of the arrays is sliced
-      result_ = left.values()->RangeEquals(left.value_offset(0),
-          left.value_offset(left.length()), right.value_offset(0), right.values());
-    }
-
+    result_ = left.values()->RangeEquals(left.value_offset(0),
+        left.value_offset(left.length()), right.value_offset(0), right.values());
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94c03a0/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index cfba0d0..b39136e 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -270,6 +270,75 @@ TEST_P(TestIpcRoundTrip, ZeroLengthArrays) {
   CheckRoundtrip(bin_array2, 1 << 20);
 }
 
+TEST_F(TestWriteRecordBatch, SliceTruncatesBuffers) {
+  auto CheckArray = [this](const std::shared_ptr<Array>& array) {
+    auto f0 = field("f0", array->type());
+    auto schema = std::shared_ptr<Schema>(new Schema({f0}));
+    RecordBatch batch(schema, array->length(), {array});
+    auto sliced_batch = batch.Slice(0, 5);
+
+    int64_t full_size;
+    int64_t sliced_size;
+
+    ASSERT_OK(GetRecordBatchSize(batch, &full_size));
+    ASSERT_OK(GetRecordBatchSize(*sliced_batch, &sliced_size));
+    ASSERT_TRUE(sliced_size < full_size) << sliced_size << " " << full_size;
+
+    // make sure we can write and read it
+    this->CheckRoundtrip(*sliced_batch, 1 << 20);
+  };
+
+  std::shared_ptr<Array> a0, a1;
+  auto pool = default_memory_pool();
+
+  // Integer
+  ASSERT_OK(MakeRandomInt32Array(500, false, pool, &a0));
+  CheckArray(a0);
+
+  // String / Binary
+  {
+    auto s = MakeRandomBinaryArray<StringBuilder, char>(500, false, pool, &a0);
+    ASSERT_TRUE(s.ok());
+  }
+  CheckArray(a0);
+
+  // Boolean
+  ASSERT_OK(MakeRandomBooleanArray(10000, false, &a0));
+  CheckArray(a0);
+
+  // List
+  ASSERT_OK(MakeRandomInt32Array(500, false, pool, &a0));
+  ASSERT_OK(MakeRandomListArray(a0, 200, false, pool, &a1));
+  CheckArray(a1);
+
+  // Struct
+  auto struct_type = struct_({field("f0", a0->type())});
+  std::vector<std::shared_ptr<Array>> struct_children = {a0};
+  a1 = std::make_shared<StructArray>(struct_type, a0->length(), struct_children);
+  CheckArray(a1);
+
+  // Sparse Union
+  auto union_type = union_({field("f0", a0->type())}, {0});
+  std::vector<int32_t> type_ids(a0->length());
+  std::shared_ptr<Buffer> ids_buffer;
+  ASSERT_OK(test::CopyBufferFromVector(type_ids, &ids_buffer));
+  a1 = std::make_shared<UnionArray>(
+      union_type, a0->length(), struct_children, ids_buffer);
+  CheckArray(a1);
+
+  // Dense union
+  auto dense_union_type = union_({field("f0", a0->type())}, {0}, UnionMode::DENSE);
+  std::vector<int32_t> type_offsets;
+  for (int32_t i = 0; i < a0->length(); ++i) {
+    type_offsets.push_back(i);
+  }
+  std::shared_ptr<Buffer> offsets_buffer;
+  ASSERT_OK(test::CopyBufferFromVector(type_offsets, &offsets_buffer));
+  a1 = std::make_shared<UnionArray>(
+      dense_union_type, a0->length(), struct_children, ids_buffer, offsets_buffer);
+  CheckArray(a1);
+}
+
 void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
   ipc::MockOutputStream mock;
   int32_t mock_metadata_length = -1;

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94c03a0/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index a17b609..c8ca21c 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -138,31 +138,41 @@ Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_li
 
 typedef Status MakeRecordBatch(std::shared_ptr<RecordBatch>* out);
 
-Status MakeBooleanBatch(std::shared_ptr<RecordBatch>* out) {
-  const int length = 1000;
+Status MakeRandomBooleanArray(
+    const int length, bool include_nulls, std::shared_ptr<Array>* out) {
+  std::vector<uint8_t> values(length);
+  test::random_null_bytes(length, 0.5, values.data());
+  auto data = test::bytes_to_null_buffer(values);
 
+  if (include_nulls) {
+    std::vector<uint8_t> valid_bytes(length);
+    auto null_bitmap = test::bytes_to_null_buffer(valid_bytes);
+    test::random_null_bytes(length, 0.1, valid_bytes.data());
+    *out = std::make_shared<BooleanArray>(length, data, null_bitmap, -1);
+  } else {
+    *out = std::make_shared<BooleanArray>(length, data, nullptr, 0);
+  }
+  return Status::OK();
+}
+
+Status MakeBooleanBatchSized(const int length, std::shared_ptr<RecordBatch>* out) {
   // Make the schema
   auto f0 = field("f0", boolean());
   auto f1 = field("f1", boolean());
   std::shared_ptr<Schema> schema(new Schema({f0, f1}));
 
-  std::vector<uint8_t> values(length);
-  std::vector<uint8_t> valid_bytes(length);
-  test::random_null_bytes(length, 0.5, values.data());
-  test::random_null_bytes(length, 0.1, valid_bytes.data());
-
-  auto data = test::bytes_to_null_buffer(values);
-  auto null_bitmap = test::bytes_to_null_buffer(valid_bytes);
-
-  auto a0 = std::make_shared<BooleanArray>(length, data, null_bitmap, -1);
-  auto a1 = std::make_shared<BooleanArray>(length, data, nullptr, 0);
+  std::shared_ptr<Array> a0, a1;
+  RETURN_NOT_OK(MakeRandomBooleanArray(length, true, &a0));
+  RETURN_NOT_OK(MakeRandomBooleanArray(length, false, &a1));
   out->reset(new RecordBatch(schema, length, {a0, a1}));
   return Status::OK();
 }
 
-Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
-  const int length = 10;
+Status MakeBooleanBatch(std::shared_ptr<RecordBatch>* out) {
+  return MakeBooleanBatchSized(1000, out);
+}
 
+Status MakeIntBatchSized(int length, std::shared_ptr<RecordBatch>* out) {
   // Make the schema
   auto f0 = field("f0", int32());
   auto f1 = field("f1", int32());
@@ -177,16 +187,20 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
   return Status::OK();
 }
 
+Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
+  return MakeIntBatchSized(10, out);
+}
+
 template <class Builder, class RawType>
 Status MakeRandomBinaryArray(
-    int64_t length, MemoryPool* pool, std::shared_ptr<Array>* out) {
+    int64_t length, bool include_nulls, MemoryPool* pool, std::shared_ptr<Array>* out) {
   const std::vector<std::string> values = {
       "", "", "abc", "123", "efg", "456!@#!@#", "12312"};
   Builder builder(pool);
   const size_t values_len = values.size();
   for (int64_t i = 0; i < length; ++i) {
     int64_t values_index = i % values_len;
-    if (values_index == 0) {
+    if (include_nulls && values_index == 0) {
       RETURN_NOT_OK(builder.AppendNull());
     } else {
       const std::string& value = values[values_index];
@@ -210,12 +224,12 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) {
 
   // Quirk with RETURN_NOT_OK macro and templated functions
   {
-    auto s = MakeRandomBinaryArray<StringBuilder, char>(length, pool, &a0);
+    auto s = MakeRandomBinaryArray<StringBuilder, char>(length, true, pool, &a0);
     RETURN_NOT_OK(s);
   }
 
   {
-    auto s = MakeRandomBinaryArray<BinaryBuilder, uint8_t>(length, pool, &a1);
+    auto s = MakeRandomBinaryArray<BinaryBuilder, uint8_t>(length, true, pool, &a1);
     RETURN_NOT_OK(s);
   }
   out->reset(new RecordBatch(schema, length, {a0, a1}));

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94c03a0/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 8ba00a6..61caf64 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -45,6 +45,49 @@ namespace ipc {
 // ----------------------------------------------------------------------
 // Record batch write path
 
+static inline Status GetTruncatedBitmap(int64_t offset, int64_t length,
+    const std::shared_ptr<Buffer> input, MemoryPool* pool,
+    std::shared_ptr<Buffer>* buffer) {
+  if (!input) {
+    *buffer = input;
+    return Status::OK();
+  }
+  int64_t min_length = PaddedLength(BitUtil::BytesForBits(length));
+  if (offset != 0 || min_length < input->size()) {
+    // With a sliced array / non-zero offset, we must copy the bitmap
+    RETURN_NOT_OK(CopyBitmap(pool, input->data(), offset, length, buffer));
+  } else {
+    *buffer = input;
+  }
+  return Status::OK();
+}
+
+template <typename T>
+inline Status GetTruncatedBuffer(int64_t offset, int64_t length,
+    const std::shared_ptr<Buffer> input, MemoryPool* pool,
+    std::shared_ptr<Buffer>* buffer) {
+  if (!input) {
+    *buffer = input;
+    return Status::OK();
+  }
+  int32_t byte_width = static_cast<int32_t>(sizeof(T));
+  int64_t padded_length = PaddedLength(length * byte_width);
+  if (offset != 0 || padded_length < input->size()) {
+    *buffer =
+        SliceBuffer(input, offset * byte_width, std::min(padded_length, input->size()));
+  } else {
+    *buffer = input;
+  }
+  return Status::OK();
+}
+
+static inline bool NeedTruncate(
+    int64_t offset, const Buffer* buffer, int64_t min_length) {
+  // buffer can be NULL
+  if (buffer == nullptr) { return false; }
+  return offset != 0 || min_length < buffer->size();
+}
+
 class RecordBatchWriter : public ArrayVisitor {
  public:
   RecordBatchWriter(MemoryPool* pool, int64_t buffer_start_offset,
@@ -71,14 +114,9 @@ class RecordBatchWriter : public ArrayVisitor {
     field_nodes_.emplace_back(arr.length(), arr.null_count(), 0);
 
     if (arr.null_count() > 0) {
-      std::shared_ptr<Buffer> bitmap = arr.null_bitmap();
-
-      if (arr.offset() != 0) {
-        // With a sliced array / non-zero offset, we must copy the bitmap
-        RETURN_NOT_OK(
-            CopyBitmap(pool_, bitmap->data(), arr.offset(), arr.length(), &bitmap));
-      }
-
+      std::shared_ptr<Buffer> bitmap;
+      RETURN_NOT_OK(GetTruncatedBitmap(
+          arr.offset(), arr.length(), arr.null_bitmap(), pool_, &bitmap));
       buffers_.push_back(bitmap);
     } else {
       // Push a dummy zero-length buffer, not to be copied
@@ -195,21 +233,23 @@ class RecordBatchWriter : public ArrayVisitor {
  protected:
   template <typename ArrayType>
   Status VisitFixedWidth(const ArrayType& array) {
-    std::shared_ptr<Buffer> data_buffer = array.data();
+    std::shared_ptr<Buffer> data = array.data();
 
-    if (array.offset() != 0) {
+    const auto& fw_type = static_cast<const FixedWidthType&>(*array.type());
+    const int64_t type_width = fw_type.bit_width() / 8;
+    int64_t min_length = PaddedLength(array.length() * type_width);
+
+    if (NeedTruncate(array.offset(), data.get(), min_length)) {
       // Non-zero offset, slice the buffer
-      const auto& fw_type = static_cast<const FixedWidthType&>(*array.type());
-      const int type_width = fw_type.bit_width() / 8;
       const int64_t byte_offset = array.offset() * type_width;
 
       // Send padding if it's available
       const int64_t buffer_length =
           std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width),
-              data_buffer->size() - byte_offset);
-      data_buffer = SliceBuffer(data_buffer, byte_offset, buffer_length);
+              data->size() - byte_offset);
+      data = SliceBuffer(data, byte_offset, buffer_length);
     }
-    buffers_.push_back(data_buffer);
+    buffers_.push_back(data);
     return Status::OK();
   }
 
@@ -249,9 +289,16 @@ class RecordBatchWriter : public ArrayVisitor {
     RETURN_NOT_OK(GetZeroBasedValueOffsets<BinaryArray>(array, &value_offsets));
     auto data = array.data();
 
-    if (array.offset() != 0) {
+    int64_t total_data_bytes = 0;
+    if (value_offsets) {
+      total_data_bytes = array.value_offset(array.length()) - array.value_offset(0);
+    }
+    if (NeedTruncate(array.offset(), data.get(), total_data_bytes)) {
       // Slice the data buffer to include only the range we need now
-      data = SliceBuffer(data, array.value_offset(0), array.value_offset(array.length()));
+      const int64_t start_offset = array.value_offset(0);
+      const int64_t slice_length =
+          std::min(PaddedLength(total_data_bytes), data->size() - start_offset);
+      data = SliceBuffer(data, start_offset, slice_length);
     }
 
     buffers_.push_back(value_offsets);
@@ -259,24 +306,11 @@ class RecordBatchWriter : public ArrayVisitor {
     return Status::OK();
   }
 
-  Status Visit(const FixedSizeBinaryArray& array) override {
-    auto data = array.data();
-    int32_t width = array.byte_width();
-
-    if (data && array.offset() != 0) {
-      data = SliceBuffer(data, array.offset() * width, width * array.length());
-    }
-    buffers_.push_back(data);
-    return Status::OK();
-  }
-
   Status Visit(const BooleanArray& array) override {
-    std::shared_ptr<Buffer> bits = array.data();
-    if (array.offset() != 0) {
-      RETURN_NOT_OK(
-          CopyBitmap(pool_, bits->data(), array.offset(), array.length(), &bits));
-    }
-    buffers_.push_back(bits);
+    std::shared_ptr<Buffer> data;
+    RETURN_NOT_OK(
+        GetTruncatedBitmap(array.offset(), array.length(), array.data(), pool_, &data));
+    buffers_.push_back(data);
     return Status::OK();
   }
 
@@ -299,6 +333,7 @@ class RecordBatchWriter : public ArrayVisitor {
   VISIT_FIXED_WIDTH(TimestampArray);
   VISIT_FIXED_WIDTH(Time32Array);
   VISIT_FIXED_WIDTH(Time64Array);
+  VISIT_FIXED_WIDTH(FixedSizeBinaryArray);
 
 #undef VISIT_FIXED_WIDTH
 
@@ -314,11 +349,16 @@ class RecordBatchWriter : public ArrayVisitor {
     --max_recursion_depth_;
     std::shared_ptr<Array> values = array.values();
 
-    if (array.offset() != 0) {
-      // For non-zero offset, we slice the values array accordingly
-      const int32_t offset = array.value_offset(0);
-      const int32_t length = array.value_offset(array.length()) - offset;
-      values = values->Slice(offset, length);
+    int32_t values_offset = 0;
+    int32_t values_length = 0;
+    if (value_offsets) {
+      values_offset = array.value_offset(0);
+      values_length = array.value_offset(array.length()) - values_offset;
+    }
+
+    if (array.offset() != 0 || values_length < values->length()) {
+      // Must also slice the values
+      values = values->Slice(values_offset, values_length);
     }
     RETURN_NOT_OK(VisitArray(*values));
     ++max_recursion_depth_;
@@ -328,7 +368,7 @@ class RecordBatchWriter : public ArrayVisitor {
   Status Visit(const StructArray& array) override {
     --max_recursion_depth_;
     for (std::shared_ptr<Array> field : array.fields()) {
-      if (array.offset() != 0) {
+      if (array.offset() != 0 || array.length() < field->length()) {
         // If offset is non-zero, slice the child array
         field = field->Slice(array.offset(), array.length());
       }
@@ -339,18 +379,21 @@ class RecordBatchWriter : public ArrayVisitor {
   }
 
   Status Visit(const UnionArray& array) override {
-    auto type_ids = array.type_ids();
-    if (array.offset() != 0) {
-      type_ids = SliceBuffer(type_ids, array.offset() * sizeof(UnionArray::type_id_t),
-          array.length() * sizeof(UnionArray::type_id_t));
-    }
+    const int64_t offset = array.offset();
+    const int64_t length = array.length();
 
+    std::shared_ptr<Buffer> type_ids;
+    RETURN_NOT_OK(GetTruncatedBuffer<UnionArray::type_id_t>(
+        offset, length, array.type_ids(), pool_, &type_ids));
     buffers_.push_back(type_ids);
 
     --max_recursion_depth_;
     if (array.mode() == UnionMode::DENSE) {
       const auto& type = static_cast<const UnionType&>(*array.type());
-      auto value_offsets = array.value_offsets();
+
+      std::shared_ptr<Buffer> value_offsets;
+      RETURN_NOT_OK(GetTruncatedBuffer<int32_t>(
+          offset, length, array.value_offsets(), pool_, &value_offsets));
 
       // The Union type codes are not necessary 0-indexed
       uint8_t max_code = 0;
@@ -363,7 +406,7 @@ class RecordBatchWriter : public ArrayVisitor {
       std::vector<int32_t> child_offsets(max_code + 1);
       std::vector<int32_t> child_lengths(max_code + 1, 0);
 
-      if (array.offset() != 0) {
+      if (offset != 0) {
         // This is an unpleasant case. Because the offsets are different for
         // each child array, when we have a sliced array, we need to "rebase"
         // the value_offsets for each array
@@ -373,12 +416,12 @@ class RecordBatchWriter : public ArrayVisitor {
 
         // Allocate the shifted offsets
         std::shared_ptr<MutableBuffer> shifted_offsets_buffer;
-        RETURN_NOT_OK(AllocateBuffer(
-            pool_, array.length() * sizeof(int32_t), &shifted_offsets_buffer));
+        RETURN_NOT_OK(
+            AllocateBuffer(pool_, length * sizeof(int32_t), &shifted_offsets_buffer));
         int32_t* shifted_offsets =
             reinterpret_cast<int32_t*>(shifted_offsets_buffer->mutable_data());
 
-        for (int64_t i = 0; i < array.length(); ++i) {
+        for (int64_t i = 0; i < length; ++i) {
           const uint8_t code = type_ids[i];
           int32_t shift = child_offsets[code];
           if (shift == -1) { child_offsets[code] = shift = unshifted_offsets[i]; }
@@ -395,18 +438,23 @@ class RecordBatchWriter : public ArrayVisitor {
       // Visit children and slice accordingly
       for (int i = 0; i < type.num_children(); ++i) {
         std::shared_ptr<Array> child = array.child(i);
-        if (array.offset() != 0) {
-          const uint8_t code = type.type_codes()[i];
-          child = child->Slice(child_offsets[code], child_lengths[code]);
+
+        // TODO: ARROW-809, for sliced unions, tricky to know how much to
+        // truncate the children. For now, we are truncating the children to be
+        // no longer than the parent union
+        const uint8_t code = type.type_codes()[i];
+        const int64_t child_length = child_lengths[code];
+        if (offset != 0 || length < child_length) {
+          child = child->Slice(child_offsets[code], std::min(length, child_length));
         }
         RETURN_NOT_OK(VisitArray(*child));
       }
     } else {
       for (std::shared_ptr<Array> child : array.children()) {
         // Sparse union, slicing is simpler
-        if (array.offset() != 0) {
+        if (offset != 0 || length < child->length()) {
           // If offset is non-zero, slice the child array
-          child = child->Slice(array.offset(), array.length());
+          child = child->Slice(offset, length);
         }
         RETURN_NOT_OK(VisitArray(*child));
       }

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94c03a0/cpp/src/arrow/pretty_print.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index 0f46f03..1f4bfa9 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -162,10 +162,8 @@ class ArrayPrinter {
 
     Newline();
     Write("-- values: ");
-    auto values = array.values();
-    if (array.offset() != 0) {
-      values = values->Slice(array.value_offset(0), array.value_offset(array.length()));
-    }
+    auto values =
+        array.values()->Slice(array.value_offset(0), array.value_offset(array.length()));
     RETURN_NOT_OK(PrettyPrint(*values, indent_ + 2, sink_));
 
     return Status::OK();

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94c03a0/cpp/src/arrow/python/util/datetime.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/util/datetime.h b/cpp/src/arrow/python/util/datetime.h
index bd80d9f..ad0ee0f 100644
--- a/cpp/src/arrow/python/util/datetime.h
+++ b/cpp/src/arrow/python/util/datetime.h
@@ -32,8 +32,8 @@ static inline int64_t PyDate_to_ms(PyDateTime_Date* pydate) {
   struct tm epoch = {0};
   epoch.tm_year = 70;
   epoch.tm_mday = 1;
-  // Milliseconds since the epoch
 #ifdef _MSC_VER
+  // Milliseconds since the epoch
   const int64_t current_timestamp = static_cast<int64_t>(_mkgmtime64(&date));
   const int64_t epoch_timestamp = static_cast<int64_t>(_mkgmtime64(&epoch));
   return (current_timestamp - epoch_timestamp) * 1000LL;

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94c03a0/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index eabd98b..db17da7 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -180,11 +180,11 @@ bool RecordBatch::ApproxEquals(const RecordBatch& other) const {
   return true;
 }
 
-std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset) {
+std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset) const {
   return Slice(offset, this->num_rows() - offset);
 }
 
-std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset, int64_t length) {
+std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset, int64_t length) const {
   std::vector<std::shared_ptr<Array>> arrays;
   arrays.reserve(num_columns());
   for (const auto& field : columns_) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94c03a0/cpp/src/arrow/table.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index cfd1f36..efc2077 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -137,8 +137,8 @@ class ARROW_EXPORT RecordBatch {
   int64_t num_rows() const { return num_rows_; }
 
   /// Slice each of the arrays in the record batch and construct a new RecordBatch object
-  std::shared_ptr<RecordBatch> Slice(int64_t offset);
-  std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length);
+  std::shared_ptr<RecordBatch> Slice(int64_t offset) const;
+  std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const;
 
   /// Returns error status is there is something wrong with the record batch
   /// contents, like a schema/array mismatch or inconsistent lengths