You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/07/31 22:26:08 UTC
[2/5] kudu git commit: [Java] Minor fixes to the Scalafmt changes
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index fc8576e..9bcd73d 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -19,23 +19,28 @@ package org.apache.kudu.spark.kudu
import scala.collection.JavaConverters._
import scala.collection.immutable.IndexedSeq
import scala.util.control.NonFatal
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
+import org.apache.spark.sql.types.DataTypes
+import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.types.StructType
import org.junit.Assert._
import org.scalatest.Matchers
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
import org.apache.kudu.client.CreateTableOptions
-import org.apache.kudu.{Schema, Type}
+import org.apache.kudu.Schema
+import org.apache.kudu.Type
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.junit.{Before, Test}
+import org.junit.Before
+import org.junit.Test
class DefaultSourceTest extends KuduTestSuite with Matchers {
val rowCount = 10
- var sqlContext : SQLContext = _
- var rows : IndexedSeq[(Int, Int, String, Long)] = _
- var kuduOptions : Map[String, String] = _
+ var sqlContext: SQLContext = _
+ var rows: IndexedSeq[(Int, Int, String, Long)] = _
+ var kuduOptions: Map[String, String] = _
@Before
def setUp(): Unit = {
@@ -43,9 +48,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
sqlContext = ss.sqlContext
- kuduOptions = Map(
- "kudu.table" -> tableName,
- "kudu.master" -> miniCluster.getMasterAddresses)
+ kuduOptions = Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddresses)
sqlContext.read.options(kuduOptions).kudu.createOrReplaceTempView(tableName)
}
@@ -57,15 +60,18 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
kuduContext.deleteTable(tableName)
}
val df = sqlContext.read.options(kuduOptions).kudu
- kuduContext.createTable(tableName, df.schema, Seq("key"),
- new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
- .setNumReplicas(1))
+ kuduContext.createTable(
+ tableName,
+ df.schema,
+ Seq("key"),
+ new CreateTableOptions()
+ .setRangePartitionColumns(List("key").asJava)
+ .setNumReplicas(1))
kuduContext.insertRows(df, tableName)
// now use new options to refer to the new table name
- val newOptions: Map[String, String] = Map(
- "kudu.table" -> tableName,
- "kudu.master" -> miniCluster.getMasterAddresses)
+ val newOptions: Map[String, String] =
+ Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddresses)
val checkDf = sqlContext.read.options(newOptions).kudu
assert(checkDf.schema === df.schema)
@@ -90,18 +96,20 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
val upper = kuduSchema.newPartialRow()
upper.addInt("key", Integer.MAX_VALUE)
- kuduContext.createTable(tableName, kuduSchema,
+ kuduContext.createTable(
+ tableName,
+ kuduSchema,
new CreateTableOptions()
.addHashPartitions(List("key").asJava, 2)
.setRangePartitionColumns(List("key").asJava)
.addRangePartition(lower, upper)
- .setNumReplicas(1))
+ .setNumReplicas(1)
+ )
kuduContext.insertRows(df, tableName)
// now use new options to refer to the new table name
- val newOptions: Map[String, String] = Map(
- "kudu.table" -> tableName,
- "kudu.master" -> miniCluster.getMasterAddresses)
+ val newOptions: Map[String, String] =
+ Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddresses)
val checkDf = sqlContext.read.options(newOptions).kudu
assert(checkDf.schema === df.schema)
@@ -115,7 +123,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testInsertion() {
val df = sqlContext.read.options(kuduOptions).kudu
- val changedDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("abc"))
+ val changedDF = df
+ .limit(1)
+ .withColumn("key", df("key").plus(100))
+ .withColumn("c2_s", lit("abc"))
kuduContext.insertRows(changedDF, tableName)
val newDF = sqlContext.read.options(kuduOptions).kudu
@@ -128,7 +139,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testInsertionMultiple() {
val df = sqlContext.read.options(kuduOptions).kudu
- val changedDF = df.limit(2).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("abc"))
+ val changedDF = df
+ .limit(2)
+ .withColumn("key", df("key").plus(100))
+ .withColumn("c2_s", lit("abc"))
kuduContext.insertRows(changedDF, tableName)
val newDF = sqlContext.read.options(kuduOptions).kudu
@@ -154,7 +168,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
kuduContext.insertRows(updateDF, tableName, kuduWriteOptions)
// change the key and insert
- val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def"))
+ val insertDF = df
+ .limit(1)
+ .withColumn("key", df("key").plus(100))
+ .withColumn("c2_s", lit("def"))
kuduContext.insertRows(insertDF, tableName, kuduWriteOptions)
// read the data back
@@ -183,7 +200,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
updateDF.write.options(newOptions).mode("append").kudu
// change the key and insert
- val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def"))
+ val insertDF = df
+ .limit(1)
+ .withColumn("key", df("key").plus(100))
+ .withColumn("c2_s", lit("def"))
insertDF.write.options(newOptions).mode("append").kudu
// read the data back
@@ -211,7 +231,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
updateDF.write.options(newOptions).mode("append").kudu
// change the key and insert
- val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def"))
+ val insertDF = df
+ .limit(1)
+ .withColumn("key", df("key").plus(100))
+ .withColumn("c2_s", lit("def"))
insertDF.write.options(newOptions).mode("append").kudu
// read the data back
@@ -235,7 +258,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
kuduContext.insertIgnoreRows(updateDF, tableName)
// change the key and insert
- val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def"))
+ val insertDF = df
+ .limit(1)
+ .withColumn("key", df("key").plus(100))
+ .withColumn("c2_s", lit("def"))
kuduContext.insertIgnoreRows(insertDF, tableName)
// read the data back
@@ -259,7 +285,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
kuduContext.upsertRows(updateDF, tableName)
// change the key and insert
- val insertDF = df.limit(1).withColumn("key", df("key").plus(100)).withColumn("c2_s", lit("def"))
+ val insertDF = df
+ .limit(1)
+ .withColumn("key", df("key").plus(100))
+ .withColumn("c2_s", lit("def"))
kuduContext.upsertRows(insertDF, tableName)
// read the data back
@@ -276,13 +305,18 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testUpsertRowsIgnoreNulls() {
- val nonNullDF = sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")
+ val nonNullDF =
+ sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")
kuduContext.insertRows(nonNullDF, simpleTableName)
- val dataDF = sqlContext.read.options(Map("kudu.master" -> miniCluster.getMasterAddresses,
- "kudu.table" -> simpleTableName)).kudu
+ val dataDF = sqlContext.read
+ .options(
+ Map("kudu.master" -> miniCluster.getMasterAddresses, "kudu.table" -> simpleTableName))
+ .kudu
- val nullDF = sqlContext.createDataFrame(Seq((0, null.asInstanceOf[String]))).toDF("key", "val")
+ val nullDF = sqlContext
+ .createDataFrame(Seq((0, null.asInstanceOf[String])))
+ .toDF("key", "val")
val kuduWriteOptions = new KuduWriteOptions
kuduWriteOptions.ignoreNull = true
kuduContext.upsertRows(nullDF, simpleTableName, kuduWriteOptions)
@@ -303,13 +337,18 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testUpsertRowsIgnoreNullsUsingDefaultSource() {
- val nonNullDF = sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")
+ val nonNullDF =
+ sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")
kuduContext.insertRows(nonNullDF, simpleTableName)
- val dataDF = sqlContext.read.options(Map("kudu.master" -> miniCluster.getMasterAddresses,
- "kudu.table" -> simpleTableName)).kudu
+ val dataDF = sqlContext.read
+ .options(
+ Map("kudu.master" -> miniCluster.getMasterAddresses, "kudu.table" -> simpleTableName))
+ .kudu
- val nullDF = sqlContext.createDataFrame(Seq((0, null.asInstanceOf[String]))).toDF("key", "val")
+ val nullDF = sqlContext
+ .createDataFrame(Seq((0, null.asInstanceOf[String])))
+ .toDF("key", "val")
val options_0: Map[String, String] = Map(
"kudu.table" -> simpleTableName,
"kudu.master" -> miniCluster.getMasterAddresses,
@@ -318,9 +357,8 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
assert(dataDF.collect.toList === nonNullDF.collect.toList)
kuduContext.updateRows(nonNullDF, simpleTableName)
- val options_1: Map[String, String] = Map(
- "kudu.table" -> simpleTableName,
- "kudu.master" -> miniCluster.getMasterAddresses)
+ val options_1: Map[String, String] =
+ Map("kudu.table" -> simpleTableName, "kudu.master" -> miniCluster.getMasterAddresses)
nullDF.write.options(options_1).mode("append").kudu
assert(dataDF.collect.toList === nullDF.collect.toList)
@@ -346,7 +384,8 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testOutOfOrderSelection() {
- val df = sqlContext.read.options(kuduOptions).kudu.select( "c2_s", "c1_i", "key")
+ val df =
+ sqlContext.read.options(kuduOptions).kudu.select("c2_s", "c1_i", "key")
val collected = df.collect()
assert(collected(0).getString(0).equals("0"))
}
@@ -383,61 +422,91 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testTableScanWithProjectionAndPredicateDouble() {
- assertEquals(rows.count { case (key, i, s, ts) => i > 5 },
- sqlContext.sql(s"""SELECT key, c3_double FROM $tableName where c3_double > "5.0"""").count())
+ assertEquals(
+ rows.count { case (key, i, s, ts) => i > 5 },
+ sqlContext
+ .sql(s"""SELECT key, c3_double FROM $tableName where c3_double > "5.0"""")
+ .count())
}
@Test
def testTableScanWithProjectionAndPredicateLong() {
- assertEquals(rows.count { case (key, i, s, ts) => i > 5 },
- sqlContext.sql(s"""SELECT key, c4_long FROM $tableName where c4_long > "5"""").count())
+ assertEquals(
+ rows.count { case (key, i, s, ts) => i > 5 },
+ sqlContext
+ .sql(s"""SELECT key, c4_long FROM $tableName where c4_long > "5"""")
+ .count())
}
@Test
def testTableScanWithProjectionAndPredicateBool() {
- assertEquals(rows.count { case (key, i, s, ts) => i % 2==0 },
- sqlContext.sql(s"""SELECT key, c5_bool FROM $tableName where c5_bool = true""").count())
+ assertEquals(
+ rows.count { case (key, i, s, ts) => i % 2 == 0 },
+ sqlContext
+ .sql(s"""SELECT key, c5_bool FROM $tableName where c5_bool = true""")
+ .count())
}
@Test
def testTableScanWithProjectionAndPredicateShort() {
- assertEquals(rows.count { case (key, i, s, ts) => i > 5},
- sqlContext.sql(s"""SELECT key, c6_short FROM $tableName where c6_short > 5""").count())
+ assertEquals(
+ rows.count { case (key, i, s, ts) => i > 5 },
+ sqlContext
+ .sql(s"""SELECT key, c6_short FROM $tableName where c6_short > 5""")
+ .count())
}
@Test
def testTableScanWithProjectionAndPredicateFloat() {
- assertEquals(rows.count { case (key, i, s, ts) => i > 5},
- sqlContext.sql(s"""SELECT key, c7_float FROM $tableName where c7_float > 5""").count())
+ assertEquals(
+ rows.count { case (key, i, s, ts) => i > 5 },
+ sqlContext
+ .sql(s"""SELECT key, c7_float FROM $tableName where c7_float > 5""")
+ .count())
}
@Test
def testTableScanWithProjectionAndPredicateDecimal32() {
- assertEquals(rows.count { case (key, i, s, ts) => i > 5},
- sqlContext.sql(s"""SELECT key, c11_decimal32 FROM $tableName where c11_decimal32 > 5""").count())
+ assertEquals(
+ rows.count { case (key, i, s, ts) => i > 5 },
+ sqlContext
+ .sql(s"""SELECT key, c11_decimal32 FROM $tableName where c11_decimal32 > 5""")
+ .count())
}
@Test
def testTableScanWithProjectionAndPredicateDecimal64() {
- assertEquals(rows.count { case (key, i, s, ts) => i > 5},
- sqlContext.sql(s"""SELECT key, c12_decimal64 FROM $tableName where c12_decimal64 > 5""").count())
+ assertEquals(
+ rows.count { case (key, i, s, ts) => i > 5 },
+ sqlContext
+ .sql(s"""SELECT key, c12_decimal64 FROM $tableName where c12_decimal64 > 5""")
+ .count())
}
@Test
def testTableScanWithProjectionAndPredicateDecimal128() {
- assertEquals(rows.count { case (key, i, s, ts) => i > 5},
- sqlContext.sql(s"""SELECT key, c13_decimal128 FROM $tableName where c13_decimal128 > 5""").count())
+ assertEquals(
+ rows.count { case (key, i, s, ts) => i > 5 },
+ sqlContext
+ .sql(s"""SELECT key, c13_decimal128 FROM $tableName where c13_decimal128 > 5""")
+ .count())
}
@Test
def testTableScanWithProjectionAndPredicate() {
- assertEquals(rows.count { case (key, i, s, ts) => s != null && s > "5" },
- sqlContext.sql(s"""SELECT key FROM $tableName where c2_s > "5"""").count())
+ assertEquals(
+ rows.count { case (key, i, s, ts) => s != null && s > "5" },
+ sqlContext
+ .sql(s"""SELECT key FROM $tableName where c2_s > "5"""")
+ .count())
- assertEquals(rows.count { case (key, i, s, ts) => s != null },
- sqlContext.sql(s"""SELECT key, c2_s FROM $tableName where c2_s IS NOT NULL""").count())
+ assertEquals(
+ rows.count { case (key, i, s, ts) => s != null },
+ sqlContext
+ .sql(s"""SELECT key, c2_s FROM $tableName where c2_s IS NOT NULL""")
+ .count())
}
@Test
@@ -459,7 +528,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testBasicSparkSQLWithPredicate() {
- val results = sqlContext.sql("SELECT key FROM " + tableName + " where key=1").collectAsList()
+ val results = sqlContext
+ .sql("SELECT key FROM " + tableName + " where key=1")
+ .collectAsList()
assert(results.size() == 1)
assert(results.get(0).size.equals(1))
assert(results.get(0).getInt(0).equals(1))
@@ -468,7 +539,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testBasicSparkSQLWithTwoPredicates() {
- val results = sqlContext.sql("SELECT key FROM " + tableName + " where key=2 and c2_s='2'").collectAsList()
+ val results = sqlContext
+ .sql("SELECT key FROM " + tableName + " where key=2 and c2_s='2'")
+ .collectAsList()
assert(results.size() == 1)
assert(results.get(0).size.equals(1))
assert(results.get(0).getInt(0).equals(2))
@@ -477,45 +550,58 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testBasicSparkSQLWithInListPredicate() {
val keys = Array(1, 5, 7)
- val results = sqlContext.sql(s"SELECT key FROM $tableName where key in (${keys.mkString(", ")})").collectAsList()
+ val results = sqlContext
+ .sql(s"SELECT key FROM $tableName where key in (${keys.mkString(", ")})")
+ .collectAsList()
assert(results.size() == keys.length)
- keys.zipWithIndex.foreach { case (v, i) =>
- assert(results.get(i).size.equals(1))
- assert(results.get(i).getInt(0).equals(v))
+ keys.zipWithIndex.foreach {
+ case (v, i) =>
+ assert(results.get(i).size.equals(1))
+ assert(results.get(i).getInt(0).equals(v))
}
}
@Test
def testBasicSparkSQLWithInListPredicateOnString() {
val keys = Array(1, 4, 6)
- val results = sqlContext.sql(s"SELECT key FROM $tableName where c2_s in (${keys.mkString("'", "', '", "'")})").collectAsList()
+ val results = sqlContext
+ .sql(s"SELECT key FROM $tableName where c2_s in (${keys.mkString("'", "', '", "'")})")
+ .collectAsList()
assert(results.size() == keys.count(_ % 2 == 0))
- keys.filter(_ % 2 == 0).zipWithIndex.foreach { case (v, i) =>
- assert(results.get(i).size.equals(1))
- assert(results.get(i).getInt(0).equals(v))
+ keys.filter(_ % 2 == 0).zipWithIndex.foreach {
+ case (v, i) =>
+ assert(results.get(i).size.equals(1))
+ assert(results.get(i).getInt(0).equals(v))
}
}
@Test
def testBasicSparkSQLWithInListAndComparisonPredicate() {
val keys = Array(1, 5, 7)
- val results = sqlContext.sql(s"SELECT key FROM $tableName where key>2 and key in (${keys.mkString(", ")})").collectAsList()
- assert(results.size() == keys.count(_>2))
- keys.filter(_>2).zipWithIndex.foreach { case (v, i) =>
- assert(results.get(i).size.equals(1))
- assert(results.get(i).getInt(0).equals(v))
+ val results = sqlContext
+ .sql(s"SELECT key FROM $tableName where key>2 and key in (${keys.mkString(", ")})")
+ .collectAsList()
+ assert(results.size() == keys.count(_ > 2))
+ keys.filter(_ > 2).zipWithIndex.foreach {
+ case (v, i) =>
+ assert(results.get(i).size.equals(1))
+ assert(results.get(i).getInt(0).equals(v))
}
}
@Test
def testBasicSparkSQLWithTwoPredicatesNegative() {
- val results = sqlContext.sql("SELECT key FROM " + tableName + " where key=1 and c2_s='2'").collectAsList()
+ val results = sqlContext
+ .sql("SELECT key FROM " + tableName + " where key=1 and c2_s='2'")
+ .collectAsList()
assert(results.size() == 0)
}
@Test
def testBasicSparkSQLWithTwoPredicatesIncludingString() {
- val results = sqlContext.sql("SELECT key FROM " + tableName + " where c2_s='2'").collectAsList()
+ val results = sqlContext
+ .sql("SELECT key FROM " + tableName + " where c2_s='2'")
+ .collectAsList()
assert(results.size() == 1)
assert(results.get(0).size.equals(1))
assert(results.get(0).getInt(0).equals(2))
@@ -523,7 +609,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testBasicSparkSQLWithTwoPredicatesAndProjection() {
- val results = sqlContext.sql("SELECT key, c2_s FROM " + tableName + " where c2_s='2'").collectAsList()
+ val results = sqlContext
+ .sql("SELECT key, c2_s FROM " + tableName + " where c2_s='2'")
+ .collectAsList()
assert(results.size() == 1)
assert(results.get(0).size.equals(2))
assert(results.get(0).getInt(0).equals(2))
@@ -532,7 +620,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testBasicSparkSQLWithTwoPredicatesGreaterThan() {
- val results = sqlContext.sql("SELECT key, c2_s FROM " + tableName + " where c2_s>='2'").collectAsList()
+ val results = sqlContext
+ .sql("SELECT key, c2_s FROM " + tableName + " where c2_s>='2'")
+ .collectAsList()
assert(results.size() == 4)
assert(results.get(0).size.equals(2))
assert(results.get(0).getInt(0).equals(2))
@@ -543,15 +633,21 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
def testSparkSQLStringStartsWithFilters() {
// This test requires a special table.
val testTableName = "startswith"
- val schema = new Schema(List(
- new ColumnSchemaBuilder("key", Type.STRING).key(true).build()).asJava)
- val tableOptions = new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
+ val schema = new Schema(
+ List(new ColumnSchemaBuilder("key", Type.STRING).key(true).build()).asJava)
+ val tableOptions = new CreateTableOptions()
+ .setRangePartitionColumns(List("key").asJava)
.setNumReplicas(1)
val testTable = kuduClient.createTable(testTableName, schema, tableOptions)
val kuduSession = kuduClient.newSession()
val chars = List('a', 'b', 'δΉ•', Char.MaxValue, '\u0000')
- val keys = for (x <- chars; y <- chars; z <- chars; w <- chars) yield Array(x, y, z, w).mkString
+ val keys = for {
+ x <- chars
+ y <- chars
+ z <- chars
+ w <- chars
+ } yield Array(x, y, z, w).mkString
keys.foreach { key =>
val insert = testTable.newInsert
val row = insert.getRow
@@ -559,9 +655,8 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
row.addString(0, key)
kuduSession.apply(insert)
}
- val options: Map[String, String] = Map(
- "kudu.table" -> testTableName,
- "kudu.master" -> miniCluster.getMasterAddresses)
+ val options: Map[String, String] =
+ Map("kudu.table" -> testTableName, "kudu.master" -> miniCluster.getMasterAddresses)
sqlContext.read.options(options).kudu.createOrReplaceTempView(testTableName)
val checkPrefixCount = { prefix: String =>
@@ -575,26 +670,37 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
checkPrefixCount(Array(x).mkString)
}
// all two character combos
- for (x <- chars; y <- chars) {
+ for {
+ x <- chars
+ y <- chars
+ } {
checkPrefixCount(Array(x, y).mkString)
}
}
@Test
def testSparkSQLIsNullPredicate() {
- var results = sqlContext.sql("SELECT key FROM " + tableName + " where c2_s IS NULL").collectAsList()
+ var results = sqlContext
+ .sql("SELECT key FROM " + tableName + " where c2_s IS NULL")
+ .collectAsList()
assert(results.size() == 5)
- results = sqlContext.sql("SELECT key FROM " + tableName + " where key IS NULL").collectAsList()
+ results = sqlContext
+ .sql("SELECT key FROM " + tableName + " where key IS NULL")
+ .collectAsList()
assert(results.isEmpty())
}
@Test
def testSparkSQLIsNotNullPredicate() {
- var results = sqlContext.sql("SELECT key FROM " + tableName + " where c2_s IS NOT NULL").collectAsList()
+ var results = sqlContext
+ .sql("SELECT key FROM " + tableName + " where c2_s IS NOT NULL")
+ .collectAsList()
assert(results.size() == 5)
- results = sqlContext.sql("SELECT key FROM " + tableName + " where key IS NOT NULL").collectAsList()
+ results = sqlContext
+ .sql("SELECT key FROM " + tableName + " where key IS NOT NULL")
+ .collectAsList()
assert(results.size() == 10)
}
@@ -604,17 +710,24 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
// read 0 rows just to get the schema
val df = sqlContext.sql(s"SELECT * FROM $tableName LIMIT 0")
- kuduContext.createTable(insertTable, df.schema, Seq("key"),
- new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
+ kuduContext.createTable(
+ insertTable,
+ df.schema,
+ Seq("key"),
+ new CreateTableOptions()
+ .setRangePartitionColumns(List("key").asJava)
.setNumReplicas(1))
- val newOptions: Map[String, String] = Map(
- "kudu.table" -> insertTable,
- "kudu.master" -> miniCluster.getMasterAddresses)
- sqlContext.read.options(newOptions).kudu.createOrReplaceTempView(insertTable)
+ val newOptions: Map[String, String] =
+ Map("kudu.table" -> insertTable, "kudu.master" -> miniCluster.getMasterAddresses)
+ sqlContext.read
+ .options(newOptions)
+ .kudu
+ .createOrReplaceTempView(insertTable)
sqlContext.sql(s"INSERT INTO TABLE $insertTable SELECT * FROM $tableName")
- val results = sqlContext.sql(s"SELECT key FROM $insertTable").collectAsList()
+ val results =
+ sqlContext.sql(s"SELECT key FROM $insertTable").collectAsList()
assertEquals(10, results.size())
}
@@ -624,14 +737,20 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
// read 0 rows just to get the schema
val df = sqlContext.sql(s"SELECT * FROM $tableName LIMIT 0")
- kuduContext.createTable(insertTable, df.schema, Seq("key"),
- new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
+ kuduContext.createTable(
+ insertTable,
+ df.schema,
+ Seq("key"),
+ new CreateTableOptions()
+ .setRangePartitionColumns(List("key").asJava)
.setNumReplicas(1))
- val newOptions: Map[String, String] = Map(
- "kudu.table" -> insertTable,
- "kudu.master" -> miniCluster.getMasterAddresses)
- sqlContext.read.options(newOptions).kudu.createOrReplaceTempView(insertTable)
+ val newOptions: Map[String, String] =
+ Map("kudu.table" -> insertTable, "kudu.master" -> miniCluster.getMasterAddresses)
+ sqlContext.read
+ .options(newOptions)
+ .kudu
+ .createOrReplaceTempView(insertTable)
try {
sqlContext.sql(s"INSERT OVERWRITE TABLE $insertTable SELECT * FROM $tableName")
@@ -648,13 +767,16 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
val df = sqlContext.read.options(kuduOptions).kudu
val newTable = "testwritedatasourcetable"
- kuduContext.createTable(newTable, df.schema, Seq("key"),
- new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
- .setNumReplicas(1))
+ kuduContext.createTable(
+ newTable,
+ df.schema,
+ Seq("key"),
+ new CreateTableOptions()
+ .setRangePartitionColumns(List("key").asJava)
+ .setNumReplicas(1))
- val newOptions: Map[String, String] = Map(
- "kudu.table" -> newTable,
- "kudu.master" -> miniCluster.getMasterAddresses)
+ val newOptions: Map[String, String] =
+ Map("kudu.table" -> newTable, "kudu.master" -> miniCluster.getMasterAddresses)
df.write.options(newOptions).mode("append").kudu
val checkDf = sqlContext.read.options(newOptions).kudu
@@ -666,15 +788,17 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testCreateRelationWithSchema() {
// user-supplied schema that is compatible with actual schema, but with the key at the end
- val userSchema: StructType = StructType(List(
- StructField("c4_long", DataTypes.LongType),
- StructField("key", DataTypes.IntegerType)
- ))
+ val userSchema: StructType = StructType(
+ List(
+ StructField("c4_long", DataTypes.LongType),
+ StructField("key", DataTypes.IntegerType)
+ ))
val dfDefaultSchema = sqlContext.read.options(kuduOptions).kudu
assertEquals(14, dfDefaultSchema.schema.fields.length)
- val dfWithUserSchema = sqlContext.read.options(kuduOptions).schema(userSchema).kudu
+ val dfWithUserSchema =
+ sqlContext.read.options(kuduOptions).schema(userSchema).kudu
assertEquals(2, dfWithUserSchema.schema.fields.length)
dfWithUserSchema.limit(10).collect()
@@ -684,14 +808,15 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testCreateRelationWithInvalidSchema() {
// user-supplied schema that is NOT compatible with actual schema
- val userSchema: StructType = StructType(List(
- StructField("foo", DataTypes.LongType),
- StructField("bar", DataTypes.IntegerType)
- ))
+ val userSchema: StructType = StructType(
+ List(
+ StructField("foo", DataTypes.LongType),
+ StructField("bar", DataTypes.IntegerType)
+ ))
intercept[IllegalArgumentException] {
sqlContext.read.options(kuduOptions).schema(userSchema).kudu
- }.getMessage should include ("Unknown column: foo")
+ }.getMessage should include("Unknown column: foo")
}
@Test
@@ -715,10 +840,13 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testTimestampPropagation() {
val df = sqlContext.read.options(kuduOptions).kudu
- val insertDF = df.limit(1)
- .withColumn("key", df("key")
- .plus(100))
- .withColumn("c2_s", lit("abc"))
+ val insertDF = df
+ .limit(1)
+ .withColumn(
+ "key",
+ df("key")
+ .plus(100))
+ .withColumn("c2_s", lit("abc"))
// Initiate a write via KuduContext, and verify that the client should
// have propagated timestamp.
@@ -743,10 +871,13 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
// Initiate another write via KuduContext, and verify that the client should
// move the propagated timestamp further.
- val updateDF = df.limit(1)
- .withColumn("key", df("key")
- .plus(100))
- .withColumn("c2_s", lit("def"))
+ val updateDF = df
+ .limit(1)
+ .withColumn(
+ "key",
+ df("key")
+ .plus(100))
+ .withColumn("c2_s", lit("def"))
val kuduWriteOptions = new KuduWriteOptions
kuduWriteOptions.ignoreDuplicateRowErrors = true
kuduContext.insertRows(updateDF, tableName, kuduWriteOptions)
@@ -754,10 +885,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
}
/**
- * Assuming that the only part of the logical plan is a Kudu scan, this
- * function extracts the KuduRelation from the passed DataFrame for
- * testing purposes.
- */
+ * Assuming that the only part of the logical plan is a Kudu scan, this
+ * function extracts the KuduRelation from the passed DataFrame for
+ * testing purposes.
+ */
def kuduRelationFromDataFrame(dataFrame: DataFrame) = {
val logicalPlan = dataFrame.queryExecution.logical
val logicalRelation = logicalPlan.asInstanceOf[LogicalRelation]
@@ -766,10 +897,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
}
/**
- * Verify that the kudu.scanRequestTimeoutMs parameter is parsed by the
- * DefaultSource and makes it into the KuduRelation as a configuration
- * parameter.
- */
+ * Verify that the kudu.scanRequestTimeoutMs parameter is parsed by the
+ * DefaultSource and makes it into the KuduRelation as a configuration
+ * parameter.
+ */
@Test
def testScanRequestTimeoutPropagation() {
kuduOptions = Map(
@@ -782,10 +913,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
}
/**
- * Verify that the kudu.socketReadTimeoutMs parameter is parsed by the
- * DefaultSource and makes it into the KuduRelation as a configuration
- * parameter.
- */
+ * Verify that the kudu.socketReadTimeoutMs parameter is parsed by the
+ * DefaultSource and makes it into the KuduRelation as a configuration
+ * parameter.
+ */
@Test
def testSocketReadTimeoutPropagation() {
kuduOptions = Map(
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
index 2329eff..968b6a6 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
@@ -16,7 +16,10 @@
*/
package org.apache.kudu.spark.kudu
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.ObjectInputStream
+import java.io.ObjectOutputStream
import java.math.BigDecimal
import java.sql.Timestamp
@@ -61,10 +64,29 @@ class KuduContextTest extends KuduTestSuite with Matchers {
@Test
def testBasicKuduRDD() {
val rows = insertRows(table, rowCount)
- val scanList = kuduContext.kuduRDD(ss.sparkContext, "test", Seq("key", "c1_i", "c2_s", "c3_double",
- "c4_long", "c5_bool", "c6_short", "c7_float", "c8_binary", "c9_unixtime_micros", "c10_byte",
- "c11_decimal32", "c12_decimal64", "c13_decimal128"))
- .map(r => r.toSeq).collect()
+ val scanList = kuduContext
+ .kuduRDD(
+ ss.sparkContext,
+ "test",
+ Seq(
+ "key",
+ "c1_i",
+ "c2_s",
+ "c3_double",
+ "c4_long",
+ "c5_bool",
+ "c6_short",
+ "c7_float",
+ "c8_binary",
+ "c9_unixtime_micros",
+ "c10_byte",
+ "c11_decimal32",
+ "c12_decimal64",
+ "c13_decimal128"
+ )
+ )
+ .map(r => r.toSeq)
+ .collect()
scanList.foreach(r => {
val index = r.apply(0).asInstanceOf[Int]
assert(r.apply(0).asInstanceOf[Int] == rows.apply(index)._1)
@@ -72,13 +94,14 @@ class KuduContextTest extends KuduTestSuite with Matchers {
assert(r.apply(2).asInstanceOf[String] == rows.apply(index)._3)
assert(r.apply(3).asInstanceOf[Double] == rows.apply(index)._2.toDouble)
assert(r.apply(4).asInstanceOf[Long] == rows.apply(index)._2.toLong)
- assert(r.apply(5).asInstanceOf[Boolean] == (rows.apply(index)._2%2==1))
+ assert(r.apply(5).asInstanceOf[Boolean] == (rows.apply(index)._2 % 2 == 1))
assert(r.apply(6).asInstanceOf[Short] == rows.apply(index)._2.toShort)
assert(r.apply(7).asInstanceOf[Float] == rows.apply(index)._2.toFloat)
val binaryBytes = s"bytes ${rows.apply(index)._2}".getBytes().toSeq
assert(r.apply(8).asInstanceOf[Array[Byte]].toSeq == binaryBytes)
- assert(r.apply(9).asInstanceOf[Timestamp] ==
- TimestampUtil.microsToTimestamp(rows.apply(index)._4))
+ assert(
+ r.apply(9).asInstanceOf[Timestamp] ==
+ TimestampUtil.microsToTimestamp(rows.apply(index)._4))
assert(r.apply(10).asInstanceOf[Byte] == rows.apply(index)._2.toByte)
assert(r.apply(11).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
assert(r.apply(12).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
@@ -90,12 +113,23 @@ class KuduContextTest extends KuduTestSuite with Matchers {
def testKuduSparkDataFrame() {
insertRows(table, rowCount)
val sqlContext = ss.sqlContext
- val dataDF = sqlContext.read.options(Map("kudu.master" -> miniCluster.getMasterAddresses,
- "kudu.table" -> "test")).kudu
- dataDF.sort("key").select("c8_binary").first.get(0)
- .asInstanceOf[Array[Byte]].shouldBe("bytes 0".getBytes)
+ val dataDF = sqlContext.read
+ .options(Map("kudu.master" -> miniCluster.getMasterAddresses, "kudu.table" -> "test"))
+ .kudu
+ dataDF
+ .sort("key")
+ .select("c8_binary")
+ .first
+ .get(0)
+ .asInstanceOf[Array[Byte]]
+ .shouldBe("bytes 0".getBytes)
// decode the binary to string and compare
- dataDF.sort("key").withColumn("c8_binary", decode(dataDF("c8_binary"), "UTF-8"))
- .select("c8_binary").first.get(0).shouldBe("bytes 0")
+ dataDF
+ .sort("key")
+ .withColumn("c8_binary", decode(dataDF("c8_binary"), "UTF-8"))
+ .select("c8_binary")
+ .first
+ .get(0)
+ .shouldBe("bytes 0")
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index 65f8215..ed761cc 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -26,12 +26,18 @@ import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
import org.apache.kudu.client.KuduClient.KuduClientBuilder
import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder
-import org.apache.kudu.client.{CreateTableOptions, KuduClient, KuduTable, MiniKuduCluster}
+import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.client.KuduClient
+import org.apache.kudu.client.KuduTable
+import org.apache.kudu.client.MiniKuduCluster
import org.apache.kudu.junit.RetryRule
-import org.apache.kudu.{Schema, Type}
+import org.apache.kudu.Schema
+import org.apache.kudu.Type
import org.apache.kudu.util.DecimalUtil
import org.apache.spark.sql.SparkSession
-import org.junit.{After, Before, Rule}
+import org.junit.After
+import org.junit.Before
+import org.junit.Rule
import org.scalatest.junit.JUnitSuite
// TODO (grant): Use BaseKuduTest for most of this.
@@ -56,21 +62,32 @@ trait KuduTestSuite extends JUnitSuite {
new ColumnSchemaBuilder("c6_short", Type.INT16).build(),
new ColumnSchemaBuilder("c7_float", Type.FLOAT).build(),
new ColumnSchemaBuilder("c8_binary", Type.BINARY).build(),
- new ColumnSchemaBuilder("c9_unixtime_micros", Type.UNIXTIME_MICROS).build(),
+ new ColumnSchemaBuilder("c9_unixtime_micros", Type.UNIXTIME_MICROS)
+ .build(),
new ColumnSchemaBuilder("c10_byte", Type.INT8).build(),
new ColumnSchemaBuilder("c11_decimal32", Type.DECIMAL)
.typeAttributes(
- new ColumnTypeAttributesBuilder().precision(DecimalUtil.MAX_DECIMAL32_PRECISION).build()
- ).build(),
+ new ColumnTypeAttributesBuilder()
+ .precision(DecimalUtil.MAX_DECIMAL32_PRECISION)
+ .build()
+ )
+ .build(),
new ColumnSchemaBuilder("c12_decimal64", Type.DECIMAL)
.typeAttributes(
- new ColumnTypeAttributesBuilder().precision(DecimalUtil.MAX_DECIMAL64_PRECISION).build()
- ).build(),
+ new ColumnTypeAttributesBuilder()
+ .precision(DecimalUtil.MAX_DECIMAL64_PRECISION)
+ .build()
+ )
+ .build(),
new ColumnSchemaBuilder("c13_decimal128", Type.DECIMAL)
.typeAttributes(
- new ColumnTypeAttributesBuilder().precision(DecimalUtil.MAX_DECIMAL128_PRECISION).build()
- ).build())
- new Schema(columns.asJava)
+ new ColumnTypeAttributesBuilder()
+ .precision(DecimalUtil.MAX_DECIMAL128_PRECISION)
+ .build()
+ )
+ .build()
+ )
+ new Schema(columns.asJava)
}
lazy val simpleSchema: Schema = {
@@ -93,13 +110,16 @@ trait KuduTestSuite extends JUnitSuite {
.setNumReplicas(1)
}
- val appID: String = new Date().toString + math.floor(math.random * 10E4).toLong.toString
+ val appID: String = new Date().toString + math
+ .floor(math.random * 10E4)
+ .toLong
+ .toString
- val conf: SparkConf = new SparkConf().
- setMaster("local[*]").
- setAppName("test").
- set("spark.ui.enabled", "false").
- set("spark.app.id", appID)
+ val conf: SparkConf = new SparkConf()
+ .setMaster("local[*]")
+ .setAppName("test")
+ .set("spark.ui.enabled", "false")
+ .set("spark.app.id", appID)
// Add a rule to rerun tests. We use this with Gradle because it doesn't support
// Surefire/Failsafe rerunFailingTestsCount like Maven does.
@@ -121,7 +141,6 @@ trait KuduTestSuite extends JUnitSuite {
table = kuduClient.createTable(tableName, schema, tableOptions)
-
val simpleTableOptions = new CreateTableOptions()
.setRangePartitionColumns(List("key").asJava)
.setNumReplicas(1)
@@ -143,7 +162,9 @@ trait KuduTestSuite extends JUnitSuite {
kuduSession.apply(delete)
}
- def insertRows(targetTable: KuduTable, rowCount: Integer): IndexedSeq[(Int, Int, String, Long)] = {
+ def insertRows(
+ targetTable: KuduTable,
+ rowCount: Integer): IndexedSeq[(Int, Int, String, Long)] = {
val kuduSession = kuduClient.newSession()
val rows = Range(0, rowCount).map { i =>
@@ -153,7 +174,7 @@ trait KuduTestSuite extends JUnitSuite {
row.addInt(1, i)
row.addDouble(3, i.toDouble)
row.addLong(4, i.toLong)
- row.addBoolean(5, i%2==1)
+ row.addBoolean(5, i % 2 == 1)
row.addShort(6, i.toShort)
row.addFloat(7, i.toFloat)
row.addBinary(8, s"bytes $i".getBytes())
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 8aec568..2373dbe 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -62,8 +62,7 @@
<maven-shade-plugin.version>3.1.1</maven-shade-plugin.version>
<maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version>
<scala-maven-plugin.version>3.3.2</scala-maven-plugin.version>
- <mvn-scalafmt-plugin.version>0.7</mvn-scalafmt-plugin.version>
- <scalafmt.version>1.4.0</scalafmt.version>
+ <mvn-scalafmt-plugin.version>0.7_1.4.0</mvn-scalafmt-plugin.version>
<!-- Library dependencies -->
<async.version>1.4.1</async.version>
@@ -184,10 +183,9 @@
<plugin>
<groupId>org.antipathy</groupId>
<artifactId>mvn-scalafmt</artifactId>
- <version>${mvn-scalafmt-plugin.version}_${scalafmt.version}</version>
+ <version>${mvn-scalafmt-plugin.version}</version>
<configuration>
- <parameters>--diff</parameters>
- <configLocation>${user.dir}/.scalafmt.conf</configLocation>
+ <configLocation>.scalafmt.conf</configLocation>
</configuration>
<executions>
<execution>