You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/03 16:29:06 UTC

[GitHub] [arrow] bkietz commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

bkietz commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r644890412



##########
File path: docs/source/cpp/compute.rst
##########
@@ -638,6 +638,23 @@ String extraction
   ``(?P<letter>[ab])(?P<digit>\\d)``.
 
 
+String joining
+~~~~~~~~~~~~~~
+
+This function does the inverse of string splitting.
+
++-----------------+-----------+----------------------+----------------+-------------------+---------+
+| Function name   | Arity     | Input type 1         | Input type 2   | Output type       | Notes   |
++=================+===========+======================+================+===================+=========+
+| binary_join     | Binary    | List of string-like  | String-like    | String-like       | \(1)    |
++-----------------+-----------+----------------------+----------------+-------------------+---------+
+
+* \(1) The first input must be an array, while the second can be a scalar or array.
+  Each list of values in the first input is joined using each second input
+  as separator.  If there is null element in an input list, the corresponding

Review comment:
       ```suggestion
     as separator.  If any input list is null or contains a null, the corresponding
   ```

##########
File path: cpp/src/arrow/array/builder_binary.h
##########
@@ -77,6 +77,23 @@ class BaseBinaryBuilder : public ArrayBuilder {
     return Append(value.data(), static_cast<offset_type>(value.size()));
   }
 
+  /// Append to the last appended value
+  ///
+  /// Unlike Append, this does not create a new offset.
+  Status AppendToCurrent(const uint8_t* value, offset_type length) {

Review comment:
       Nit: I'd prefer if these were not named `Append` since every other `/Append.*/` method *does* modify built array length. Could we name it `UpdateLast(string_view suffix)`?

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();

Review comment:
       ```suggestion
       auto out_scalar = checked_cast<BaseBinaryScalar*>(out.scalar().get());
       return builder.Finish(&out_scalar->value);
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }

Review comment:
       Might be worthwhile to extract this as a helper:
   ```suggestion
         if (ListContainsNull(list, i)) {
           continue;
         }
         const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
         total_data_length += string_offsets[j_end] - string_offsets[j_start];
         total_data_length += (j_end - j_start - 1) * separators.value_length(i);
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());

Review comment:
       The output should already be a correctly typed but uninitialized scalar
   ```suggestion
         out->scalar()->valid = false;
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();

Review comment:
       We don't lose anything for string kernels which always do their own allocation, but it's worth noting that `*out` is always a correctly shaped ArrayData. We could also safely write
   ```suggestion
       RETURN_NOT_OK(builder->FinishInternal(const_cast<std::shared_ptr<ArrayData>*>(&out->array())));
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?

Review comment:
       Unfortunately yes since ScalarFunctions need to support all permutations of input shapes. Otherwise we would break if a projection expression like `binary_join(x, y)` had its first argument simplified to a scalar by partition information (for example).
   
   I think it'd be fine to defer this to a follow up, however
   ```suggestion
         return Status::NotImplemented("'list' was scalar but 'separator' was array");
   ```
   
   Alternatively you could manually broadcast it to an array with MakeArrayFromScalar and write the follow up as "improve performance of"

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {

Review comment:
       Would you mind moving this above the definition of Append?

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();

Review comment:
       Redundant `else` block
   ```suggestion
       DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
       if (batch[1].kind() == Datum::SCALAR) {
         return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
       }
       DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
       return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
   ```

##########
File path: cpp/src/arrow/compute/kernels/test_util.h
##########
@@ -113,6 +113,11 @@ void CheckScalarBinary(std::string func_name, std::shared_ptr<Array> left_input,
                        std::shared_ptr<Array> expected,
                        const FunctionOptions* options = nullptr);
 
+void CheckScalarBinary(std::string func_name, std::shared_ptr<Array> left_input,

Review comment:
       To keep the public overloads to a minimum, these should probably just take `Datum`s. Added https://issues.apache.org/jira/browse/ARROW-12953

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();
+    // Correct the output type based on the input
+    out->mutable_array()->type = list.value_type();
+    return Status::OK();
+  }
+};
+
+const FunctionDoc binary_join_doc(
+    "Join a list of strings together with a `separator` to form a single string",
+    ("Insert `separator` between `list` elements, and concatenate them.\n"
+     "Any null input and any null `list` element emits a null output.\n"),
+    {"list", "separator"});
+
+template <typename ListType>
+void AddBinaryJoinForListType(ScalarFunction* func) {
+  for (const std::shared_ptr<DataType>& ty : BaseBinaryTypes()) {
+    auto exec = GenerateTypeAgnosticVarBinaryBase<BinaryJoin, ListType>(*ty);
+    auto list_ty = std::make_shared<ListType>(ty);
+    DCHECK_OK(
+        func->AddKernel({InputType::Array(list_ty), InputType::Scalar(ty)}, ty, exec));
+    DCHECK_OK(
+        func->AddKernel({InputType::Array(list_ty), InputType::Array(ty)}, ty, exec));
+    DCHECK_OK(
+        func->AddKernel({InputType::Scalar(list_ty), InputType::Scalar(ty)}, ty, exec));

Review comment:
       Since BinaryJoin::Exec is handling dispatch on shape we shouldn't produce multiple kernels
   ```suggestion
       auto exec = GenerateTypeAgnosticVarBinaryBase<BinaryJoin, ListType>(*ty);
       auto list_ty = std::make_shared<ListType>(ty);
       DCHECK_OK(
           func->AddKernel({InputType::Any(list_ty), InputType::Any(ty)}, ty, exec));
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);

Review comment:
       Nit:
   ```suggestion
       const ListArrayType lists(left);
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();
+    // Correct the output type based on the input
+    out->mutable_array()->type = list.value_type();

Review comment:
       Why would this happen? It seems Exec could `DCHECK(list.value_type()->Equals(separator.type())` or so based on the kernel signatures




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org