You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/22 12:54:41 UTC

[GitHub] [arrow] maartenbreddels opened a new pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

maartenbreddels opened a new pull request #8990:
URL: https://github.com/apache/arrow/pull/8990


   @jorisvandenbossche I've implemented this kernel as a binary (arity) kernel, so the input list array *and* the separator input string array can both be an array (see python test).
   
   I did not implement the case where the input list is a scalar, and the separator an array, since I don't think that's very common.
   
   And note that the kernel is named `binary_join` because it takes string-like and binary-like inputs.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ianmcook commented on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
ianmcook commented on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-809669201


   > > I'll just point out that "join" is not a python-ism. There is a string join in Java, Rust, C#, JavaScript, etc. and it is consistently called join. I think R is the only language I can find that doesn't call it "join".
   > 
   > SQL—which probably has more users than all those languages combined 😁
   
   Snark aside, that's a good point, and since users who are thinking in the SQL way will always be at least one abstraction layer removed from the C++ library, that makes me think "join" is fine 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-809664145


   I'll just point out that "join" is not a python-ism.  There is a string join in Java, Rust, C#, JavaScript, etc. and it is consistently called join.  I think R is the only language I can find that doesn't call it "join".


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou edited a comment on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou edited a comment on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-853781705


   Still a TODO to address (also support large_list inputs).
   
   Edit: done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r644963432



##########
File path: cpp/src/arrow/array/builder_binary.h
##########
@@ -77,6 +77,23 @@ class BaseBinaryBuilder : public ArrayBuilder {
     return Append(value.data(), static_cast<offset_type>(value.size()));
   }
 
