You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/09/26 15:46:11 UTC
kudu git commit: Supporting Spark streaming DataFrame in KuduContext.
Repository: kudu
Updated Branches:
refs/heads/master 9130bb0e1 -> 8020cbf27
Supporting Spark streaming DataFrame in KuduContext.
KUDU-2539: Supporting Spark streaming DataFrame in KuduContext.
This solution follows the way how other sinks ie. KafkaSink
is implemented, for details see
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala#L87
Where on the DataFrame a queryExecution.toRdd.foreachPartition
is called to access the InternalRows which mapped to Rows by Catalyst
converters.
Change-Id: Iead04539d3514920a5d6803c34715e5686124572
Reviewed-on: http://gerrit.cloudera.org:8080/11199
Tested-by: Kudu Jenkins
Reviewed-by: Attila Bukor <ab...@apache.org>
Reviewed-by: Grant Henke <gr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8020cbf2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8020cbf2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8020cbf2
Branch: refs/heads/master
Commit: 8020cbf2760483c46ed0766dfdebe3c12d0107f1
Parents: 9130bb0
Author: attilapiros <pi...@gmail.com>
Authored: Fri Aug 10 07:49:16 2018 -0700
Committer: Grant Henke <gr...@apache.org>
Committed: Wed Sep 26 15:45:21 2018 +0000
----------------------------------------------------------------------
java/gradle/dependencies.gradle | 2 +
java/kudu-spark/build.gradle | 4 +-
java/kudu-spark/pom.xml | 17 +++-
.../apache/kudu/spark/kudu/KuduContext.scala | 12 ++-
.../apache/kudu/spark/kudu/StreamingTest.scala | 97 ++++++++++++++++++++
5 files changed, 126 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index 5384599..252769a 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -98,6 +98,7 @@ libs += [
protobufJavaUtil : "com.google.protobuf:protobuf-java-util:$versions.protobuf",
protoc : "com.google.protobuf:protoc:$versions.protobuf",
scalaLibrary : "org.scala-lang:scala-library:$versions.scala",
+ scalap : "org.scala-lang:scalap:$versions.scala",
scalatest : "org.scalatest:scalatest_$versions.scalaBase:$versions.scalatest",
scopt : "com.github.scopt:scopt_$versions.scalaBase:$versions.scopt",
slf4jApi : "org.slf4j:slf4j-api:$versions.slf4j",
@@ -105,5 +106,6 @@ libs += [
sparkAvro : "com.databricks:spark-avro_$versions.scalaBase:$versions.sparkAvro",
sparkCore : "org.apache.spark:spark-core_$versions.scalaBase:$versions.spark",
sparkSql : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark",
+ sparkSqlTest : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark:tests",
yetusAnnotations : "org.apache.yetus:audience-annotations:$versions.yetus"
]
http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/kudu-spark/build.gradle
----------------------------------------------------------------------
diff --git a/java/kudu-spark/build.gradle b/java/kudu-spark/build.gradle
index c723902..aa2c774 100644
--- a/java/kudu-spark/build.gradle
+++ b/java/kudu-spark/build.gradle
@@ -24,6 +24,7 @@ dependencies {
compile libs.yetusAnnotations
provided libs.scalaLibrary
+ provided libs.scalap
provided libs.sparkCore
provided libs.sparkSql
provided libs.slf4jApi
@@ -32,7 +33,8 @@ dependencies {
testCompile project(path: ":kudu-client", configuration: "shadowTest")
testCompile libs.junit
testCompile libs.scalatest
+ testCompile libs.sparkSqlTest
}
// Adjust the artifact name to include the spark and scala base versions.
-archivesBaseName = "kudu-spark${versions.sparkBase}_${versions.scalaBase}"
\ No newline at end of file
+archivesBaseName = "kudu-spark${versions.sparkBase}_${versions.scalaBase}"
http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/kudu-spark/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-spark/pom.xml b/java/kudu-spark/pom.xml
index 7665c2e..07045ae 100644
--- a/java/kudu-spark/pom.xml
+++ b/java/kudu-spark/pom.xml
@@ -67,6 +67,12 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scalap</artifactId>
+ <version>${scala.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
@@ -79,8 +85,15 @@
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
- </dependency>
- <dependency>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index b0fb257..a9975b6 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -20,6 +20,8 @@ package org.apache.kudu.spark.kudu
import java.security.AccessController
import java.security.PrivilegedAction
+import java.sql.Timestamp
+
import javax.security.auth.Subject
import javax.security.auth.login.AppConfigurationEntry
import javax.security.auth.login.Configuration
@@ -36,6 +38,8 @@ import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.util.AccumulatorV2
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
@@ -298,7 +302,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
val schema = data.schema
// Get the client's last propagated timestamp on the driver.
val lastPropagatedTimestamp = syncClient.getLastPropagatedTimestamp
- data.foreachPartition(iterator => {
+ data.queryExecution.toRdd.foreachPartition(iterator => {
val pendingErrors = writePartitionRows(
iterator,
schema,
@@ -317,7 +321,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
}
private def writePartitionRows(
- rows: Iterator[Row],
+ rows: Iterator[InternalRow],
schema: StructType,
tableName: String,
operationType: OperationType,
@@ -334,8 +338,10 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
val session: KuduSession = syncClient.newSession
session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
session.setIgnoreAllDuplicateRows(writeOptions.ignoreDuplicateRowErrors)
+ val typeConverter = CatalystTypeConverters.createToScalaConverter(schema)
try {
- for (row <- rows) {
+ for (internalRow <- rows) {
+ val row = typeConverter(internalRow).asInstanceOf[Row]
val operation = operationType.operation(table)
for ((sparkIdx, kuduIdx) <- indices) {
if (row.isNullAt(sparkIdx)) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/8020cbf2/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala
new file mode 100644
index 0000000..246d00e
--- /dev/null
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/StreamingTest.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kudu.spark.kudu
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.StreamSinkProvider
+import org.apache.spark.sql.streaming.OutputMode
+import org.junit.Before
+import org.junit.Test
+
+class StreamingTest extends KuduTestSuite {
+
+ implicit var sqlContext: SQLContext = _
+ var kuduOptions: Map[String, String] = _
+
+ @Before
+ def setUp(): Unit = {
+ sqlContext = ss.sqlContext
+ kuduOptions =
+ Map("kudu.table" -> simpleTableName, "kudu.master" -> miniCluster.getMasterAddresses)
+ }
+
+ @Test
+ def testKuduContextWithSparkStreaming() {
+ val spark = ss
+ import spark.implicits._
+ val checkpointDir = java.nio.file.Files.createTempDirectory("spark_kudu")
+ val input = MemoryStream[Int]
+ val query = input
+ .toDS()
+ .map(v => (v + 1, v.toString))
+ .toDF("key", "val")
+ .writeStream
+ .format(classOf[KuduSinkProvider].getCanonicalName)
+ .option("kudu.master", miniCluster.getMasterAddresses)
+ .option("kudu.table", simpleTableName)
+ .option("checkpointLocation", checkpointDir.toFile.getCanonicalPath)
+ .outputMode(OutputMode.Update)
+ .start()
+
+ def verifyOutput(expectedData: Seq[(Int, String)]): Unit = {
+ val df = sqlContext.read.options(kuduOptions).kudu
+ val actual = df.rdd
+ .map { row =>
+ (row.get(0), row.getString(1))
+ }
+ .collect()
+ .toSet
+ assert(actual === expectedData.toSet)
+ }
+ input.addData(1, 2, 3)
+ query.processAllAvailable()
+ verifyOutput(expectedData = Seq((2, "1"), (3, "2"), (4, "3")))
+ query.stop()
+ }
+}
+
+class KuduSinkProvider extends StreamSinkProvider with DataSourceRegister {
+
+ override def createSink(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ partitionColumns: Seq[String],
+ outputMode: OutputMode): Sink =
+ new KuduSink(sqlContext, parameters)
+
+ override def shortName(): String = "kudu"
+}
+
+class KuduSink(sqlContext: SQLContext, parameters: Map[String, String]) extends Sink {
+
+ private val kuduContext =
+ new KuduContext(parameters("kudu.master"), sqlContext.sparkContext)
+
+ private val tablename = parameters("kudu.table")
+
+ override def addBatch(batchId: Long, data: DataFrame): Unit = {
+ kuduContext.upsertRows(data, tablename)
+ }
+}