You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2019/03/01 00:54:11 UTC

[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5141 Use HBaseFactoryProvider.getConfigurationFactory to get the config in PhoenixRDD (addendum)

This is an automated email from the ASF dual-hosted git repository.

tdsilva pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
     new 022362c  PHOENIX-5141 Use HBaseFactoryProvider.getConfigurationFactory to get the config in PhoenixRDD (addendum)
022362c is described below

commit 022362cdccdd5d5e5470de84094dbaead55fa0c1
Author: Thomas D'Silva <td...@apache.org>
AuthorDate: Thu Feb 28 16:52:36 2019 -0800

    PHOENIX-5141 Use HBaseFactoryProvider.getConfigurationFactory to get the config in PhoenixRDD (addendum)
---
 .../main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala   | 7 +++++--
 .../main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala  | 8 ++++----
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
index d555954..9377986 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
@@ -17,6 +17,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver
 import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
+import org.apache.phoenix.query.HBaseFactoryProvider
 import org.apache.phoenix.util.{ColumnInfo, PhoenixRuntime}
 
 import scala.collection.JavaConversions._
@@ -28,8 +29,8 @@ object ConfigurationUtil extends Serializable {
 
     // Create an HBaseConfiguration object from the passed in config, if present
     val config = conf match {
-      case Some(c) => HBaseConfiguration.create(c)
-      case _ => HBaseConfiguration.create()
+      case Some(c) => HBaseFactoryProvider.getConfigurationFactory.getConfiguration(c)
+      case _ => HBaseFactoryProvider.getConfigurationFactory.getConfiguration()
     }
 
     // Set the tenantId in the config if present
@@ -41,6 +42,8 @@ object ConfigurationUtil extends Serializable {
     // Set the table to save to
     PhoenixConfigurationUtil.setOutputTableName(config, tableName)
     PhoenixConfigurationUtil.setPhysicalTableName(config, tableName)
+    // disable property provider evaluation
+    PhoenixConfigurationUtil.setPropertyPolicyProviderDisabled(config);
 
     // Infer column names from the DataFrame schema
     PhoenixConfigurationUtil.setUpsertColumnNames(config, Array(columns : _*))
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index 3b0289d..85a6d8a 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -28,7 +28,7 @@ class DataFrameFunctions(data: DataFrame) extends Serializable {
   		saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"), 
   		skipNormalizingIdentifier=parameters.contains("skipNormalizingIdentifier"))
    }
-  def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
+  def saveToPhoenix(tableName: String, conf: Option[Configuration] = None,
                     zkUrl: Option[String] = None, tenantId: Option[String] = None, skipNormalizingIdentifier: Boolean = false): Unit = {
 
     // Retrieve the schema field names and normalize to Phoenix, need to do this outside of mapPartitions
@@ -36,7 +36,7 @@ class DataFrameFunctions(data: DataFrame) extends Serializable {
     
 
     // Create a configuration object to use for saving
-    @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, tenantId, Some(conf))
+    @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, tenantId, conf)
 
     // Retrieve the zookeeper URL
     val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig)
@@ -45,9 +45,9 @@ class DataFrameFunctions(data: DataFrame) extends Serializable {
     val phxRDD = data.rdd.mapPartitions{ rows =>
  
        // Create a within-partition config to retrieve the ColumnInfo list
-       @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal, tenantId)
+       @transient val partitionConfig = ConfigurationUtil.getOutputCon figuration(tableName, fieldArray, zkUrlFinal, tenantId)
        @transient val columns = PhoenixConfigurationUtil.getUpsertColumnMetadataList(partitionConfig).toList
- 
+
        rows.map { row =>
          val rec = new PhoenixRecordWritable(columns)
          row.toSeq.foreach { e => rec.add(e) }