+  /// Append to the last appended value
+  ///
+  /// Unlike Append, this does not create a new offset.
+  Status AppendToCurrent(const uint8_t* value, offset_type length) {

Review comment:
       Probably, though `UpdateLast` doesn't convey the idea of actually appending to the last value. Any other idea?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] maartenbreddels commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
maartenbreddels commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r593172558



##########
File path: cpp/src/arrow/compute/kernels/scalar_string_test.cc
##########
@@ -428,6 +433,26 @@ TYPED_TEST(TestStringKernels, StrptimeDoesNotProvideDefaultOptions) {
   ASSERT_RAISES(Invalid, CallFunction("strptime", {input}));
 }
 
+TYPED_TEST(TestStringKernels, Join) {
+  auto separator = this->scalar("--");
+  CheckScalarBinary(
+      "binary_join", list(this->type()),
+      R"([["a", "bb", "ccc"], [], null, ["dd"], ["eee", null], ["ff", ""]])", separator,
+      this->type(), R"(["a--bb--ccc", "", null, "dd", null, "ff--"])");

Review comment:
       Do you agree with my reasoning Joris? this current behavior is a bit more flexible, since with fill_null we can get the behavior you want.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ianmcook commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
ianmcook commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r603563736



##########
File path: cpp/src/arrow/compute/kernels/scalar_string_test.cc
##########
@@ -428,6 +433,26 @@ TYPED_TEST(TestStringKernels, StrptimeDoesNotProvideDefaultOptions) {
   ASSERT_RAISES(Invalid, CallFunction("strptime", {input}));
 }
 
+TYPED_TEST(TestStringKernels, Join) {
+  auto separator = this->scalar("--");
+  CheckScalarBinary(
+      "binary_join", list(this->type()),
+      R"([["a", "bb", "ccc"], [], null, ["dd"], ["eee", null], ["ff", ""]])", separator,
+      this->type(), R"(["a--bb--ccc", "", null, "dd", null, "ff--"])");

Review comment:
       Regarding the R bindings: we will need some additional things beyond the scope of this PR to implement useful translations for `paste()` and `str_c()`—most importantly, a way of combining two or more arrays row-wise into list arrays (because in most real-world usage the strings users want to combine won't be in a list array). So I think it's fine to save those considerations for later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ianmcook commented on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
ianmcook commented on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-809672648


   > In SQL it's called `CONCAT`? (https://www.w3schools.com/sql/func_mysql_concat.asp, although this doesn't have the concept of a join separator)
   
   In SQL, `concat_ws` is the variant with a separator


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r646630651



##########
File path: cpp/src/arrow/array/builder_binary.h
##########
@@ -77,6 +77,23 @@ class BaseBinaryBuilder : public ArrayBuilder {
     return Append(value.data(), static_cast<offset_type>(value.size()));
   }
 
+  /// Append to the last appended value
+  ///
+  /// Unlike Append, this does not create a new offset.
+  Status AppendToCurrent(const uint8_t* value, offset_type length) {

Review comment:
       Does `ExtendCurrent` sound ok? It's a bit terser.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-853166144


   I'm going to take this up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r644890412



##########
File path: docs/source/cpp/compute.rst
##########
@@ -638,6 +638,23 @@ String extraction
   ``(?P<letter>[ab])(?P<digit>\\d)``.
 
 
+String joining
+~~~~~~~~~~~~~~
+
+This function does the inverse of string splitting.
+
++-----------------+-----------+----------------------+----------------+-------------------+---------+
+| Function name   | Arity     | Input type 1         | Input type 2   | Output type       | Notes   |
++=================+===========+======================+================+===================+=========+
+| binary_join     | Binary    | List of string-like  | String-like    | String-like       | \(1)    |
++-----------------+-----------+----------------------+----------------+-------------------+---------+
+
+* \(1) The first input must be an array, while the second can be a scalar or array.
+  Each list of values in the first input is joined using each second input
+  as separator.  If there is null element in an input list, the corresponding

Review comment:
       ```suggestion
     as separator.  If any input list is null or contains a null, the corresponding
   ```

##########
File path: cpp/src/arrow/array/builder_binary.h
##########
@@ -77,6 +77,23 @@ class BaseBinaryBuilder : public ArrayBuilder {
     return Append(value.data(), static_cast<offset_type>(value.size()));
   }
 
+  /// Append to the last appended value
+  ///
+  /// Unlike Append, this does not create a new offset.
+  Status AppendToCurrent(const uint8_t* value, offset_type length) {

Review comment:
       Nit: I'd prefer if these were not named `Append` since every other `/Append.*/` method *does* modify built array length. Could we name it `UpdateLast(string_view suffix)`?

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();

Review comment:
       ```suggestion
       auto out_scalar = checked_cast<BaseBinaryScalar*>(out.scalar().get());
       return builder.Finish(&out_scalar->value);
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }

Review comment:
       Might be worthwhile to extract this as a helper:
   ```suggestion
         if (ListContainsNull(list, i)) {
           continue;
         }
         const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
         total_data_length += string_offsets[j_end] - string_offsets[j_start];
         total_data_length += (j_end - j_start - 1) * separators.value_length(i);
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());

Review comment:
       The output should already be a correctly typed but uninitialized scalar
   ```suggestion
         out->scalar()->valid = false;
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();

Review comment:
       We don't lose anything for string kernels which always do their own allocation, but it's worth noting that `*out` is always a correctly shaped ArrayData. We could also safely write
   ```suggestion
       RETURN_NOT_OK(builder->FinishInternal(const_cast<std::shared_ptr<ArrayData>*>(&out->array())));
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?

Review comment:
       Unfortunately yes since ScalarFunctions need to support all permutations of input shapes. Otherwise we would break if a projection expression like `binary_join(x, y)` had its first argument simplified to a scalar by partition information (for example).
   
   I think it'd be fine to defer this to a follow up, however
   ```suggestion
         return Status::NotImplemented("'list' was scalar but 'separator' was array");
   ```
   
   Alternatively you could manually broadcast it to an array with MakeArrayFromScalar and write the follow up as "improve performance of"

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {

Review comment:
       Would you mind moving this above the definition of Append?

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();

Review comment:
       Redundant `else` block
   ```suggestion
       DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
       if (batch[1].kind() == Datum::SCALAR) {
         return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
       }
       DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
       return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
   ```

##########
File path: cpp/src/arrow/compute/kernels/test_util.h
##########
@@ -113,6 +113,11 @@ void CheckScalarBinary(std::string func_name, std::shared_ptr<Array> left_input,
                        std::shared_ptr<Array> expected,
                        const FunctionOptions* options = nullptr);
 
+void CheckScalarBinary(std::string func_name, std::shared_ptr<Array> left_input,

Review comment:
       To keep the public overloads to a minimum, these should probably just take `Datum`s. Added https://issues.apache.org/jira/browse/ARROW-12953

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();
+    // Correct the output type based on the input
+    out->mutable_array()->type = list.value_type();
+    return Status::OK();
+  }
+};
+
+const FunctionDoc binary_join_doc(
+    "Join a list of strings together with a `separator` to form a single string",
+    ("Insert `separator` between `list` elements, and concatenate them.\n"
+     "Any null input and any null `list` element emits a null output.\n"),
+    {"list", "separator"});
+
+template <typename ListType>
+void AddBinaryJoinForListType(ScalarFunction* func) {
+  for (const std::shared_ptr<DataType>& ty : BaseBinaryTypes()) {
+    auto exec = GenerateTypeAgnosticVarBinaryBase<BinaryJoin, ListType>(*ty);
+    auto list_ty = std::make_shared<ListType>(ty);
+    DCHECK_OK(
+        func->AddKernel({InputType::Array(list_ty), InputType::Scalar(ty)}, ty, exec));
+    DCHECK_OK(
+        func->AddKernel({InputType::Array(list_ty), InputType::Array(ty)}, ty, exec));
+    DCHECK_OK(
+        func->AddKernel({InputType::Scalar(list_ty), InputType::Scalar(ty)}, ty, exec));

Review comment:
       Since BinaryJoin::Exec is handling dispatch on shape we shouldn't produce multiple kernels
   ```suggestion
       auto exec = GenerateTypeAgnosticVarBinaryBase<BinaryJoin, ListType>(*ty);
       auto list_ty = std::make_shared<ListType>(ty);
       DCHECK_OK(
           func->AddKernel({InputType::Any(list_ty), InputType::Any(ty)}, ty, exec));
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);

Review comment:
       Nit:
   ```suggestion
       const ListArrayType lists(left);
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();
+    // Correct the output type based on the input
+    out->mutable_array()->type = list.value_type();

Review comment:
       Why would this happen? It seems Exec could `DCHECK(list.value_type()->Equals(separator.type())` or so based on the kernel signatures




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r644950543



##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?

Review comment:
       Hmm, well, a list scalar is already a wrapper around a list array so I should be able to add it in this PR :-)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r603289347



##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -1828,6 +1828,148 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+// binary join
+
+template <typename Type>
+struct Join {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using ListArrayType = ListArray;
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {

Review comment:
       For readability, can you split this function into different functions based on the various input kinds, e.g. `ExecArrayArray`, `ExecArrayScalar`, `ExecScalarArray`, `ExecScalarScalar`?

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -1828,6 +1828,148 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+// binary join
+
+template <typename Type>
+struct Join {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using ListArrayType = ListArray;
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      const ListScalar& list = checked_cast<const ListScalar&>(*batch[0].scalar());
+      if (!list.is_valid) {
+        return;
+      }
+      if (batch[1].kind() == Datum::SCALAR) {
+        const BaseBinaryScalar& separator_scalar =
+            checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
+        if (!separator_scalar.is_valid) {
+          return;
+        }
+        util::string_view separator(*separator_scalar.value);
+
+        TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+        auto Append = [&](util::string_view value) {
+          return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                                static_cast<offset_type>(value.size()));
+        };
+
+        const ArrayType* strings = static_cast<const ArrayType*>(list.value.get());
+        if (strings->null_count() > 0) {
+          // since the input list is not null, the out datum needs to be assigned to
+          *out = MakeNullScalar(list.value->type());
+          return;
+        }
+        if (strings->length() > 0) {
+          KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(0)));
+          for (int64_t j = 1; j < strings->length(); j++) {
+            KERNEL_RETURN_IF_ERROR(ctx, Append(separator));
+            KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(j)));
+          }
+        }
+        std::shared_ptr<Buffer> string_buffer;
+
+        KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_buffer));
+        KERNEL_ASSIGN_OR_RAISE(auto scalar_right_type, ctx,
+                               MakeScalar<std::shared_ptr<Buffer>>(
+                                   list.value->type(), std::move(string_buffer)));
+        *out = scalar_right_type;
+      }  // do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      const ListArrayType list(batch[0].array());
+      ArrayData* output = out->mutable_array();
+
+      BuilderType builder(ctx->memory_pool());
+      KERNEL_RETURN_IF_ERROR(ctx, builder.Reserve(list.length()));
+      if (batch[1].kind() == Datum::ARRAY) {
+        ArrayType separator_array(batch[1].array());
+        for (int64_t i = 0; i < list.length(); ++i) {
+          const std::shared_ptr<Array> slice = list.value_slice(i);
+          const ArrayType* strings = static_cast<const ArrayType*>(slice.get());
+          if ((strings->null_count() > 0) || (list.IsNull(i)) ||
+              separator_array.IsNull(i)) {
+            KERNEL_RETURN_IF_ERROR(ctx, builder.AppendNull());
+          } else {
+            const auto separator = separator_array.GetView(i);
+            if (strings->length() > 0) {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.Append(strings->GetView(0)));
+              for (int64_t j = 1; j < strings->length(); j++) {
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(separator));
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(strings->GetView(j)));
+              }
+            } else {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.AppendEmptyValue());
+            }
+          }
+        }
+      } else if (batch[1].kind() == Datum::SCALAR) {
+        const auto& separator_scalar =
+            checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
+        if (!separator_scalar.is_valid) {
+          KERNEL_ASSIGN_OR_RAISE(
+              auto nulls, ctx,
+              MakeArrayOfNull(list.value_type(), list.length(), ctx->memory_pool()));
+          *output = *nulls->data();
+          output->type = list.value_type();
+          return;
+        }
+        util::string_view separator(*separator_scalar.value);
+
+        for (int64_t i = 0; i < list.length(); ++i) {
+          const std::shared_ptr<Array> slice = list.value_slice(i);
+          const ArrayType* strings = static_cast<const ArrayType*>(slice.get());
+          if ((strings->null_count() > 0) || (list.IsNull(i))) {
+            KERNEL_RETURN_IF_ERROR(ctx, builder.AppendNull());
+          } else {
+            if (strings->length() > 0) {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.Append(strings->GetView(0)));
+              for (int64_t j = 1; j < strings->length(); j++) {
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(separator));
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(strings->GetView(j)));
+              }
+            } else {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.AppendEmptyValue());
+            }
+          }
+        }
+      }
+      std::shared_ptr<Array> string_array;
+
+      KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_array));
+      *output = *string_array->data();
+      // correct the output type based on the input
+      output->type = list.value_type();
+    }
+  }
+};
+
+const FunctionDoc binary_join_doc(
+    "Join list of strings together with a `seperator` to form a single string",
+    ("Insert `seperator` between each list element, and concatenate them."),
+    {"list", "separator"});
+
+void AddJoin(FunctionRegistry* registry) {
+  auto func =
+      std::make_shared<ScalarFunction>("binary_join", Arity::Binary(), &binary_join_doc);
+  for (const std::shared_ptr<DataType>& ty : BaseBinaryTypes()) {
+    auto exec = GenerateTypeAgnosticVarBinaryBase<Join>(*ty);
+    DCHECK_OK(
+        func->AddKernel({InputType::Array(list(ty)), InputType::Scalar(ty)}, ty, exec));
+    DCHECK_OK(
+        func->AddKernel({InputType::Array(list(ty)), InputType::Array(ty)}, ty, exec));
+    // do we want to support scalar[list[str]] with array[str] ?

Review comment:
       It doesn't sound very useful. Either way, if it's not supported, then simply remove the commented out code.

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -1828,6 +1828,148 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+// binary join
+
+template <typename Type>
+struct Join {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using ListArrayType = ListArray;
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      const ListScalar& list = checked_cast<const ListScalar&>(*batch[0].scalar());
+      if (!list.is_valid) {
+        return;
+      }
+      if (batch[1].kind() == Datum::SCALAR) {
+        const BaseBinaryScalar& separator_scalar =
+            checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
+        if (!separator_scalar.is_valid) {
+          return;
+        }
+        util::string_view separator(*separator_scalar.value);
+
+        TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+        auto Append = [&](util::string_view value) {
+          return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                                static_cast<offset_type>(value.size()));
+        };
+
+        const ArrayType* strings = static_cast<const ArrayType*>(list.value.get());
+        if (strings->null_count() > 0) {
+          // since the input list is not null, the out datum needs to be assigned to
+          *out = MakeNullScalar(list.value->type());
+          return;
+        }
+        if (strings->length() > 0) {
+          KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(0)));
+          for (int64_t j = 1; j < strings->length(); j++) {
+            KERNEL_RETURN_IF_ERROR(ctx, Append(separator));
+            KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(j)));
+          }
+        }
+        std::shared_ptr<Buffer> string_buffer;
+
+        KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_buffer));
+        KERNEL_ASSIGN_OR_RAISE(auto scalar_right_type, ctx,
+                               MakeScalar<std::shared_ptr<Buffer>>(
+                                   list.value->type(), std::move(string_buffer)));
+        *out = scalar_right_type;
+      }  // do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      const ListArrayType list(batch[0].array());
+      ArrayData* output = out->mutable_array();
+
+      BuilderType builder(ctx->memory_pool());
+      KERNEL_RETURN_IF_ERROR(ctx, builder.Reserve(list.length()));
+      if (batch[1].kind() == Datum::ARRAY) {
+        ArrayType separator_array(batch[1].array());
+        for (int64_t i = 0; i < list.length(); ++i) {
+          const std::shared_ptr<Array> slice = list.value_slice(i);
+          const ArrayType* strings = static_cast<const ArrayType*>(slice.get());
+          if ((strings->null_count() > 0) || (list.IsNull(i)) ||
+              separator_array.IsNull(i)) {
+            KERNEL_RETURN_IF_ERROR(ctx, builder.AppendNull());
+          } else {
+            const auto separator = separator_array.GetView(i);
+            if (strings->length() > 0) {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.Append(strings->GetView(0)));
+              for (int64_t j = 1; j < strings->length(); j++) {
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(separator));
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(strings->GetView(j)));
+              }
+            } else {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.AppendEmptyValue());
+            }
+          }
+        }
+      } else if (batch[1].kind() == Datum::SCALAR) {
+        const auto& separator_scalar =
+            checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
+        if (!separator_scalar.is_valid) {
+          KERNEL_ASSIGN_OR_RAISE(
+              auto nulls, ctx,
+              MakeArrayOfNull(list.value_type(), list.length(), ctx->memory_pool()));
+          *output = *nulls->data();
+          output->type = list.value_type();
+          return;
+        }
+        util::string_view separator(*separator_scalar.value);
+
+        for (int64_t i = 0; i < list.length(); ++i) {
+          const std::shared_ptr<Array> slice = list.value_slice(i);
+          const ArrayType* strings = static_cast<const ArrayType*>(slice.get());
+          if ((strings->null_count() > 0) || (list.IsNull(i))) {
+            KERNEL_RETURN_IF_ERROR(ctx, builder.AppendNull());
+          } else {
+            if (strings->length() > 0) {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.Append(strings->GetView(0)));
+              for (int64_t j = 1; j < strings->length(); j++) {
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(separator));
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(strings->GetView(j)));
+              }
+            } else {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.AppendEmptyValue());
+            }
+          }
+        }
+      }
+      std::shared_ptr<Array> string_array;
+
+      KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_array));
+      *output = *string_array->data();
+      // correct the output type based on the input
+      output->type = list.value_type();
+    }
+  }
+};
+
+const FunctionDoc binary_join_doc(
+    "Join list of strings together with a `seperator` to form a single string",

Review comment:
       "separator"

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -1828,6 +1828,148 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+// binary join
+
+template <typename Type>
+struct Join {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using ListArrayType = ListArray;
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      const ListScalar& list = checked_cast<const ListScalar&>(*batch[0].scalar());
+      if (!list.is_valid) {
+        return;
+      }
+      if (batch[1].kind() == Datum::SCALAR) {
+        const BaseBinaryScalar& separator_scalar =
+            checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
+        if (!separator_scalar.is_valid) {
+          return;
+        }
+        util::string_view separator(*separator_scalar.value);
+
+        TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+        auto Append = [&](util::string_view value) {
+          return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                                static_cast<offset_type>(value.size()));
+        };
+
+        const ArrayType* strings = static_cast<const ArrayType*>(list.value.get());

Review comment:
       Use `checked_cast`

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -1828,6 +1828,148 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+// binary join
+
+template <typename Type>
+struct Join {

Review comment:
       Note that the computation logic doesn't depend on utf8 vs. binary, hence it would be nice to only instantiate two kernel routines (one per offset type), not four.

##########
File path: cpp/src/arrow/compute/kernels/scalar_string_test.cc
##########
@@ -428,6 +433,26 @@ TYPED_TEST(TestStringKernels, StrptimeDoesNotProvideDefaultOptions) {
   ASSERT_RAISES(Invalid, CallFunction("strptime", {input}));
 }
 
+TYPED_TEST(TestStringKernels, Join) {
+  auto separator = this->scalar("--");
+  CheckScalarBinary(
+      "binary_join", list(this->type()),
+      R"([["a", "bb", "ccc"], [], null, ["dd"], ["eee", null], ["ff", ""]])", separator,
+      this->type(), R"(["a--bb--ccc", "", null, "dd", null, "ff--"])");

Review comment:
       I'm ok with this behaviour. We can add an option later if we want to override it.

##########
File path: docs/source/cpp/compute.rst
##########
@@ -575,6 +575,22 @@ when a positive ``max_splits`` is given.
   as separator.
 
 
+String joining
+~~~~~~~~~~~~~~
+
+The inverse of string splitting:
+
++--------------------------+------------+-------------------------+-------------------+---------+
+| Function name            | Arity      | Input types             | Output type       | Notes   |
++==========================+============+=========================+===================+=========+
+| binary_join              | Binary     | List-like               | String-like       | \(1)    |

Review comment:
       Since the two inputs must be of different types, it would be nice to make this explicit here (perhaps by making two columns "first input type" and "second input type"?).

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -1828,6 +1828,148 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+// binary join
+
+template <typename Type>
+struct Join {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using ListArrayType = ListArray;
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      const ListScalar& list = checked_cast<const ListScalar&>(*batch[0].scalar());
+      if (!list.is_valid) {
+        return;
+      }
+      if (batch[1].kind() == Datum::SCALAR) {
+        const BaseBinaryScalar& separator_scalar =
+            checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
+        if (!separator_scalar.is_valid) {
+          return;
+        }
+        util::string_view separator(*separator_scalar.value);
+
+        TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+        auto Append = [&](util::string_view value) {
+          return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                                static_cast<offset_type>(value.size()));
+        };
+
+        const ArrayType* strings = static_cast<const ArrayType*>(list.value.get());
+        if (strings->null_count() > 0) {
+          // since the input list is not null, the out datum needs to be assigned to
+          *out = MakeNullScalar(list.value->type());
+          return;
+        }
+        if (strings->length() > 0) {
+          KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(0)));
+          for (int64_t j = 1; j < strings->length(); j++) {
+            KERNEL_RETURN_IF_ERROR(ctx, Append(separator));
+            KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(j)));
+          }
+        }
+        std::shared_ptr<Buffer> string_buffer;
+
+        KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_buffer));
+        KERNEL_ASSIGN_OR_RAISE(auto scalar_right_type, ctx,
+                               MakeScalar<std::shared_ptr<Buffer>>(
+                                   list.value->type(), std::move(string_buffer)));
+        *out = scalar_right_type;
+      }  // do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      const ListArrayType list(batch[0].array());
+      ArrayData* output = out->mutable_array();
+
+      BuilderType builder(ctx->memory_pool());
+      KERNEL_RETURN_IF_ERROR(ctx, builder.Reserve(list.length()));
+      if (batch[1].kind() == Datum::ARRAY) {
+        ArrayType separator_array(batch[1].array());
+        for (int64_t i = 0; i < list.length(); ++i) {
+          const std::shared_ptr<Array> slice = list.value_slice(i);
+          const ArrayType* strings = static_cast<const ArrayType*>(slice.get());
+          if ((strings->null_count() > 0) || (list.IsNull(i)) ||
+              separator_array.IsNull(i)) {
+            KERNEL_RETURN_IF_ERROR(ctx, builder.AppendNull());
+          } else {
+            const auto separator = separator_array.GetView(i);
+            if (strings->length() > 0) {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.Append(strings->GetView(0)));
+              for (int64_t j = 1; j < strings->length(); j++) {
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(separator));
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(strings->GetView(j)));
+              }
+            } else {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.AppendEmptyValue());
+            }
+          }
+        }
+      } else if (batch[1].kind() == Datum::SCALAR) {
+        const auto& separator_scalar =
+            checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
+        if (!separator_scalar.is_valid) {
+          KERNEL_ASSIGN_OR_RAISE(
+              auto nulls, ctx,
+              MakeArrayOfNull(list.value_type(), list.length(), ctx->memory_pool()));
+          *output = *nulls->data();
+          output->type = list.value_type();
+          return;
+        }
+        util::string_view separator(*separator_scalar.value);
+
+        for (int64_t i = 0; i < list.length(); ++i) {
+          const std::shared_ptr<Array> slice = list.value_slice(i);
+          const ArrayType* strings = static_cast<const ArrayType*>(slice.get());
+          if ((strings->null_count() > 0) || (list.IsNull(i))) {
+            KERNEL_RETURN_IF_ERROR(ctx, builder.AppendNull());
+          } else {
+            if (strings->length() > 0) {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.Append(strings->GetView(0)));
+              for (int64_t j = 1; j < strings->length(); j++) {
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(separator));
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(strings->GetView(j)));
+              }
+            } else {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.AppendEmptyValue());
+            }
+          }
+        }
+      }
+      std::shared_ptr<Array> string_array;
+
+      KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_array));
+      *output = *string_array->data();
+      // correct the output type based on the input
+      output->type = list.value_type();
+    }
+  }
+};
+
+const FunctionDoc binary_join_doc(
+    "Join list of strings together with a `seperator` to form a single string",
+    ("Insert `seperator` between each list element, and concatenate them."),

Review comment:
       Can you mention the null behaviour as well?

##########
File path: cpp/src/arrow/array/builder_binary.h
##########
@@ -77,6 +77,25 @@ class BaseBinaryBuilder : public ArrayBuilder {
     return Append(value.data(), static_cast<offset_type>(value.size()));
   }
 
+  /// AppendCurrent does not add a new offset
+  Status AppendCurrent(const uint8_t* value, offset_type length) {

Review comment:
       Call this `AppendToCurrent`?

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -1828,6 +1828,148 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+// binary join
+
+template <typename Type>
+struct Join {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using ListArrayType = ListArray;
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      const ListScalar& list = checked_cast<const ListScalar&>(*batch[0].scalar());
+      if (!list.is_valid) {
+        return;
+      }
+      if (batch[1].kind() == Datum::SCALAR) {
+        const BaseBinaryScalar& separator_scalar =
+            checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
+        if (!separator_scalar.is_valid) {
+          return;
+        }
+        util::string_view separator(*separator_scalar.value);
+
+        TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+        auto Append = [&](util::string_view value) {
+          return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                                static_cast<offset_type>(value.size()));
+        };
+
+        const ArrayType* strings = static_cast<const ArrayType*>(list.value.get());
+        if (strings->null_count() > 0) {
+          // since the input list is not null, the out datum needs to be assigned to
+          *out = MakeNullScalar(list.value->type());
+          return;
+        }
+        if (strings->length() > 0) {
+          KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(0)));
+          for (int64_t j = 1; j < strings->length(); j++) {
+            KERNEL_RETURN_IF_ERROR(ctx, Append(separator));
+            KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(j)));
+          }
+        }
+        std::shared_ptr<Buffer> string_buffer;
+
+        KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_buffer));
+        KERNEL_ASSIGN_OR_RAISE(auto scalar_right_type, ctx,
+                               MakeScalar<std::shared_ptr<Buffer>>(
+                                   list.value->type(), std::move(string_buffer)));
+        *out = scalar_right_type;
+      }  // do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      const ListArrayType list(batch[0].array());
+      ArrayData* output = out->mutable_array();
+
+      BuilderType builder(ctx->memory_pool());
+      KERNEL_RETURN_IF_ERROR(ctx, builder.Reserve(list.length()));
+      if (batch[1].kind() == Datum::ARRAY) {
+        ArrayType separator_array(batch[1].array());
+        for (int64_t i = 0; i < list.length(); ++i) {
+          const std::shared_ptr<Array> slice = list.value_slice(i);
+          const ArrayType* strings = static_cast<const ArrayType*>(slice.get());
+          if ((strings->null_count() > 0) || (list.IsNull(i)) ||
+              separator_array.IsNull(i)) {
+            KERNEL_RETURN_IF_ERROR(ctx, builder.AppendNull());
+          } else {
+            const auto separator = separator_array.GetView(i);
+            if (strings->length() > 0) {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.Append(strings->GetView(0)));
+              for (int64_t j = 1; j < strings->length(); j++) {
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(separator));
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(strings->GetView(j)));
+              }
+            } else {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.AppendEmptyValue());

