You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/19 20:54:11 UTC

[incubator-seatunnel] branch dev updated: [Bug][connectors] fix bug Spark "WrappedArray$ofRef cannot be cast to java.sql.Array" (#1711)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f4909f08 [Bug][connectors] fix bug Spark "WrappedArray$ofRef cannot be cast to java.sql.Array" (#1711)
f4909f08 is described below

commit f4909f08f95cbe879e83ca90c863d518282bd523
Author: v-wx-v <39...@users.noreply.github.com>
AuthorDate: Wed Apr 20 04:54:05 2022 +0800

    [Bug][connectors] fix bug Spark "WrappedArray$ofRef cannot be cast to java.sql.Array" (#1711)
    
    * fix bug Spark "WrappedArray$ofRef cannot be cast to java.sql.Array"
---
 .../org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala    | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
index bb7dd890..0719ebb3 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
@@ -37,9 +37,9 @@ import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.seatunnel.spark.clickhouse.Config.{BULK_SIZE, DATABASE, FIELDS, HOST, PASSWORD, RETRY, RETRY_CODES, SHARDING_KEY, SPLIT_MODE, TABLE, USERNAME}
 import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{Shard, acceptedClickHouseSchema, distributedEngine, getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection, getClusterShardList, getDefaultValue, getRowShard}
 import org.apache.spark.sql.{Dataset, Row}
-import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseConnectionImpl, ClickHousePreparedStatementImpl}
+import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseArray, ClickHouseConnectionImpl, ClickHousePreparedStatementImpl}
 import ru.yandex.clickhouse.except.ClickHouseException
-
+import ru.yandex.clickhouse.domain.ClickHouseDataType
 import java.nio.ByteBuffer
 import java.util.concurrent.ThreadLocalRandom
 import java.util.concurrent.atomic.AtomicLong
@@ -254,7 +254,8 @@ class Clickhouse extends SparkBatchSink {
             statement.setDouble(index + 1, value.asInstanceOf[Double])
         }
       case Clickhouse.arrayPattern(_) =>
-        statement.setArray(index + 1, item.getAs[java.sql.Array](fieldIndex))
+        val value = item.getAs[Seq[Any]](fieldIndex).toArray
+        statement.setArray(index + 1, new ClickHouseArray(ClickHouseDataType.String, value))
       case "Decimal" => statement.setBigDecimal(index + 1, item.getAs[BigDecimal](fieldIndex))
       case _ => statement.setString(index + 1, item.getAs[String](fieldIndex))
     }