You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/30 15:21:25 UTC

[GitHub] [arrow] nirandaperera commented on a change in pull request #10802: ARROW-1568: [C++] Implement Drop Null Kernel for Arrays

nirandaperera commented on a change in pull request #10802:
URL: https://github.com/apache/arrow/pull/10802#discussion_r680002827



##########
File path: cpp/src/arrow/compute/kernels/vector_selection.cc
##########
@@ -2146,6 +2147,144 @@ class TakeMetaFunction : public MetaFunction {
   }
 };
 
+// ----------------------------------------------------------------------
+// DropNull Implementation
+
+Result<std::shared_ptr<arrow::Array>> GetNotNullIndices(
+    const std::shared_ptr<Array>& column, MemoryPool* memory_pool) {
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(memory_pool);
+  builder.Reserve(column->length() - column->null_count());
+
+  std::vector<int32_t> values;
+  for (int64_t i = 0; i < column->length(); i++) {
+    if (column->IsValid(i)) {
+      builder.UnsafeAppend(static_cast<int32_t>(i));
+    }
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return indices;
+}
+
+Result<std::shared_ptr<arrow::Array>> GetNotNullIndices(
+    const std::shared_ptr<ChunkedArray>& chunks, MemoryPool* memory_pool) {
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(memory_pool);
+  builder.Reserve(chunks->length() - chunks->null_count());
+  int64_t relative_index = 0;
+  for (int64_t chunk_index = 0; chunk_index < chunks->num_chunks(); ++chunk_index) {
+    auto column_chunk = chunks->chunk(chunk_index);
+    for (int64_t col_index = 0; col_index < column_chunk->length(); col_index++) {
+      if (column_chunk->IsValid(col_index)) {
+        builder.UnsafeAppend(static_cast<int32_t>(relative_index + col_index));
+      }
+    }
+    relative_index += column_chunk->length();
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return indices;
+}
+
+Result<std::shared_ptr<RecordBatch>> DropNullRecordBatch(const RecordBatch& batch,

Review comment:
       I'm a little confused on the sematics for `DropNull` in record batches. What would happen in the following case?
   
   | A | B    | C    |
   |---|------|------|
   | 1 | 4    | null |
   | 2 | null | 6    |
   | 3 | 5    | null |
   

##########
File path: cpp/src/arrow/compute/kernels/vector_selection.cc
##########
@@ -2146,6 +2147,144 @@ class TakeMetaFunction : public MetaFunction {
   }
 };
 
+// ----------------------------------------------------------------------
+// DropNull Implementation
+
+Result<std::shared_ptr<arrow::Array>> GetNotNullIndices(
+    const std::shared_ptr<Array>& column, MemoryPool* memory_pool) {

Review comment:
       I feel like it would be nice to have the following signature
   ```c++
   Status GetNotNullIndices( const std::shared_ptr<Array>& column, MemoryPool* memory_pool, std::shared_ptr<arrow::Array>* out_array);
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_selection.cc
##########
@@ -2146,6 +2147,144 @@ class TakeMetaFunction : public MetaFunction {
   }
 };
 
+// ----------------------------------------------------------------------
+// DropNull Implementation
+
+Result<std::shared_ptr<arrow::Array>> GetNotNullIndices(
+    const std::shared_ptr<Array>& column, MemoryPool* memory_pool) {
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(memory_pool);
+  builder.Reserve(column->length() - column->null_count());
+
+  std::vector<int32_t> values;
+  for (int64_t i = 0; i < column->length(); i++) {
+    if (column->IsValid(i)) {
+      builder.UnsafeAppend(static_cast<int32_t>(i));
+    }
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return indices;
+}
+
+Result<std::shared_ptr<arrow::Array>> GetNotNullIndices(
+    const std::shared_ptr<ChunkedArray>& chunks, MemoryPool* memory_pool) {
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(memory_pool);
+  builder.Reserve(chunks->length() - chunks->null_count());
+  int64_t relative_index = 0;
+  for (int64_t chunk_index = 0; chunk_index < chunks->num_chunks(); ++chunk_index) {
+    auto column_chunk = chunks->chunk(chunk_index);
+    for (int64_t col_index = 0; col_index < column_chunk->length(); col_index++) {
+      if (column_chunk->IsValid(col_index)) {
+        builder.UnsafeAppend(static_cast<int32_t>(relative_index + col_index));
+      }
+    }
+    relative_index += column_chunk->length();
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return indices;
+}
+
+Result<std::shared_ptr<RecordBatch>> DropNullRecordBatch(const RecordBatch& batch,

Review comment:
       I have a feeling that we need to make a union out of all the validity buffers and then call `Take` based on that?

##########
File path: cpp/src/arrow/compute/kernels/vector_selection.cc
##########
@@ -2146,6 +2147,144 @@ class TakeMetaFunction : public MetaFunction {
   }
 };
 
+// ----------------------------------------------------------------------
+// DropNull Implementation
+
+Result<std::shared_ptr<arrow::Array>> GetNotNullIndices(
+    const std::shared_ptr<Array>& column, MemoryPool* memory_pool) {
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(memory_pool);
+  builder.Reserve(column->length() - column->null_count());
+
+  std::vector<int32_t> values;
+  for (int64_t i = 0; i < column->length(); i++) {
+    if (column->IsValid(i)) {
+      builder.UnsafeAppend(static_cast<int32_t>(i));
+    }
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return indices;
+}
+
+Result<std::shared_ptr<arrow::Array>> GetNotNullIndices(
+    const std::shared_ptr<ChunkedArray>& chunks, MemoryPool* memory_pool) {
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(memory_pool);
+  builder.Reserve(chunks->length() - chunks->null_count());
+  int64_t relative_index = 0;
+  for (int64_t chunk_index = 0; chunk_index < chunks->num_chunks(); ++chunk_index) {
+    auto column_chunk = chunks->chunk(chunk_index);
+    for (int64_t col_index = 0; col_index < column_chunk->length(); col_index++) {
+      if (column_chunk->IsValid(col_index)) {
+        builder.UnsafeAppend(static_cast<int32_t>(relative_index + col_index));
+      }
+    }
+    relative_index += column_chunk->length();
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return indices;
+}
+
+Result<std::shared_ptr<RecordBatch>> DropNullRecordBatch(const RecordBatch& batch,
+                                                         ExecContext* ctx) {
+  int64_t length_count = 0;
+  std::vector<std::shared_ptr<Array>> columns(batch.num_columns());
+  for (int i = 0; i < batch.num_columns(); ++i) {
+    ARROW_ASSIGN_OR_RAISE(auto indices,
+                          GetNotNullIndices(batch.column(i), ctx->memory_pool()));
+    ARROW_ASSIGN_OR_RAISE(Datum out, Take(batch.column(i)->data(), Datum(indices),
+                                          TakeOptions::NoBoundsCheck(), ctx));
+    columns[i] = out.make_array();
+    length_count += columns[i]->length();
+  }
+  return RecordBatch::Make(batch.schema(), length_count, std::move(columns));
+}
+
+Result<std::shared_ptr<Table>> DropNullTable(const Table& table, ExecContext* ctx) {
+  if (table.num_rows() == 0) {
+    return Table::Make(table.schema(), table.columns(), 0);
+  }
+  const int num_columns = table.num_columns();
+  std::vector<ArrayVector> inputs(num_columns);
+
+  // Fetch table columns
+  for (int i = 0; i < num_columns; ++i) {
+    inputs[i] = table.column(i)->chunks();
+  }
+  std::set<int32_t> notnull_indices;
+  // Rechunk inputs to allow consistent iteration over their respective chunks
+  inputs = arrow::internal::RechunkArraysConsistently(inputs);
+
+  const int64_t num_chunks = static_cast<int64_t>(inputs.back().size());
+  for (int col = 0; col < num_columns; ++col) {
+    int64_t relative_index = 0;
+    for (int64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
+      const auto& column_chunk = inputs[col][chunk_index];
+      for (int64_t i = 0; i < column_chunk->length(); ++i) {
+        if (!column_chunk->IsValid(i)) {
+          notnull_indices.insert(relative_index + i);
+        }
+      }
+      relative_index += column_chunk->length();
+    }
+  }
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(ctx->memory_pool());
+  builder.Reserve(static_cast<int64_t>(table.num_rows() - notnull_indices.size()));
+  for (int64_t row_index = 0; row_index < table.num_rows(); ++row_index) {
+    if (notnull_indices.find(row_index) == notnull_indices.end()) {
+      builder.UnsafeAppend(static_cast<int32_t>(row_index));
+    }
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return TakeTA(table, *indices, TakeOptions::Defaults(), ctx);
+}
+
+const FunctionDoc dropnull_doc(
+    "DropNull kernel",
+    ("The output is populated with values from the input without the null values"),
+    {"input"});
+
+class DropNullMetaFunction : public MetaFunction {

Review comment:
       There's an alternative approach to using `Take` (at least for arrays). We can promote array's validity buffer to a BooleanArray and call `Filter`. In that case, we dont have to create the valid indices IMO

##########
File path: cpp/src/arrow/compute/kernels/vector_selection_test.cc
##########
@@ -1718,5 +1718,240 @@ TEST(TestTake, RandomFixedSizeBinary) {
   TakeRandomTest<FixedSizeBinaryType>::Test(fixed_size_binary(16));
 }
 
+// ----------------------------------------------------------------------
+// DropNull tests
+
+void AssertDropNullArrays(const std::shared_ptr<Array>& values,
+                          const std::shared_ptr<Array>& expected) {
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> actual, DropNull(*values));
+  ValidateOutput(actual);
+  AssertArraysEqual(*expected, *actual, /*verbose=*/true);
+}
+
+Status DropNullJSON(const std::shared_ptr<DataType>& type, const std::string& values,
+                    std::shared_ptr<Array>* out) {
+  return DropNull(*ArrayFromJSON(type, values)).Value(out);
+}
+
+void CheckDropNull(const std::shared_ptr<DataType>& type, const std::string& values,
+                   const std::string& expected) {
+  std::shared_ptr<Array> actual;
+
+  ASSERT_OK(DropNullJSON(type, values, &actual));
+  ValidateOutput(actual);
+  AssertArraysEqual(*ArrayFromJSON(type, expected), *actual, /*verbose=*/true);
+}
+
+struct TestDropNullKernel : public ::testing::Test {
+  void TestNoValidityBitmapButUnknownNullCount(const std::shared_ptr<Array>& values) {
+    ASSERT_EQ(values->null_count(), 0);
+    auto expected = (*DropNull(values)).make_array();
+
+    auto new_values = MakeArray(values->data()->Copy());
+    new_values->data()->buffers[0].reset();
+    new_values->data()->null_count = kUnknownNullCount;
+    auto result = (*DropNull(new_values)).make_array();
+    AssertArraysEqual(*expected, *result);
+  }
+
+  void TestNoValidityBitmapButUnknownNullCount(const std::shared_ptr<DataType>& type,
+                                               const std::string& values) {
+    TestNoValidityBitmapButUnknownNullCount(ArrayFromJSON(type, values));
+  }
+};
+
+TEST_F(TestDropNullKernel, DropNull) {
+  CheckDropNull(null(), "[null, null, null]", "[]");
+  CheckDropNull(null(), "[null]", "[]");
+}
+
+TEST_F(TestDropNullKernel, DropNullBoolean) {
+  CheckDropNull(boolean(), "[true, false, true]", "[true, false, true]");
+  CheckDropNull(boolean(), "[null, false, true]", "[false, true]");
+
+  TestNoValidityBitmapButUnknownNullCount(boolean(), "[true, false, true]");
+}
+
+template <typename ArrowType>
+class TestDropNullKernelTyped : public TestDropNullKernel {};
+
+template <typename ArrowType>
+class TestDropNullKernelWithNumeric : public TestDropNullKernelTyped<ArrowType> {
+ protected:
+  void AssertDropNull(const std::string& values, const std::string& expected) {
+    CheckDropNull(type_singleton(), values, expected);
+  }
+
+  std::shared_ptr<DataType> type_singleton() {

Review comment:
       nit: you can keep this as a class member




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org