You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/09/26 15:46:11 UTC

kudu git commit: Supporting Spark streaming DataFrame in KuduContext.

Repository: kudu
Updated Branches:
  refs/heads/master 9130bb0e1 -> 8020cbf27


Supporting Spark streaming DataFrame in KuduContext.

KUDU-2539: Supporting Spark streaming DataFrame in KuduContext.

This solution follows the way how other sinks ie. KafkaSink
is implemented, for details see
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala#L87

Where on the DataFrame a queryExecution.toRdd.foreachPartition
is called to access the InternalRows which mapped to Rows by Catalyst
converters.

Change-Id: Iead04539d3514920a5d6803c34715e5686124572
Reviewed-on: http://gerrit.cloudera.org:8080/11199
Tested-by: Kudu Jenkins
Reviewed-by: Attila Bukor <ab...@apache.org>
Reviewed-by: Grant Henke <gr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8020cbf2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8020cbf2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8020cbf2

Branch: refs/heads/master
Commit: 8020cbf2760483c46ed0766dfdebe3c12d0107f1
Parents: 9130bb0
Author: attilapiros <pi...@gmail.com>
Authored: Fri Aug 10 07:49:16 2018 -0700
Committer: Grant Henke <gr...@apache.org>
Committed: Wed Sep 26 15:45:21 2018 +0000