Review comment:
       or `UnsafeAppend("", 0)`...

##########
File path: docs/source/cpp/compute.rst
##########
@@ -575,6 +575,22 @@ when a positive ``max_splits`` is given.
   as separator.
 
 
+String joining
+~~~~~~~~~~~~~~
+
+The inverse of string splitting:
+
++--------------------------+------------+-------------------------+-------------------+---------+
+| Function name            | Arity      | Input types             | Output type       | Notes   |
++==========================+============+=========================+===================+=========+
+| binary_join              | Binary     | List-like               | String-like       | \(1)    |
++--------------------------+------------+-------------------------+-------------------+---------+
+
+* \(1) First argument must be an array, while the second argument (`separator`)
+can be a scalar or array. When the `separator` argument is an array, the join is
+performed by each corresponding element.

Review comment:
       Be careful with indentation in bullet lists, you need the continuation lines to be indented as the first line after the `*`:
   ```
   * \(1) First argument must be an array, while the second argument (`separator`)
     can be a scalar or array. When the `separator` argument is an array, the join is
     performed by each corresponding element.
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -1828,6 +1828,148 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+// binary join
+
+template <typename Type>
+struct Join {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using ListArrayType = ListArray;
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      const ListScalar& list = checked_cast<const ListScalar&>(*batch[0].scalar());
+      if (!list.is_valid) {
+        return;
+      }
+      if (batch[1].kind() == Datum::SCALAR) {
+        const BaseBinaryScalar& separator_scalar =
+            checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
+        if (!separator_scalar.is_valid) {
+          return;
+        }
+        util::string_view separator(*separator_scalar.value);
+
+        TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+        auto Append = [&](util::string_view value) {
+          return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                                static_cast<offset_type>(value.size()));
+        };
+
+        const ArrayType* strings = static_cast<const ArrayType*>(list.value.get());
+        if (strings->null_count() > 0) {
+          // since the input list is not null, the out datum needs to be assigned to
+          *out = MakeNullScalar(list.value->type());
+          return;
+        }
+        if (strings->length() > 0) {
+          KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(0)));
+          for (int64_t j = 1; j < strings->length(); j++) {
+            KERNEL_RETURN_IF_ERROR(ctx, Append(separator));
+            KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(j)));
+          }
+        }
+        std::shared_ptr<Buffer> string_buffer;
+
+        KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_buffer));
+        KERNEL_ASSIGN_OR_RAISE(auto scalar_right_type, ctx,
+                               MakeScalar<std::shared_ptr<Buffer>>(
+                                   list.value->type(), std::move(string_buffer)));
+        *out = scalar_right_type;
+      }  // do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      const ListArrayType list(batch[0].array());
+      ArrayData* output = out->mutable_array();
+
+      BuilderType builder(ctx->memory_pool());
+      KERNEL_RETURN_IF_ERROR(ctx, builder.Reserve(list.length()));
+      if (batch[1].kind() == Datum::ARRAY) {
+        ArrayType separator_array(batch[1].array());
+        for (int64_t i = 0; i < list.length(); ++i) {
+          const std::shared_ptr<Array> slice = list.value_slice(i);
+          const ArrayType* strings = static_cast<const ArrayType*>(slice.get());
+          if ((strings->null_count() > 0) || (list.IsNull(i)) ||
+              separator_array.IsNull(i)) {
+            KERNEL_RETURN_IF_ERROR(ctx, builder.AppendNull());

Review comment:
       Can be `UnsafeAppendNull`, since you reserved the space above.

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -1828,6 +1828,148 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+// binary join
+
+template <typename Type>
+struct Join {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using ListArrayType = ListArray;
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      const ListScalar& list = checked_cast<const ListScalar&>(*batch[0].scalar());
+      if (!list.is_valid) {
+        return;
+      }
+      if (batch[1].kind() == Datum::SCALAR) {
+        const BaseBinaryScalar& separator_scalar =
+            checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
+        if (!separator_scalar.is_valid) {
+          return;
+        }
+        util::string_view separator(*separator_scalar.value);
+
+        TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+        auto Append = [&](util::string_view value) {
+          return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                                static_cast<offset_type>(value.size()));
+        };
+
+        const ArrayType* strings = static_cast<const ArrayType*>(list.value.get());
+        if (strings->null_count() > 0) {
+          // since the input list is not null, the out datum needs to be assigned to
+          *out = MakeNullScalar(list.value->type());
+          return;
+        }
+        if (strings->length() > 0) {
+          KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(0)));
+          for (int64_t j = 1; j < strings->length(); j++) {
+            KERNEL_RETURN_IF_ERROR(ctx, Append(separator));
+            KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(j)));
+          }
+        }
+        std::shared_ptr<Buffer> string_buffer;
+
+        KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_buffer));
+        KERNEL_ASSIGN_OR_RAISE(auto scalar_right_type, ctx,
+                               MakeScalar<std::shared_ptr<Buffer>>(
+                                   list.value->type(), std::move(string_buffer)));
+        *out = scalar_right_type;
+      }  // do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      const ListArrayType list(batch[0].array());
+      ArrayData* output = out->mutable_array();
+
+      BuilderType builder(ctx->memory_pool());
+      KERNEL_RETURN_IF_ERROR(ctx, builder.Reserve(list.length()));
+      if (batch[1].kind() == Datum::ARRAY) {
+        ArrayType separator_array(batch[1].array());
+        for (int64_t i = 0; i < list.length(); ++i) {
+          const std::shared_ptr<Array> slice = list.value_slice(i);
+          const ArrayType* strings = static_cast<const ArrayType*>(slice.get());
+          if ((strings->null_count() > 0) || (list.IsNull(i)) ||
+              separator_array.IsNull(i)) {
+            KERNEL_RETURN_IF_ERROR(ctx, builder.AppendNull());
+          } else {
+            const auto separator = separator_array.GetView(i);
+            if (strings->length() > 0) {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.Append(strings->GetView(0)));
+              for (int64_t j = 1; j < strings->length(); j++) {
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(separator));
+                KERNEL_RETURN_IF_ERROR(ctx, builder.AppendCurrent(strings->GetView(j)));
+              }
+            } else {
+              KERNEL_RETURN_IF_ERROR(ctx, builder.AppendEmptyValue());
+            }
+          }
+        }
+      } else if (batch[1].kind() == Datum::SCALAR) {
+        const auto& separator_scalar =
+            checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
+        if (!separator_scalar.is_valid) {
+          KERNEL_ASSIGN_OR_RAISE(
+              auto nulls, ctx,
+              MakeArrayOfNull(list.value_type(), list.length(), ctx->memory_pool()));
+          *output = *nulls->data();
+          output->type = list.value_type();
+          return;
+        }
+        util::string_view separator(*separator_scalar.value);
+
+        for (int64_t i = 0; i < list.length(); ++i) {
+          const std::shared_ptr<Array> slice = list.value_slice(i);

Review comment:
       Same comments as above wrt. efficiency.

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -1828,6 +1828,148 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+// binary join
+
+template <typename Type>
+struct Join {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using ListArrayType = ListArray;
+  using offset_type = typename Type::offset_type;
+  using BuilderType = typename TypeTraits<Type>::BuilderType;
+
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      const ListScalar& list = checked_cast<const ListScalar&>(*batch[0].scalar());
+      if (!list.is_valid) {
+        return;
+      }
+      if (batch[1].kind() == Datum::SCALAR) {
+        const BaseBinaryScalar& separator_scalar =
+            checked_cast<const BaseBinaryScalar&>(*batch[1].scalar());
+        if (!separator_scalar.is_valid) {
+          return;
+        }
+        util::string_view separator(*separator_scalar.value);
+
+        TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+        auto Append = [&](util::string_view value) {
+          return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                                static_cast<offset_type>(value.size()));
+        };
+
+        const ArrayType* strings = static_cast<const ArrayType*>(list.value.get());
+        if (strings->null_count() > 0) {
+          // since the input list is not null, the out datum needs to be assigned to
+          *out = MakeNullScalar(list.value->type());
+          return;
+        }
+        if (strings->length() > 0) {
+          KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(0)));
+          for (int64_t j = 1; j < strings->length(); j++) {
+            KERNEL_RETURN_IF_ERROR(ctx, Append(separator));
+            KERNEL_RETURN_IF_ERROR(ctx, Append(strings->GetView(j)));
+          }
+        }
+        std::shared_ptr<Buffer> string_buffer;
+
+        KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_buffer));
+        KERNEL_ASSIGN_OR_RAISE(auto scalar_right_type, ctx,
+                               MakeScalar<std::shared_ptr<Buffer>>(
+                                   list.value->type(), std::move(string_buffer)));
+        *out = scalar_right_type;
+      }  // do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      const ListArrayType list(batch[0].array());
+      ArrayData* output = out->mutable_array();
+
+      BuilderType builder(ctx->memory_pool());
+      KERNEL_RETURN_IF_ERROR(ctx, builder.Reserve(list.length()));
+      if (batch[1].kind() == Datum::ARRAY) {
+        ArrayType separator_array(batch[1].array());
+        for (int64_t i = 0; i < list.length(); ++i) {
+          const std::shared_ptr<Array> slice = list.value_slice(i);

Review comment:
       I think it would be much faster to avoid instantiating this temporary array for each list element.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r644890412



##########
File path: docs/source/cpp/compute.rst
##########
@@ -638,6 +638,23 @@ String extraction
   ``(?P<letter>[ab])(?P<digit>\\d)``.
 
 
+String joining
+~~~~~~~~~~~~~~
+
+This function does the inverse of string splitting.
+
++-----------------+-----------+----------------------+----------------+-------------------+---------+
+| Function name   | Arity     | Input type 1         | Input type 2   | Output type       | Notes   |
++=================+===========+======================+================+===================+=========+
+| binary_join     | Binary    | List of string-like  | String-like    | String-like       | \(1)    |
++-----------------+-----------+----------------------+----------------+-------------------+---------+
+
+* \(1) The first input must be an array, while the second can be a scalar or array.
+  Each list of values in the first input is joined using each second input
+  as separator.  If there is null element in an input list, the corresponding

Review comment:
       ```suggestion
     as separator.  If any input list is null or contains a null, the corresponding
   ```

##########
File path: cpp/src/arrow/array/builder_binary.h
##########
@@ -77,6 +77,23 @@ class BaseBinaryBuilder : public ArrayBuilder {
     return Append(value.data(), static_cast<offset_type>(value.size()));
   }
 
+  /// Append to the last appended value
+  ///
+  /// Unlike Append, this does not create a new offset.
+  Status AppendToCurrent(const uint8_t* value, offset_type length) {

Review comment:
       Nit: I'd prefer if these were not named `Append` since every other `/Append.*/` method *does* modify built array length. Could we name it `UpdateLast(string_view suffix)`?

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();

Review comment:
       ```suggestion
       auto out_scalar = checked_cast<BaseBinaryScalar*>(out.scalar().get());
       return builder.Finish(&out_scalar->value);
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }

