You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/10/31 10:13:36 UTC

carbondata git commit: [CARBONDATA-3054] Fix Dictionary file cannot be read in S3a with CarbonDictionaryDecoder.doConsume() codeGen

Repository: carbondata
Updated Branches:
  refs/heads/master 0e39abf81 -> bcf3e0fd5


[CARBONDATA-3054] Fix Dictionary file cannot be read in S3a with CarbonDictionaryDecoder.doConsume() codeGen

problem: In S3a environment, when queried the data which has dictionary
files, Dictionary file cannot be read in S3a with CarbonDictionaryDecoder.doConsume() codeGen even though file is present.

cause: CarbonDictionaryDecoder.doConsume() codeGen doesn't set hadoop conf in thread local variable, only doExecute() sets it.
Hence, when getDictionaryWrapper() called from doConsume() codeGen, AbstractDictionaryCache.getDictionaryMetaCarbonFile() returns false for fileExists() operation.

solution:
In CarbonDictionaryDecoder.doConsume() codeGen, set hadoop conf in thread local variable

This closes #2876


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bcf3e0fd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bcf3e0fd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bcf3e0fd

Branch: refs/heads/master
Commit: bcf3e0fd595f612ee33a8ee2d9aa6197998f626e
Parents: 0e39abf
Author: ajantha-bhat <aj...@gmail.com>
Authored: Mon Oct 29 17:56:29 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Wed Oct 31 15:47:51 2018 +0530

----------------------------------------------------------------------
 .../spark/sql/CarbonDictionaryDecoder.scala     | 25 +++++++++++++-------
 1 file changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/bcf3e0fd/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index f3d5bf0..c9434a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.attachTree
@@ -31,6 +32,7 @@ import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
+import org.apache.spark.util.SerializableConfiguration
 
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
@@ -42,7 +44,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.scan.executor.util.QueryUtil
 import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalSessionInfo}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.rdd.CarbonRDDWithTableInfo
 
@@ -69,6 +70,9 @@ case class CarbonDictionaryDecoder(
   val getDictionaryColumnIds: Array[(String, ColumnIdentifier, CarbonDimension)] =
     CarbonDictionaryDecoder.getDictionaryColumnMapping(child.output, relations, profile, aliasMap)
 
+  val broadcastConf = SparkSQLUtil.broadCastHadoopConf(
+    sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
+
   override def doExecute(): RDD[InternalRow] = {
     attachTree(this, "execute") {
       val tableNameToCarbonTableMapping = relations.map { relation =>
@@ -76,12 +80,10 @@ case class CarbonDictionaryDecoder(
         (carbonTable.getTableName, carbonTable)
       }.toMap
 
-      val conf = SparkSQLUtil
-        .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
       if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
         val dataTypes = child.output.map { attr => attr.dataType }
         child.execute().mapPartitions { iter =>
-          ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+          ThreadLocalSessionInfo.setConfigurationToCurrentThread(broadcastConf.value.value)
           val cacheProvider: CacheProvider = CacheProvider.getInstance
           val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
             cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
@@ -137,7 +139,7 @@ case class CarbonDictionaryDecoder(
       val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
         cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
       val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryWrapper(tableNameToCarbonTableMapping,
-        forwardDictionaryCache)
+        forwardDictionaryCache, broadcastConf)
 
       val exprs = child.output.map { exp =>
         ExpressionCanonicalizer.execute(BindReferences.bindReference(exp, child.output))
@@ -269,7 +271,8 @@ case class CarbonDictionaryDecoder(
   }
 
   private def getDictionaryWrapper(atiMap: Map[String, CarbonTable],
-      cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
+      cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary],
+      broadcastConf: Broadcast[SerializableConfiguration]) = {
     val allDictIdentifiers = new ArrayBuffer[DictionaryColumnUniqueIdentifier]()
     val dicts: Seq[ForwardDictionaryWrapper] = getDictionaryColumnIds.map {
       case (tableName, columnIdentifier, carbonDimension) =>
@@ -295,7 +298,7 @@ case class CarbonDictionaryDecoder(
               newColumnIdentifier, carbonDimension.getDataType,
               dictionaryPath)
             allDictIdentifiers += dictionaryColumnUniqueIdentifier
-            new ForwardDictionaryWrapper(dictionaryColumnUniqueIdentifier)
+            new ForwardDictionaryWrapper(dictionaryColumnUniqueIdentifier, broadcastConf)
           } catch {
             case _: Throwable => null
           }
@@ -552,9 +555,11 @@ class CarbonDecoderRDD(
  * It is a wrapper around Dictionary, it is a work around to keep the dictionary serializable in
  * case of codegen
  * @param dictIdentifier Dictionary column unique identifier
+ * @param broadcastConf hadoop broadcast conf for serialization, that contains carbon conf.
  */
 class ForwardDictionaryWrapper(
-    dictIdentifier: DictionaryColumnUniqueIdentifier) extends Serializable {
+    dictIdentifier: DictionaryColumnUniqueIdentifier,
+    broadcastConf: Broadcast[SerializableConfiguration]) extends Serializable {
 
   var dictionary: Dictionary = null
 
@@ -562,6 +567,10 @@ class ForwardDictionaryWrapper(
 
   def getDictionaryValueForKeyInBytes (surrogateKey: Int): Array[Byte] = {
     if (dictionary == null) {
+      // Note: from doConsume() codegen, this is the first method called.
+      // so setting conf to threadlocal here. If any new method added before calling this method.
+      // Need to set this in that method instead of here.
+      ThreadLocalSessionInfo.setConfigurationToCurrentThread(broadcastConf.value.value)
       dictionary = dictionaryLoader.getDictionary(dictIdentifier)
     }
     dictionary.getDictionaryValueForKeyInBytes(surrogateKey)