You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2019/03/01 00:53:57 UTC
[phoenix] branch 4.x-HBase-1.2 updated: PHOENIX-5141 Use
HBaseFactoryProvider.getConfigurationFactory to get the config in
PhoenixRDD (addendum)
This is an automated email from the ASF dual-hosted git repository.
tdsilva pushed a commit to branch 4.x-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push:
new 7e89269 PHOENIX-5141 Use HBaseFactoryProvider.getConfigurationFactory to get the config in PhoenixRDD (addendum)
7e89269 is described below
commit 7e892690c702a5465f4ddaebced0a0b018c7b629
Author: Thomas D'Silva <td...@apache.org>
AuthorDate: Thu Feb 28 16:52:36 2019 -0800
PHOENIX-5141 Use HBaseFactoryProvider.getConfigurationFactory to get the config in PhoenixRDD (addendum)
---
.../main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala | 7 +++++--
.../main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala | 8 ++++----
2 files changed, 9 insertions(+), 6 deletions(-)
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
index d555954..9377986 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
@@ -17,6 +17,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver
import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
+import org.apache.phoenix.query.HBaseFactoryProvider
import org.apache.phoenix.util.{ColumnInfo, PhoenixRuntime}
import scala.collection.JavaConversions._
@@ -28,8 +29,8 @@ object ConfigurationUtil extends Serializable {
// Create an HBaseConfiguration object from the passed in config, if present
val config = conf match {
- case Some(c) => HBaseConfiguration.create(c)
- case _ => HBaseConfiguration.create()
+ case Some(c) => HBaseFactoryProvider.getConfigurationFactory.getConfiguration(c)
+ case _ => HBaseFactoryProvider.getConfigurationFactory.getConfiguration()
}
// Set the tenantId in the config if present
@@ -41,6 +42,8 @@ object ConfigurationUtil extends Serializable {
// Set the table to save to
PhoenixConfigurationUtil.setOutputTableName(config, tableName)
PhoenixConfigurationUtil.setPhysicalTableName(config, tableName)
+ // disable property provider evaluation
+ PhoenixConfigurationUtil.setPropertyPolicyProviderDisabled(config);
// Infer column names from the DataFrame schema
PhoenixConfigurationUtil.setUpsertColumnNames(config, Array(columns : _*))
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index 3b0289d..85a6d8a 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -28,7 +28,7 @@ class DataFrameFunctions(data: DataFrame) extends Serializable {
saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"),
skipNormalizingIdentifier=parameters.contains("skipNormalizingIdentifier"))
}
- def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
+ def saveToPhoenix(tableName: String, conf: Option[Configuration] = None,
zkUrl: Option[String] = None, tenantId: Option[String] = None, skipNormalizingIdentifier: Boolean = false): Unit = {
// Retrieve the schema field names and normalize to Phoenix, need to do this outside of mapPartitions
@@ -36,7 +36,7 @@ class DataFrameFunctions(data: DataFrame) extends Serializable {
// Create a configuration object to use for saving
- @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, tenantId, Some(conf))
+ @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, tenantId, conf)
// Retrieve the zookeeper URL
val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig)
@@ -45,9 +45,9 @@ class DataFrameFunctions(data: DataFrame) extends Serializable {
val phxRDD = data.rdd.mapPartitions{ rows =>
// Create a within-partition config to retrieve the ColumnInfo list
- @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal, tenantId)
+ @transient val partitionConfig = ConfigurationUtil.getOutputCon figuration(tableName, fieldArray, zkUrlFinal, tenantId)
@transient val columns = PhoenixConfigurationUtil.getUpsertColumnMetadataList(partitionConfig).toList
-
+
rows.map { row =>
val rec = new PhoenixRecordWritable(columns)
row.toSeq.foreach { e => rec.add(e) }