You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/04/21 06:19:03 UTC

[1/3] phoenix git commit: PHOENIX-3751 spark 2.1 with Phoenix 4.10 load data as dataframe fail, NullPointerException

Repository: phoenix
Updated Branches:
  refs/heads/master 679ff21b7 -> 92b951e53


PHOENIX-3751 spark 2.1 with Phoenix 4.10 load data as dataframe fail, NullPointerException


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/28af89c4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/28af89c4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/28af89c4

Branch: refs/heads/master
Commit: 28af89c46fa54d7f60adc8be88fdf559cad811d2
Parents: 679ff21
Author: Ankit Singhal <an...@gmail.com>
Authored: Fri Apr 21 11:47:27 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Fri Apr 21 11:47:27 2017 +0530

----------------------------------------------------------------------
 phoenix-spark/src/it/resources/globalSetup.sql                   | 2 +-
 .../src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala     | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/28af89c4/phoenix-spark/src/it/resources/globalSetup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/globalSetup.sql b/phoenix-spark/src/it/resources/globalSetup.sql
index 28eb0f7..dc24da7 100644
--- a/phoenix-spark/src/it/resources/globalSetup.sql
+++ b/phoenix-spark/src/it/resources/globalSetup.sql
@@ -60,4 +60,4 @@ UPSERT INTO "small" VALUES ('key3', 'xyz', 30000)
 CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL, ORGANIZATION_ID VARCHAR, GLOBAL_COL1 VARCHAR  CONSTRAINT pk PRIMARY KEY (TENANT_ID, ORGANIZATION_ID)) MULTI_TENANT=true
 CREATE TABLE IF NOT EXISTS GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100))
  CREATE TABLE IF NOT EXISTS OUTPUT_GIGANTIC_TABLE (ID INTEGER PRIMARY KEY,unsig_id UNSIGNED_INT,big_id BIGINT,unsig_long_id UNSIGNED_LONG,tiny_id TINYINT,unsig_tiny_id UNSIGNED_TINYINT,small_id SMALLINT,unsig_small_id UNSIGNED_SMALLINT,float_id FLOAT,unsig_float_id UNSIGNED_FLOAT,double_id DOUBLE,unsig_double_id UNSIGNED_DOUBLE,decimal_id DECIMAL,boolean_id BOOLEAN,time_id TIME,date_id DATE,timestamp_id TIMESTAMP,unsig_time_id UNSIGNED_TIME,unsig_date_id UNSIGNED_DATE,unsig_timestamp_id UNSIGNED_TIMESTAMP,varchar_id VARCHAR (30),char_id CHAR (30),binary_id BINARY (100),varbinary_id VARBINARY (100))
