You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jm...@apache.org on 2017/01/03 15:00:08 UTC

phoenix git commit: PHOENIX-3333 Support Spark 2.0

Repository: phoenix
Updated Branches:
  refs/heads/master 07f92732f -> a0e5efcec


PHOENIX-3333 Support Spark 2.0

Note that the default maven profile will compile for Spark 2.0.2
and Scala 2.11. To switch to the previous behavior, use Maven
profile 'spark16'.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a0e5efce
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a0e5efce
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a0e5efce

Branch: refs/heads/master
Commit: a0e5efcec5a1a732b2dce9794251242c3d66eea6
Parents: 07f9273
Author: Josh Mahonin <jm...@gmail.com>
Authored: Tue Jan 3 09:57:46 2017 -0500
Committer: Josh Mahonin <jm...@gmail.com>
Committed: Tue Jan 3 09:57:46 2017 -0500

----------------------------------------------------------------------
 .../apache/phoenix/spark/PhoenixSparkIT.scala   | 26 ++++++++++++++++----
 .../phoenix/spark/DataFrameFunctions.scala      |  6 ++---
 .../org/apache/phoenix/spark/PhoenixRDD.scala   |  6 ++---
 .../phoenix/spark/ProductRDDFunctions.scala     |  3 +--
 pom.xml                                         | 14 ++++++++---
 5 files changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0e5efce/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 9def354..d53b5ee 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -302,11 +302,21 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
 
     // Load TABLE1, save as TABLE1_COPY
     val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
-      "zkUrl" -> quorumAddress))
+    val df = sqlContext
+      .read
+      .format("org.apache.phoenix.spark")
+      .option("table", "TABLE1")
+      .option("zkUrl", quorumAddress)
+      .load()
 
     // Save to TABLE21_COPY
-    df.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "TABLE1_COPY", "zkUrl" -> quorumAddress))
+    df
+      .write
+      .format("org.apache.phoenix.spark")
+      .mode(SaveMode.Overwrite)
+      .option("table", "TABLE1_COPY")
+      .option("zkUrl", quorumAddress)
+      .save()
 
     // Verify results
     stmt = conn.createStatement()
@@ -632,12 +642,18 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     val epoch = new Date().getTime
     assert(Math.abs(epoch - time) < 86400000)
   }
+
   test("can read all Phoenix data types") {
     val sqlContext = new SQLContext(sc)
     val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "GIGANTIC_TABLE",
       "zkUrl" -> quorumAddress))
-    df.save("org.apache.phoenix.spark",SaveMode.Overwrite, Map("table" -> "OUTPUT_GIGANTIC_TABLE",
-     "zkUrl" -> quorumAddress))
+
+    df.write
+      .format("org.apache.phoenix.spark")
+      .options(Map("table" -> "OUTPUT_GIGANTIC_TABLE", "zkUrl" -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
+
     df.count() shouldEqual 1
   }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0e5efce/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index bb2efd5..ddf4fab 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -18,12 +18,12 @@ import org.apache.hadoop.io.NullWritable
 import org.apache.phoenix.mapreduce.PhoenixOutputFormat
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
 import org.apache.phoenix.util.SchemaUtil
-import org.apache.spark.Logging
 import org.apache.spark.sql.DataFrame
 
 import scala.collection.JavaConversions._
 
-class DataFrameFunctions(data: DataFrame) extends Logging with Serializable {
+
+class DataFrameFunctions(data: DataFrame) extends Serializable {
 
   def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
                     zkUrl: Option[String] = None, tenantId: Option[String] = None): Unit = {
@@ -39,7 +39,7 @@ class DataFrameFunctions(data: DataFrame) extends Logging with Serializable {
     val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig)
 
     // Map the row objects into PhoenixRecordWritable
-    val phxRDD = data.mapPartitions{ rows =>
+    val phxRDD = data.rdd.mapPartitions{ rows =>
  
        // Create a within-partition config to retrieve the ColumnInfo list
        @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal, tenantId)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0e5efce/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index 63289a0..01a9077 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -37,7 +37,7 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
                  @transient conf: Configuration, dateAsTimestamp: Boolean = false,
                  tenantId: Option[String] = None
                 )
-  extends RDD[PhoenixRecordWritable](sc, Nil) with Logging {
+  extends RDD[PhoenixRecordWritable](sc, Nil) {
 
   // Make sure to register the Phoenix driver
   DriverManager.registerDriver(new PhoenixDriver)
@@ -163,7 +163,7 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
     case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => DoubleType
     // Use Spark system default precision for now (explicit to work with < 1.5)
     case t if t.isInstanceOf[PDecimal] => 
-      if (columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale)
+      if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale)
     case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType
     case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType
     case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType
@@ -180,7 +180,7 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
     case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true)
     case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true)
     case t if t.isInstanceOf[PDecimalArray] => ArrayType(
-      if (columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale), containsNull = true)
+      if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale), containsNull = true)
     case t if t.isInstanceOf[PTimestampArray] || t.isInstanceOf[PUnsignedTimestampArray] => ArrayType(TimestampType, containsNull = true)
     case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true)
     case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0e5efce/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
index b59592b..9b368b6 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
@@ -17,12 +17,11 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.NullWritable
 import org.apache.phoenix.mapreduce.PhoenixOutputFormat
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
-import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 
 import scala.collection.JavaConversions._
 
-class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Logging with Serializable {
+class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Serializable {
 
   def saveToPhoenix(tableName: String, cols: Seq[String],
                     conf: Configuration = new Configuration, zkUrl: Option[String] = None, tenantId: Option[String] = None)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0e5efce/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f27f2cc..4a0292d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,9 +99,9 @@
     <avatica.version>1.9.0</avatica.version>
     <jettyVersion>8.1.7.v20120910</jettyVersion>
     <tephra.version>0.9.0-incubating</tephra.version>
-    <spark.version>1.6.1</spark.version>
-    <scala.version>2.10.4</scala.version>
-    <scala.binary.version>2.10</scala.binary.version>
+    <spark.version>2.0.2</spark.version>
+    <scala.version>2.11.8</scala.version>
+    <scala.binary.version>2.11</scala.binary.version>
 
     <!-- Test Dependencies -->
     <mockito-all.version>1.8.5</mockito-all.version>
@@ -982,5 +982,13 @@
         </plugins>
       </build>
     </profile>
+    <profile>
+      <id>spark16</id>
+      <properties>
+        <spark.version>1.6.1</spark.version>
+        <scala.version>2.10.4</scala.version>
+        <scala.binary.version>2.10</scala.binary.version>
+      </properties>
+    </profile>
   </profiles>
 </project>