You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/11/13 14:29:11 UTC

[2/2] kudu git commit: [java] Upgrade to Spark 2.4

[java] Upgrade to Spark 2.4

In Spark 2.4 spark-avro is now a part of Spark itself.
This change migrates the Kudu spark-avro
dependencies and adds a test to ensure that
the functionality does not break.

Change-Id: Id1f8de543276c4dc82a57c4a2228ae2374f2d87f
Reviewed-on: http://gerrit.cloudera.org:8080/11912
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Hao Hao <ha...@cloudera.com>
Tested-by: Grant Henke <gr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c136efe4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c136efe4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c136efe4

Branch: refs/heads/master
Commit: c136efe47a0ebacebe6774701ffa06fc9d41a189
Parents: 51736f3
Author: Grant Henke <gr...@apache.org>
Authored: Thu Nov 8 16:04:28 2018 -0600
Committer: Grant Henke <gr...@apache.org>
Committed: Tue Nov 13 14:28:37 2018 +0000

----------------------------------------------------------------------
 java/gradle/dependencies.gradle                 |   5 +-
 .../spark/tools/TestImportExportFiles.scala     | 117 +++++++++++++++----
 2 files changed, 96 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c136efe4/java/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index 52a5079..71cfe16 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -53,8 +53,7 @@ versions += [
     scalatest      : "3.0.5",
     scopt          : "3.7.0",
     slf4j          : "1.7.25",
-    spark          : "2.3.2",
-    sparkAvro      : "4.0.0",
+    spark          : "2.4.0",
     spotBugs       : "3.1.6",
     yetus          : "0.8.0"
 ]
