You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/06 01:59:51 UTC

[incubator-seatunnel] branch dev updated: Fix the data output exception when accessing Hive using Spark JDBC So… (#2085)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 67613d1eb Fix the data output exception when accessing Hive using Spark JDBC So… (#2085)
67613d1eb is described below

commit 67613d1eb4edf4ce3889912bec45ced2b381150f
Author: Bingz2 <32...@users.noreply.github.com>
AuthorDate: Wed Jul 6 09:59:46 2022 +0800

    Fix the data output exception when accessing Hive using Spark JDBC So… (#2085)
    
    * Fix the data output exception when accessing Hive using Spark JDBC Source
    
    * add License
    
    * format code
    
    Co-authored-by: zhaobingzhi <zh...@gitv.cn>
---
 .../apache/seatunnel/spark/jdbc/source/Jdbc.scala  |  7 ++++-
 .../spark/jdbc/source/util/HiveDialet.scala        | 34 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
index 6648e019c..df3d8c983 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/Jdbc.scala
@@ -18,11 +18,12 @@ package org.apache.seatunnel.spark.jdbc.source
 
 import scala.collection.JavaConversions._
 import scala.util.{Failure, Success, Try}
-
 import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSource
+import org.apache.seatunnel.spark.jdbc.source.util.HiveDialet
+import org.apache.spark.sql.jdbc.JdbcDialects
 import org.apache.spark.sql.{DataFrameReader, Dataset, Row, SparkSession}
 
 class Jdbc extends SparkBatchSource {
@@ -58,6 +59,10 @@ class Jdbc extends SparkBatchSource {
       case Failure(_) => // do nothing
     }
 
+    if (config.getString("url").startsWith("jdbc:hive2")) {
+      JdbcDialects.registerDialect(new HiveDialet)
+    }
+
     reader
   }
 
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialet.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialet.scala
new file mode 100644
index 000000000..c686cdfba
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/src/main/scala/org/apache/seatunnel/spark/jdbc/source/util/HiveDialet.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.spark.jdbc.source.util
+
+import org.apache.spark.sql.jdbc.JdbcDialect
+
+class HiveDialet extends JdbcDialect {
+  override def canHandle(url: String): Boolean = {
+    url.startsWith("jdbc:hive2")
+  }
+
+  override def quoteIdentifier(colName: String): String = {
+    if (colName.contains(".")) {
+      val colName1 = colName.substring(colName.indexOf(".") + 1)
+      s"`$colName1`"
+    } else {
+      s"`$colName`"
+    }
+  }
+}