You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2016/04/27 00:45:44 UTC
incubator-kudu git commit: Adding kudu datasource for spark
Repository: incubator-kudu
Updated Branches:
refs/heads/master 342b7a704 -> 938f7feaf
Adding kudu datasource for spark
Change-Id: I0f91772f58e9eee9de45901866867e9a5014cfbe
Reviewed-on: http://gerrit.cloudera.org:8080/2848
Reviewed-by: Dan Burkert <da...@cloudera.com>
Tested-by: Dan Burkert <da...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/938f7fea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/938f7fea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/938f7fea
Branch: refs/heads/master
Commit: 938f7feaf90d76bc7f4d2823c61808a9b140168f
Parents: 342b7a7
Author: cgeorge <ch...@rms.com>
Authored: Mon Apr 11 11:46:13 2016 -0600
Committer: Dan Burkert <da...@cloudera.com>
Committed: Tue Apr 26 22:45:10 2016 +0000
----------------------------------------------------------------------
docs/developing.adoc | 22 ++
docs/release_notes.adoc | 4 +
java/kudu-spark/pom.xml | 47 ++--
.../scala/org/kududb/spark/DefaultSource.scala | 154 ------------
.../scala/org/kududb/spark/KuduContext.scala | 91 -------
.../org/kududb/spark/kudu/DefaultSource.scala | 235 +++++++++++++++++++
.../org/kududb/spark/kudu/KuduContext.scala | 78 ++++++
.../scala/org/kududb/spark/kudu/KuduRDD.scala | 133 +++++++++++
.../scala/org/kududb/spark/kudu/package.scala | 30 +++
.../src/test/resources/log4j.properties | 4 +-
.../org/kududb/spark/DefaultSourceTest.scala | 44 ----
.../org/kududb/spark/KuduContextTest.scala | 35 ---
.../scala/org/kududb/spark/TestContext.scala | 89 -------
.../kududb/spark/kudu/DefaultSourceTest.scala | 171 ++++++++++++++
.../org/kududb/spark/kudu/KuduContextTest.scala | 35 +++
.../org/kududb/spark/kudu/TestContext.scala | 115 +++++++++
16 files changed, 847 insertions(+), 440 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/docs/developing.adoc
----------------------------------------------------------------------
diff --git a/docs/developing.adoc b/docs/developing.adoc
index d0fe73e..89ee517 100644
--- a/docs/developing.adoc
+++ b/docs/developing.adoc
@@ -96,6 +96,28 @@ example Maven pom.xml files.
See link:kudu_impala_integration.html[Using Impala With Kudu] for guidance on installing
and using Impala with Kudu, including several `impala-shell` examples.
+== Kudu integration with Spark
+
+Kudu integrates with spark through the spark data source api as of version 0.9
+Include the kudu-spark using the --jars
+[source]
+----
+spark-shell --jars /kudu-spark-0.9.0.jar
+----
+Then import kudu-spark and create a dataframe:
+[source]
+----
+// Import kudu datasource
+import org.kududb.spark.kudu._
+val kuduDataFrame = sqlContext.read.options(Map("kudu.master"-> "your.kudu.master.here","kudu.table"-> "your.kudu.table.here")).kudu
+// Then query using spark api or register a temporary table and use spark sql
+scala> kuduDataFrame.select("id").filter("id">=5).show()
+// Register kuduDataFrame as a temporary table for spark-sql
+kuduDataFrame.registerTempTable("kudu_table")
+// Select from the dataframe
+sqlContext.sql("select id from kudu_table where id>=5").show()
+----
+
== Integration with MapReduce, YARN, and Other Frameworks
Kudu was designed to integrate with MapReduce, YARN, Spark, and other frameworks in
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/docs/release_notes.adoc
----------------------------------------------------------------------
diff --git a/docs/release_notes.adoc b/docs/release_notes.adoc
index 017cb6c..1f0a3d3 100644
--- a/docs/release_notes.adoc
+++ b/docs/release_notes.adoc
@@ -71,6 +71,10 @@ Hadoop storage technologies.
for creating partition-aware scan descriptors. Can be used by clients and
query engines to more easily execute parallel scans.
+- link:http://gerrit.cloudera.org:8080/#/c/2848/[Gerrit 2848] Added a kudu datasource for spark which uses the kudu client directly instead of
+ using mapreduce api. Includes predicate pushdowns for spark-sql and spark filters.
+ Parallel retrieval for multiple tablets and column projections. link:developing.html#_kudu_integration_with_spark[Kudu integration with Spark Example]
+
[[rn_0.8.0]]
=== Release notes specific to 0.8.0
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-spark/pom.xml b/java/kudu-spark/pom.xml
index 590b45d..cc0d7fe 100644
--- a/java/kudu-spark/pom.xml
+++ b/java/kudu-spark/pom.xml
@@ -24,7 +24,7 @@
<name>Kudu Spark Bindings</name>
<properties>
- <spark.version>1.3.0</spark.version>
+ <spark.version>1.6.1</spark.version>
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<top.dir>${project.basedir}/..</top.dir>
@@ -55,19 +55,6 @@
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <type>test-jar</type>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.kududb</groupId>
@@ -88,17 +75,6 @@
</dependency>
<dependency>
- <groupId>org.kududb</groupId>
- <artifactId>kudu-mapreduce</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
@@ -164,6 +140,27 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/main/scala/org/kududb/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/DefaultSource.scala
deleted file mode 100644
index 447b059..0000000
--- a/java/kudu-spark/src/main/scala/org/kududb/spark/DefaultSource.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.kududb.spark
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Row, SQLContext}
-import org.kududb.Type
-import org.kududb.annotations.InterfaceStability
-import org.kududb.client.RowResult
-
-import scala.collection.JavaConverters._
-import scala.collection.immutable.HashMap
-
-/**
- * DefaultSource for integration with Spark's dataframe datasources.
- * This class will produce a relationProvider based on input given to it from spark.
- */
-@InterfaceStability.Unstable
-class DefaultSource extends RelationProvider {
-
- val TABLE_KEY = "kudu.table"
- val KUDU_MASTER = "kudu.master"
-
- /**
- * Construct a BaseRelation using the provided context and parameters.
- *
- * @param sqlContext SparkSQL context
- * @param parameters parameters given to us from SparkSQL
- * @return a BaseRelation Object
- */
- override def createRelation(sqlContext: SQLContext,
- parameters: Map[String, String]):
- BaseRelation = {
- val tableName = parameters.get(TABLE_KEY)
- if (tableName.isEmpty) {
- throw new IllegalArgumentException(s"Invalid value for $TABLE_KEY '$tableName'")
- }
-
- val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
-
- new KuduRelation(tableName.get, kuduMaster)(sqlContext)
- }
-}
-
-/**
- * Implementation of Spark BaseRelation.
- *
- * @param tableName Kudu table that we plan to read from
- * @param kuduMaster Kudu master addresses
- * @param sqlContext SparkSQL context
- */
-@InterfaceStability.Unstable
-class KuduRelation(val tableName: String,
- val kuduMaster: String)(
- @transient val sqlContext: SQLContext)
- extends BaseRelation with PrunedFilteredScan with Serializable {
-
- val typesMapping = HashMap[Type, DataType](
- Type.INT16 -> IntegerType,
- Type.INT32 -> IntegerType,
- Type.INT64 -> LongType,
- Type.FLOAT -> FloatType,
- Type.DOUBLE -> DoubleType,
- Type.STRING -> StringType,
- Type.TIMESTAMP -> TimestampType,
- Type.BINARY -> BinaryType
- )
-
- // Using lazy val for the following because we can't serialize them but we need them once we
- // deserialize them.
- @transient lazy val kuduContext = new KuduContext(kuduMaster)
- @transient lazy val kuduTable = kuduContext.syncClient.openTable(tableName)
- @transient lazy val tableColumns = kuduTable.getSchema.getColumns.asScala
- @transient lazy val kuduSchemaColumnMap = tableColumns.map(c => (c.getName, c)).toMap
-
- /**
- * Generates a SparkSQL schema object so SparkSQL knows what is being
- * provided by this BaseRelation.
- *
- * @return schema generated from the Kudu table's schema
- */
- override def schema: StructType = {
- val metadataBuilder = new MetadataBuilder()
-
- val structFieldArray: Array[StructField] =
- tableColumns.map { columnSchema =>
- val columnSparkSqlType = typesMapping.getOrElse(
- columnSchema.getType,
- throw new IllegalArgumentException(s"Unsupported column type: ${columnSchema.getType}"))
-
- val metadata = metadataBuilder.putString("name", columnSchema.getName).build()
- new StructField(columnSchema.getName, columnSparkSqlType,
- nullable = columnSchema.isNullable, metadata)
- }.toArray
-
- new StructType(structFieldArray)
- }
-
- /**
- * Build the RDD to scan rows.
- *
- * @param requiredColumns columns that are being requested by the requesting query
- * @param filters filters that are being applied by the requesting query
- * @return RDD will all the results from Kudu
- */
- override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
- kuduContext.kuduRDD(sqlContext.sparkContext, tableName, requiredColumns).map { row =>
- // TODO use indexes instead of column names since it requires one less mapping.
- Row.fromSeq(requiredColumns.map(column => getKuduValue(row, column)))
- }
- }
-
- private def getKuduValue(row: RowResult, columnName: String): Any = {
- val columnSchema = kuduSchemaColumnMap.getOrElse(columnName,
- throw new IllegalArgumentException(s"Couldn't find column '$columnName'"))
-
- if (columnSchema.isNullable && row.isNull(columnName)) {
- return null
- }
-
- val columnType = columnSchema.getType
-
- columnType match {
- case Type.BINARY => row.getBinary(columnName)
- case Type.BOOL => row.getBoolean(columnName)
- case Type.DOUBLE => row.getDouble(columnName)
- case Type.FLOAT => row.getFloat(columnName)
- case Type.INT16 => row.getShort(columnName)
- case Type.INT32 => row.getInt(columnName)
- case Type.INT64 => row.getLong(columnName)
- case Type.INT8 => row.getByte(columnName)
- case Type.TIMESTAMP => row.getLong(columnName)
- case Type.STRING => row.getString(columnName)
- case _ => throw new IllegalArgumentException(s"Type not supported: '${columnType.getName}'")
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala
deleted file mode 100644
index c7bfe3e..0000000
--- a/java/kudu-spark/src/main/scala/org/kududb/spark/KuduContext.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.kududb.spark
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.kududb.annotations.InterfaceStability
-import org.kududb.client.{AsyncKuduClient, KuduClient, RowResult}
-import org.kududb.mapreduce.KuduTableInputFormat
-
-/**
- * KuduContext is a façade for Kudu operations.
- *
- * If a Kudu client connection is needed as part of a Spark application, a
- * [[KuduContext]] should used as a broadcast variable in the job in order to
- * share connections among the tasks in a JVM.
- */
-@InterfaceStability.Unstable
-class KuduContext(kuduMaster: String) extends Serializable {
-
- /**
- * Set to
- * [[org.apache.spark.util.ShutdownHookManager.DEFAULT_SHUTDOWN_PRIORITY]].
- * The client instances are closed through the JVM shutdown hook
- * mechanism in order to make sure that any unflushed writes are cleaned up
- * properly. Spark has no way of notifying the [[DefaultSource]] on shutdown.
- */
- private val ShutdownHookPriority = 100
-
- @transient lazy val syncClient = {
- val syncClient = new KuduClient.KuduClientBuilder(kuduMaster).build()
- ShutdownHookManager.get().addShutdownHook(new Runnable {
- override def run() = syncClient.close()
- }, ShutdownHookPriority)
- syncClient
- }
- @transient lazy val asyncClient = {
- val asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build()
- ShutdownHookManager.get().addShutdownHook(
- new Runnable {
- override def run() = asyncClient.close()
- }, ShutdownHookPriority)
- asyncClient
- }
-
- /**
- * Create an RDD from a Kudu table.
- *
- * @param tableName table to read from
- * @param columnProjection list of columns to read
- *
- * Not specifying this at all (i.e. setting to null) or setting to the special string
- * '*' means to project all columns.
- * @return a new RDD that maps over the given table for the selected columns
- */
- def kuduRDD(sc: SparkContext,
- tableName: String,
- columnProjection: Seq[String] = Nil): RDD[RowResult] = {
-
- val conf = new Configuration
- conf.set("kudu.mapreduce.master.address", kuduMaster)
- conf.set("kudu.mapreduce.input.table", tableName)
- if (columnProjection.nonEmpty) {
- conf.set("kudu.mapreduce.column.projection", columnProjection.mkString(","))
- }
-
- val rdd = sc.newAPIHadoopRDD(conf, classOf[KuduTableInputFormat],
- classOf[NullWritable], classOf[RowResult])
-
- val columnNames = if (columnProjection.nonEmpty) columnProjection.mkString(", ") else "(*)"
- rdd.values.setName(s"KuduRDD { table=$tableName, columnProjection=$columnNames }")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
new file mode 100644
index 0000000..6226862
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kududb.spark.kudu
+
+import java.sql.Timestamp
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SQLContext}
+import org.kududb.Type
+import org.kududb.annotations.InterfaceStability
+import org.kududb.client.{KuduPredicate, KuduTable}
+import org.kududb.client.KuduPredicate.ComparisonOp
+
+import scala.collection.JavaConverters._
+
+/**
+ * DefaultSource for integration with Spark's dataframe datasources.
+ * This class will produce a relationProvider based on input given to it from spark.
+ */
+@InterfaceStability.Unstable
+class DefaultSource extends RelationProvider {
+
+ val TABLE_KEY = "kudu.table"
+ val KUDU_MASTER = "kudu.master"
+
+ /**
+ * Construct a BaseRelation using the provided context and parameters.
+ *
+ * @param sqlContext SparkSQL context
+ * @param parameters parameters given to us from SparkSQL
+ * @return a BaseRelation Object
+ */
+ override def createRelation(sqlContext: SQLContext,
+ parameters: Map[String, String]):
+ BaseRelation = {
+ val tableName = parameters.get(TABLE_KEY)
+ if (tableName.isEmpty) {
+ throw new IllegalArgumentException(s"Invalid value for $TABLE_KEY '$tableName'")
+ }
+
+ val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
+
+ new KuduRelation(tableName.get, kuduMaster)(sqlContext)
+ }
+}
+
+/**
+ * Implementation of Spark BaseRelation.
+ *
+ * @param tableName Kudu table that we plan to read from
+ * @param kuduMaster Kudu master addresses
+ * @param sqlContext SparkSQL context
+ */
+@InterfaceStability.Unstable
+class KuduRelation(private val tableName: String,
+ private val kuduMaster: String)(
+ val sqlContext: SQLContext)
+extends BaseRelation
+with PrunedFilteredScan {
+ import KuduRelation._
+
+ private val context: KuduContext = new KuduContext(kuduMaster)
+ private val table: KuduTable = context.syncClient.openTable(tableName)
+
+ override def unhandledFilters(filters: Array[Filter]): Array[Filter] =
+ filters.filterNot(supportsFilter)
+
+ /**
+ * Generates a SparkSQL schema object so SparkSQL knows what is being
+ * provided by this BaseRelation.
+ *
+ * @return schema generated from the Kudu table's schema
+ */
+ override def schema: StructType = {
+ val fields: Array[StructField] =
+ table.getSchema.getColumns.asScala.map { columnSchema =>
+ val sparkType = kuduTypeToSparkType(columnSchema.getType)
+ new StructField(columnSchema.getName, sparkType, columnSchema.isNullable)
+ }.toArray
+
+ new StructType(fields)
+ }
+
+ /**
+ * Build the RDD to scan rows.
+ *
+ * @param requiredColumns columns that are being requested by the requesting query
+ * @param filters filters that are being applied by the requesting query
+ * @return RDD will all the results from Kudu
+ */
+ override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+ val predicates = filters.flatMap(filterToPredicate)
+ new KuduRDD(kuduMaster, 1024*1024*20, requiredColumns, predicates,
+ table, context, sqlContext.sparkContext)
+ }
+
+ /**
+ * Converts a Spark [[Filter]] to a Kudu [[KuduPredicate]].
+ *
+ * @param filter the filter to convert
+ * @return the converted filter
+ */
+ private def filterToPredicate(filter : Filter) : Array[KuduPredicate] = {
+ filter match {
+ case EqualTo(column, value) =>
+ Array(comparisonPredicate(column, ComparisonOp.EQUAL, value))
+ case GreaterThan(column, value) =>
+ Array(comparisonPredicate(column, ComparisonOp.GREATER, value))
+ case GreaterThanOrEqual(column, value) =>
+ Array(comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, value))
+ case LessThan(column, value) =>
+ Array(comparisonPredicate(column, ComparisonOp.LESS, value))
+ case LessThanOrEqual(column, value) =>
+ Array(comparisonPredicate(column, ComparisonOp.LESS_EQUAL, value))
+ case And(left, right) => filterToPredicate(left) ++ filterToPredicate(right)
+ case _ => Array()
+ }
+ }
+
+ /**
+ * Creates a new comparison predicate for the column, comparison operator, and comparison value.
+ *
+ * @param column the column name
+ * @param operator the comparison operator
+ * @param value the comparison value
+ * @return the comparison predicate
+ */
+ private def comparisonPredicate(column: String,
+ operator: ComparisonOp,
+ value: Any): KuduPredicate = {
+ val columnSchema = table.getSchema.getColumn(column)
+ value match {
+ case value: Boolean => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+ case value: Byte => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+ case value: Short => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+ case value: Int => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+ case value: Long => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+ case value: Timestamp => KuduPredicate.newComparisonPredicate(columnSchema, operator, timestampToMicros(value))
+ case value: Float => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+ case value: Double => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+ case value: String => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+ case value: Array[Byte] => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+ }
+ }
+}
+
+private[spark] object KuduRelation {
+ /**
+ * Converts a Kudu [[Type]] to a Spark SQL [[DataType]].
+ *
+ * @param t the Kudu type
+ * @return the corresponding Spark SQL type
+ */
+ private def kuduTypeToSparkType(t: Type): DataType = t match {
+ case Type.BOOL => BooleanType
+ case Type.INT8 => ByteType
+ case Type.INT16 => ShortType
+ case Type.INT32 => IntegerType
+ case Type.INT64 => LongType
+ case Type.TIMESTAMP => TimestampType
+ case Type.FLOAT => FloatType
+ case Type.DOUBLE => DoubleType
+ case Type.STRING => StringType
+ case Type.BINARY => BinaryType
+ }
+
+ /**
+ * Returns `true` if the filter is able to be pushed down to Kudu.
+ *
+ * @param filter the filter to test
+ */
+ private def supportsFilter(filter: Filter): Boolean = filter match {
+ case EqualTo(_, _)
+ | GreaterThan(_, _)
+ | GreaterThanOrEqual(_, _)
+ | LessThan(_, _)
+ | LessThanOrEqual(_, _) => true
+ case And(left, right) => supportsFilter(left) && supportsFilter(right)
+ case _ => false
+ }
+
+ /**
+ * Converts a [[Timestamp]] to microseconds since the Unix epoch (1970-01-01T00:00:00Z).
+ *
+ * @param timestamp the timestamp to convert to microseconds
+ * @return the microseconds since the Unix epoch
+ */
+ def timestampToMicros(timestamp: Timestamp): Long = {
+ // Number of whole milliseconds since the Unix epoch, in microseconds.
+ val millis = timestamp.getTime * 1000
+ // Sub millisecond time since the Unix epoch, in microseconds.
+ val micros = (timestamp.getNanos % 1000000) / 1000
+ if (micros >= 0) {
+ millis + micros
+ } else {
+ millis + 1000000 + micros
+ }
+ }
+
+ /**
+ * Converts a microsecond offset from the Unix epoch (1970-01-01T00:00:00Z) to a [[Timestamp]].
+ *
+ * @param micros the offset in microseconds since the Unix epoch
+ * @return the corresponding timestamp
+ */
+ def microsToTimestamp(micros: Long): Timestamp = {
+ var millis = micros / 1000
+ var nanos = (micros % 1000000) * 1000
+ if (nanos < 0) {
+ millis -= 1
+ nanos += 1000000000
+ }
+
+ val timestamp = new Timestamp(millis)
+ timestamp.setNanos(nanos.asInstanceOf[Int])
+ timestamp
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
new file mode 100644
index 0000000..47984b2
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kududb.spark.kudu
+
+import org.apache.hadoop.util.ShutdownHookManager
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.kududb.annotations.InterfaceStability
+import org.kududb.client.{AsyncKuduClient, KuduClient}
+
+/**
+ * KuduContext is a serializable container for Kudu client connections.
+ *
+ * If a Kudu client connection is needed as part of a Spark application, a
+ * [[KuduContext]] should used as a broadcast variable in the job in order to
+ * share connections among the tasks in a JVM.
+ */
+@InterfaceStability.Unstable
+class KuduContext(kuduMaster: String) extends Serializable {
+
+ /**
+ * Set to
+ * [[org.apache.spark.util.ShutdownHookManager.DEFAULT_SHUTDOWN_PRIORITY]].
+ * The client instances are closed through the JVM shutdown hook
+ * mechanism in order to make sure that any unflushed writes are cleaned up
+ * properly. Spark has no shutdown notifications.
+ */
+ private val ShutdownHookPriority = 100
+
+ @transient lazy val syncClient = {
+ val syncClient = new KuduClient.KuduClientBuilder(kuduMaster).build()
+ ShutdownHookManager.get().addShutdownHook(new Runnable {
+ override def run() = syncClient.close()
+ }, ShutdownHookPriority)
+ syncClient
+ }
+
+ @transient lazy val asyncClient = {
+ val asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build()
+ ShutdownHookManager.get().addShutdownHook(
+ new Runnable {
+ override def run() = asyncClient.close()
+ }, ShutdownHookPriority)
+ asyncClient
+ }
+
+ /**
+ * Create an RDD from a Kudu table.
+ *
+ * @param tableName table to read from
+ * @param columnProjection list of columns to read. Not specifying this at all
+ * (i.e. setting to null) or setting to the special
+ * string '*' means to project all columns.
+ * @return a new RDD that maps over the given table for the selected columns
+ */
+ def kuduRDD(sc: SparkContext,
+ tableName: String,
+ columnProjection: Seq[String] = Nil): RDD[Row] = {
+ new KuduRDD(kuduMaster, 1024*1024*20, columnProjection.toArray, Array(),
+ syncClient.openTable(tableName), this, sc)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduRDD.scala
new file mode 100644
index 0000000..5395d5a
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduRDD.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.spark.kudu
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.kududb.client._
+import org.kududb.{Type, client}
+
+import scala.collection.JavaConverters._
+
+/**
+ * A Resilient Distributed Dataset backed by a Kudu table.
+ */
+class KuduRDD(val kuduMaster: String,
+ @transient batchSize: Integer,
+ @transient projectedCols: Array[String],
+ @transient predicates: Array[client.KuduPredicate],
+ @transient table: KuduTable,
+ @transient kc: KuduContext,
+ @transient sc: SparkContext) extends RDD[Row](sc, Nil) {
+
+ /**
+ * The [[KuduContext]] for this `KuduRDD`.
+ *
+ * The `KuduContext` manages the Kudu client instances for the `KuduRDD`.
+ * When the `KuduRDD` is first constructed it uses the context passed in as
+ * `kc`. After deserialization, a new `KuduContext` is created as necessary.
+ * The `kc` field should not be used, since it will not be rehydrated after
+ * serialization.
+ */
+ @transient private lazy val kuduContext: KuduContext = {
+ if (kc != null) kc else new KuduContext(kuduMaster)
+ }
+
+ override protected def getPartitions: Array[Partition] = {
+ val builder = kuduContext.syncClient
+ .newScanTokenBuilder(table)
+ .batchSizeBytes(batchSize)
+ .setProjectedColumnNames(projectedCols.toSeq.asJava)
+ .cacheBlocks(true)
+
+ for (predicate <- predicates) {
+ builder.addPredicate(predicate)
+ }
+ val tokens = builder.build().asScala
+ tokens.zipWithIndex.map {
+ case (token, index) =>
+ new KuduPartition(index, token.serialize(),
+ token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray)
+ }.toArray
+ }
+
+ override def compute(part: Partition, taskContext: TaskContext): Iterator[Row] = {
+ val client: KuduClient = kuduContext.syncClient
+ val partition: KuduPartition = part.asInstanceOf[KuduPartition]
+ val scanner = KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
+ new RowResultIteratorScala(scanner)
+ }
+
+ override def getPreferredLocations(partition: Partition): Seq[String] = {
+ partition.asInstanceOf[KuduPartition].locations
+ }
+}
+
+/**
+ * A Spark SQL [[Partition]] which wraps a [[KuduScanToken]].
+ */
+private[spark] class KuduPartition(val index: Int,
+ val scanToken: Array[Byte],
+ val locations : Array[String]) extends Partition {}
+
+/**
+ * A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].
+ * @param scanner the wrapped scanner
+ */
+private[spark] class RowResultIteratorScala(private val scanner: KuduScanner) extends Iterator[Row] {
+
+ private var currentIterator: RowResultIterator = null
+
+ override def hasNext: Boolean = {
+ if ((currentIterator != null && !currentIterator.hasNext && scanner.hasMoreRows) ||
+ (scanner.hasMoreRows && currentIterator == null)) {
+ currentIterator = scanner.nextRows()
+ }
+ currentIterator.hasNext
+ }
+
+ override def next(): Row = new KuduRow(currentIterator.next())
+}
+
+/**
+ * A Spark SQL [[Row]] which wraps a Kudu [[RowResult]].
+ * @param rowResult the wrapped row result
+ */
+private[spark] class KuduRow(private val rowResult: RowResult) extends Row {
+ override def length: Int = rowResult.getColumnProjection.getColumnCount
+
+ override def get(i: Int): Any = {
+ if (rowResult.isNull(i)) null
+ else rowResult.getColumnType(i) match {
+ case Type.BOOL => rowResult.getBoolean(i)
+ case Type.INT8 => rowResult.getByte(i)
+ case Type.INT16 => rowResult.getShort(i)
+ case Type.INT32 => rowResult.getInt(i)
+ case Type.INT64 => rowResult.getLong(i)
+ case Type.TIMESTAMP => KuduRelation.microsToTimestamp(rowResult.getLong(i))
+ case Type.FLOAT => rowResult.getFloat(i)
+ case Type.DOUBLE => rowResult.getDouble(i)
+ case Type.STRING => rowResult.getString(i)
+ case Type.BINARY => rowResult.getBinary(i)
+ }
+ }
+
+ override def copy(): Row = Row.fromSeq(Range(0, length).map(get))
+
+ override def toString(): String = rowResult.toString
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
new file mode 100755
index 0000000..29ba455
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.spark
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader}
+
+package object kudu {
+
+ /**
+ * Adds a method, `kudu`, to DataFrameReader that allows you to read Kudu tables using
+ * the DataFrameReader.
+ */
+ implicit class KuduDataFrameReader(reader: DataFrameReader) {
+ def kudu: DataFrame = reader.format("org.kududb.spark.kudu").load
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/resources/log4j.properties b/java/kudu-spark/src/test/resources/log4j.properties
index cb277ed..94321ff 100644
--- a/java/kudu-spark/src/test/resources/log4j.properties
+++ b/java/kudu-spark/src/test/resources/log4j.properties
@@ -15,9 +15,9 @@
# specific language governing permissions and limitations
# under the License.
-log4j.rootLogger = DEBUG, out
+log4j.rootLogger = WARN, out
log4j.appender.out = org.apache.log4j.ConsoleAppender
log4j.appender.out.layout = org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
-log4j.logger.org.kududb = DEBUG
\ No newline at end of file
+log4j.logger.org.kududb = INFO
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/scala/org/kududb/spark/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/DefaultSourceTest.scala
deleted file mode 100644
index 483b2e2..0000000
--- a/java/kudu-spark/src/test/scala/org/kududb/spark/DefaultSourceTest.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.kududb.spark
-
-import org.apache.spark.sql.SQLContext
-import org.junit.runner.RunWith
-import org.scalatest.FunSuite
-import org.scalatest.junit.JUnitRunner
-
-@RunWith(classOf[JUnitRunner])
-class DefaultSourceTest extends FunSuite with TestContext {
-
- test("Test basic SparkSQL") {
- val rowCount = 10
-
- insertRows(rowCount)
-
- val sqlContext = new SQLContext(sc)
-
- sqlContext.load("org.kududb.spark",
- Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddresses))
- .registerTempTable(tableName)
-
- val results = sqlContext.sql("SELECT * FROM " + tableName).collectAsList()
- assert(results.size() == rowCount)
-
- assert(results.get(0).isNullAt(2))
- assert(!results.get(1).isNullAt(2))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/scala/org/kududb/spark/KuduContextTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/KuduContextTest.scala
deleted file mode 100644
index 67aad7b..0000000
--- a/java/kudu-spark/src/test/scala/org/kududb/spark/KuduContextTest.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.kududb.spark
-
-import org.junit.runner.RunWith
-import org.scalatest.FunSuite
-import org.scalatest.junit.JUnitRunner
-
-@RunWith(classOf[JUnitRunner])
-class KuduContextTest extends FunSuite with TestContext {
- test("Test basic kuduRDD") {
- val rowCount = 10
-
- insertRows(rowCount)
-
- val scanRdd = kuduContext.kuduRDD(sc, "test")
-
- val scanList = scanRdd.map(r => r.getInt(0)).collect()
- assert(scanList.length == rowCount)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/scala/org/kududb/spark/TestContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/TestContext.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/TestContext.scala
deleted file mode 100644
index 9876282..0000000
--- a/java/kudu-spark/src/test/scala/org/kududb/spark/TestContext.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.kududb.spark
-
-import com.google.common.collect.ImmutableList
-import org.apache.spark.SparkContext
-import org.kududb.ColumnSchema.ColumnSchemaBuilder
-import org.kududb.client.KuduClient.KuduClientBuilder
-import org.kududb.client.MiniKuduCluster.MiniKuduClusterBuilder
-import org.kududb.client.{CreateTableOptions, KuduClient, KuduTable, MiniKuduCluster}
-import org.kududb.{Schema, Type}
-import org.scalatest.{BeforeAndAfterAll, Suite}
-
-trait TestContext extends BeforeAndAfterAll { self: Suite =>
-
- var sc: SparkContext = null
- var miniCluster: MiniKuduCluster = null
- var kuduClient: KuduClient = null
- var table: KuduTable = null
- var kuduContext: KuduContext = null
-
- val tableName = "test"
-
- lazy val schema: Schema = {
- val columns = ImmutableList.of(
- new ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
- new ColumnSchemaBuilder("c1_i", Type.INT32).build(),
- new ColumnSchemaBuilder("c2_s", Type.STRING).nullable(true).build())
- new Schema(columns)
- }
-
- override def beforeAll() {
- miniCluster = new MiniKuduClusterBuilder()
- .numMasters(1)
- .numTservers(1)
- .build()
- val envMap = Map[String,String](("Xmx", "512m"))
-
- sc = new SparkContext("local[2]", "test", null, Nil, envMap)
-
- kuduClient = new KuduClientBuilder(miniCluster.getMasterAddresses).build()
- assert(miniCluster.waitForTabletServers(1))
-
- kuduContext = new KuduContext(miniCluster.getMasterAddresses)
-
- val tableOptions = new CreateTableOptions().setNumReplicas(1)
- table = kuduClient.createTable(tableName, schema, tableOptions)
- }
-
- override def afterAll() {
- if (kuduClient != null) kuduClient.shutdown()
- if (miniCluster != null) miniCluster.shutdown()
- if (sc != null) sc.stop()
- }
-
- def insertRows(rowCount: Integer) {
- val kuduSession = kuduClient.newSession()
-
- for (i <- 1 to rowCount) {
- val insert = table.newInsert
- val row = insert.getRow
- row.addInt(0, i)
- row.addInt(1, i)
-
- // Sprinkling some nulls so that queries see them.
- if (i % 2 == 0) {
- row.addString(2, i.toString)
- } else {
- row.setNull(2)
- }
-
- kuduSession.apply(insert)
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
new file mode 100644
index 0000000..7161ace
--- /dev/null
+++ b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.spark.kudu
+
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.TimeZone
+
+import org.apache.spark.sql.SQLContext
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+import scala.collection.immutable.IndexedSeq
+
+@RunWith(classOf[JUnitRunner])
+class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter {
+
+ test("timestamp conversion") {
+ val epoch = new Timestamp(0)
+ assertEquals(0, KuduRelation.timestampToMicros(epoch))
+ assertEquals(epoch, KuduRelation.microsToTimestamp(0))
+
+ val t1 = new Timestamp(0)
+ t1.setNanos(123456000)
+ assertEquals(123456, KuduRelation.timestampToMicros(t1))
+ assertEquals(t1, KuduRelation.microsToTimestamp(123456))
+
+ val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
+ iso8601.setTimeZone(TimeZone.getTimeZone("UTC"))
+
+ val t3 = new Timestamp(iso8601.parse("1923-12-01T00:44:36.876").getTime)
+ t3.setNanos(876544000)
+ assertEquals(-1454368523123456L, KuduRelation.timestampToMicros(t3))
+ assertEquals(t3, KuduRelation.microsToTimestamp(-1454368523123456L))
+ }
+
+ val rowCount = 10
+ var sqlContext : SQLContext = _
+ var rows : IndexedSeq[(Int, Int, String)] = _
+ before {
+ val rowCount = 10
+ rows = insertRows(rowCount)
+
+ sqlContext = new SQLContext(sc)
+
+ sqlContext.read.options(
+ Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddresses)).kudu
+ .registerTempTable(tableName)
+ }
+
+ test("table scan") {
+ val results = sqlContext.sql(s"SELECT * FROM $tableName").collectAsList()
+ assert(results.size() == rowCount)
+
+ assert(!results.get(0).isNullAt(2))
+ assert(results.get(1).isNullAt(2))
+ }
+
+ test("table scan with projection") {
+ assertEquals(10, sqlContext.sql(s"""SELECT key FROM $tableName""").count())
+ }
+
+ test("table scan with projection and predicate double") {
+ assertEquals(rows.count { case (key, i, s) => i != null && i > 5 },
+ sqlContext.sql(s"""SELECT key, c3_double FROM $tableName where c3_double > "5.0"""").count())
+ }
+
+ test("table scan with projection and predicate long") {
+ assertEquals(rows.count { case (key, i, s) => i != null && i > 5 },
+ sqlContext.sql(s"""SELECT key, c4_long FROM $tableName where c4_long > "5"""").count())
+
+ }
+ test("table scan with projection and predicate bool") {
+ assertEquals(rows.count { case (key, i, s) => i != null && i%2==0 },
+ sqlContext.sql(s"""SELECT key, c5_bool FROM $tableName where c5_bool = true""").count())
+
+ }
+ test("table scan with projection and predicate short") {
+ assertEquals(rows.count { case (key, i, s) => i != null && i > 5},
+ sqlContext.sql(s"""SELECT key, c6_short FROM $tableName where c6_short > 5""").count())
+
+ }
+ test("table scan with projection and predicate float") {
+ assertEquals(rows.count { case (key, i, s) => i != null && i > 5},
+ sqlContext.sql(s"""SELECT key, c7_float FROM $tableName where c7_float > 5""").count())
+
+ }
+
+ test("table scan with projection and predicate ") {
+ assertEquals(rows.count { case (key, i, s) => s != null && s > "5" },
+ sqlContext.sql(s"""SELECT key FROM $tableName where c2_s > "5"""").count())
+
+ assertEquals(rows.count { case (key, i, s) => s != null },
+ sqlContext.sql(s"""SELECT key, c2_s FROM $tableName where c2_s IS NOT NULL""").count())
+ }
+
+
+ test("Test basic SparkSQL") {
+ val results = sqlContext.sql("SELECT * FROM " + tableName).collectAsList()
+ assert(results.size() == rowCount)
+
+ assert(results.get(1).isNullAt(2))
+ assert(!results.get(0).isNullAt(2))
+ }
+
+ test("Test basic SparkSQL projection") {
+ val results = sqlContext.sql("SELECT key FROM " + tableName).collectAsList()
+ assert(results.size() == rowCount)
+ assert(results.get(0).size.equals(1))
+ assert(results.get(0).getInt(0).equals(0))
+ }
+
+ test("Test basic SparkSQL with predicate") {
+ val results = sqlContext.sql("SELECT key FROM " + tableName + " where key=1").collectAsList()
+ assert(results.size() == 1)
+ assert(results.get(0).size.equals(1))
+ assert(results.get(0).getInt(0).equals(1))
+
+ }
+
+ test("Test basic SparkSQL with two predicates") {
+ val results = sqlContext.sql("SELECT key FROM " + tableName + " where key=2 and c2_s='2'").collectAsList()
+ assert(results.size() == 1)
+ assert(results.get(0).size.equals(1))
+ assert(results.get(0).getInt(0).equals(2))
+ }
+
+ test("Test basic SparkSQL with two predicates negative") {
+ val results = sqlContext.sql("SELECT key FROM " + tableName + " where key=1 and c2_s='2'").collectAsList()
+ assert(results.size() == 0)
+ }
+
+ test("Test basic SparkSQL with two predicates including string") {
+ val results = sqlContext.sql("SELECT key FROM " + tableName + " where c2_s='2'").collectAsList()
+ assert(results.size() == 1)
+ assert(results.get(0).size.equals(1))
+ assert(results.get(0).getInt(0).equals(2))
+ }
+
+ test("Test basic SparkSQL with two predicates and projection") {
+ val results = sqlContext.sql("SELECT key, c2_s FROM " + tableName + " where c2_s='2'").collectAsList()
+ assert(results.size() == 1)
+ assert(results.get(0).size.equals(2))
+ assert(results.get(0).getInt(0).equals(2))
+ assert(results.get(0).getString(1).equals("2"))
+ }
+
+ test("Test basic SparkSQL with two predicates greater than") {
+ val results = sqlContext.sql("SELECT key, c2_s FROM " + tableName + " where c2_s>='2'").collectAsList()
+ assert(results.size() == 4)
+ assert(results.get(0).size.equals(2))
+ assert(results.get(0).getInt(0).equals(2))
+ assert(results.get(0).getString(1).equals("2"))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/KuduContextTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/KuduContextTest.scala
new file mode 100644
index 0000000..fc2dff5
--- /dev/null
+++ b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/KuduContextTest.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.spark.kudu
+
+import org.junit.runner.RunWith
+import org.scalatest.FunSuite
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class KuduContextTest extends FunSuite with TestContext {
+ test("Test basic kuduRDD") {
+ val rowCount = 10
+
+ insertRows(rowCount)
+
+ val scanRdd = kuduContext.kuduRDD(sc, "test", Seq("key"))
+
+ val scanList = scanRdd.map(r => r.getInt(0)).collect()
+ assert(scanList.length == rowCount)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/938f7fea/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala
new file mode 100644
index 0000000..97a4d39
--- /dev/null
+++ b/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/TestContext.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.spark.kudu
+
+import java.util.Date
+
+import com.google.common.collect.ImmutableList
+import org.apache.spark.{SparkConf, SparkContext}
+import org.kududb.ColumnSchema.ColumnSchemaBuilder
+import org.kududb.client.KuduClient.KuduClientBuilder
+import org.kududb.client.MiniKuduCluster.MiniKuduClusterBuilder
+import org.kududb.client.{CreateTableOptions, KuduClient, KuduTable, MiniKuduCluster}
+import org.kududb.{Schema, Type}
+import org.scalatest.{BeforeAndAfterAll, Suite}
+
+import scala.collection.immutable.IndexedSeq
+
+trait TestContext extends BeforeAndAfterAll { self: Suite =>
+
+ var sc: SparkContext = null
+ var miniCluster: MiniKuduCluster = null
+ var kuduClient: KuduClient = null
+ var table: KuduTable = null
+ var kuduContext: KuduContext = null
+
+ val tableName = "test"
+
+ lazy val schema: Schema = {
+ val columns = ImmutableList.of(
+ new ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
+ new ColumnSchemaBuilder("c1_i", Type.INT32).build(),
+ new ColumnSchemaBuilder("c2_s", Type.STRING).nullable(true).build(),
+ new ColumnSchemaBuilder("c3_double", Type.DOUBLE).build(),
+ new ColumnSchemaBuilder("c4_long", Type.INT64).build(),
+ new ColumnSchemaBuilder("c5_bool", Type.BOOL).build(),
+ new ColumnSchemaBuilder("c6_short", Type.INT16).build(),
+ new ColumnSchemaBuilder("c7_float", Type.FLOAT).build())
+ new Schema(columns)
+ }
+
+ val appID = new Date().toString + math.floor(math.random * 10E4).toLong.toString
+
+ val conf = new SparkConf().
+ setMaster("local[*]").
+ setAppName("test").
+ set("spark.ui.enabled", "false").
+ set("spark.app.id", appID)
+
+ override def beforeAll() {
+ miniCluster = new MiniKuduClusterBuilder()
+ .numMasters(1)
+ .numTservers(1)
+ .build()
+ val envMap = Map[String,String](("Xmx", "512m"))
+
+ sc = new SparkContext(conf)
+
+ kuduClient = new KuduClientBuilder(miniCluster.getMasterAddresses).build()
+ assert(miniCluster.waitForTabletServers(1))
+
+ kuduContext = new KuduContext(miniCluster.getMasterAddresses)
+
+ val tableOptions = new CreateTableOptions().setNumReplicas(1)
+ table = kuduClient.createTable(tableName, schema, tableOptions)
+ }
+
+ override def afterAll() {
+ if (kuduClient != null) kuduClient.shutdown()
+ if (miniCluster != null) miniCluster.shutdown()
+ if (sc != null) sc.stop()
+ }
+
+ def insertRows(rowCount: Integer): IndexedSeq[(Int, Int, String)] = {
+ val kuduSession = kuduClient.newSession()
+
+ val rows = Range(0, rowCount).map { i =>
+ val insert = table.newInsert
+ val row = insert.getRow
+ row.addInt(0, i)
+ row.addInt(1, i)
+ row.addDouble(3, i.toDouble)
+ row.addLong(4, i.toLong)
+ row.addBoolean(5, i%2==1)
+ row.addShort(6, i.toShort)
+ row.addFloat(7, i.toFloat)
+
+ // Sprinkling some nulls so that queries see them.
+ val s = if (i % 2 == 0) {
+ row.addString(2, i.toString)
+ i.toString
+ } else {
+ row.setNull(2)
+ null
+ }
+
+ kuduSession.apply(insert)
+ (i, i, s)
+ }
+ rows
+ }
+}
\ No newline at end of file