You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/10/31 10:13:36 UTC
carbondata git commit: [CARBONDATA-3054] Fix Dictionary file cannot
be read in S3a with CarbonDictionaryDecoder.doConsume() codeGen
Repository: carbondata
Updated Branches:
refs/heads/master 0e39abf81 -> bcf3e0fd5
[CARBONDATA-3054] Fix Dictionary file cannot be read in S3a with CarbonDictionaryDecoder.doConsume() codeGen
problem: In S3a environment, when queried the data which has dictionary
files, Dictionary file cannot be read in S3a with CarbonDictionaryDecoder.doConsume() codeGen even though file is present.
cause: CarbonDictionaryDecoder.doConsume() codeGen doesn't set hadoop conf in thread local variable, only doExecute() sets it.
Hence, when getDictionaryWrapper() called from doConsume() codeGen, AbstractDictionaryCache.getDictionaryMetaCarbonFile() returns false for fileExists() operation.
solution:
In CarbonDictionaryDecoder.doConsume() codeGen, set hadoop conf in thread local variable
This closes #2876
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bcf3e0fd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bcf3e0fd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bcf3e0fd
Branch: refs/heads/master
Commit: bcf3e0fd595f612ee33a8ee2d9aa6197998f626e
Parents: 0e39abf
Author: ajantha-bhat <aj...@gmail.com>
Authored: Mon Oct 29 17:56:29 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Wed Oct 31 15:47:51 2018 +0530
----------------------------------------------------------------------
.../spark/sql/CarbonDictionaryDecoder.scala | 25 +++++++++++++-------
1 file changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bcf3e0fd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index f3d5bf0..c9434a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
@@ -31,6 +32,7 @@ import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
+import org.apache.spark.util.SerializableConfiguration
import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
@@ -42,7 +44,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.scan.executor.util.QueryUtil
import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalSessionInfo}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
import org.apache.carbondata.spark.rdd.CarbonRDDWithTableInfo
@@ -69,6 +70,9 @@ case class CarbonDictionaryDecoder(
val getDictionaryColumnIds: Array[(String, ColumnIdentifier, CarbonDimension)] =
CarbonDictionaryDecoder.getDictionaryColumnMapping(child.output, relations, profile, aliasMap)
+ val broadcastConf = SparkSQLUtil.broadCastHadoopConf(
+ sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
+
override def doExecute(): RDD[InternalRow] = {
attachTree(this, "execute") {
val tableNameToCarbonTableMapping = relations.map { relation =>
@@ -76,12 +80,10 @@ case class CarbonDictionaryDecoder(
(carbonTable.getTableName, carbonTable)
}.toMap
- val conf = SparkSQLUtil
- .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
val dataTypes = child.output.map { attr => attr.dataType }
child.execute().mapPartitions { iter =>
- ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(broadcastConf.value.value)
val cacheProvider: CacheProvider = CacheProvider.getInstance
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
@@ -137,7 +139,7 @@ case class CarbonDictionaryDecoder(
val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryWrapper(tableNameToCarbonTableMapping,
- forwardDictionaryCache)
+ forwardDictionaryCache, broadcastConf)
val exprs = child.output.map { exp =>
ExpressionCanonicalizer.execute(BindReferences.bindReference(exp, child.output))
@@ -269,7 +271,8 @@ case class CarbonDictionaryDecoder(
}
private def getDictionaryWrapper(atiMap: Map[String, CarbonTable],
- cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
+ cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary],
+ broadcastConf: Broadcast[SerializableConfiguration]) = {
val allDictIdentifiers = new ArrayBuffer[DictionaryColumnUniqueIdentifier]()
val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryColumnIds.map {
case (tableName, columnIdentifier, carbonDimension) =>
@@ -295,7 +298,7 @@ case class CarbonDictionaryDecoder(
newColumnIdentifier, carbonDimension.getDataType,
dictionaryPath)
allDictIdentifiers += dictionaryColumnUniqueIdentifier
- new ForwardDictionaryWrapper(dictionaryColumnUniqueIdentifier)
+ new ForwardDictionaryWrapper(dictionaryColumnUniqueIdentifier, broadcastConf)
} catch {
case _: Throwable => null
}
@@ -552,9 +555,11 @@ class CarbonDecoderRDD(
* It is a wrapper around Dictionary, it is a work around to keep the dictionary serializable in
* case of codegen
* @param dictIdentifier Dictionary column unique identifier
+ * @param broadcastConf hadoop broadcast conf for serialization, that contains carbon conf.
*/
class ForwardDictionaryWrapper(
- dictIdentifier: DictionaryColumnUniqueIdentifier) extends Serializable {
+ dictIdentifier: DictionaryColumnUniqueIdentifier,
+ broadcastConf: Broadcast[SerializableConfiguration]) extends Serializable {
var dictionary: Dictionary = null
@@ -562,6 +567,10 @@ class ForwardDictionaryWrapper(
def getDictionaryValueForKeyInBytes (surrogateKey: Int): Array[Byte] = {
if (dictionary == null) {
+ // Note: from doConsume() codegen, this is the first method called.
+ // so setting conf to threadlocal here. If any new method added before calling this method.
+ // Need to set this in that method instead of here.
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(broadcastConf.value.value)
dictionary = dictionaryLoader.getDictionary(dictIdentifier)
}
dictionary.getDictionaryValueForKeyInBytes(surrogateKey)