@@ -104,7 +103,7 @@ libs += [
     scopt                : "com.github.scopt:scopt_$versions.scalaBase:$versions.scopt",
     slf4jApi             : "org.slf4j:slf4j-api:$versions.slf4j",
     slf4jLog4j12         : "org.slf4j:slf4j-log4j12:$versions.slf4j",
-    sparkAvro            : "com.databricks:spark-avro_$versions.scalaBase:$versions.sparkAvro",
+    sparkAvro            : "org.apache.spark:spark-avro_$versions.scalaBase:$versions.spark",
     sparkCore            : "org.apache.spark:spark-core_$versions.scalaBase:$versions.spark",
     sparkSql             : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark",
     sparkSqlTest         : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark:tests",

http://git-wip-us.apache.org/repos/asf/kudu/blob/c136efe4/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
index 040c810..6034568 100644
--- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
+++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
@@ -17,14 +17,18 @@
 
 package org.apache.kudu.spark.tools
 
+import java.io.File
+import java.nio.file.Files
 import java.nio.file.Paths
 
 import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
 import org.apache.kudu.Schema
 import org.apache.kudu.Type
 import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.client.KuduTable
 import org.apache.kudu.spark.kudu._
 import org.junit.Assert._
+import org.junit.Before
 import org.junit.Test
 import org.spark_project.guava.collect.ImmutableList
 
@@ -32,31 +36,34 @@ import scala.collection.JavaConverters._
 
 class TestImportExportFiles extends KuduTestSuite {
 
-  private val TABLE_NAME: String = "TestImportExportFiles"
-  private val TABLE_DATA_PATH: String = "/TestImportExportFiles.csv"
+  private val TableDataPath = "/TestImportExportFiles.csv"
+  private val TableName = "TestImportExportFiles"
+  private val TableSchema = {
+    val columns = ImmutableList.of(
+      new ColumnSchemaBuilder("key", Type.STRING).key(true).build(),
+      new ColumnSchemaBuilder("column1_i", Type.STRING).build(),
+      new ColumnSchemaBuilder("column2_d", Type.STRING)
+        .nullable(true)
+        .build(),
+      new ColumnSchemaBuilder("column3_s", Type.STRING).build(),
+      new ColumnSchemaBuilder("column4_b", Type.STRING).build()
+    )
+    new Schema(columns)
+  }
+  private val options = new CreateTableOptions()
+    .setRangePartitionColumns(List("key").asJava)
+    .setNumReplicas(1)
 
-  @Test
-  def testSparkImportExport() {
-    val schema: Schema = {
-      val columns = ImmutableList.of(
-        new ColumnSchemaBuilder("key", Type.STRING).key(true).build(),
-        new ColumnSchemaBuilder("column1_i", Type.STRING).build(),
-        new ColumnSchemaBuilder("column2_d", Type.STRING)
-          .nullable(true)
-          .build(),
-        new ColumnSchemaBuilder("column3_s", Type.STRING).build(),
-        new ColumnSchemaBuilder("column4_b", Type.STRING).build()
-      )
-      new Schema(columns)
-    }
-    val tableOptions = new CreateTableOptions()
-      .setRangePartitionColumns(List("key").asJava)
-      .setNumReplicas(1)
-    kuduClient.createTable(TABLE_NAME, schema, tableOptions)
+  @Before
+  def setUp(): Unit = {
+    kuduClient.createTable(TableName, TableSchema, options)
+  }
 
+  @Test
+  def testCSVImport() {
     // Get the absolute path of the resource file.
     val schemaResource =
-      classOf[TestImportExportFiles].getResource(TABLE_DATA_PATH)
+      classOf[TestImportExportFiles].getResource(TableDataPath)
     val dataPath = Paths.get(schemaResource.toURI).toAbsolutePath
 
     ImportExportFiles.testMain(
@@ -65,15 +72,79 @@ class TestImportExportFiles extends KuduTestSuite {
         "--format=csv",
         s"--master-addrs=${harness.getMasterAddressesAsString}",
         s"--path=$dataPath",
-        s"--table-name=$TABLE_NAME",
+        s"--table-name=$TableName",
         "--delimiter=,",
         "--header=true",
         "--inferschema=true"
       ),
       ss
     )
-    val rdd = kuduContext.kuduRDD(ss.sparkContext, TABLE_NAME, List("key"))
+    val rdd = kuduContext.kuduRDD(ss.sparkContext, TableName, List("key"))
     assert(rdd.collect.length == 4)
     assertEquals(rdd.collect().mkString(","), "[1],[2],[3],[4]")
   }
+
+  @Test
+  def testRoundTrips(): Unit = {
+    val table = kuduClient.openTable(TableName)
+    loadSampleData(table, 50)
+    runRoundTripTest(TableName, s"$TableName-avro", "avro")
+    runRoundTripTest(TableName, s"$TableName-csv", "csv")
+    runRoundTripTest(TableName, s"$TableName-parquet", "parquet")
+  }
+
+  // TODO(KUDU-2454): Use random schemas and random data to ensure all type/values round-trip.
+  private def loadSampleData(table: KuduTable, numRows: Int): Unit = {
+    val session = kuduClient.newSession()
+    val rows = Range(0, numRows).map { i =>
+      val insert = table.newInsert
+      val row = insert.getRow
+      row.addString(0, i.toString)
+      row.addString(1, i.toString)
+      row.addString(3, i.toString)
+      row.addString(4, i.toString)
+      session.apply(insert)
+    }
+    session.close
+  }
+
+  private def runRoundTripTest(fromTable: String, toTable: String, format: String): Unit = {
+    val dir = Files.createTempDirectory("round-trip")
+    val path = new File(dir.toFile, s"$fromTable-$format").getAbsolutePath
+
+    // Export the data.
+    ImportExportFiles.testMain(
+      Array(
+        "--operation=export",
+        s"--format=$format",
+        s"--master-addrs=${harness.getMasterAddressesAsString}",
+        s"--path=$path",
+        s"--table-name=$fromTable",
+        s"--header=true"
+      ),
+      ss
+    )
+
+    // Create the target table.
+    kuduClient.createTable(toTable, TableSchema, options)
+
+    // Import the data.
+    ImportExportFiles.testMain(
+      Array(
+        "--operation=import",
+        s"--format=$format",
+        s"--master-addrs=${harness.getMasterAddressesAsString}",
+        s"--path=$path",
+        s"--table-name=$toTable",
+        s"--header=true"
+      ),
+      ss
+    )
+
+    // Verify the tables match.
+    // TODO(KUDU-2454): Verify every value to ensure all values round trip.
+    val rdd1 = kuduContext.kuduRDD(ss.sparkContext, fromTable, List("key"))
+    val rdd2 = kuduContext.kuduRDD(ss.sparkContext, toTable, List("key"))
+    assertResult(rdd1.count())(rdd2.count())
+  }
 }