Review comment:
       Might be worthwhile to extract this as a helper:
   ```suggestion
         if (ListContainsNull(list, i)) {
           continue;
         }
         const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
         total_data_length += string_offsets[j_end] - string_offsets[j_start];
         total_data_length += (j_end - j_start - 1) * separators.value_length(i);
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());

Review comment:
       The output should already be a correctly typed but uninitialized scalar
   ```suggestion
         out->scalar()->valid = false;
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();

Review comment:
       We don't lose anything for string kernels which always do their own allocation, but it's worth noting that `*out` is always a correctly shaped ArrayData. We could also safely write
   ```suggestion
       RETURN_NOT_OK(builder->FinishInternal(const_cast<std::shared_ptr<ArrayData>*>(&out->array())));
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?

Review comment:
       Unfortunately yes since ScalarFunctions need to support all permutations of input shapes. Otherwise we would break if a projection expression like `binary_join(x, y)` had its first argument simplified to a scalar by partition information (for example).
   
   I think it'd be fine to defer this to a follow up, however
   ```suggestion
         return Status::NotImplemented("'list' was scalar but 'separator' was array");
   ```
   
   Alternatively you could manually broadcast it to an array with MakeArrayFromScalar and write the follow up as "improve performance of"

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {

Review comment:
       Would you mind moving this above the definition of Append?

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();

Review comment:
       Redundant `else` block
   ```suggestion
       DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
       if (batch[1].kind() == Datum::SCALAR) {
         return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
       }
       DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
       return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
   ```

##########
File path: cpp/src/arrow/compute/kernels/test_util.h
##########
@@ -113,6 +113,11 @@ void CheckScalarBinary(std::string func_name, std::shared_ptr<Array> left_input,
                        std::shared_ptr<Array> expected,
                        const FunctionOptions* options = nullptr);
 
+void CheckScalarBinary(std::string func_name, std::shared_ptr<Array> left_input,

Review comment:
       To keep the public overloads to a minimum, these should probably just take `Datum`s. Added https://issues.apache.org/jira/browse/ARROW-12953

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();
+    // Correct the output type based on the input
+    out->mutable_array()->type = list.value_type();
+    return Status::OK();
+  }
+};
+
+const FunctionDoc binary_join_doc(
+    "Join a list of strings together with a `separator` to form a single string",
+    ("Insert `separator` between `list` elements, and concatenate them.\n"
+     "Any null input and any null `list` element emits a null output.\n"),
+    {"list", "separator"});
+
+template <typename ListType>
+void AddBinaryJoinForListType(ScalarFunction* func) {
+  for (const std::shared_ptr<DataType>& ty : BaseBinaryTypes()) {
+    auto exec = GenerateTypeAgnosticVarBinaryBase<BinaryJoin, ListType>(*ty);
+    auto list_ty = std::make_shared<ListType>(ty);
+    DCHECK_OK(
+        func->AddKernel({InputType::Array(list_ty), InputType::Scalar(ty)}, ty, exec));
+    DCHECK_OK(
+        func->AddKernel({InputType::Array(list_ty), InputType::Array(ty)}, ty, exec));
+    DCHECK_OK(
+        func->AddKernel({InputType::Scalar(list_ty), InputType::Scalar(ty)}, ty, exec));

Review comment:
       Since BinaryJoin::Exec is handling dispatch on shape we shouldn't produce multiple kernels
   ```suggestion
       auto exec = GenerateTypeAgnosticVarBinaryBase<BinaryJoin, ListType>(*ty);
       auto list_ty = std::make_shared<ListType>(ty);
       DCHECK_OK(
           func->AddKernel({InputType::Any(list_ty), InputType::Any(ty)}, ty, exec));
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);

