You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/11/13 14:29:11 UTC
[2/2] kudu git commit: [java] Upgrade to Spark 2.4
[java] Upgrade to Spark 2.4
In Spark 2.4 spark-avro is now a part of Spark itself.
This change migrates the Kudu spark-avro
dependencies and adds a test to ensure that
the functionality does not break.
Change-Id: Id1f8de543276c4dc82a57c4a2228ae2374f2d87f
Reviewed-on: http://gerrit.cloudera.org:8080/11912
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Hao Hao <ha...@cloudera.com>
Tested-by: Grant Henke <gr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c136efe4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c136efe4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c136efe4
Branch: refs/heads/master
Commit: c136efe47a0ebacebe6774701ffa06fc9d41a189
Parents: 51736f3
Author: Grant Henke <gr...@apache.org>
Authored: Thu Nov 8 16:04:28 2018 -0600
Committer: Grant Henke <gr...@apache.org>
Committed: Tue Nov 13 14:28:37 2018 +0000
----------------------------------------------------------------------
java/gradle/dependencies.gradle | 5 +-
.../spark/tools/TestImportExportFiles.scala | 117 +++++++++++++++----
2 files changed, 96 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/c136efe4/java/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index 52a5079..71cfe16 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -53,8 +53,7 @@ versions += [
scalatest : "3.0.5",
scopt : "3.7.0",
slf4j : "1.7.25",
- spark : "2.3.2",
- sparkAvro : "4.0.0",
+ spark : "2.4.0",
spotBugs : "3.1.6",
yetus : "0.8.0"
]
@@ -104,7 +103,7 @@ libs += [
scopt : "com.github.scopt:scopt_$versions.scalaBase:$versions.scopt",
slf4jApi : "org.slf4j:slf4j-api:$versions.slf4j",
slf4jLog4j12 : "org.slf4j:slf4j-log4j12:$versions.slf4j",
- sparkAvro : "com.databricks:spark-avro_$versions.scalaBase:$versions.sparkAvro",
+ sparkAvro : "org.apache.spark:spark-avro_$versions.scalaBase:$versions.spark",
sparkCore : "org.apache.spark:spark-core_$versions.scalaBase:$versions.spark",
sparkSql : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark",
sparkSqlTest : "org.apache.spark:spark-sql_$versions.scalaBase:$versions.spark:tests",
http://git-wip-us.apache.org/repos/asf/kudu/blob/c136efe4/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
index 040c810..6034568 100644
--- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
+++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
@@ -17,14 +17,18 @@
package org.apache.kudu.spark.tools
+import java.io.File
+import java.nio.file.Files
import java.nio.file.Paths
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
import org.apache.kudu.Schema
import org.apache.kudu.Type
import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.client.KuduTable
import org.apache.kudu.spark.kudu._
import org.junit.Assert._
+import org.junit.Before
import org.junit.Test
import org.spark_project.guava.collect.ImmutableList
@@ -32,31 +36,34 @@ import scala.collection.JavaConverters._
class TestImportExportFiles extends KuduTestSuite {
- private val TABLE_NAME: String = "TestImportExportFiles"
- private val TABLE_DATA_PATH: String = "/TestImportExportFiles.csv"
+ private val TableDataPath = "/TestImportExportFiles.csv"
+ private val TableName = "TestImportExportFiles"
+ private val TableSchema = {
+ val columns = ImmutableList.of(
+ new ColumnSchemaBuilder("key", Type.STRING).key(true).build(),
+ new ColumnSchemaBuilder("column1_i", Type.STRING).build(),
+ new ColumnSchemaBuilder("column2_d", Type.STRING)
+ .nullable(true)
+ .build(),
+ new ColumnSchemaBuilder("column3_s", Type.STRING).build(),
+ new ColumnSchemaBuilder("column4_b", Type.STRING).build()
+ )
+ new Schema(columns)
+ }
+ private val options = new CreateTableOptions()
+ .setRangePartitionColumns(List("key").asJava)
+ .setNumReplicas(1)
- @Test
- def testSparkImportExport() {
- val schema: Schema = {
- val columns = ImmutableList.of(
- new ColumnSchemaBuilder("key", Type.STRING).key(true).build(),
- new ColumnSchemaBuilder("column1_i", Type.STRING).build(),
- new ColumnSchemaBuilder("column2_d", Type.STRING)
- .nullable(true)
- .build(),
- new ColumnSchemaBuilder("column3_s", Type.STRING).build(),
- new ColumnSchemaBuilder("column4_b", Type.STRING).build()
- )
- new Schema(columns)
- }
- val tableOptions = new CreateTableOptions()
- .setRangePartitionColumns(List("key").asJava)
- .setNumReplicas(1)
- kuduClient.createTable(TABLE_NAME, schema, tableOptions)
+ @Before
+ def setUp(): Unit = {
+ kuduClient.createTable(TableName, TableSchema, options)
+ }
+ @Test
+ def testCSVImport() {
// Get the absolute path of the resource file.
val schemaResource =
- classOf[TestImportExportFiles].getResource(TABLE_DATA_PATH)
+ classOf[TestImportExportFiles].getResource(TableDataPath)
val dataPath = Paths.get(schemaResource.toURI).toAbsolutePath
ImportExportFiles.testMain(
@@ -65,15 +72,79 @@ class TestImportExportFiles extends KuduTestSuite {
"--format=csv",
s"--master-addrs=${harness.getMasterAddressesAsString}",
s"--path=$dataPath",
- s"--table-name=$TABLE_NAME",
+ s"--table-name=$TableName",
"--delimiter=,",
"--header=true",
"--inferschema=true"
),
ss
)
- val rdd = kuduContext.kuduRDD(ss.sparkContext, TABLE_NAME, List("key"))
+ val rdd = kuduContext.kuduRDD(ss.sparkContext, TableName, List("key"))
assert(rdd.collect.length == 4)
assertEquals(rdd.collect().mkString(","), "[1],[2],[3],[4]")
}
+
+ @Test
+ def testRoundTrips(): Unit = {
+ val table = kuduClient.openTable(TableName)
+ loadSampleData(table, 50)
+ runRoundTripTest(TableName, s"$TableName-avro", "avro")
+ runRoundTripTest(TableName, s"$TableName-csv", "csv")
+ runRoundTripTest(TableName, s"$TableName-parquet", "parquet")
+ }
+
+ // TODO(KUDU-2454): Use random schemas and random data to ensure all type/values round-trip.
+ private def loadSampleData(table: KuduTable, numRows: Int): Unit = {
+ val session = kuduClient.newSession()
+ val rows = Range(0, numRows).map { i =>
+ val insert = table.newInsert
+ val row = insert.getRow
+ row.addString(0, i.toString)
+ row.addString(1, i.toString)
+ row.addString(3, i.toString)
+ row.addString(4, i.toString)
+ session.apply(insert)
+ }
+ session.close
+ }
+
+ private def runRoundTripTest(fromTable: String, toTable: String, format: String): Unit = {
+ val dir = Files.createTempDirectory("round-trip")
+ val path = new File(dir.toFile, s"$fromTable-$format").getAbsolutePath
+
+ // Export the data.
+ ImportExportFiles.testMain(
+ Array(
+ "--operation=export",
+ s"--format=$format",
+ s"--master-addrs=${harness.getMasterAddressesAsString}",
+ s"--path=$path",
+ s"--table-name=$fromTable",
+ s"--header=true"
+ ),
+ ss
+ )
+
+ // Create the target table.
+ kuduClient.createTable(toTable, TableSchema, options)
+
+ // Import the data.
+ ImportExportFiles.testMain(
+ Array(
+ "--operation=import",
+ s"--format=$format",
+ s"--master-addrs=${harness.getMasterAddressesAsString}",
+ s"--path=$path",
+ s"--table-name=$toTable",
+ s"--header=true"
+ ),
+ ss
+ )
+
+ // Verify the tables match.
+ // TODO(KUDU-2454): Verify every value to ensure all values round trip.
+ val rdd1 = kuduContext.kuduRDD(ss.sparkContext, fromTable, List("key"))
+ val rdd2 = kuduContext.kuduRDD(ss.sparkContext, toTable, List("key"))
+ assertResult(rdd1.count())(rdd2.count())
+ }
}