You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/06/27 10:52:58 UTC
[2/5] carbondata git commit: Added set/reset commands in carbon to
update/reset properties dynamically
Added set/reset commands in carbon to update/reset properties dynamically
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/95ce1da1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/95ce1da1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/95ce1da1
Branch: refs/heads/master
Commit: 95ce1da1e6a828255ca6385ae5ab16706e66483f
Parents: 28e2e17
Author: Manohar <ma...@gmail.com>
Authored: Mon Jun 12 18:06:25 2017 +0530
Committer: Manohar <ma...@gmail.com>
Committed: Tue Jun 27 14:39:51 2017 +0530
----------------------------------------------------------------------
.../core/util/ThreadLocalSessionParams.java | 34 +++++++++++++++
.../spark/rdd/AlterTableAddColumnRDD.scala | 9 ++--
.../spark/rdd/AlterTableDropColumnRDD.scala | 10 ++---
.../spark/rdd/CarbonCleanFilesRDD.scala | 8 +---
.../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 9 +---
.../spark/rdd/CarbonDeleteLoadRDD.scala | 9 +---
.../spark/rdd/CarbonDropTableRDD.scala | 12 ++---
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 34 ++++-----------
.../spark/rdd/CarbonIUDMergerRDD.scala | 3 --
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 8 +---
.../apache/carbondata/spark/rdd/CarbonRDD.scala | 46 ++++++++++++++++++++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 8 +---
.../spark/rdd/DataLoadCoalescedRDD.scala | 15 +++----
.../spark/rdd/NewCarbonDataLoadRDD.scala | 42 +++++++-----------
.../spark/rdd/UpdateCoalescedRDD.scala | 10 ++---
.../carbondata/spark/rdd/UpdateDataLoad.scala | 4 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 7 ++-
.../spark/sql/hive/CarbonStrategies.scala | 4 +-
.../execution/command/CarbonHiveCommands.scala | 16 +------
.../spark/rdd/CarbonDataRDDFactory.scala | 16 +++----
.../sql/CarbonDatasourceHadoopRelation.scala | 3 ++
.../spark/sql/CarbonDictionaryDecoder.scala | 5 ++-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 12 ++++-
.../org/apache/spark/sql/CarbonSource.scala | 5 +--
.../execution/CarbonLateDecodeStrategy.scala | 3 +-
.../execution/CastExpressionOptimization.scala | 6 +--
.../execution/command/CarbonHiveCommands.scala | 26 ++++++++---
.../sql/execution/command/DDLStrategy.scala | 4 +-
.../execution/command/carbonTableSchema.scala | 16 +++----
.../apache/spark/sql/hive/CarbonMetastore.scala | 15 ++++---
.../spark/sql/hive/CarbonSessionState.scala | 2 +-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 15 ++++---
32 files changed, 217 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
new file mode 100644
index 0000000..354a0ee
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+/**
+ * This class maintains ThreadLocal session params
+ */
+public class ThreadLocalSessionParams {
+ static final InheritableThreadLocal<SessionParams> threadLocal =
+ new InheritableThreadLocal<SessionParams>();
+
+ public static void setSessionParams(SessionParams sessionParams) {
+ threadLocal.set(sessionParams);
+ }
+
+ public static SessionParams getSessionParams() {
+ return threadLocal.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 61e1e61..7eea95d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -50,24 +50,21 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par
class AlterTableAddColumnRDD[K, V](sc: SparkContext,
@transient newColumns: Seq[ColumnSchema],
carbonTableIdentifier: CarbonTableIdentifier,
- carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {
+ carbonStorePath: String)
+ extends CarbonRDD[(Int, String)](sc, Nil) {
val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
override def getPartitions: Array[Partition] = {
newColumns.zipWithIndex.map { column =>
new AddColumnPartition(id, column._2, column._1)
}.toArray
}
- override def compute(split: Partition,
+ override def internalCompute(split: Partition,
context: TaskContext): Iterator[(Int, String)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
val iter = new Iterator[(Int, String)] {
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index ba91673..fde5cd6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -26,7 +26,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.core.util.CarbonProperties
/**
* This is a partitioner class for dividing the newly added columns into partitions
@@ -49,9 +48,8 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa
class AlterTableDropColumnRDD[K, V](sc: SparkContext,
@transient newColumns: Seq[ColumnSchema],
carbonTableIdentifier: CarbonTableIdentifier,
- carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {
-
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+ carbonStorePath: String)
+ extends CarbonRDD[(Int, String)](sc, Nil) {
override def getPartitions: Array[Partition] = {
newColumns.zipWithIndex.map { column =>
@@ -59,11 +57,9 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
}.toArray
}
- override def compute(split: Partition,
+ override def internalCompute(split: Partition,
context: TaskContext): Iterator[(Int, String)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
val iter = new Iterator[(Int, String)] {
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index c1a30b7..b63fc48 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.Value
import org.apache.carbondata.spark.util.CarbonQueryUtil
@@ -34,21 +33,18 @@ class CarbonCleanFilesRDD[V: ClassTag](
databaseName: String,
tableName: String,
partitioner: Partitioner)
- extends RDD[V](sc, Nil) {
+ extends CarbonRDD[V](sc, Nil) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
override def getPartitions: Array[Partition] = {
val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
}
- override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
+ override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
val iter = new Iterator[(V)] {
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
val split = theSplit.asInstanceOf[CarbonLoadPartition]
logInfo("Input split: " + split.serializableHadoopSplit.value)
// TODO call CARBON delete API
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index f7bed59..da391cf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.DeletedLoadResult
import org.apache.carbondata.spark.util.CarbonQueryUtil
@@ -40,12 +39,10 @@ class CarbonDeleteLoadByDateRDD[K, V](
dimTableName: String,
storePath: String,
loadMetadataDetails: List[LoadMetadataDetails])
- extends RDD[(K, V)](sc, Nil) {
+ extends CarbonRDD[(K, V)](sc, Nil) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
override def getPartitions: Array[Partition] = {
val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
splits.zipWithIndex.map {s =>
@@ -53,10 +50,8 @@ class CarbonDeleteLoadByDateRDD[K, V](
}
}
- override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
new Iterator[(K, V)] {
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
val split = theSplit.asInstanceOf[CarbonLoadPartition]
logInfo("Input split: " + split.serializableHadoopSplit.value)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index 3ef9cef..9e43d0e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.Value
import org.apache.carbondata.spark.util.CarbonQueryUtil
@@ -35,11 +34,9 @@ class CarbonDeleteLoadRDD[V: ClassTag](
databaseName: String,
tableName: String,
partitioner: Partitioner)
- extends RDD[V](sc, Nil) {
+ extends CarbonRDD[V](sc, Nil) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
override def getPartitions: Array[Partition] = {
val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
splits.zipWithIndex.map {f =>
@@ -47,10 +44,8 @@ class CarbonDeleteLoadRDD[V: ClassTag](
}
}
- override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
+ override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
val iter = new Iterator[V] {
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
val split = theSplit.asInstanceOf[CarbonLoadPartition]
logInfo("Input split: " + split.serializableHadoopSplit.value)
// TODO call CARBON delete API
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
index 54f8ea5..d1d49b9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -22,7 +22,6 @@ import scala.reflect.ClassTag
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
-import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.Value
import org.apache.carbondata.spark.util.CarbonQueryUtil
@@ -31,12 +30,10 @@ class CarbonDropTableRDD[V: ClassTag](
valueClass: Value[V],
databaseName: String,
tableName: String)
- extends RDD[V](sc, Nil) {
+ extends CarbonRDD[V](sc, Nil) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
override def getPartitions: Array[Partition] = {
val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
splits.zipWithIndex.map { s =>
@@ -44,12 +41,9 @@ class CarbonDropTableRDD[V: ClassTag](
}
}
- override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
-
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
+ override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
- val iter = new Iterator[V] {
+ val iter = new Iterator[V] {
// TODO: Clear Btree from memory
var havePair = false
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 434fb3c..d0f9362 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -176,19 +176,15 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S
class CarbonAllDictionaryCombineRDD(
prev: RDD[(String, Iterable[String])],
model: DictionaryLoadModel)
- extends RDD[(Int, ColumnDistinctValues)](prev) {
-
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+ extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
override def getPartitions: Array[Partition] = {
firstParent[(String, Iterable[String])].partitions
}
- override def compute(split: Partition, context: TaskContext
+ override def internalCompute(split: Partition, context: TaskContext
): Iterator[(Int, ColumnDistinctValues)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])]
/*
* for all dictionary, all columns need to encoding and checking
@@ -273,17 +269,12 @@ class StringArrayRow(var values: Array[String]) extends Row {
class CarbonBlockDistinctValuesCombineRDD(
prev: RDD[Row],
model: DictionaryLoadModel)
- extends RDD[(Int, ColumnDistinctValues)](prev) {
-
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+ extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
override def getPartitions: Array[Partition] = firstParent[Row].partitions
-
- override def compute(split: Partition,
+ override def internalCompute(split: Partition,
context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
model.hdfsLocation)
CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
@@ -338,16 +329,13 @@ class CarbonBlockDistinctValuesCombineRDD(
class CarbonGlobalDictionaryGenerateRDD(
prev: RDD[(Int, ColumnDistinctValues)],
model: DictionaryLoadModel)
- extends RDD[(Int, String, Boolean)](prev) {
-
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+ extends CarbonRDD[(Int, String, Boolean)](prev) {
override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
- override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
+ override def internalCompute(split: Partition,
+ context: TaskContext): Iterator[(Int, String, Boolean)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
model.hdfsLocation)
val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
@@ -544,9 +532,7 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
dimensions: Array[CarbonDimension],
hdfsLocation: String,
dictFolderPath: String)
- extends RDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
-
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+ extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
override def getPartitions: Array[Partition] = {
val primDimensions = dictionaryLoadModel.primDimensions
@@ -558,10 +544,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
result
}
- override def compute(split: Partition, context: TaskContext)
+ override def internalCompute(split: Partition, context: TaskContext)
: Iterator[(Int, ColumnDistinctValues)] = {
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
val theSplit = split.asInstanceOf[CarbonColumnDictPatition]
val primDimension = theSplit.preDefDictDimension
// read the column dict data
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 38e3680..277005b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping
import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -51,8 +50,6 @@ class CarbonIUDMergerRDD[K, V](
carbonMergerMapping,
confExecutorsTemp) {
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
override def getPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index dec3ee3..908043a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -59,7 +59,7 @@ class CarbonMergerRDD[K, V](
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping,
confExecutorsTemp: String)
- extends RDD[(K, V)](sc, Nil) {
+ extends CarbonRDD[(K, V)](sc, Nil) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
sc.setLocalProperty("spark.job.interruptOnCancel", "true")
@@ -74,12 +74,8 @@ class CarbonMergerRDD[K, V](
val factTableName = carbonMergerMapping.factTableName
val tableId = carbonMergerMapping.tableId
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
- override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
val iter = new Iterator[(K, V)] {
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
new file mode 100644
index 0000000..e00dd0f
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+
+import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
+
+/**
+ * This RDD maintains session level ThreadLocal
+ */
+abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
+ @transient private var deps: Seq[Dependency[_]]) extends RDD[T](sc, deps) {
+
+ val sessionParams: SessionParams = ThreadLocalSessionParams.getSessionParams
+
+ /** Construct an RDD with just a one-to-one dependency on one parent */
+ def this(@transient oneParent: RDD[_]) =
+ this (oneParent.context, List(new OneToOneDependency(oneParent)))
+
+ // RDD compute logic should be here
+ def internalCompute(split: Partition, context: TaskContext): Iterator[T]
+
+ final def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ ThreadLocalSessionParams.setSessionParams(sessionParams)
+ internalCompute(split, context)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 2c10e65..3868342 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -54,7 +54,7 @@ class CarbonScanRDD(
filterExpression: Expression,
identifier: AbsoluteTableIdentifier,
@transient carbonTable: CarbonTable)
- extends RDD[InternalRow](sc, Nil) {
+ extends CarbonRDD[InternalRow](sc, Nil) {
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
private val jobTrackerId: String = {
@@ -67,8 +67,6 @@ class CarbonScanRDD(
private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
@transient private val jobId = new JobID(jobTrackerId, id)
@transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -175,15 +173,13 @@ class CarbonScanRDD(
result.toArray(new Array[Partition](result.size()))
}
- override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+ override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
if (null == carbonPropertiesFilePath) {
System.setProperty("carbon.properties.filepath",
System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
)
}
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
index 5da0835..b2d04ac 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
@@ -21,26 +21,21 @@ import scala.reflect.ClassTag
import org.apache.spark._
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.rdd.CarbonRDD
case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
class DataLoadCoalescedRDD[T: ClassTag](
- @transient var prev: RDD[T],
- nodeList: Array[String])
- extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) {
-
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+ @transient var prev: RDD[T],
+ nodeList: Array[String])
+ extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil) {
override def getPartitions: Array[Partition] = {
new DataLoadPartitionCoalescer(prev, nodeList).run
}
- override def compute(split: Partition,
+ override def internalCompute(split: Partition,
context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
-
new Iterator[DataLoadPartitionWrap[T]] {
val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator
def hasNext = iter.hasNext
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 5790369..129c642 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -20,7 +20,6 @@ package org.apache.carbondata.spark.rdd
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import java.nio.ByteBuffer
import java.text.SimpleDateFormat
-import java.util
import java.util.{Date, UUID}
import scala.collection.JavaConverters._
@@ -127,16 +126,12 @@ class SparkPartitionLoader(model: CarbonLoadModel,
var storeLocation: String = ""
- def initialize(addedProperies: util.Map[String, String]): Unit = {
+ def initialize(): Unit = {
val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
if (null == carbonPropertiesFilePath) {
System.setProperty("carbon.properties.filepath",
System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
}
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
@@ -177,7 +172,7 @@ class NewCarbonDataLoadRDD[K, V](
loadCount: Integer,
blocksGroupBy: Array[(String, Array[BlockDetails])],
isTableSplitPartition: Boolean)
- extends RDD[(K, V)](sc, Nil) {
+ extends CarbonRDD[(K, V)](sc, Nil) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
@@ -190,8 +185,6 @@ class NewCarbonDataLoadRDD[K, V](
private val confBroadcast =
sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
override def getPartitions: Array[Partition] = {
if (isTableSplitPartition) {
// for table split partition
@@ -222,7 +215,7 @@ class NewCarbonDataLoadRDD[K, V](
// Do nothing. Hadoop RDD should not be checkpointed.
}
- override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
var partitionID = "0"
@@ -246,7 +239,7 @@ class NewCarbonDataLoadRDD[K, V](
String.valueOf(loadCount),
loadMetadataDetails)
// Intialize to set carbon properties
- loader.initialize(addedProperies)
+ loader.initialize()
new DataLoadExecutor().execute(model,
loader.storeLocation,
recordReaders)
@@ -391,17 +384,16 @@ class NewCarbonDataLoadRDD[K, V](
* @see org.apache.carbondata.processing.newflow.DataLoadExecutor
*/
class NewDataFrameLoaderRDD[K, V](
- sc: SparkContext,
- result: DataLoadResult[K, V],
- carbonLoadModel: CarbonLoadModel,
- loadCount: Integer,
- tableCreationTime: Long,
- schemaLastUpdatedTime: Long,
- prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {
+ sc: SparkContext,
+ result: DataLoadResult[K, V],
+ carbonLoadModel: CarbonLoadModel,
+ loadCount: Integer,
+ tableCreationTime: Long,
+ schemaLastUpdatedTime: Long,
+ prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](prev) {
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+ override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
- override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
val partitionID = "0"
@@ -438,7 +430,7 @@ class NewDataFrameLoaderRDD[K, V](
String.valueOf(loadCount),
loadMetadataDetails)
// Intialize to set carbon properties
- loader.initialize(addedProperies)
+ loader.initialize()
new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray)
} catch {
case e: BadRecordFoundException =>
@@ -593,11 +585,9 @@ class PartitionTableDataLoaderRDD[K, V](
loadCount: Integer,
tableCreationTime: Long,
schemaLastUpdatedTime: Long,
- prev: RDD[Row]) extends RDD[(K, V)](prev) {
-
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+ prev: RDD[Row]) extends CarbonRDD[(K, V)](prev) {
- override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
val partitionID = "0"
@@ -625,7 +615,7 @@ class PartitionTableDataLoaderRDD[K, V](
String.valueOf(loadCount),
loadMetadataDetails)
// Intialize to set carbon properties
- loader.initialize(addedProperies)
+ loader.initialize()
new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders)
} catch {
case e: BadRecordFoundException =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
index 30050f7..1025da7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
@@ -22,25 +22,21 @@ import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.rdd.{CoalescedRDDPartition, DataLoadPartitionCoalescer, RDD}
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.rdd.CarbonRDD
// This RDD distributes previous RDD data based on number of nodes. i.e., one partition for one node
class UpdateCoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
nodeList: Array[String])
- extends RDD[T](prev.context, Nil) {
-
- private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+ extends CarbonRDD[T](prev.context, Nil) {
override def getPartitions: Array[Partition] = {
new DataLoadPartitionCoalescer(prev, nodeList).run
}
- override def compute(split: Partition,
+ override def internalCompute(split: Partition,
context: TaskContext): Iterator[T] = {
- // Add the properties added in driver to executor.
- CarbonProperties.getInstance().setProperties(addedProperies)
// This iterator combines data from all the parent partitions
new Iterator[T] {
val parentPartitionIter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index 6b94894..bcfc096 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -17,8 +17,6 @@
package org.apache.carbondata.spark.rdd
-import java.util
-
import scala.collection.mutable
import org.apache.spark.TaskContext
@@ -54,7 +52,7 @@ object UpdateDataLoad {
segId,
loadMetadataDetails)
// Intialize to set carbon properties
- loader.initialize(new util.HashMap)
+ loader.initialize()
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
new DataLoadExecutor().execute(carbonLoadModel,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 0e6153f..2fc93e6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -42,6 +42,7 @@ import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, Carbon
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
+import org.apache.carbondata.spark.rdd.CarbonRDD
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
private[sql] case class CarbonDatasourceHadoopRelation(
@@ -94,7 +95,6 @@ private[sql] case class CarbonDatasourceHadoopRelation(
requiredColumns.foreach(projection.addColumn)
CarbonInputFormat.setColumnProjection(conf, projection)
CarbonInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
-
new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
new SerializableConfiguration(conf),
absIdentifier,
@@ -120,7 +120,7 @@ class CarbonHadoopFSRDD[V: ClassTag](
identifier: AbsoluteTableIdentifier,
inputFormatClass: Class[_ <: CarbonInputFormat[V]],
valueClass: Class[V])
- extends RDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
+ extends CarbonRDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -128,8 +128,7 @@ class CarbonHadoopFSRDD[V: ClassTag](
}
@transient protected val jobId = new JobID(jobTrackerId, id)
- @DeveloperApi
- override def compute(split: Partition,
+ override def internalCompute(split: Partition,
context: TaskContext): Iterator[V] = {
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 7bfd742..f0cd33b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _}
import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan}
-import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SetCommand, SparkPlan}
+import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SparkPlan}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand}
@@ -316,8 +316,6 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
} else {
ExecutedCommand(HiveNativeCommand(sql)) :: Nil
}
- case set@SetCommand(kv) =>
- ExecutedCommand(CarbonSetCommand(set)) :: Nil
case _ =>
Nil
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index d047b20..0f42940 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -18,12 +18,10 @@
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.{RunnableCommand, SetCommand}
+import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.execution.command.DropTableCommand
import org.apache.spark.sql.hive.execution.HiveNativeCommand
-import org.apache.carbondata.core.util.CarbonProperties
-
private[hive] case class CreateDatabaseCommand(dbName: String,
command: HiveNativeCommand) extends RunnableCommand {
def run(sqlContext: SQLContext): Seq[Row] = {
@@ -55,15 +53,3 @@ private[hive] case class DropDatabaseCascadeCommand(dbName: String,
rows
}
}
-
-case class CarbonSetCommand(command: SetCommand)
- extends RunnableCommand {
-
- override val output = command.output
-
- override def run(sparkSession: SQLContext): Seq[Row] = {
- val rows = command.run(sparkSession)
- CarbonProperties.getInstance().addProperty(rows.head.getString(0), rows.head.getString(1))
- rows
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2b77654..48af516 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -128,7 +128,7 @@ object CarbonDataRDDFactory {
isCompactionTriggerByDDl
)
- val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
+ val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
)
@@ -275,8 +275,8 @@ object CarbonDataRDDFactory {
exception = e
}
// continue in case of exception also, check for all the tables.
- val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession).
- sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+ val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
).equalsIgnoreCase("true")
@@ -397,8 +397,8 @@ object CarbonDataRDDFactory {
}
storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
- val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession)
- .sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+ val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
)
.equalsIgnoreCase("true")
@@ -1042,8 +1042,7 @@ object CarbonDataRDDFactory {
val timeStampFormat = if (specificFormat.isDefined) {
new SimpleDateFormat(specificFormat.get)
} else {
- val timestampFormatString = CarbonEnv.getInstance(sqlContext.sparkSession)
- .sessionParams.getProperty(CarbonCommonConstants
+ val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
new SimpleDateFormat(timestampFormatString)
}
@@ -1051,8 +1050,7 @@ object CarbonDataRDDFactory {
val dateFormat = if (specificFormat.isDefined) {
new SimpleDateFormat(specificFormat.get)
} else {
- val dateFormatString = CarbonEnv.getInstance(sqlContext.sparkSession)
- .sessionParams.getProperty(CarbonCommonConstants
+ val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
new SimpleDateFormat(dateFormatString)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index b0044d7..7c096d3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
import org.apache.carbondata.hadoop.CarbonProjection
import org.apache.carbondata.hadoop.util.SchemaReader
import org.apache.carbondata.processing.merger.TableMeta
@@ -52,6 +53,8 @@ case class CarbonDatasourceHadoopRelation(
absIdentifier.getCarbonTableIdentifier.getTableName)(sparkSession)
.asInstanceOf[CarbonRelation]
+ val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams
+ ThreadLocalSessionParams.setSessionParams(sessionParams)
override def sqlContext: SQLContext = sparkSession.sqlContext
override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 49cf54f..bd1c8b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.util.DataTypeUtil
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+import org.apache.carbondata.spark.rdd.CarbonRDD
/**
* It decodes the data.
@@ -444,7 +445,7 @@ class CarbonDecoderRDD(
prev: RDD[InternalRow],
output: Seq[Attribute],
sparkSession: SparkSession)
- extends RDD[InternalRow](prev) {
+ extends CarbonRDD[InternalRow](prev) {
private val storepath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
@@ -513,7 +514,7 @@ class CarbonDecoderRDD(
dictIds
}
- override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+ override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val absoluteTableIdentifiers = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
(carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 0851ec2..78820ea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -21,10 +21,11 @@ import java.util.Map
import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.sql.hive.{CarbonMetastore, CarbonSessionCatalog}
+import org.apache.spark.sql.internal.CarbonSQLConf
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, SessionParams}
+import org.apache.carbondata.core.util.{CarbonProperties, SessionParams, ThreadLocalSessionParams}
import org.apache.carbondata.spark.rdd.SparkReadSupport
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
@@ -48,11 +49,18 @@ class CarbonEnv {
sparkSession.udf.register("getTupleId", () => "")
if (!initialized) {
sessionParams = new SessionParams()
+ ThreadLocalSessionParams.setSessionParams(sessionParams)
+ val config = new CarbonSQLConf(sparkSession)
+ if(sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT) == None) {
+ config.addDefaultCarbonParams()
+ }
+ // add session params after adding DefaultCarbonParams
+ config.addDefaultCarbonSessionParams()
carbonMetastore = {
val storePath =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
LOGGER.info(s"carbon env initial: $storePath")
- new CarbonMetastore(sparkSession.conf, storePath, sessionParams)
+ new CarbonMetastore(sparkSession.conf, storePath)
}
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
initialized = true
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 3079c84..1c16143 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -56,7 +56,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
None)
case _ =>
val options = new CarbonOption(parameters)
- val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
+ val storePath = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.STORE_LOCATION)
val tablePath = storePath + "/" + options.dbName + "/" + options.tableName
CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
@@ -77,8 +77,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
"specified when creating CarbonContext")
val options = new CarbonOption(parameters)
- val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
- .getProperty(CarbonCommonConstants.STORE_LOCATION)
+ val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
.exists(tablePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 8d0b4ea..4605914 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -520,8 +520,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
} else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
} else {
- CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
- .getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
index 805a4df..a8985b9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
@@ -24,7 +24,8 @@ import java.util.{Locale, TimeZone}
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not}
-import org.apache.spark.sql.{CarbonEnv, CastExpr, SparkSession, sources}
+import org.apache.spark.sql.CastExpr
+import org.apache.spark.sql.sources
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, TimestampType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -34,8 +35,7 @@ object CastExpressionOptimization {
def typeCastStringToLong(v: Any): Any = {
- val parser: SimpleDateFormat = new SimpleDateFormat(
- CarbonEnv.getInstance(SparkSession.getActiveSession.get).sessionParams
+ val parser: SimpleDateFormat = new SimpleDateFormat(CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index 627de02..a4feead 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, RunnableCommand, SetCommand}
+import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, ResetCommand, RunnableCommand, SetCommand}
import org.apache.carbondata.core.util.CarbonProperties
@@ -49,10 +49,26 @@ case class CarbonSetCommand(command: SetCommand)
override val output = command.output
override def run(sparkSession: SparkSession): Seq[Row] = {
- val rows = command.run(sparkSession)
- CarbonEnv.getInstance(sparkSession).sessionParams
- .addProperty(rows.head.getString(0), rows.head.getString(1))
- rows
+ val sessionParms = CarbonEnv.getInstance(sparkSession).sessionParams
+ command.kv match {
+ case Some((key, Some(value))) =>
+ val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
+ if (isCarbonProperty) {
+ sessionParms.addProperty(key, value)
+ }
+ case _ =>
+
+ }
+ command.run(sparkSession)
}
}
+case class CarbonResetCommand()
+ extends RunnableCommand {
+ override val output = ResetCommand.output
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ CarbonEnv.getInstance(sparkSession).sessionParams.clear()
+ ResetCommand.run(sparkSession)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 35be543..7d0215f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonSetCommand}
+import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -117,6 +117,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil
case set@SetCommand(kv) =>
ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
+ case reset@ResetCommand =>
+ ExecutedCommandExec(CarbonResetCommand()) :: Nil
case _ => Nil
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f1fd05b..0064c21 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -107,7 +107,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
carbonLoadModel.setStorePath(relation.tableMeta.storePath)
- var storeLocation = CarbonEnv.getInstance(sparkSession).sessionParams
+ var storeLocation = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
System.getProperty("java.io.tmpdir")
)
@@ -359,8 +359,7 @@ case class LoadTable(
sys.error(s"Data loading failed. table not found: $dbName.$tableName")
}
- CarbonEnv.getInstance(sparkSession).sessionParams
- .addProperty("zookeeper.enable.lock", "false")
+ CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
val carbonLock = CarbonLockFactory
.getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
.getCarbonTableIdentifier,
@@ -409,7 +408,7 @@ case class LoadTable(
val columnDict = options.getOrElse("columndict", null)
val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
- val badRecordActionValue = CarbonEnv.getInstance(sparkSession).sessionParams
+ val badRecordActionValue = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
@@ -429,12 +428,11 @@ case class LoadTable(
carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#"))
carbonLoadModel.setDateFormat(dateFormat)
- carbonLoadModel.setDefaultTimestampFormat(CarbonEnv.getInstance(sparkSession)
- .sessionParams.getProperty(
+ carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
- carbonLoadModel.setDefaultDateFormat(CarbonEnv.getInstance(sparkSession).sessionParams.
- getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+ carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
carbonLoadModel
.setSerializationNullFormat(
@@ -536,7 +534,7 @@ case class LoadTable(
allDictionaryPath)
}
// dictionaryServerClient dictionary generator
- val dictionaryServerPort = CarbonEnv.getInstance(sparkSession).sessionParams
+ val dictionaryServerPort = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
val sparkDriverHost = sparkSession.sqlContext.sparkContext.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 54cffc2..04a94ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.parsing.combinator.RegexParsers
-import org.apache.spark.sql.{CarbonEnv, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
import org.apache.carbondata.core.reader.ThriftReader
import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil, SessionParams}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -104,7 +104,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
}
}
-class CarbonMetastore(conf: RuntimeConfig, val storePath: String, sessionParams: SessionParams) {
+class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
@transient
val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -201,15 +201,18 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String, sessionParams:
// if zookeeper is configured as carbon lock type.
val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
if (null != zookeeperurl) {
- sessionParams.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
+ CarbonProperties.getInstance
+ .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
}
if (metadataPath == null) {
return null
}
// if no locktype is configured and store type is HDFS set HDFS lock as default
- if (null == sessionParams.getProperty(CarbonCommonConstants.LOCK_TYPE) &&
+ if (null == CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
FileType.HDFS == FileFactory.getFileType(metadataPath)) {
- sessionParams.addProperty(CarbonCommonConstants.LOCK_TYPE,
+ CarbonProperties.getInstance
+ .addProperty(CarbonCommonConstants.LOCK_TYPE,
CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
)
LOGGER.info("Default lock type HDFSLOCK is configured")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 156a12e..4aef118 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -115,7 +115,7 @@ class CarbonSessionCatalog(
*/
class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
- override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf)
+ override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
experimentalMethods.extraStrategies =
Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 258920b..3412fb0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,30 +18,33 @@ package org.apache.spark.sql.parser
import scala.collection.mutable
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
import org.apache.spark.sql.catalyst.parser.ParserUtils._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext,
-TablePropertyListContext}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field,
-PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, PartitionerField, TableModel}
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
+import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
/**
- * Concrete parser for Spark SQL statements and carbon specific statements
+ * Concrete parser for Spark SQL stateENABLE_INMEMORY_MERGE_SORT_DEFAULTments and carbon specific
+ * statements
*/
-class CarbonSparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
+class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser {
val astBuilder = new CarbonSqlAstBuilder(conf)
private val substitutor = new VariableSubstitution(conf)
override def parsePlan(sqlText: String): LogicalPlan = {
+ val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams
+ ThreadLocalSessionParams.setSessionParams(sessionParams)
try {
super.parsePlan(sqlText)
} catch {