- upsert into GIGANTIC_TABLE values(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This is random textA','a','a','a')
+ upsert into GIGANTIC_TABLE values(0,2,3,4,-5,6,7,8,9.3,10.4,11.5,12.6,13.7,true,null,null,CURRENT_TIME(),CURRENT_TIME(),CURRENT_DATE(),CURRENT_TIME(),'This is random textA','a','a','a')

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28af89c4/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index 63547d2..2c2c6e1 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -134,9 +134,9 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
       val rowSeq = columns.map { case (name, sqlType) =>
         val res = pr.resultMap(name)
           // Special handling for data types
-          if (dateAsTimestamp && (sqlType == 91 || sqlType == 19)) { // 91 is the defined type for Date and 19 for UNSIGNED_DATE
+          if (dateAsTimestamp && (sqlType == 91 || sqlType == 19) && res!=null) { // 91 is the defined type for Date and 19 for UNSIGNED_DATE
             new java.sql.Timestamp(res.asInstanceOf[java.sql.Date].getTime)
-          } else if (sqlType == 92 || sqlType == 18) { // 92 is the defined type for Time and 18 for UNSIGNED_TIME
+          } else if ((sqlType == 92 || sqlType == 18) && res!=null) { // 92 is the defined type for Time and 18 for UNSIGNED_TIME
             new java.sql.Timestamp(res.asInstanceOf[java.sql.Time].getTime)
           } else {
             res


[2/3] phoenix git commit: PHOENIX-3792 Provide way to skip normalization of column names in phoenix-spark integration

Posted by an...@apache.org.
PHOENIX-3792 Provide way to skip normalization of column names in phoenix-spark integration


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/90e32c01
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/90e32c01
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/90e32c01

Branch: refs/heads/master
Commit: 90e32c015207b39330ed7496db7a73dbc7b634f4
Parents: 28af89c
Author: Ankit Singhal <an...@gmail.com>
Authored: Fri Apr 21 11:48:16 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Fri Apr 21 11:48:16 2017 +0530

----------------------------------------------------------------------
 phoenix-spark/src/it/resources/globalSetup.sql  |  1 +
 .../apache/phoenix/spark/PhoenixSparkIT.scala   | 27 ++++++++++++++++++--
 .../phoenix/spark/DataFrameFunctions.scala      | 19 +++++++++++---
 .../apache/phoenix/spark/DefaultSource.scala    |  2 +-
 4 files changed, 42 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/90e32c01/phoenix-spark/src/it/resources/globalSetup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/globalSetup.sql b/phoenix-spark/src/it/resources/globalSetup.sql
index dc24da7..7ac0039 100644
--- a/phoenix-spark/src/it/resources/globalSetup.sql
+++ b/phoenix-spark/src/it/resources/globalSetup.sql
@@ -17,6 +17,7 @@
 CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
 CREATE TABLE table1_copy (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
 CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR)
+CREATE TABLE table3 (id BIGINT NOT NULL PRIMARY KEY, table3_id BIGINT, "t2col1" VARCHAR)
 UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1')
 UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1')
 UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (2, 1, 'test_child_2')

http://git-wip-us.apache.org/repos/asf/phoenix/blob/90e32c01/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index d53b5ee..b8e44fe 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -20,15 +20,38 @@ import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{Row, SQLContext, SaveMode}
 import org.joda.time.DateTime
-
+import org.apache.spark.{SparkConf, SparkContext}
 import scala.collection.mutable.ListBuffer
-
+import org.apache.hadoop.conf.Configuration
 /**
   * Note: If running directly from an IDE, these are the recommended VM parameters:
   * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
   */
 class PhoenixSparkIT extends AbstractPhoenixSparkIT {
 
+  test("Can persist data with case senstive columns (like in avro schema) using 'DataFrame.saveToPhoenix'") {
+    val sqlContext = new SQLContext(sc)
+    val df = sqlContext.createDataFrame(
+      Seq(
+        (1, 1, "test_child_1"),
+        (2, 1, "test_child_2"))).toDF("ID", "TABLE3_ID", "t2col1")
+    df.saveToPhoenix("TABLE3", zkUrl = Some(quorumAddress),skipNormalizingIdentifier=true)
+
+    // Verify results
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT * FROM TABLE3")
+
+    val checkResults = List((1, 1, "test_child_1"), (2, 1, "test_child_2"))
+    val results = ListBuffer[(Long, Long, String)]()
+    while (rs.next()) {
+      results.append((rs.getLong(1), rs.getLong(2), rs.getString(3)))
+    }
+    stmt.close()
+
+    results.toList shouldEqual checkResults
+
+  }
+  
   test("Can convert Phoenix schema") {
     val phoenixSchema = List(
       new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/90e32c01/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index ddf4fab..92f4c58 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -24,13 +24,16 @@ import scala.collection.JavaConversions._
 
 
 class DataFrameFunctions(data: DataFrame) extends Serializable {
-
+  def saveToPhoenix(parameters: Map[String, String]): Unit = {
+  		saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"), 
+  		skipNormalizingIdentifier=parameters.contains("skipNormalizingIdentifier"))
+   }
   def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
-                    zkUrl: Option[String] = None, tenantId: Option[String] = None): Unit = {
-
+                    zkUrl: Option[String] = None, tenantId: Option[String] = None, skipNormalizingIdentifier: Boolean = false): Unit = {
 
     // Retrieve the schema field names and normalize to Phoenix, need to do this outside of mapPartitions
-    val fieldArray = data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x))
+    val fieldArray = getFieldArray(skipNormalizingIdentifier, data)
+    
 
     // Create a configuration object to use for saving
     @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, tenantId, Some(conf))
@@ -61,4 +64,12 @@ class DataFrameFunctions(data: DataFrame) extends Serializable {
       outConfig
     )
   }
+
+  def getFieldArray(skipNormalizingIdentifier: Boolean = false, data: DataFrame) = {
+    if (skipNormalizingIdentifier) {
+      data.schema.fieldNames.map(x => x)
+    } else {
+      data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/90e32c01/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
index 743d196..e000b74 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
@@ -44,7 +44,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
     verifyParameters(parameters)
 
     // Save the DataFrame to Phoenix
-    data.saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"))
+    data.saveToPhoenix(parameters)
 
     // Return a relation of the saved data
     createRelation(sqlContext, parameters)


[3/3] phoenix git commit: PHOENIX-3759 Dropping a local index causes NPE

Posted by an...@apache.org.
PHOENIX-3759 Dropping a local index causes NPE


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/92b951e5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/92b951e5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/92b951e5

Branch: refs/heads/master
Commit: 92b951e5387768e084ed09729884a59160cd81d3
Parents: 90e32c0
Author: Ankit Singhal <an...@gmail.com>
Authored: Fri Apr 21 11:48:54 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Fri Apr 21 11:48:54 2017 +0530

----------------------------------------------------------------------
 .../apache/phoenix/end2end/index/LocalIndexIT.java   | 15 ++++++++++++---
 .../java/org/apache/phoenix/util/RepairUtil.java     | 11 +++++++----
 2 files changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/92b951e5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 8d3316b..ea4780b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -599,21 +599,30 @@ public class LocalIndexIT extends BaseLocalIndexIT {
             admin.disableTable(tableName);
             copyLocalIndexHFiles(config, tableRegions.get(0), tableRegions.get(1), false);
             copyLocalIndexHFiles(config, tableRegions.get(3), tableRegions.get(0), false);
-
             admin.enableTable(tableName);
 
             int count=getCount(conn, tableName, "L#0");
             assertTrue(count > 14);
-            admin.majorCompact(tableName);
+            admin.majorCompact(TableName.valueOf(tableName));
             int tryCount = 5;// need to wait for rebuilding of corrupted local index region
             while (tryCount-- > 0 && count != 14) {
-                Thread.sleep(30000);
+                Thread.sleep(15000);
                 count = getCount(conn, tableName, "L#0");
             }
             assertEquals(14, count);
             rs = statement.executeQuery("SELECT COUNT(*) FROM " + indexName1);
             assertTrue(rs.next());
             assertEquals(7, rs.getLong(1));
+            statement.execute("DROP INDEX " + indexName1 + " ON " + tableName);
+            admin.majorCompact(TableName.valueOf(tableName));
+            statement.execute("DROP INDEX " + indexName + " ON " + tableName);
+            admin.majorCompact(TableName.valueOf(tableName));
+            Thread.sleep(15000);
+            admin.majorCompact(TableName.valueOf(tableName));
+            Thread.sleep(15000);
+            rs = statement.executeQuery("SELECT COUNT(*) FROM " + tableName);
+            assertTrue(rs.next());
+            
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92b951e5/phoenix-core/src/main/java/org/apache/phoenix/util/RepairUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/RepairUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/RepairUtil.java
index b9b7526..ea14715 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/RepairUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/RepairUtil.java
@@ -29,10 +29,13 @@ public class RepairUtil {
         byte[] endKey = environment.getRegion().getRegionInfo().getEndKey();
         byte[] indexKeyEmbedded = startKey.length == 0 ? new byte[endKey.length] : startKey;
         for (StoreFile file : store.getStorefiles()) {
-            byte[] fileFirstRowKey = KeyValue.createKeyValueFromKey(file.getReader().getFirstKey()).getRow();;
-            if ((fileFirstRowKey != null && Bytes.compareTo(file.getReader().getFirstKey(), 0, indexKeyEmbedded.length,
-                    indexKeyEmbedded, 0, indexKeyEmbedded.length) != 0)
-                    /*|| (endKey.length > 0 && Bytes.compareTo(file.getLastKey(), endKey) < 0)*/) { return false; }
+            if (file.getReader() != null && file.getReader().getFirstKey() != null) {
+                byte[] fileFirstRowKey = KeyValue.createKeyValueFromKey(file.getReader().getFirstKey()).getRow();
+                ;
+                if ((fileFirstRowKey != null && Bytes.compareTo(file.getReader().getFirstKey(), 0,
+                        indexKeyEmbedded.length, indexKeyEmbedded, 0, indexKeyEmbedded.length) != 0)
+                /* || (endKey.length > 0 && Bytes.compareTo(file.getLastKey(), endKey) < 0) */) { return false; }
+            }
         }
         return true;
     }