----------------------------------------------------------------------
 java/gradle/dependencies.gradle                 |  2 +
 java/kudu-spark/build.gradle                    |  4 +-
 java/kudu-spark/pom.xml                         | 17 +++-
 .../apache/kudu/spark/kudu/KuduContext.scala    | 12 ++-
 .../apache/kudu/spark/kudu/StreamingTest.scala  | 97 ++++++++++++++++++++
 5 files changed, 126 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index 5384599..252769a 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -98,6 +98,7 @@ libs += [
     protobufJavaUtil     : "com.google.protobuf:protobuf-java-util:$versions.protobuf",
     protoc               : "com.google.protobuf:protoc:$versions.protobuf",
     scalaLibrary         : "org.scala-lang:scala-library:$versions.scala",
+    scalap               : "org.scala-lang:scalap:$versions.scala",
     scalatest            : "org.scalatest:scalatest_$versions.scalaBase:$versions.scalatest",
     scopt                : "com.github.scopt:scopt_$versions.scalaBase:$versions.scopt",
     slf4jApi             : "org.slf4j:slf4j-api:$versions.slf4j",
@@ -105,5 +106,6 @@ libs += [
     sparkAvro            : "com.databricks:spark-avro_$versions.scalaBase:$versions.sparkAvro",
     sparkCore            : "org.apache.spark:spark-core_$versions.scalaBase:$versions.spark",
     sparkSql             : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark",
+    sparkSqlTest         : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark:tests",
     yetusAnnotations     : "org.apache.yetus:audience-annotations:$versions.yetus"
 ]

http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/kudu-spark/build.gradle
----------------------------------------------------------------------
diff --git a/java/kudu-spark/build.gradle b/java/kudu-spark/build.gradle
index c723902..aa2c774 100644
--- a/java/kudu-spark/build.gradle
+++ b/java/kudu-spark/build.gradle
@@ -24,6 +24,7 @@ dependencies {
   compile libs.yetusAnnotations
 
   provided libs.scalaLibrary
+  provided libs.scalap
   provided libs.sparkCore
   provided libs.sparkSql
   provided libs.slf4jApi
@@ -32,7 +33,8 @@ dependencies {
   testCompile project(path: ":kudu-client", configuration: "shadowTest")
   testCompile libs.junit
   testCompile libs.scalatest
+  testCompile libs.sparkSqlTest
 }
 
 // Adjust the artifact name to include the spark and scala base versions.
-archivesBaseName = "kudu-spark${versions.sparkBase}_${versions.scalaBase}"
\ No newline at end of file
+archivesBaseName = "kudu-spark${versions.sparkBase}_${versions.scalaBase}"

http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/kudu-spark/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-spark/pom.xml b/java/kudu-spark/pom.xml
index 7665c2e..07045ae 100644
--- a/java/kudu-spark/pom.xml
+++ b/java/kudu-spark/pom.xml
@@ -67,6 +67,12 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scalap</artifactId>
+            <version>${scala.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <version>${slf4j.version}</version>
@@ -79,8 +85,15 @@
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
-        </dependency>
-        <dependency>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+          </dependency>
+          <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>${junit.version}</version>

http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index b0fb257..a9975b6 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -20,6 +20,8 @@ package org.apache.kudu.spark.kudu
 import java.security.AccessController
 import java.security.PrivilegedAction
 
+import java.sql.Timestamp
+
 import javax.security.auth.Subject
 import javax.security.auth.login.AppConfigurationEntry
 import javax.security.auth.login.Configuration
@@ -36,6 +38,8 @@ import org.apache.spark.sql.types.DecimalType
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.util.AccumulatorV2
 import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
@@ -298,7 +302,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
     val schema = data.schema
     // Get the client's last propagated timestamp on the driver.
     val lastPropagatedTimestamp = syncClient.getLastPropagatedTimestamp
-    data.foreachPartition(iterator => {
+    data.queryExecution.toRdd.foreachPartition(iterator => {
       val pendingErrors = writePartitionRows(
         iterator,
         schema,
@@ -317,7 +321,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
   }
 
   private def writePartitionRows(
-      rows: Iterator[Row],
+      rows: Iterator[InternalRow],
       schema: StructType,
       tableName: String,
       operationType: OperationType,
@@ -334,8 +338,10 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
     val session: KuduSession = syncClient.newSession
     session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
     session.setIgnoreAllDuplicateRows(writeOptions.ignoreDuplicateRowErrors)
+    val typeConverter = CatalystTypeConverters.createToScalaConverter(schema)
     try {
-      for (row <- rows) {
+      for (internalRow <- rows) {
+        val row = typeConverter(internalRow).asInstanceOf[Row]
         val operation = operationType.operation(table)
         for ((sparkIdx, kuduIdx) <- indices) {
           if (row.isNullAt(sparkIdx)) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala
new file mode 100644
index 0000000..246d00e
--- /dev/null
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kudu.spark.kudu
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.StreamSinkProvider
+import org.apache.spark.sql.streaming.OutputMode
+import org.junit.Before
+import org.junit.Test
+
+class StreamingTest extends KuduTestSuite {
+
+  implicit var sqlContext: SQLContext = _
+  var kuduOptions: Map[String, String] = _
+
+  @Before
+  def setUp(): Unit = {
+    sqlContext = ss.sqlContext
+    kuduOptions =
+      Map("kudu.table" -> simpleTableName, "kudu.master" -> miniCluster.getMasterAddresses)
+  }
+
+  @Test
+  def testKuduContextWithSparkStreaming() {
+    val spark = ss
+    import spark.implicits._
+    val checkpointDir = java.nio.file.Files.createTempDirectory("spark_kudu")
+    val input = MemoryStream[Int]
+    val query = input
+      .toDS()
+      .map(v => (v + 1, v.toString))
+      .toDF("key", "val")
+      .writeStream
+      .format(classOf[KuduSinkProvider].getCanonicalName)
+      .option("kudu.master", miniCluster.getMasterAddresses)
+      .option("kudu.table", simpleTableName)
+      .option("checkpointLocation", checkpointDir.toFile.getCanonicalPath)
+      .outputMode(OutputMode.Update)
+      .start()
+
+    def verifyOutput(expectedData: Seq[(Int, String)]): Unit = {
+      val df = sqlContext.read.options(kuduOptions).kudu
+      val actual = df.rdd
+        .map { row =>
+          (row.get(0), row.getString(1))
+        }
+        .collect()
+        .toSet
+      assert(actual === expectedData.toSet)
+    }
+    input.addData(1, 2, 3)
+    query.processAllAvailable()
+    verifyOutput(expectedData = Seq((2, "1"), (3, "2"), (4, "3")))
+    query.stop()
+  }
+}
+
+class KuduSinkProvider extends StreamSinkProvider with DataSourceRegister {
+
+  override def createSink(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      partitionColumns: Seq[String],
+      outputMode: OutputMode): Sink =
+    new KuduSink(sqlContext, parameters)
+
+  override def shortName(): String = "kudu"
+}
+
+class KuduSink(sqlContext: SQLContext, parameters: Map[String, String]) extends Sink {
+
+  private val kuduContext =
+    new KuduContext(parameters("kudu.master"), sqlContext.sparkContext)
+
+  private val tablename = parameters("kudu.table")
+
+  override def addBatch(batchId: Long, data: DataFrame): Unit = {
+    kuduContext.upsertRows(data, tablename)
+  }
+}