You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/03 16:18:55 UTC

[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #10802: ARROW-1568: [C++] Implement Drop Null Kernel for Arrays

jorisvandenbossche commented on a change in pull request #10802:
URL: https://github.com/apache/arrow/pull/10802#discussion_r681908655



##########
File path: python/pyarrow/compute.py
##########
@@ -590,6 +590,38 @@ def take(data, indices, *, boundscheck=True, memory_pool=None):
     return call_function('take', [data, indices], options, memory_pool)
 
 
+def dropnull(data, *, memory_pool=None):

Review comment:
       Is this explicit function needed? (the auto-generated function doesn't work, or are the vector functions not auto-generated?)

##########
File path: cpp/src/arrow/compute/kernels/vector_selection.cc
##########
@@ -2146,6 +2147,185 @@ class TakeMetaFunction : public MetaFunction {
   }
 };
 
+// ----------------------------------------------------------------------
+// DropNull Implementation
+
+Status GetDropNullFilter(const Array& values, MemoryPool* memory_pool,
+                         std::shared_ptr<arrow::BooleanArray>* out_array) {
+  auto bitmap_buffer = values.null_bitmap();
+  *out_array = std::make_shared<BooleanArray>(values.length(), bitmap_buffer, nullptr, 0,
+                                              values.offset());
+  return Status::OK();
+}
+
+Status CreateEmptyArray(std::shared_ptr<DataType> type, MemoryPool* memory_pool,
+                        std::shared_ptr<Array>* output_array) {
+  std::unique_ptr<ArrayBuilder> builder;
+  RETURN_NOT_OK(MakeBuilder(memory_pool, type, &builder));
+  RETURN_NOT_OK(builder->Resize(0));
+  ARROW_ASSIGN_OR_RAISE(*output_array, builder->Finish());
+  return Status::OK();
+}
+
+Result<std::shared_ptr<Array>> DropNullArray(const std::shared_ptr<Array>& values,
+                                             ExecContext* ctx) {
+  if (values->null_count() == 0) {
+    return values;
+  }
+  if (values->type()->Equals(arrow::null())) {
+    return std::make_shared<NullArray>(0);
+  }
+  std::shared_ptr<BooleanArray> dropnull_filter;
+  RETURN_NOT_OK(GetDropNullFilter(*values, ctx->memory_pool(), &dropnull_filter));
+
+  if (dropnull_filter->null_count() == dropnull_filter->length()) {
+    std::shared_ptr<Array> empty_array;
+    RETURN_NOT_OK(CreateEmptyArray(values->type(), ctx->memory_pool(), &empty_array));
+    return empty_array;
+  }
+  auto options = FilterOptions::Defaults();
+  ARROW_ASSIGN_OR_RAISE(
+      Datum result,
+      CallFunction("array_filter", {Datum(*values), Datum(*dropnull_filter)}, &options,
+                   ctx));
+  return result.make_array();
+}
+
+Result<std::shared_ptr<ChunkedArray>> DropNullChunkedArray(const ChunkedArray& values,
+                                                           ExecContext* ctx) {
+  auto num_chunks = values.num_chunks();
+  std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
+  for (int i = 0; i < num_chunks; i++) {
+    ARROW_ASSIGN_OR_RAISE(new_chunks[i], DropNullArray(values.chunk(i), ctx));
+  }
+  return std::make_shared<ChunkedArray>(std::move(new_chunks));
+}
+
+Result<std::shared_ptr<RecordBatch>> DropNullRecordBatch(const RecordBatch& batch,
+                                                         ExecContext* ctx) {
+  std::vector<std::shared_ptr<Array>> columns(batch.num_columns());
+  std::set<int32_t> notnull_indices;
+
+  for (int col_index = 0; col_index < batch.num_columns(); ++col_index) {
+    const auto& column = batch.column(col_index);
+    for (int64_t i = 0; i < column->length(); ++i) {
+      if (!column->IsValid(i)) {
+        notnull_indices.insert(static_cast<int32_t>(i));
+      }
+    }
+  }
+  if (static_cast<int64_t>(notnull_indices.size()) == batch.num_rows()) {
+    std::vector<std::shared_ptr<Array>> empty_batch(batch.num_columns());
+    for (int i = 0; i < batch.num_columns(); i++) {
+      RETURN_NOT_OK(
+          CreateEmptyArray(batch.column(i)->type(), ctx->memory_pool(), &empty_batch[i]));
+    }
+    return RecordBatch::Make(batch.schema(), 0, empty_batch);
+  }
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(ctx->memory_pool());
+  RETURN_NOT_OK(
+      builder.Reserve(static_cast<int64_t>(batch.num_rows() - notnull_indices.size())));
+  for (int64_t row_index = 0; row_index < batch.num_rows(); ++row_index) {
+    if (notnull_indices.find(static_cast<int32_t>(row_index)) == notnull_indices.end()) {
+      builder.UnsafeAppend(static_cast<int32_t>(row_index));
+    }
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return TakeRA(batch, *indices, TakeOptions::Defaults(), ctx);
+}
+
+Result<std::shared_ptr<Table>> DropNullTable(const Table& table, ExecContext* ctx) {
+  if (table.num_rows() == 0) {
+    return Table::Make(table.schema(), table.columns(), 0);
+  }
+  const int num_columns = table.num_columns();
+  std::vector<ArrayVector> inputs(num_columns);
+
+  // Fetch table columns
+  for (int i = 0; i < num_columns; ++i) {
+    inputs[i] = table.column(i)->chunks();
+  }
+  std::set<int32_t> notnull_indices;
+  // Rechunk inputs to allow consistent iteration over their respective chunks
+  inputs = arrow::internal::RechunkArraysConsistently(inputs);
+
+  const int64_t num_chunks = static_cast<int64_t>(inputs.back().size());
+  for (int col = 0; col < num_columns; ++col) {
+    int64_t relative_index = 0;
+    for (int64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
+      const auto& column_chunk = inputs[col][chunk_index];
+      for (int64_t i = 0; i < column_chunk->length(); ++i) {
+        if (!column_chunk->IsValid(i)) {
+          notnull_indices.insert(static_cast<int32_t>(relative_index + i));
+        }
+      }
+      relative_index += column_chunk->length();
+    }
+  }
+  if (static_cast<int64_t>(notnull_indices.size()) == table.num_rows()) {
+    std::vector<std::shared_ptr<ChunkedArray>> empty_table(table.num_columns());
+    for (int i = 0; i < table.num_columns(); i++) {
+      std::shared_ptr<Array> empty_array;
+      RETURN_NOT_OK(
+          CreateEmptyArray(table.column(i)->type(), ctx->memory_pool(), &empty_array));
+      empty_table[i] = std::make_shared<ChunkedArray>(ArrayVector{empty_array});
+    }
+    return Table::Make(table.schema(), empty_table, 0);
+  }
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(ctx->memory_pool());
+  RETURN_NOT_OK(
+      builder.Reserve(static_cast<int64_t>(table.num_rows() - notnull_indices.size())));
+  for (int64_t row_index = 0; row_index < table.num_rows(); ++row_index) {
+    if (notnull_indices.find(static_cast<int32_t>(row_index)) == notnull_indices.end()) {
+      builder.UnsafeAppend(static_cast<int32_t>(row_index));
+    }
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return TakeTA(table, *indices, TakeOptions::Defaults(), ctx);
+}
+
+const FunctionDoc dropnull_doc(
+    "DropNull kernel",
+    ("The output is populated with values from the input without the null values"),

Review comment:
       Given that this also supports RecordBatches/Tables, I think it would be good to explicitly describe the behaviour in those cases (as there are multiple options, eg the any/all in pandas).

##########
File path: cpp/src/arrow/compute/kernels/vector_selection.cc
##########
@@ -2146,6 +2147,185 @@ class TakeMetaFunction : public MetaFunction {
   }
 };
 
+// ----------------------------------------------------------------------
+// DropNull Implementation
+
+Status GetDropNullFilter(const Array& values, MemoryPool* memory_pool,
+                         std::shared_ptr<arrow::BooleanArray>* out_array) {
+  auto bitmap_buffer = values.null_bitmap();
+  *out_array = std::make_shared<BooleanArray>(values.length(), bitmap_buffer, nullptr, 0,
+                                              values.offset());
+  return Status::OK();
+}
+
+Status CreateEmptyArray(std::shared_ptr<DataType> type, MemoryPool* memory_pool,
+                        std::shared_ptr<Array>* output_array) {
+  std::unique_ptr<ArrayBuilder> builder;
+  RETURN_NOT_OK(MakeBuilder(memory_pool, type, &builder));
+  RETURN_NOT_OK(builder->Resize(0));
+  ARROW_ASSIGN_OR_RAISE(*output_array, builder->Finish());
+  return Status::OK();
+}
+
+Result<std::shared_ptr<Array>> DropNullArray(const std::shared_ptr<Array>& values,
+                                             ExecContext* ctx) {
+  if (values->null_count() == 0) {
+    return values;
+  }
+  if (values->type()->Equals(arrow::null())) {
+    return std::make_shared<NullArray>(0);
+  }
+  std::shared_ptr<BooleanArray> dropnull_filter;
+  RETURN_NOT_OK(GetDropNullFilter(*values, ctx->memory_pool(), &dropnull_filter));
+
+  if (dropnull_filter->null_count() == dropnull_filter->length()) {
+    std::shared_ptr<Array> empty_array;
+    RETURN_NOT_OK(CreateEmptyArray(values->type(), ctx->memory_pool(), &empty_array));
+    return empty_array;
+  }
+  auto options = FilterOptions::Defaults();
+  ARROW_ASSIGN_OR_RAISE(
+      Datum result,
+      CallFunction("array_filter", {Datum(*values), Datum(*dropnull_filter)}, &options,
+                   ctx));
+  return result.make_array();
+}
+
+Result<std::shared_ptr<ChunkedArray>> DropNullChunkedArray(const ChunkedArray& values,
+                                                           ExecContext* ctx) {
+  auto num_chunks = values.num_chunks();
+  std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
+  for (int i = 0; i < num_chunks; i++) {
+    ARROW_ASSIGN_OR_RAISE(new_chunks[i], DropNullArray(values.chunk(i), ctx));
+  }
+  return std::make_shared<ChunkedArray>(std::move(new_chunks));
+}
+
+Result<std::shared_ptr<RecordBatch>> DropNullRecordBatch(const RecordBatch& batch,
+                                                         ExecContext* ctx) {
+  std::vector<std::shared_ptr<Array>> columns(batch.num_columns());
+  std::set<int32_t> notnull_indices;
+
+  for (int col_index = 0; col_index < batch.num_columns(); ++col_index) {
+    const auto& column = batch.column(col_index);
+    for (int64_t i = 0; i < column->length(); ++i) {
+      if (!column->IsValid(i)) {
+        notnull_indices.insert(static_cast<int32_t>(i));
+      }
+    }
+  }
+  if (static_cast<int64_t>(notnull_indices.size()) == batch.num_rows()) {
+    std::vector<std::shared_ptr<Array>> empty_batch(batch.num_columns());
+    for (int i = 0; i < batch.num_columns(); i++) {
+      RETURN_NOT_OK(
+          CreateEmptyArray(batch.column(i)->type(), ctx->memory_pool(), &empty_batch[i]));
+    }
+    return RecordBatch::Make(batch.schema(), 0, empty_batch);
+  }
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(ctx->memory_pool());
+  RETURN_NOT_OK(
+      builder.Reserve(static_cast<int64_t>(batch.num_rows() - notnull_indices.size())));
+  for (int64_t row_index = 0; row_index < batch.num_rows(); ++row_index) {
+    if (notnull_indices.find(static_cast<int32_t>(row_index)) == notnull_indices.end()) {
+      builder.UnsafeAppend(static_cast<int32_t>(row_index));
+    }
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return TakeRA(batch, *indices, TakeOptions::Defaults(), ctx);
+}
+
+Result<std::shared_ptr<Table>> DropNullTable(const Table& table, ExecContext* ctx) {
+  if (table.num_rows() == 0) {
+    return Table::Make(table.schema(), table.columns(), 0);
+  }
+  const int num_columns = table.num_columns();
+  std::vector<ArrayVector> inputs(num_columns);
+
+  // Fetch table columns
+  for (int i = 0; i < num_columns; ++i) {
+    inputs[i] = table.column(i)->chunks();
+  }
+  std::set<int32_t> notnull_indices;
+  // Rechunk inputs to allow consistent iteration over their respective chunks
+  inputs = arrow::internal::RechunkArraysConsistently(inputs);
+
+  const int64_t num_chunks = static_cast<int64_t>(inputs.back().size());
+  for (int col = 0; col < num_columns; ++col) {
+    int64_t relative_index = 0;
+    for (int64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
+      const auto& column_chunk = inputs[col][chunk_index];
+      for (int64_t i = 0; i < column_chunk->length(); ++i) {
+        if (!column_chunk->IsValid(i)) {
+          notnull_indices.insert(static_cast<int32_t>(relative_index + i));
+        }
+      }
+      relative_index += column_chunk->length();
+    }
+  }
+  if (static_cast<int64_t>(notnull_indices.size()) == table.num_rows()) {
+    std::vector<std::shared_ptr<ChunkedArray>> empty_table(table.num_columns());
+    for (int i = 0; i < table.num_columns(); i++) {
+      std::shared_ptr<Array> empty_array;
+      RETURN_NOT_OK(
+          CreateEmptyArray(table.column(i)->type(), ctx->memory_pool(), &empty_array));
+      empty_table[i] = std::make_shared<ChunkedArray>(ArrayVector{empty_array});
+    }
+    return Table::Make(table.schema(), empty_table, 0);
+  }
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(ctx->memory_pool());
+  RETURN_NOT_OK(
+      builder.Reserve(static_cast<int64_t>(table.num_rows() - notnull_indices.size())));
+  for (int64_t row_index = 0; row_index < table.num_rows(); ++row_index) {
+    if (notnull_indices.find(static_cast<int32_t>(row_index)) == notnull_indices.end()) {
+      builder.UnsafeAppend(static_cast<int32_t>(row_index));
+    }
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return TakeTA(table, *indices, TakeOptions::Defaults(), ctx);
+}
+
+const FunctionDoc dropnull_doc(
+    "DropNull kernel",

Review comment:
       ```suggestion
       "Drop nulls from the input",
   ```

##########
File path: cpp/src/arrow/compute/kernels/vector_selection.cc
##########
@@ -2146,6 +2147,185 @@ class TakeMetaFunction : public MetaFunction {
   }
 };
 
+// ----------------------------------------------------------------------
+// DropNull Implementation
+
+Status GetDropNullFilter(const Array& values, MemoryPool* memory_pool,
+                         std::shared_ptr<arrow::BooleanArray>* out_array) {
+  auto bitmap_buffer = values.null_bitmap();
+  *out_array = std::make_shared<BooleanArray>(values.length(), bitmap_buffer, nullptr, 0,
+                                              values.offset());
+  return Status::OK();
+}
+
+Status CreateEmptyArray(std::shared_ptr<DataType> type, MemoryPool* memory_pool,
+                        std::shared_ptr<Array>* output_array) {
+  std::unique_ptr<ArrayBuilder> builder;
+  RETURN_NOT_OK(MakeBuilder(memory_pool, type, &builder));
+  RETURN_NOT_OK(builder->Resize(0));
+  ARROW_ASSIGN_OR_RAISE(*output_array, builder->Finish());
+  return Status::OK();
+}
+
+Result<std::shared_ptr<Array>> DropNullArray(const std::shared_ptr<Array>& values,
+                                             ExecContext* ctx) {
+  if (values->null_count() == 0) {
+    return values;
+  }
+  if (values->type()->Equals(arrow::null())) {
+    return std::make_shared<NullArray>(0);
+  }
+  std::shared_ptr<BooleanArray> dropnull_filter;
+  RETURN_NOT_OK(GetDropNullFilter(*values, ctx->memory_pool(), &dropnull_filter));
+
+  if (dropnull_filter->null_count() == dropnull_filter->length()) {
+    std::shared_ptr<Array> empty_array;
+    RETURN_NOT_OK(CreateEmptyArray(values->type(), ctx->memory_pool(), &empty_array));
+    return empty_array;
+  }
+  auto options = FilterOptions::Defaults();
+  ARROW_ASSIGN_OR_RAISE(
+      Datum result,
+      CallFunction("array_filter", {Datum(*values), Datum(*dropnull_filter)}, &options,
+                   ctx));
+  return result.make_array();
+}
+
+Result<std::shared_ptr<ChunkedArray>> DropNullChunkedArray(const ChunkedArray& values,
+                                                           ExecContext* ctx) {
+  auto num_chunks = values.num_chunks();
+  std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
+  for (int i = 0; i < num_chunks; i++) {
+    ARROW_ASSIGN_OR_RAISE(new_chunks[i], DropNullArray(values.chunk(i), ctx));
+  }
+  return std::make_shared<ChunkedArray>(std::move(new_chunks));
+}
+
+Result<std::shared_ptr<RecordBatch>> DropNullRecordBatch(const RecordBatch& batch,
+                                                         ExecContext* ctx) {
+  std::vector<std::shared_ptr<Array>> columns(batch.num_columns());
+  std::set<int32_t> notnull_indices;
+
+  for (int col_index = 0; col_index < batch.num_columns(); ++col_index) {
+    const auto& column = batch.column(col_index);
+    for (int64_t i = 0; i < column->length(); ++i) {
+      if (!column->IsValid(i)) {
+        notnull_indices.insert(static_cast<int32_t>(i));
+      }
+    }
+  }
+  if (static_cast<int64_t>(notnull_indices.size()) == batch.num_rows()) {
+    std::vector<std::shared_ptr<Array>> empty_batch(batch.num_columns());
+    for (int i = 0; i < batch.num_columns(); i++) {
+      RETURN_NOT_OK(
+          CreateEmptyArray(batch.column(i)->type(), ctx->memory_pool(), &empty_batch[i]));
+    }
+    return RecordBatch::Make(batch.schema(), 0, empty_batch);
+  }
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(ctx->memory_pool());
+  RETURN_NOT_OK(
+      builder.Reserve(static_cast<int64_t>(batch.num_rows() - notnull_indices.size())));
+  for (int64_t row_index = 0; row_index < batch.num_rows(); ++row_index) {
+    if (notnull_indices.find(static_cast<int32_t>(row_index)) == notnull_indices.end()) {
+      builder.UnsafeAppend(static_cast<int32_t>(row_index));
+    }
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return TakeRA(batch, *indices, TakeOptions::Defaults(), ctx);
+}
+
+Result<std::shared_ptr<Table>> DropNullTable(const Table& table, ExecContext* ctx) {
+  if (table.num_rows() == 0) {
+    return Table::Make(table.schema(), table.columns(), 0);
+  }
+  const int num_columns = table.num_columns();
+  std::vector<ArrayVector> inputs(num_columns);
+
+  // Fetch table columns
+  for (int i = 0; i < num_columns; ++i) {
+    inputs[i] = table.column(i)->chunks();
+  }
+  std::set<int32_t> notnull_indices;
+  // Rechunk inputs to allow consistent iteration over their respective chunks
+  inputs = arrow::internal::RechunkArraysConsistently(inputs);
+
+  const int64_t num_chunks = static_cast<int64_t>(inputs.back().size());
+  for (int col = 0; col < num_columns; ++col) {
+    int64_t relative_index = 0;
+    for (int64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
+      const auto& column_chunk = inputs[col][chunk_index];
+      for (int64_t i = 0; i < column_chunk->length(); ++i) {
+        if (!column_chunk->IsValid(i)) {
+          notnull_indices.insert(static_cast<int32_t>(relative_index + i));
+        }
+      }
+      relative_index += column_chunk->length();
+    }
+  }
+  if (static_cast<int64_t>(notnull_indices.size()) == table.num_rows()) {
+    std::vector<std::shared_ptr<ChunkedArray>> empty_table(table.num_columns());
+    for (int i = 0; i < table.num_columns(); i++) {
+      std::shared_ptr<Array> empty_array;
+      RETURN_NOT_OK(
+          CreateEmptyArray(table.column(i)->type(), ctx->memory_pool(), &empty_array));
+      empty_table[i] = std::make_shared<ChunkedArray>(ArrayVector{empty_array});
+    }
+    return Table::Make(table.schema(), empty_table, 0);
+  }
+  std::shared_ptr<arrow::Array> indices;
+  arrow::NumericBuilder<arrow::Int32Type> builder(ctx->memory_pool());
+  RETURN_NOT_OK(
+      builder.Reserve(static_cast<int64_t>(table.num_rows() - notnull_indices.size())));
+  for (int64_t row_index = 0; row_index < table.num_rows(); ++row_index) {
+    if (notnull_indices.find(static_cast<int32_t>(row_index)) == notnull_indices.end()) {
+      builder.UnsafeAppend(static_cast<int32_t>(row_index));
+    }
+  }
+  RETURN_NOT_OK(builder.Finish(&indices));
+  return TakeTA(table, *indices, TakeOptions::Defaults(), ctx);
+}
+
+const FunctionDoc dropnull_doc(
+    "DropNull kernel",
+    ("The output is populated with values from the input without the null values"),
+    {"input"});
+
+class DropNullMetaFunction : public MetaFunction {
+ public:
+  DropNullMetaFunction() : MetaFunction("dropnull", Arity::Unary(), &dropnull_doc) {}

Review comment:
       ```suggestion
     DropNullMetaFunction() : MetaFunction("drop_null", Arity::Unary(), &dropnull_doc) {}
   ```
   
   (I think we generally use underscores, eg we already have `"is_null"` kernel)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org