You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/07/31 22:26:09 UTC
[3/5] kudu git commit: [Java] Minor fixes to the Scalafmt changes
[Java] Minor fixes to the Scalafmt changes
- Fixed the Maven plugin usage
- Added a comment about the UTF8 encoding
- Adjusted the format file layout
- Added a few rewrite rules
- Bumped maxColumn to 100
- Fixed files that weren’t formatted
Change-Id: Ied557d2ab501f43288e7edae2874ba123ab036bc
Reviewed-on: http://gerrit.cloudera.org:8080/11089
Reviewed-by: Mike Percy <mp...@apache.org>
Reviewed-by: Tony Foerster <an...@gmail.com>
Tested-by: Grant Henke <gr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a417127a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a417127a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a417127a
Branch: refs/heads/master
Commit: a417127add0c0b07da9bfead2b2acf55212153bd
Parents: 6a432c6
Author: Grant Henke <gr...@apache.org>
Authored: Tue Jul 31 11:31:54 2018 -0500
Committer: Grant Henke <gr...@apache.org>
Committed: Tue Jul 31 21:37:55 2018 +0000
----------------------------------------------------------------------
java/.scalafmt.conf | 29 +-
java/gradle.properties | 4 +-
.../org/apache/kudu/backup/KuduBackup.scala | 15 +-
.../apache/kudu/backup/KuduBackupOptions.scala | 6 +-
.../org/apache/kudu/backup/KuduBackupRDD.scala | 22 +-
.../org/apache/kudu/backup/KuduRestore.scala | 30 +-
.../apache/kudu/backup/KuduRestoreOptions.scala | 9 +-
.../org/apache/kudu/backup/TableMetadata.scala | 81 ++--
.../org/apache/kudu/backup/TestKuduBackup.scala | 94 +++--
.../kudu/spark/tools/ImportExportFiles.scala | 6 +-
.../tools/IntegrationTestBigLinkedList.scala | 54 ++-
.../kudu/spark/tools/ITBigLinkedListTest.scala | 29 +-
.../spark/tools/TestImportExportFiles.scala | 39 +-
.../apache/kudu/spark/kudu/DefaultSource.scala | 61 +--
.../apache/kudu/spark/kudu/KuduContext.scala | 52 ++-
.../org/apache/kudu/spark/kudu/KuduRDD.scala | 21 +-
.../apache/kudu/spark/kudu/OperationType.scala | 3 +-
.../org/apache/kudu/spark/kudu/SparkUtil.scala | 19 +-
.../org/apache/kudu/spark/kudu/package.scala | 4 +-
.../kudu/spark/kudu/DefaultSourceTest.scala | 411 ++++++++++++-------
.../kudu/spark/kudu/KuduContextTest.scala | 62 ++-
.../apache/kudu/spark/kudu/KuduTestSuite.scala | 61 ++-
java/pom.xml | 8 +-
23 files changed, 653 insertions(+), 467 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/.scalafmt.conf
----------------------------------------------------------------------
diff --git a/java/.scalafmt.conf b/java/.scalafmt.conf
index 7247dc9..dc8ad39 100644
--- a/java/.scalafmt.conf
+++ b/java/.scalafmt.conf
@@ -1,10 +1,23 @@
style = defaultWithAlign
-rewrite.rules = [prefercurlyfors, AvoidInfix]
+
align = false
-docstrings=JavaDoc
-maxColumn=80
-spaces.inImportCurlyBraces=false
-unindentTopLevelOperators =true
-newlines.alwaysBeforeTopLevelStatements=true
-newlines.penalizeSingleSelectMultiArgList=false
-lineEndings=unix
+binPack {
+ literalArgumentLists = true
+ parentConstructors = true
+}
+docstrings = JavaDoc
+lineEndings = unix
+maxColumn = 100
+newlines {
+ alwaysBeforeTopLevelStatements = true
+ penalizeSingleSelectMultiArgList = false
+}
+rewrite.rules = [
+ avoidinfix,
+ expandimportselectors,
+ prefercurlyfors,
+]
+spaces {
+ inImportCurlyBraces = false
+}
+unindentTopLevelOperators = true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/gradle.properties
----------------------------------------------------------------------
diff --git a/java/gradle.properties b/java/gradle.properties
index 64a26bf..000e26f 100755
--- a/java/gradle.properties
+++ b/java/gradle.properties
@@ -38,6 +38,8 @@ mavenPublishUrl = https://repository.apache.org/service/local/staging/deploy/mav
javaCompatibility = 7
encoding = UTF-8
+# Used by the scalafmt plugin because the encoding property can't be passed.
+systemProp.file.encoding = UTF-8
# gpg-agent configuration for artifact signing.
# See https://docs.gradle.org/current/userguide/signing_plugin.html#sec:using_gpg_agent
@@ -58,5 +60,3 @@ org.gradle.daemon = true
# org.gradle.configureondemand = true
# org.gradle.parallel = true
# org.gradle.workers.max = 4
-
-systemProp.file.encoding = UTF-8
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
index db16055..bec3111 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackup.scala
@@ -18,16 +18,20 @@ package org.apache.kudu.backup
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
-import java.nio.file.{Path, Paths}
+import java.nio.file.Path
+import java.nio.file.Paths
import com.google.protobuf.util.JsonFormat
import org.apache.hadoop.fs.{Path => HPath}
import org.apache.kudu.backup.Backup.TableMetadataPB
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.kudu.spark.kudu.SparkUtil._
-import org.apache.spark.sql.{SaveMode, SparkSession}
-import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
-import org.slf4j.{Logger, LoggerFactory}
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.SparkSession
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -78,8 +82,7 @@ object KuduBackup {
def main(args: Array[String]): Unit = {
val options = KuduBackupOptions
.parse(args)
- .getOrElse(
- throw new IllegalArgumentException("could not parse the arguments"))
+ .getOrElse(throw new IllegalArgumentException("could not parse the arguments"))
val session = SparkSession
.builder()
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
index 4d343ed..02bfa28 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
@@ -19,7 +19,8 @@ package org.apache.kudu.backup
import java.net.InetAddress
import org.apache.kudu.client.AsyncKuduClient
-import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
import scopt.OptionParser
@InterfaceAudience.Private
@@ -69,8 +70,7 @@ object KuduBackupOptions {
opt[Int]("scanBatchSize")
.action((v, o) => o.copy(scanBatchSize = v))
- .text(
- "The maximum number of bytes returned by the scanner, on each batch.")
+ .text("The maximum number of bytes returned by the scanner, on each batch.")
.optional()
opt[Int]("scanRequestTimeout")
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
index d116d35..3b75236 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
@@ -25,8 +25,11 @@ import org.apache.kudu.spark.kudu.KuduContext
import org.apache.kudu.util.HybridTimeUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+import org.apache.spark.Partition
+import org.apache.spark.SparkContext
+import org.apache.spark.TaskContext
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
import scala.collection.JavaConverters._
@@ -75,9 +78,7 @@ class KuduBackupRDD private[kudu] (
// TODO: Do we need a custom spark partitioner for any guarantees?
// override val partitioner = None
- override def compute(
- part: Partition,
- taskContext: TaskContext): Iterator[Row] = {
+ override def compute(part: Partition, taskContext: TaskContext): Iterator[Row] = {
val client: KuduClient = kuduContext.syncClient
val partition: KuduBackupPartition = part.asInstanceOf[KuduBackupPartition]
// TODO: Get deletes and updates for incremental backups.
@@ -91,10 +92,7 @@ class KuduBackupRDD private[kudu] (
}
}
-private case class KuduBackupPartition(
- index: Int,
- scanToken: Array[Byte],
- locations: Array[String])
+private case class KuduBackupPartition(index: Int, scanToken: Array[Byte], locations: Array[String])
extends Partition
/**
@@ -105,8 +103,7 @@ private case class KuduBackupPartition(
* that takes the job partitions and task context and expects to return an Iterator[Row].
* This implementation facilitates that.
*/
-private class RowIterator(private val scanner: KuduScanner)
- extends Iterator[Row] {
+private class RowIterator(private val scanner: KuduScanner) extends Iterator[Row] {
private var currentIterator: RowResultIterator = _
@@ -138,8 +135,7 @@ private class RowIterator(private val scanner: KuduScanner)
case Type.BINARY => rowResult.getBinaryCopy(i)
case Type.DECIMAL => rowResult.getDecimal(i)
case _ =>
- throw new RuntimeException(
- s"Unsupported column type: ${rowResult.getColumnType(i)}")
+ throw new RuntimeException(s"Unsupported column type: ${rowResult.getColumnType(i)}")
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 8555215..695d704 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -19,16 +19,21 @@ package org.apache.kudu.backup
import java.io.InputStreamReader
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
-import java.nio.file.{Path, Paths}
+import java.nio.file.Path
+import java.nio.file.Paths
import com.google.common.io.CharStreams
import com.google.protobuf.util.JsonFormat
-import org.apache.hadoop.fs.{FileSystem, Path => HPath}
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.{Path => HPath}
import org.apache.kudu.backup.Backup.TableMetadataPB
-import org.apache.kudu.spark.kudu.{KuduContext, KuduWriteOptions}
+import org.apache.kudu.spark.kudu.KuduContext
+import org.apache.kudu.spark.kudu.KuduWriteOptions
import org.apache.spark.sql.SparkSession
-import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
-import org.slf4j.{Logger, LoggerFactory}
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -62,26 +67,20 @@ object KuduRestore {
val df = session.sqlContext.read
.format(metadata.getDataFormat)
.load(tablePath.toString)
- val writeOptions = new KuduWriteOptions(
- ignoreDuplicateRowErrors = false,
- ignoreNull = false)
+ val writeOptions = new KuduWriteOptions(ignoreDuplicateRowErrors = false, ignoreNull = false)
// TODO: Use client directly for more control?
// (session timeout, consistency mode, flush interval, mutation buffer space)
context.insertRows(df, restoreName, writeOptions)
}
}
- private def getMetadataPath(
- tableName: String,
- options: KuduRestoreOptions): Path = {
+ private def getMetadataPath(tableName: String, options: KuduRestoreOptions): Path = {
val rootPath =
if (options.metadataPath.isEmpty) options.path else options.metadataPath
Paths.get(rootPath).resolve(tableName)
}
- private def readTableMetadata(
- path: Path,
- session: SparkSession): TableMetadataPB = {
+ private def readTableMetadata(path: Path, session: SparkSession): TableMetadataPB = {
val conf = session.sparkContext.hadoopConfiguration
val hPath = new HPath(path.resolve(TableMetadata.MetadataFileName).toString)
val fs = hPath.getFileSystem(conf)
@@ -96,8 +95,7 @@ object KuduRestore {
def main(args: Array[String]): Unit = {
val options = KuduRestoreOptions
.parse(args)
- .getOrElse(
- throw new IllegalArgumentException("could not parse the arguments"))
+ .getOrElse(throw new IllegalArgumentException("could not parse the arguments"))
val session = SparkSession
.builder()
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala
index 418c365..66de017 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestoreOptions.scala
@@ -18,7 +18,8 @@ package org.apache.kudu.backup
import java.net.InetAddress
-import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
import scopt.OptionParser
@InterfaceAudience.Private
@@ -41,8 +42,7 @@ object KuduRestoreOptions {
new OptionParser[KuduRestoreOptions]("KuduRestore") {
opt[String]("path")
.action((v, o) => o.copy(path = v))
- .text(
- "The root path to the backup data. Accepts any Spark compatible path.")
+ .text("The root path to the backup data. Accepts any Spark compatible path.")
.optional()
opt[String]("kuduMasterAddresses")
@@ -52,8 +52,7 @@ object KuduRestoreOptions {
opt[Boolean]("createTables")
.action((v, o) => o.copy(createTables = v))
- .text(
- "true to create tables during restore, false if they already exist.")
+ .text("true to create tables during restore, false if they already exist.")
.optional()
opt[String]("tableSuffix")
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
index 68e6f49..77cf919 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -21,15 +21,19 @@ import java.math.BigDecimal
import com.google.protobuf.StringValue
import org.apache.commons.net.util.Base64
import org.apache.kudu.backup.Backup._
-import org.apache.kudu.ColumnSchema.{
- ColumnSchemaBuilder,
- CompressionAlgorithm,
- Encoding
-}
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
+import org.apache.kudu.ColumnSchema.CompressionAlgorithm
+import org.apache.kudu.ColumnSchema.Encoding
import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
-import org.apache.kudu.client.{Bytes, CreateTableOptions, KuduTable, PartialRow}
-import org.apache.kudu.{ColumnSchema, Schema, Type}
-import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+import org.apache.kudu.client.Bytes
+import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.client.KuduTable
+import org.apache.kudu.client.PartialRow
+import org.apache.kudu.ColumnSchema
+import org.apache.kudu.Schema
+import org.apache.kudu.Type
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
import scala.collection.JavaConverters._
@@ -39,9 +43,7 @@ object TableMetadata {
val MetadataFileName = ".kudu-metadata.json"
- def getTableMetadata(
- table: KuduTable,
- options: KuduBackupOptions): TableMetadataPB = {
+ def getTableMetadata(table: KuduTable, options: KuduBackupOptions): TableMetadataPB = {
val columns = table.getSchema.getColumns.asScala.map { col =>
val builder = ColumnMetadataPB
.newBuilder()
@@ -56,8 +58,7 @@ object TableMetadata {
builder.setTypeAttributes(getTypeAttributesMetadata(col))
}
if (col.getDefaultValue != null) {
- builder.setDefaultValue(
- StringValue.of(valueToString(col.getDefaultValue, col.getType)))
+ builder.setDefaultValue(StringValue.of(valueToString(col.getDefaultValue, col.getType)))
}
builder.build()
}
@@ -74,8 +75,7 @@ object TableMetadata {
.build()
}
- private def getTypeAttributesMetadata(
- col: ColumnSchema): ColumnTypeAttributesMetadataPB = {
+ private def getTypeAttributesMetadata(col: ColumnSchema): ColumnTypeAttributesMetadataPB = {
val attributes = col.getTypeAttributes
ColumnTypeAttributesMetadataPB
.newBuilder()
@@ -94,8 +94,7 @@ object TableMetadata {
.build()
}
- private def getHashPartitionsMetadata(
- table: KuduTable): Seq[HashPartitionMetadataPB] = {
+ private def getHashPartitionsMetadata(table: KuduTable): Seq[HashPartitionMetadataPB] = {
val tableSchema = table.getSchema
val partitionSchema = table.getPartitionSchema
partitionSchema.getHashBucketSchemas.asScala.map { hs =>
@@ -111,27 +110,19 @@ object TableMetadata {
}
}
- private def getRangePartitionMetadata(
- table: KuduTable): RangePartitionMetadataPB = {
+ private def getRangePartitionMetadata(table: KuduTable): RangePartitionMetadataPB = {
val tableSchema = table.getSchema
val partitionSchema = table.getPartitionSchema
- val columnNames = partitionSchema.getRangeSchema.getColumnIds.asScala.map {
- id =>
- getColumnById(tableSchema, id).getName
+ val columnNames = partitionSchema.getRangeSchema.getColumnIds.asScala.map { id =>
+ getColumnById(tableSchema, id).getName
}
val bounds = table
.getRangePartitions(table.getAsyncClient.getDefaultOperationTimeoutMs)
.asScala
.map { p =>
- val lowerValues = getBoundValues(
- p.getDecodedRangeKeyStart(table),
- columnNames,
- tableSchema)
- val upperValues = getBoundValues(
- p.getDecodedRangeKeyEnd(table),
- columnNames,
- tableSchema)
+ val lowerValues = getBoundValues(p.getDecodedRangeKeyStart(table), columnNames, tableSchema)
+ val upperValues = getBoundValues(p.getDecodedRangeKeyEnd(table), columnNames, tableSchema)
RangeBoundsMetadataPB
.newBuilder()
.addAllUpperBounds(upperValues.asJava)
@@ -164,17 +155,11 @@ object TableMetadata {
}
}
- private def getPartialRow(
- values: Seq[ColumnValueMetadataPB],
- schema: Schema): PartialRow = {
+ private def getPartialRow(values: Seq[ColumnValueMetadataPB], schema: Schema): PartialRow = {
val row = schema.newPartialRow()
values.foreach { v =>
val colType = schema.getColumn(v.getColumnName).getType
- addValue(
- valueFromString(v.getValue, colType),
- row,
- v.getColumnName,
- colType)
+ addValue(valueFromString(v.getValue, colType), row, v.getColumnName, colType)
}
row
}
@@ -208,10 +193,7 @@ object TableMetadata {
new Schema(columns.asJava)
}
- private def getValue(
- row: PartialRow,
- columnName: String,
- colType: Type): Any = {
+ private def getValue(row: PartialRow, columnName: String, colType: Type): Any = {
colType match {
case Type.BOOL => row.getBoolean(columnName)
case Type.INT8 => row.getByte(columnName)
@@ -228,11 +210,7 @@ object TableMetadata {
}
}
- private def addValue(
- value: Any,
- row: PartialRow,
- columnName: String,
- colType: Type): Any = {
+ private def addValue(value: Any, row: PartialRow, columnName: String, colType: Type): Any = {
colType match {
case Type.BOOL => row.addBoolean(columnName, value.asInstanceOf[Boolean])
case Type.INT8 => row.addByte(columnName, value.asInstanceOf[Byte])
@@ -309,11 +287,10 @@ object TableMetadata {
val rangePartitionColumns =
metadata.getPartitions.getRangePartitions.getColumnNamesList
options.setRangePartitionColumns(rangePartitionColumns)
- metadata.getPartitions.getRangePartitions.getBoundsList.asScala.foreach {
- b =>
- val lower = getPartialRow(b.getLowerBoundsList.asScala, schema)
- val upper = getPartialRow(b.getUpperBoundsList.asScala, schema)
- options.addRangePartition(lower, upper)
+ metadata.getPartitions.getRangePartitions.getBoundsList.asScala.foreach { b =>
+ val lower = getPartialRow(b.getLowerBoundsList.asScala, schema)
+ val upper = getPartialRow(b.getUpperBoundsList.asScala, schema)
+ options.addRangePartition(lower, upper)
}
options
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
----------------------------------------------------------------------
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index b077b31..9ff9960 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -22,17 +22,26 @@ import java.util
import com.google.common.base.Objects
import org.apache.commons.io.FileUtils
-import org.apache.kudu.ColumnSchema.{ColumnSchemaBuilder, CompressionAlgorithm, Encoding}
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
+import org.apache.kudu.ColumnSchema.CompressionAlgorithm
+import org.apache.kudu.ColumnSchema.Encoding
import org.apache.kudu.client.PartitionSchema.HashBucketSchema
-import org.apache.kudu.client.{CreateTableOptions, KuduTable, PartialRow, PartitionSchema, TestUtils}
-import org.apache.kudu.{ColumnSchema, Schema, Type}
+import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.client.KuduTable
+import org.apache.kudu.client.PartialRow
+import org.apache.kudu.client.PartitionSchema
+import org.apache.kudu.client.TestUtils
+import org.apache.kudu.ColumnSchema
+import org.apache.kudu.Schema
+import org.apache.kudu.Type
import org.apache.kudu.spark.kudu._
import org.apache.kudu.util.DecimalUtil
import org.junit.Assert._
import org.junit.Test
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.util.Random
@@ -47,7 +56,8 @@ class TestKuduBackup extends KuduTestSuite {
backupAndRestore(tableName)
- val rdd = kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore", List("key"))
+ val rdd =
+ kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore", List("key"))
assert(rdd.collect.length == 100)
val tA = kuduClient.openTable(tableName)
@@ -70,7 +80,8 @@ class TestKuduBackup extends KuduTestSuite {
backupAndRestore(impalaTableName)
- val rdd = kuduContext.kuduRDD(ss.sparkContext, s"$impalaTableName-restore", List("key"))
+ val rdd = kuduContext
+ .kuduRDD(ss.sparkContext, s"$impalaTableName-restore", List("key"))
// Only verifying the file contents could be read, the contents are expected to be empty.
assert(rdd.isEmpty())
}
@@ -86,7 +97,8 @@ class TestKuduBackup extends KuduTestSuite {
backupAndRestore(tableName)
val backupRows = kuduContext.kuduRDD(ss.sparkContext, s"$tableName").collect
- val restoreRows = kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore").collect
+ val restoreRows =
+ kuduContext.kuduRDD(ss.sparkContext, s"$tableName-restore").collect
assertEquals(backupRows.length, restoreRows.length)
val tA = kuduClient.openTable(tableName)
@@ -110,10 +122,12 @@ class TestKuduBackup extends KuduTestSuite {
backupAndRestore(table1Name, table2Name)
- val rdd1 = kuduContext.kuduRDD(ss.sparkContext, s"$table1Name-restore", List("key"))
+ val rdd1 =
+ kuduContext.kuduRDD(ss.sparkContext, s"$table1Name-restore", List("key"))
assertResult(numRows)(rdd1.count())
- val rdd2 = kuduContext.kuduRDD(ss.sparkContext, s"$table2Name-restore", List("key"))
+ val rdd2 =
+ kuduContext.kuduRDD(ss.sparkContext, s"$table2Name-restore", List("key"))
assertResult(numRows)(rdd2.count())
}
@@ -130,14 +144,15 @@ class TestKuduBackup extends KuduTestSuite {
def columnsMatch(before: ColumnSchema, after: ColumnSchema): Boolean = {
if (before eq after) return true
Objects.equal(before.getName, after.getName) &&
- Objects.equal(before.getType, after.getType) &&
- Objects.equal(before.isKey, after.isKey) &&
- Objects.equal(before.isNullable, after.isNullable) &&
- defaultValuesMatch(before.getDefaultValue, after.getDefaultValue) &&
- Objects.equal(before.getDesiredBlockSize, after.getDesiredBlockSize) &&
- Objects.equal(before.getEncoding, after.getEncoding) &&
- Objects.equal(before.getCompressionAlgorithm, after.getCompressionAlgorithm) &&
- Objects.equal(before.getTypeAttributes, after.getTypeAttributes)
+ Objects.equal(before.getType, after.getType) &&
+ Objects.equal(before.isKey, after.isKey) &&
+ Objects.equal(before.isNullable, after.isNullable) &&
+ defaultValuesMatch(before.getDefaultValue, after.getDefaultValue) &&
+ Objects.equal(before.getDesiredBlockSize, after.getDesiredBlockSize) &&
+ Objects.equal(before.getEncoding, after.getEncoding) &&
+ Objects
+ .equal(before.getCompressionAlgorithm, after.getCompressionAlgorithm) &&
+ Objects.equal(before.getTypeAttributes, after.getTypeAttributes)
}
// Special handling because default values can be a byte array which is not
@@ -161,14 +176,14 @@ class TestKuduBackup extends KuduTestSuite {
HashBucketSchemasMatch(beforeBuckets(i), afterBuckets(i))
}
hashBucketsMatch &&
- Objects.equal(before.getRangeSchema.getColumnIds, after.getRangeSchema.getColumnIds)
+ Objects.equal(before.getRangeSchema.getColumnIds, after.getRangeSchema.getColumnIds)
}
def HashBucketSchemasMatch(before: HashBucketSchema, after: HashBucketSchema): Boolean = {
if (before eq after) return true
Objects.equal(before.getColumnIds, after.getColumnIds) &&
- Objects.equal(before.getNumBuckets, after.getNumBuckets) &&
- Objects.equal(before.getSeed, after.getSeed)
+ Objects.equal(before.getNumBuckets, after.getNumBuckets) &&
+ Objects.equal(before.getSeed, after.getSeed)
}
// TODO: Move to a test utility in kudu-client since it's generally useful.
@@ -177,8 +192,11 @@ class TestKuduBackup extends KuduTestSuite {
val keyCount = Random.nextInt(columnCount) + 1 // At least one key.
val types = Type.values()
- val keyTypes = types.filter { t => !Array(Type.BOOL, Type.FLOAT, Type.DOUBLE).contains(t)}
- val compressions = CompressionAlgorithm.values().filter(_ != CompressionAlgorithm.UNKNOWN)
+ val keyTypes = types.filter { t =>
+ !Array(Type.BOOL, Type.FLOAT, Type.DOUBLE).contains(t)
+ }
+ val compressions =
+ CompressionAlgorithm.values().filter(_ != CompressionAlgorithm.UNKNOWN)
val blockSizes = Array(0, 4096, 524288, 1048576) // Default, min, middle, max.
val columns = (0 until columnCount).map { i =>
@@ -189,18 +207,22 @@ class TestKuduBackup extends KuduTestSuite {
types(Random.nextInt(types.length))
}
val precision = Random.nextInt(DecimalUtil.MAX_DECIMAL_PRECISION) + 1
- val scale = Random.nextInt(precision)
+ val scale = Random.nextInt(precision)
val typeAttributes = DecimalUtil.typeAttributes(precision, scale)
val nullable = Random.nextBoolean() && !key
val compression = compressions(Random.nextInt(compressions.length))
val blockSize = blockSizes(Random.nextInt(blockSizes.length))
val encodings = t match {
- case Type.INT8 | Type.INT16 | Type.INT32 |Type.INT64 | Type.UNIXTIME_MICROS =>
+ case Type.INT8 | Type.INT16 | Type.INT32 | Type.INT64 | Type.UNIXTIME_MICROS =>
Array(Encoding.AUTO_ENCODING, Encoding.PLAIN_ENCODING, Encoding.BIT_SHUFFLE, Encoding.RLE)
- case Type.FLOAT | Type.DOUBLE |Type.DECIMAL =>
+ case Type.FLOAT | Type.DOUBLE | Type.DECIMAL =>
Array(Encoding.AUTO_ENCODING, Encoding.PLAIN_ENCODING, Encoding.BIT_SHUFFLE)
case Type.STRING | Type.BINARY =>
- Array(Encoding.AUTO_ENCODING, Encoding.PLAIN_ENCODING, Encoding.PREFIX_ENCODING, Encoding.DICT_ENCODING)
+ Array(
+ Encoding.AUTO_ENCODING,
+ Encoding.PLAIN_ENCODING,
+ Encoding.PREFIX_ENCODING,
+ Encoding.DICT_ENCODING)
case Type.BOOL =>
Array(Encoding.AUTO_ENCODING, Encoding.PLAIN_ENCODING, Encoding.RLE)
case _ => throw new IllegalArgumentException(s"Unsupported type $t")
@@ -223,15 +245,18 @@ class TestKuduBackup extends KuduTestSuite {
t match {
case Type.BOOL => Random.nextBoolean()
case Type.INT8 => Random.nextInt(Byte.MaxValue).asInstanceOf[Byte]
- case Type.INT16 => Random.nextInt(Short.MaxValue).asInstanceOf[Short]
+ case Type.INT16 =>
+ Random.nextInt(Short.MaxValue).asInstanceOf[Short]
case Type.INT32 => Random.nextInt()
case Type.INT64 | Type.UNIXTIME_MICROS => Random.nextLong()
case Type.FLOAT => Random.nextFloat()
case Type.DOUBLE => Random.nextDouble()
case Type.DECIMAL =>
- DecimalUtil.minValue(typeAttributes.getPrecision, typeAttributes.getScale)
+ DecimalUtil
+ .minValue(typeAttributes.getPrecision, typeAttributes.getScale)
case Type.STRING => Random.nextString(Random.nextInt(100))
- case Type.BINARY => Random.nextString(Random.nextInt(100)).getBytes()
+ case Type.BINARY =>
+ Random.nextString(Random.nextInt(100)).getBytes()
case _ => throw new IllegalArgumentException(s"Unsupported type $t")
}
builder.defaultValue(defaultValue)
@@ -249,7 +274,7 @@ class TestKuduBackup extends KuduTestSuite {
val hashColumn = keyColumns(level)
val hashBuckets = Random.nextInt(8) + 2 // Minimum of 2 hash buckets.
val hashSeed = Random.nextInt()
- options.addHashPartitions(List(hashColumn.getName).asJava, hashBuckets, hashSeed)
+ options.addHashPartitions(List(hashColumn.getName).asJava, hashBuckets, hashSeed)
}
val hasRangePartition = Random.nextBoolean() && keyColumns.exists(_.getType == Type.INT64)
if (hasRangePartition) {
@@ -307,7 +332,8 @@ class TestKuduBackup extends KuduTestSuite {
row.addDouble(col.getName, Random.nextDouble())
case Type.DECIMAL =>
val attributes = col.getTypeAttributes
- val max = DecimalUtil.maxValue(attributes.getPrecision, attributes.getScale)
+ val max = DecimalUtil
+ .maxValue(attributes.getPrecision, attributes.getScale)
row.addDecimal(col.getName, max)
case Type.STRING =>
row.addString(col.getName, Random.nextString(Random.nextInt(100)))
@@ -327,10 +353,12 @@ class TestKuduBackup extends KuduTestSuite {
val dir = Files.createTempDirectory("backup")
val path = dir.toUri.toString
- val backupOptions = new KuduBackupOptions(tableNames, path, miniCluster.getMasterAddresses)
+ val backupOptions =
+ new KuduBackupOptions(tableNames, path, miniCluster.getMasterAddresses)
KuduBackup.run(backupOptions, ss)
- val restoreOptions = new KuduRestoreOptions(tableNames, path, miniCluster.getMasterAddresses)
+ val restoreOptions =
+ new KuduRestoreOptions(tableNames, path, miniCluster.getMasterAddresses)
KuduRestore.run(restoreOptions, ss)
FileUtils.deleteDirectory(dir.toFile)
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
index 0625db4..7c8bb66 100644
--- a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
@@ -23,9 +23,11 @@ import org.apache.kudu.client.KuduClient
import org.apache.kudu.spark.tools.ImportExportKudu.ArgsCls
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import org.apache.kudu.spark.kudu._
-import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
@InterfaceAudience.Public
@InterfaceStability.Unstable //TODO: Unstable due to KUDU-2454
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
index 47161da..e5fa1c0 100644
--- a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
+++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/IntegrationTestBigLinkedList.scala
@@ -20,16 +20,19 @@ package org.apache.kudu.spark.tools
import java.net.InetAddress
import org.apache.kudu.client.SessionConfiguration.FlushMode
-import org.apache.kudu.client.{KuduClient, KuduSession, KuduTable}
-import org.apache.kudu.mapreduce.tools.BigLinkedListCommon.{
- Xoroshiro128PlusRandom,
- _
-}
+import org.apache.kudu.client.KuduClient
+import org.apache.kudu.client.KuduSession
+import org.apache.kudu.client.KuduTable
+import org.apache.kudu.mapreduce.tools.BigLinkedListCommon.Xoroshiro128PlusRandom
+import org.apache.kudu.mapreduce.tools.BigLinkedListCommon._
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.SparkSession
-import org.apache.spark.{SparkConf, TaskContext}
-import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
-import org.slf4j.{Logger, LoggerFactory}
+import org.apache.spark.SparkConf
+import org.apache.spark.TaskContext
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import scala.util.Try
@@ -107,13 +110,11 @@ object IntegrationTestBigLinkedList {
}
object Generator {
- import IntegrationTestBigLinkedList.{
- LOG,
- defaultMasterAddrs,
- fail,
- nanosToHuman,
- parseIntFlag
- }
+ import IntegrationTestBigLinkedList.LOG
+ import IntegrationTestBigLinkedList.defaultMasterAddrs
+ import IntegrationTestBigLinkedList.fail
+ import IntegrationTestBigLinkedList.nanosToHuman
+ import IntegrationTestBigLinkedList.parseIntFlag
def usage: String =
s"""
@@ -180,11 +181,8 @@ object Generator {
val client: KuduClient = kc.syncClient
if (!client.tableExists(args.tableName)) {
val schema = getTableSchema
- val options = getCreateTableOptions(
- schema,
- args.replicas,
- args.rangePartitions,
- args.hashPartitions)
+ val options =
+ getCreateTableOptions(schema, args.replicas, args.rangePartitions, args.hashPartitions)
client.createTable(args.tableName, getTableSchema, options)
}
@@ -296,7 +294,9 @@ object Generator {
}
object Verifier {
- import IntegrationTestBigLinkedList.{defaultMasterAddrs, fail, parseLongFlag}
+ import IntegrationTestBigLinkedList.defaultMasterAddrs
+ import IntegrationTestBigLinkedList.fail
+ import IntegrationTestBigLinkedList.parseLongFlag
def usage: String =
s"""
@@ -337,11 +337,7 @@ object Verifier {
}
}
- case class Counts(
- referenced: Long,
- unreferenced: Long,
- extrareferences: Long,
- undefined: Long)
+ case class Counts(referenced: Long, unreferenced: Long, extrareferences: Long, undefined: Long)
/**
* Verifies the expected count against the count of nodes from a verification run.
@@ -456,7 +452,8 @@ object Verifier {
}
object Looper {
- import IntegrationTestBigLinkedList.{LOG, fail}
+ import IntegrationTestBigLinkedList.LOG
+ import IntegrationTestBigLinkedList.fail
def main(args: Array[String]): Unit = {
val conf =
@@ -473,8 +470,7 @@ object Looper {
val count = Verifier.run(verifyArgs, ss)
val expected = verifyArgs.nodes.map(_ + nodesPerLoop)
Verifier.verify(expected, count).map(fail)
- verifyArgs =
- verifyArgs.copy(nodes = Some(expected.getOrElse(nodesPerLoop)))
+ verifyArgs = verifyArgs.copy(nodes = Some(expected.getOrElse(nodesPerLoop)))
LOG.info("*************************************************")
LOG.info(s"Completed $n loops. Nodes verified: ${count.referenced}")
LOG.info("*************************************************")
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/ITBigLinkedListTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/ITBigLinkedListTest.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/ITBigLinkedListTest.scala
index 8caf2c2..01ef8e6 100644
--- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/ITBigLinkedListTest.scala
+++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/ITBigLinkedListTest.scala
@@ -29,14 +29,18 @@ class ITBigLinkedListTest extends KuduTestSuite {
@Test
def testSparkITBLL() {
- Generator.testMain(Array("--tasks=2",
- "--lists=2",
- "--nodes=10000",
- "--hash-partitions=2",
- "--range-partitions=2",
- "--replicas=1",
- s"--master-addrs=${miniCluster.getMasterAddresses}"),
- ss)
+ Generator.testMain(
+ Array(
+ "--tasks=2",
+ "--lists=2",
+ "--nodes=10000",
+ "--hash-partitions=2",
+ "--range-partitions=2",
+ "--replicas=1",
+ s"--master-addrs=${miniCluster.getMasterAddresses}"
+ ),
+ ss
+ )
// Insert bad nodes in order to test the verifier:
//
@@ -51,9 +55,7 @@ class ITBigLinkedListTest extends KuduTestSuite {
val session = kuduClient.newSession()
session.setFlushMode(FlushMode.MANUAL_FLUSH)
- for ((key1, key2, prev1, prev2) <- List((0, 0, -1, -1),
- (0, 1, 0, 0),
- (0, 2, 0, 0))) {
+ for ((key1, key2, prev1, prev2) <- List((0, 0, -1, -1), (0, 1, 0, 0), (0, 2, 0, 0))) {
val insert = table.newInsert()
insert.getRow.addLong(COLUMN_KEY_ONE_IDX, key1)
insert.getRow.addLong(COLUMN_KEY_TWO_IDX, key2)
@@ -73,10 +75,11 @@ class ITBigLinkedListTest extends KuduTestSuite {
}
}
- val counts = Verifier.testMain(Array(s"--master-addrs=${miniCluster.getMasterAddresses}"), ss)
+ val counts = Verifier
+ .testMain(Array(s"--master-addrs=${miniCluster.getMasterAddresses}"), ss)
assertEquals(2 * 2 * 10000, counts.referenced)
assertEquals(1, counts.extrareferences)
assertEquals(2, counts.unreferenced)
assertEquals(1, counts.undefined)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
index 1bde85d..d491682 100644
--- a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
+++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
@@ -20,7 +20,8 @@ package org.apache.kudu.spark.tools
import java.nio.file.Paths
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
-import org.apache.kudu.{Schema, Type}
+import org.apache.kudu.Schema
+import org.apache.kudu.Type
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu._
import org.junit.Assert._
@@ -40,29 +41,39 @@ class TestImportExportFiles extends KuduTestSuite {
val columns = ImmutableList.of(
new ColumnSchemaBuilder("key", Type.STRING).key(true).build(),
new ColumnSchemaBuilder("column1_i", Type.STRING).build(),
- new ColumnSchemaBuilder("column2_d", Type.STRING).nullable(true).build(),
+ new ColumnSchemaBuilder("column2_d", Type.STRING)
+ .nullable(true)
+ .build(),
new ColumnSchemaBuilder("column3_s", Type.STRING).build(),
- new ColumnSchemaBuilder("column4_b", Type.STRING).build())
+ new ColumnSchemaBuilder("column4_b", Type.STRING).build()
+ )
new Schema(columns)
}
- val tableOptions = new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
+ val tableOptions = new CreateTableOptions()
+ .setRangePartitionColumns(List("key").asJava)
.setNumReplicas(1)
kuduClient.createTable(TABLE_NAME, schema, tableOptions)
// Get the absolute path of the resource file.
- val schemaResource = classOf[TestImportExportFiles].getResource(TABLE_DATA_PATH)
+ val schemaResource =
+ classOf[TestImportExportFiles].getResource(TABLE_DATA_PATH)
val dataPath = Paths.get(schemaResource.toURI).toAbsolutePath
- ImportExportFiles.testMain(Array("--operation=import",
- "--format=csv",
- s"--master-addrs=${miniCluster.getMasterAddresses}",
- s"--path=$dataPath",
- s"--table-name=$TABLE_NAME",
- "--delimiter=,",
- "--header=true",
- "--inferschema=true"), ss)
+ ImportExportFiles.testMain(
+ Array(
+ "--operation=import",
+ "--format=csv",
+ s"--master-addrs=${miniCluster.getMasterAddresses}",
+ s"--path=$dataPath",
+ s"--table-name=$TABLE_NAME",
+ "--delimiter=,",
+ "--header=true",
+ "--inferschema=true"
+ ),
+ ss
+ )
val rdd = kuduContext.kuduRDD(ss.sparkContext, TABLE_NAME, List("key"))
assert(rdd.collect.length == 4)
- assertEquals(rdd.collect().mkString(","),"[1],[2],[3],[4]")
+ assertEquals(rdd.collect().mkString(","), "[1],[2],[3],[4]")
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 7970459..3f9ae0a 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -26,7 +26,10 @@ import scala.util.Try
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SaveMode
import org.apache.yetus.audience.InterfaceStability
import org.apache.kudu.client.KuduPredicate.ComparisonOp
import org.apache.kudu.client._
@@ -42,9 +45,7 @@ import org.apache.kudu.spark.kudu.SparkUtil._
*/
@InterfaceStability.Unstable
class DefaultSource
- extends RelationProvider
- with CreatableRelationProvider
- with SchemaRelationProvider {
+ extends RelationProvider with CreatableRelationProvider with SchemaRelationProvider {
val TABLE_KEY = "kudu.table"
val KUDU_MASTER = "kudu.master"
@@ -81,15 +82,13 @@ class DefaultSource
throw new IllegalArgumentException(
s"Kudu table name must be specified in create options using key '$TABLE_KEY'"))
val kuduMaster = parameters.getOrElse(KUDU_MASTER, defaultMasterAddrs)
- val operationType = getOperationType(
- parameters.getOrElse(OPERATION, "upsert"))
+ val operationType = getOperationType(parameters.getOrElse(OPERATION, "upsert"))
val faultTolerantScanner =
Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean)
.getOrElse(false)
- val scanLocality = getScanLocalityType(
- parameters.getOrElse(SCAN_LOCALITY, "closest_replica"))
- val ignoreDuplicateRowErrors = Try(
- parameters(IGNORE_DUPLICATE_ROW_ERRORS).toBoolean).getOrElse(false) ||
+ val scanLocality = getScanLocalityType(parameters.getOrElse(SCAN_LOCALITY, "closest_replica"))
+ val ignoreDuplicateRowErrors = Try(parameters(IGNORE_DUPLICATE_ROW_ERRORS).toBoolean)
+ .getOrElse(false) ||
Try(parameters(OPERATION) == "insert-ignore").getOrElse(false)
val ignoreNull =
Try(parameters.getOrElse(IGNORE_NULL, "false").toBoolean).getOrElse(false)
@@ -129,8 +128,7 @@ class DefaultSource
case SaveMode.Append =>
kuduRelation.asInstanceOf[KuduRelation].insert(data, false)
case _ =>
- throw new UnsupportedOperationException(
- "Currently, only Append is supported")
+ throw new UnsupportedOperationException("Currently, only Append is supported")
}
kuduRelation
@@ -146,13 +144,11 @@ class DefaultSource
s"Kudu table name must be specified in create options " +
s"using key '$TABLE_KEY'"))
val kuduMaster = parameters.getOrElse(KUDU_MASTER, defaultMasterAddrs)
- val operationType = getOperationType(
- parameters.getOrElse(OPERATION, "upsert"))
+ val operationType = getOperationType(parameters.getOrElse(OPERATION, "upsert"))
val faultTolerantScanner =
Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean)
.getOrElse(false)
- val scanLocality = getScanLocalityType(
- parameters.getOrElse(SCAN_LOCALITY, "closest_replica"))
+ val scanLocality = getScanLocalityType(parameters.getOrElse(SCAN_LOCALITY, "closest_replica"))
new KuduRelation(
tableName,
@@ -174,8 +170,7 @@ class DefaultSource
case "update" => Update
case "delete" => Delete
case _ =>
- throw new IllegalArgumentException(
- s"Unsupported operation type '$opParam'")
+ throw new IllegalArgumentException(s"Unsupported operation type '$opParam'")
}
}
@@ -184,8 +179,7 @@ class DefaultSource
case "leader_only" => ReplicaSelection.LEADER_ONLY
case "closest_replica" => ReplicaSelection.CLOSEST_REPLICA
case _ =>
- throw new IllegalArgumentException(
- s"Unsupported replica selection type '$opParam'")
+ throw new IllegalArgumentException(s"Unsupported replica selection type '$opParam'")
}
}
}
@@ -215,11 +209,8 @@ class KuduRelation(
private[kudu] val socketReadTimeoutMs: Option[Long],
private val operationType: OperationType,
private val userSchema: Option[StructType],
- private val writeOptions: KuduWriteOptions = new KuduWriteOptions)(
- val sqlContext: SQLContext)
- extends BaseRelation
- with PrunedFilteredScan
- with InsertableRelation {
+ private val writeOptions: KuduWriteOptions = new KuduWriteOptions)(val sqlContext: SQLContext)
+ extends BaseRelation with PrunedFilteredScan with InsertableRelation {
private val context: KuduContext =
new KuduContext(masterAddrs, sqlContext.sparkContext, socketReadTimeoutMs)
@@ -246,9 +237,7 @@ class KuduRelation(
* @param filters filters that are being applied by the requesting query
* @return RDD will all the results from Kudu
*/
- override def buildScan(
- requiredColumns: Array[String],
- filters: Array[Filter]): RDD[Row] = {
+ override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val predicates = filters.flatMap(filterToPredicate)
new KuduRDD(
context,
@@ -287,8 +276,7 @@ class KuduRelation(
case StringStartsWith(column, prefix) =>
prefixInfimum(prefix) match {
case None =>
- Array(
- comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, prefix))
+ Array(comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, prefix))
case Some(inf) =>
Array(
comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, prefix),
@@ -362,12 +350,8 @@ class KuduRelation(
* @param values the values
* @return the in list predicate
*/
- private def inListPredicate(
- column: String,
- values: Array[Any]): KuduPredicate = {
- KuduPredicate.newInListPredicate(
- table.getSchema.getColumn(column),
- values.toList.asJava)
+ private def inListPredicate(column: String, values: Array[Any]): KuduPredicate = {
+ KuduPredicate.newInListPredicate(table.getSchema.getColumn(column), values.toList.asJava)
}
/**
@@ -416,9 +400,8 @@ private[spark] object KuduRelation {
*/
// formatter: off
private def supportsFilter(filter: Filter): Boolean = filter match {
- case EqualTo(_, _) | GreaterThan(_, _) | GreaterThanOrEqual(_, _) |
- LessThan(_, _) | LessThanOrEqual(_, _) | In(_, _) |
- StringStartsWith(_, _) | IsNull(_) | IsNotNull(_) =>
+ case EqualTo(_, _) | GreaterThan(_, _) | GreaterThanOrEqual(_, _) | LessThan(_, _) |
+ LessThanOrEqual(_, _) | In(_, _) | StringStartsWith(_, _) | IsNull(_) | IsNotNull(_) =>
true
case And(left, right) => supportsFilter(left) && supportsFilter(right)
case _ => false
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index d155270..b0fb257 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -17,29 +17,35 @@
package org.apache.kudu.spark.kudu
-import java.security.{AccessController, PrivilegedAction}
+import java.security.AccessController
+import java.security.PrivilegedAction
import javax.security.auth.Subject
-import javax.security.auth.login.{
- AppConfigurationEntry,
- Configuration,
- LoginContext
-}
+import javax.security.auth.login.AppConfigurationEntry
+import javax.security.auth.login.Configuration
+import javax.security.auth.login.LoginContext
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructType}
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.DataTypes
+import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Row
import org.apache.spark.util.AccumulatorV2
-import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
-import org.slf4j.{Logger, LoggerFactory}
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import org.apache.kudu.client.SessionConfiguration.FlushMode
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu.SparkUtil._
-import org.apache.kudu.{Schema, Type}
+import org.apache.kudu.Schema
+import org.apache.kudu.Type
/**
* KuduContext is a serializable container for Kudu client connections.
@@ -49,10 +55,7 @@ import org.apache.kudu.{Schema, Type}
* as a serializable field.
*/
@InterfaceStability.Unstable
-class KuduContext(
- val kuduMaster: String,
- sc: SparkContext,
- val socketReadTimeoutMs: Option[Long])
+class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeoutMs: Option[Long])
extends Serializable {
def this(kuduMaster: String, sc: SparkContext) = this(kuduMaster, sc, None)
@@ -190,10 +193,7 @@ class KuduContext(
* @param options replication and partitioning options for the table
* @return the KuduTable that was created
*/
- def createTable(
- tableName: String,
- schema: Schema,
- options: CreateTableOptions): KuduTable = {
+ def createTable(tableName: String, schema: Schema, options: CreateTableOptions): KuduTable = {
syncClient.createTable(tableName, schema, options)
}
@@ -372,8 +372,7 @@ class KuduContext(
case DecimalType() =>
operation.getRow.addDecimal(kuduIdx, row.getDecimal(sparkIdx))
case t =>
- throw new IllegalArgumentException(
- s"No support for Spark SQL type $t")
+ throw new IllegalArgumentException(s"No support for Spark SQL type $t")
}
}
}
@@ -419,8 +418,7 @@ private object KuduContext {
Log.info(s"Logging in as principal $principal with keytab $keytab")
val conf = new Configuration {
- override def getAppConfigurationEntry(
- name: String): Array[AppConfigurationEntry] = {
+ override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
val options = Map(
"principal" -> principal,
"keyTab" -> keytab,
@@ -445,9 +443,7 @@ private object KuduContext {
}
private object KuduClientCache {
- private case class CacheKey(
- kuduMaster: String,
- socketReadTimeoutMs: Option[Long])
+ private case class CacheKey(kuduMaster: String, socketReadTimeoutMs: Option[Long])
/**
* Set to
@@ -463,9 +459,7 @@ private object KuduClientCache {
// Visible for testing.
private[kudu] def clearCacheForTests() = clientCache.clear()
- def getAsyncClient(
- kuduMaster: String,
- socketReadTimeoutMs: Option[Long]): AsyncKuduClient = {
+ def getAsyncClient(kuduMaster: String, socketReadTimeoutMs: Option[Long]): AsyncKuduClient = {
val cacheKey = CacheKey(kuduMaster, socketReadTimeoutMs)
clientCache.synchronized {
if (!clientCache.contains(cacheKey)) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 022670b..ccc60fd 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -20,10 +20,13 @@ import scala.collection.JavaConverters._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
-import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.Partition
+import org.apache.spark.SparkContext
+import org.apache.spark.TaskContext
import org.apache.kudu.client._
-import org.apache.kudu.{Type, client}
+import org.apache.kudu.Type
+import org.apache.kudu.client
/**
* A Resilient Distributed Dataset backed by a Kudu table.
@@ -79,16 +82,13 @@ class KuduRDD private[kudu] (
if (scanLocality == ReplicaSelection.LEADER_ONLY) {
locations = Array(token.getTablet.getLeaderReplica.getRpcHost)
} else {
- locations =
- token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray
+ locations = token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray
}
new KuduPartition(index, token.serialize(), locations)
}.toArray
}
- override def compute(
- part: Partition,
- taskContext: TaskContext): Iterator[Row] = {
+ override def compute(part: Partition, taskContext: TaskContext): Iterator[Row] = {
val client: KuduClient = kuduContext.syncClient
val partition: KuduPartition = part.asInstanceOf[KuduPartition]
val scanner =
@@ -115,9 +115,7 @@ private class KuduPartition(
* @param scanner the wrapped scanner
* @param kuduContext the kudu context
*/
-private class RowIterator(
- private val scanner: KuduScanner,
- private val kuduContext: KuduContext)
+private class RowIterator(private val scanner: KuduScanner, private val kuduContext: KuduContext)
extends Iterator[Row] {
private var currentIterator: RowResultIterator = null
@@ -131,8 +129,7 @@ private class RowIterator(
currentIterator = scanner.nextRows()
// Update timestampAccumulator with the client's last propagated
// timestamp on each executor.
- kuduContext.timestampAccumulator.add(
- kuduContext.syncClient.getLastPropagatedTimestamp)
+ kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp)
}
currentIterator.hasNext
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
index 704ce42..143ea57 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
@@ -17,7 +17,8 @@
package org.apache.kudu.spark.kudu
-import org.apache.kudu.client.{KuduTable, Operation}
+import org.apache.kudu.client.KuduTable
+import org.apache.kudu.client.Operation
/**
* OperationType enumerates the types of Kudu write operations.
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
index f8394c2..69b6ca4 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/SparkUtil.scala
@@ -3,9 +3,13 @@ package org.apache.kudu.spark.kudu
import java.util
import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
-import org.apache.kudu.{ColumnSchema, ColumnTypeAttributes, Schema, Type}
+import org.apache.kudu.ColumnSchema
+import org.apache.kudu.ColumnTypeAttributes
+import org.apache.kudu.Schema
+import org.apache.kudu.Type
import org.apache.spark.sql.types._
-import org.apache.yetus.audience.{InterfaceAudience, InterfaceStability}
+import org.apache.yetus.audience.InterfaceAudience
+import org.apache.yetus.audience.InterfaceStability
import scala.collection.JavaConverters._
@@ -66,9 +70,7 @@ object SparkUtil {
* @param fields an optional column projection
* @return the SparkSQL schema
*/
- def sparkSchema(
- kuduSchema: Schema,
- fields: Option[Seq[String]] = None): StructType = {
+ def sparkSchema(kuduSchema: Schema, fields: Option[Seq[String]] = None): StructType = {
val kuduColumns = fields match {
case Some(fieldNames) => fieldNames.map(kuduSchema.getColumn)
case None => kuduSchema.getColumns.asScala
@@ -96,8 +98,7 @@ object SparkUtil {
kuduCols.add(col)
}
// now add the non-key columns
- for (field <- sparkSchema.fields.filter(field =>
- !keys.contains(field.name))) {
+ for (field <- sparkSchema.fields.filter(field => !keys.contains(field.name))) {
val col = createColumnSchema(field, isKey = false)
kuduCols.add(col)
}
@@ -111,9 +112,7 @@ object SparkUtil {
* @param isKey true if the column is a key
* @return the Kudu column schema
*/
- private def createColumnSchema(
- field: StructField,
- isKey: Boolean): ColumnSchema = {
+ private def createColumnSchema(field: StructField, isKey: Boolean): ColumnSchema = {
val kt = sparkTypeToKuduType(field.dataType)
val col = new ColumnSchema.ColumnSchemaBuilder(field.name, kt)
.key(isKey)
http://git-wip-us.apache.org/repos/asf/kudu/blob/a417127a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
index 25d5464..4a27b65 100755
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
@@ -16,7 +16,9 @@
*/
package org.apache.kudu.spark
-import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.DataFrameReader
+import org.apache.spark.sql.DataFrameWriter
package object kudu {