Review comment:
       Nit:
   ```suggestion
       const ListArrayType lists(left);
   ```

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();
+    // Correct the output type based on the input
+    out->mutable_array()->type = list.value_type();

Review comment:
       Why would this happen? It seems Exec could `DCHECK(list.value_type()->Equals(separator.type())` or so based on the kernel signatures

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();
+    // Correct the output type based on the input
+    out->mutable_array()->type = list.value_type();

Review comment:
       ah, understood

##########
File path: cpp/src/arrow/array/builder_binary.h
##########
@@ -77,6 +77,23 @@ class BaseBinaryBuilder : public ArrayBuilder {
     return Append(value.data(), static_cast<offset_type>(value.size()));
   }
 
+  /// Append to the last appended value
+  ///
+  /// Unlike Append, this does not create a new offset.
+  Status AppendToCurrent(const uint8_t* value, offset_type length) {

Review comment:
       UpdateLastWithSuffix?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r601484145



##########
File path: cpp/src/arrow/compute/kernels/scalar_string_test.cc
##########
@@ -428,6 +433,26 @@ TYPED_TEST(TestStringKernels, StrptimeDoesNotProvideDefaultOptions) {
   ASSERT_RAISES(Invalid, CallFunction("strptime", {input}));
 }
 
+TYPED_TEST(TestStringKernels, Join) {
+  auto separator = this->scalar("--");
+  CheckScalarBinary(
+      "binary_join", list(this->type()),
+      R"([["a", "bb", "ccc"], [], null, ["dd"], ["eee", null], ["ff", ""]])", separator,
+      this->type(), R"(["a--bb--ccc", "", null, "dd", null, "ff--"])");

Review comment:
       Sorry for the late reply here. Using `fill_null` (with empty string I suppose) would not be *fully* equivalent compared to skipping nulls, I think (assuming `["a", null, "b"]` with separator `"-"`, the result could be `"a--b"` vs `a-b`). 
   
   That said, I am not sure myself what the best / most useful behaviour would be. So it's probably fine to pick whatever behaviour (not sure what's the easiest regarding implementation?) and potentially add an option for it later if there is demand for it (similar to how we are adding an option to the numeric reductions for skipping nulls or not).
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-853781608






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r644952485



##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();
+    // Correct the output type based on the input
+    out->mutable_array()->type = list.value_type();

Review comment:
       Because the "TypeAgnostic" kernel generators dispatch to the physical type executor (e.g. Binary for String).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r645042875



##########
File path: cpp/src/arrow/array/builder_binary.h
##########
@@ -77,6 +77,23 @@ class BaseBinaryBuilder : public ArrayBuilder {
     return Append(value.data(), static_cast<offset_type>(value.size()));
   }
 
+  /// Append to the last appended value
+  ///
+  /// Unlike Append, this does not create a new offset.
+  Status AppendToCurrent(const uint8_t* value, offset_type length) {

Review comment:
       UpdateLastWithSuffix?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou edited a comment on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou edited a comment on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-853781705


   Still a TODO to address (also support large_list inputs).
   
   Edit: done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] maartenbreddels commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
maartenbreddels commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r602143253



##########
File path: cpp/src/arrow/compute/kernels/scalar_string_test.cc
##########
@@ -428,6 +433,26 @@ TYPED_TEST(TestStringKernels, StrptimeDoesNotProvideDefaultOptions) {
   ASSERT_RAISES(Invalid, CallFunction("strptime", {input}));
 }
 
+TYPED_TEST(TestStringKernels, Join) {
+  auto separator = this->scalar("--");
+  CheckScalarBinary(
+      "binary_join", list(this->type()),
+      R"([["a", "bb", "ccc"], [], null, ["dd"], ["eee", null], ["ff", ""]])", separator,
+      this->type(), R"(["a--bb--ccc", "", null, "dd", null, "ff--"])");

Review comment:
       > So it's probably fine to pick whatever behaviour (not sure what's the easiest regarding implementation?) and potentially add an option for it later if there is demand for it (similar to how we are adding an option to the numeric reductions for skipping nulls or not)
   
   Yes, although you could say you'd want a drop_null to implement the `["a", null, "b"]` -> `"a-b"` behavior. So I still think the current behaviour gives the most flexibility (assuming the existence of a drop_null for lists)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r601492842



##########
File path: cpp/src/arrow/compute/kernels/scalar_string_test.cc
##########
@@ -428,6 +433,26 @@ TYPED_TEST(TestStringKernels, StrptimeDoesNotProvideDefaultOptions) {
   ASSERT_RAISES(Invalid, CallFunction("strptime", {input}));
 }
 
+TYPED_TEST(TestStringKernels, Join) {
+  auto separator = this->scalar("--");
+  CheckScalarBinary(
+      "binary_join", list(this->type()),
+      R"([["a", "bb", "ccc"], [], null, ["dd"], ["eee", null], ["ff", ""]])", separator,
+      this->type(), R"(["a--bb--ccc", "", null, "dd", null, "ff--"])");

Review comment:
       Looking at how pandas / R does this right now:
   
   Pandas actually results in a missing value:
   
   ```
   In [7]: s = pd.Series([["a", None, "b"], ["c", "d", "e"]])
   
   In [8]: s.str.join("-")
   Out[8]: 
   0      NaN
   1    c-d-e
   dtype: object
   ```
   
   but I doubt that this is necessarily intentional (when it comes to missing values). The docstring basically says that if there any non-string element in a list, the result is NaN for that list. And I assume this is just because in that case the stdlib `"-".join(..)` which is used under the hood fails.
   
   For R, I am not fully sure what's the most "typical" way to do this, but with the basic `paste`, you actually get the string representation of NA:
   
   ```
   > paste("a", NA, "b", sep="-")
   [1] "a-NA-b"
   ```
   
   (but `paste` is more general in converting input to strings first). 
   Using a more specific string function from `stringr`, it actually results in NA:
   
   ```
   > str_c("a", "b", sep="-")
   [1] "a-b"
   > str_c("a", NA, "b", sep="-")
   [1] NA
   ```
   
   where the docstring explicitly mentions this (*"Like most other R functions, missing values are "infectious": whenever a missing value is combined with another string the result will always be missing"*). So that could be an argument for returning null. But since in Arrow the default is that nulls are *not* infectious, it could also be an argument to skip it ;)
   
   cc @ianmcook 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r551292722



##########
File path: cpp/src/arrow/compute/kernels/scalar_string_test.cc
##########
@@ -428,6 +433,26 @@ TYPED_TEST(TestStringKernels, StrptimeDoesNotProvideDefaultOptions) {
   ASSERT_RAISES(Invalid, CallFunction("strptime", {input}));
 }
 
+TYPED_TEST(TestStringKernels, Join) {
+  auto separator = this->scalar("--");
+  CheckScalarBinary(
+      "binary_join", list(this->type()),
+      R"([["a", "bb", "ccc"], [], null, ["dd"], ["eee", null], ["ff", ""]])", separator,
+      this->type(), R"(["a--bb--ccc", "", null, "dd", null, "ff--"])");

Review comment:
       Another option could also be to skip nulls from the list, instead of skipping the list entirely if there is a null present? 
   (so eg `["eee", null]` could give `"eee"` instead of ``null``? 
   
   (don't really know what is most sensible to do, but generally for aggregations (and you can see a single join as a kind of aggregation?) nulls get skipped instead of propagated)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] maartenbreddels commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
maartenbreddels commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r551295838



##########
File path: cpp/src/arrow/compute/kernels/scalar_string_test.cc
##########
@@ -428,6 +433,26 @@ TYPED_TEST(TestStringKernels, StrptimeDoesNotProvideDefaultOptions) {
   ASSERT_RAISES(Invalid, CallFunction("strptime", {input}));
 }
 
+TYPED_TEST(TestStringKernels, Join) {
+  auto separator = this->scalar("--");
+  CheckScalarBinary(
+      "binary_join", list(this->type()),
+      R"([["a", "bb", "ccc"], [], null, ["dd"], ["eee", null], ["ff", ""]])", separator,
+      this->type(), R"(["a--bb--ccc", "", null, "dd", null, "ff--"])");

Review comment:
       My reasoning was that we can get that behavior using `fill_null`, while the current behavior would otherwise be difficult to get (we don't have any kernel that will replace list value with null if any of its list elements is null). I'm not sure if that's a good reason to have the current behavior though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-809670982


   In SQL it's called `CONCAT`? (https://www.w3schools.com/sql/func_mysql_concat.asp, although this doesn't have the concept of a join separator)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r646632021



##########
File path: cpp/src/arrow/array/builder_binary.h
##########
@@ -77,6 +77,23 @@ class BaseBinaryBuilder : public ArrayBuilder {
     return Append(value.data(), static_cast<offset_type>(value.size()));
   }
 
+  /// Append to the last appended value
+  ///
+  /// Unlike Append, this does not create a new offset.
+  Status AppendToCurrent(const uint8_t* value, offset_type length) {

Review comment:
       SGTM, thank you




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r644950543



##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?

Review comment:
       Hmm, well, a list scalar is already a wrapper around a list array so I should be able to add it in this PR :-)

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();
+    // Correct the output type based on the input
+    out->mutable_array()->type = list.value_type();

Review comment:
       Because the "TypeAgnostic" kernel generators dispatch to the physical type executor (e.g. Binary for String).

##########
File path: cpp/src/arrow/array/builder_binary.h
##########
@@ -77,6 +77,23 @@ class BaseBinaryBuilder : public ArrayBuilder {
     return Append(value.data(), static_cast<offset_type>(value.size()));
   }
 
+  /// Append to the last appended value
+  ///
+  /// Unlike Append, this does not create a new offset.
+  Status AppendToCurrent(const uint8_t* value, offset_type length) {

Review comment:
       Probably, though `UpdateLast` doesn't convey the idea of actually appending to the last value. Any other idea?

##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();

Review comment:
       Much nicer, thank you!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] maartenbreddels commented on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
maartenbreddels commented on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-808268875


   Failures seem unrelated.
   @pitrou this is ready for review assuming @jorisvandenbossche agrees with the current behavior


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #8990:
URL: https://github.com/apache/arrow/pull/8990


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-749537303


   https://issues.apache.org/jira/browse/ARROW-10959


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ianmcook commented on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
ianmcook commented on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-809650002


   Regarding "join" in the name:
   +1 for consistency with Python's join function
   -1 because as Arrow gains more database-like features, the word "join" is likely to confuse people. "concat" might be better.
   
   
   Regarding "binary" in the name:
   IMO that is likely to cause users to get the impression that this is _not_ for strings.
   
   It would also be good for the name to reflect that this operates on a list array.
   
   How about renaming it `list_concat`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-836874807


   @maartenbreddels Do you want to update this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r645035690



##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();
+  }
+
+  // Array, scalar -> array
+  static Status ExecArrayScalar(KernelContext* ctx,
+                                const std::shared_ptr<ArrayData>& left,
+                                const Scalar& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+
+    if (!separator_scalar.is_valid) {
+      ARROW_ASSIGN_OR_RAISE(auto nulls, MakeArrayOfNull(list.value_type(), list.length(),
+                                                        ctx->memory_pool()));
+      *out = *nulls->data();
+      return Status::OK();
+    }
+
+    util::string_view separator(*separator_scalar.value);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = strings.total_values_length();
+    for (int64_t i = 0; i < list.length(); ++i) {
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += (j_end - j_start - 1) * separator.length();
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const util::string_view separator;
+
+      bool IsNull(int64_t i) { return false; }
+      util::string_view GetView(int64_t i) { return separator; }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separator}, &builder, out);
+  }
+
+  // Array, array -> array
+  static Status ExecArrayArray(KernelContext* ctx, const std::shared_ptr<ArrayData>& left,
+                               const std::shared_ptr<ArrayData>& right, Datum* out) {
+    const ListArrayType list(left);
+    const auto& strings = checked_cast<const ArrayType&>(*list.values());
+    const auto list_offsets = list.raw_value_offsets();
+    const auto string_offsets = strings.raw_value_offsets();
+    const ArrayType separators(right);
+
+    BuilderType builder(ctx->memory_pool());
+    RETURN_NOT_OK(builder.Reserve(list.length()));
+
+    // Presize data to avoid multiple reallocations when joining strings
+    int64_t total_data_length = 0;
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (separators.IsNull(i)) {
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (!has_null_string && j_end > j_start) {
+        total_data_length += string_offsets[j_end] - string_offsets[j_start];
+        total_data_length += (j_end - j_start - 1) * separators.value_length(i);
+      }
+    }
+    RETURN_NOT_OK(builder.ReserveData(total_data_length));
+
+    struct SeparatorLookup {
+      const ArrayType& separators;
+
+      bool IsNull(int64_t i) { return separators.IsNull(i); }
+      util::string_view GetView(int64_t i) { return separators.GetView(i); }
+    };
+    return JoinStrings(list, strings, SeparatorLookup{separators}, &builder, out);
+  }
+
+  template <typename SeparatorLookup>
+  static Status JoinStrings(const ListArrayType& list, const ArrayType& strings,
+                            SeparatorLookup&& separators, BuilderType* builder,
+                            Datum* out) {
+    const auto list_offsets = list.raw_value_offsets();
+
+    for (int64_t i = 0; i < list.length(); ++i) {
+      if (list.IsNull(i) || separators.IsNull(i)) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      const auto j_start = list_offsets[i], j_end = list_offsets[i + 1];
+      if (j_start == j_end) {
+        builder->UnsafeAppendEmptyValue();
+        continue;
+      }
+      bool has_null_string = false;
+      for (int64_t j = j_start; !has_null_string && j < j_end; ++j) {
+        has_null_string = strings.IsNull(j);
+      }
+      if (has_null_string) {
+        builder->UnsafeAppendNull();
+        continue;
+      }
+      builder->UnsafeAppend(strings.GetView(j_start));
+      for (int64_t j = j_start + 1; j < j_end; ++j) {
+        builder->UnsafeAppendToCurrent(separators.GetView(i));
+        builder->UnsafeAppendToCurrent(strings.GetView(j));
+      }
+    }
+
+    std::shared_ptr<Array> string_array;
+    RETURN_NOT_OK(builder->Finish(&string_array));
+    *out = *string_array->data();
+    // Correct the output type based on the input
+    out->mutable_array()->type = list.value_type();

Review comment:
       ah, understood




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-853781608






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ianmcook commented on pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
ianmcook commented on pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#issuecomment-809664771


   > I'll just point out that "join" is not a python-ism. There is a string join in Java, Rust, C#, JavaScript, etc. and it is consistently called join. I think R is the only language I can find that doesn't call it "join".
   
   SQL—which probably has more users than all those languages combined 😁


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8990: ARROW-10959: [C++] Add scalar string join kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8990:
URL: https://github.com/apache/arrow/pull/8990#discussion_r644966074



##########
File path: cpp/src/arrow/compute/kernels/scalar_string.cc
##########
@@ -2427,6 +2427,221 @@ void AddUtf8Length(FunctionRegistry* registry) {
   DCHECK_OK(registry->AddFunction(std::move(func)));
 }
 
+template <typename BinaryType, typename ListType>
+struct BinaryJoin {
+  using ArrayType = typename TypeTraits<BinaryType>::ArrayType;
+  using ListArrayType = typename TypeTraits<ListType>::ArrayType;
+  using ListScalarType = typename TypeTraits<ListType>::ScalarType;
+  using BuilderType = typename TypeTraits<BinaryType>::BuilderType;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (batch[0].kind() == Datum::SCALAR) {
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecScalarScalar(ctx, *batch[0].scalar(), *batch[1].scalar(), out);
+      }
+      // XXX do we want to support scalar[list[str]] with array[str] ?
+    } else {
+      DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
+      if (batch[1].kind() == Datum::SCALAR) {
+        return ExecArrayScalar(ctx, batch[0].array(), *batch[1].scalar(), out);
+      }
+      DCHECK_EQ(batch[1].kind(), Datum::ARRAY);
+      return ExecArrayArray(ctx, batch[0].array(), batch[1].array(), out);
+    }
+    return Status::OK();
+  }
+
+  // Scalar, scalar -> scalar
+  static Status ExecScalarScalar(KernelContext* ctx, const Scalar& left,
+                                 const Scalar& right, Datum* out) {
+    const auto& list = checked_cast<const ListScalarType&>(left);
+    const auto& separator_scalar = checked_cast<const BaseBinaryScalar&>(right);
+    if (!list.is_valid || !separator_scalar.is_valid) {
+      return Status::OK();
+    }
+    util::string_view separator(*separator_scalar.value);
+
+    TypedBufferBuilder<uint8_t> builder(ctx->memory_pool());
+    auto Append = [&](util::string_view value) {
+      return builder.Append(reinterpret_cast<const uint8_t*>(value.data()),
+                            static_cast<int64_t>(value.size()));
+    };
+
+    const auto& strings = checked_cast<const ArrayType&>(*list.value);
+    if (strings.null_count() > 0) {
+      // Since the input list is not null, the out datum needs to be assigned to
+      *out = MakeNullScalar(list.value->type());
+      return Status::OK();
+    }
+    if (strings.length() > 0) {
+      auto data_length =
+          strings.total_values_length() + (strings.length() - 1) * separator.length();
+      RETURN_NOT_OK(builder.Reserve(data_length));
+      RETURN_NOT_OK(Append(strings.GetView(0)));
+      for (int64_t j = 1; j < strings.length(); j++) {
+        RETURN_NOT_OK(Append(separator));
+        RETURN_NOT_OK(Append(strings.GetView(j)));
+      }
+    }
+    std::shared_ptr<Buffer> string_buffer;
+    RETURN_NOT_OK(builder.Finish(&string_buffer));
+    ARROW_ASSIGN_OR_RAISE(auto joined, MakeScalar<std::shared_ptr<Buffer>>(
+                                           list.value->type(), std::move(string_buffer)));
+    *out = std::move(joined);
+    return Status::OK();

Review comment:
       Much nicer, thank you!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org