You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/03 19:43:30 UTC

[23/50] [abbrv] carbondata git commit: [CARBONDATA-2062] Configure the temp directory to be used for streaming handoff

[CARBONDATA-2062] Configure the temp directory to be used for streaming handoff

This closes #1841


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d3b228fb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d3b228fb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d3b228fb

Branch: refs/heads/branch-1.3
Commit: d3b228fb8cde5bace2fc932124ee68b8b2e4ee8c
Parents: f9606e9
Author: Raghunandan S <ca...@gmail.com>
Authored: Mon Jan 22 11:47:28 2018 +0530
Committer: QiangCai <qi...@qq.com>
Committed: Fri Feb 2 14:52:05 2018 +0800

----------------------------------------------------------------------
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  | 34 ++-------------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 31 ++------------
 .../carbondata/spark/util/CommonUtil.scala      | 44 +++++++++++++++++++-
 .../carbondata/streaming/StreamHandoffRDD.scala |  4 ++
 4 files changed, 52 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3b228fb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 35a8ea7..76c99f2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -18,21 +18,19 @@
 package org.apache.carbondata.spark.rdd
 
 import scala.collection.JavaConverters._
-import scala.util.Random
 
-import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.AlterPartitionModel
 import org.apache.spark.util.PartitionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.partition.spliter.RowResultProcessor
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.AlterPartitionResult
-import org.apache.carbondata.spark.util.Util
+import org.apache.carbondata.spark.util.CommonUtil
 
 class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     result: AlterPartitionResult[K, V],
@@ -65,33 +63,7 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
             carbonLoadModel.setTaskNo(String.valueOf(partitionId))
             carbonLoadModel.setSegmentId(segmentId)
             carbonLoadModel.setPartitionId("0")
-            val tempLocationKey = CarbonDataProcessorUtil
-              .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
-                  carbonLoadModel.getTableName,
-                  segmentId,
-                  carbonLoadModel.getTaskNo,
-                  false,
-                  true)
-            // this property is used to determine whether temp location for carbon is inside
-            // container temp dir or is yarn application directory.
-            val carbonUseLocalDir = CarbonProperties.getInstance()
-              .getProperty("carbon.use.local.dir", "false")
-
-            if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-
-                val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-                if (null != storeLocations && storeLocations.nonEmpty) {
-                    storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-                }
-                if (storeLocation == null) {
-                    storeLocation = System.getProperty("java.io.tmpdir")
-                }
-            } else {
-                storeLocation = System.getProperty("java.io.tmpdir")
-            }
-            storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index
-            CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
-            LOGGER.info(s"Temp storeLocation taken is $storeLocation")
+            CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true)
 
             val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
                 factTableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3b228fb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index f37b0c5..0859f2e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
@@ -55,7 +54,7 @@ import org.apache.carbondata.processing.merger._
 import org.apache.carbondata.processing.splits.TableSplit
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
+import org.apache.carbondata.spark.util.{CommonUtil, SparkDataTypeConverterImpl, Util}
 
 class CarbonMergerRDD[K, V](
     sc: SparkContext,
@@ -93,24 +92,7 @@ class CarbonMergerRDD[K, V](
       } else {
         null
       }
-      // this property is used to determine whether temp location for carbon is inside
-      // container temp dir or is yarn application directory.
-      val carbonUseLocalDir = CarbonProperties.getInstance()
-        .getProperty("carbon.use.local.dir", "false")
 
-      if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-
-        val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-        if (null != storeLocations && storeLocations.nonEmpty) {
-          storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-        }
-        if (storeLocation == null) {
-          storeLocation = System.getProperty("java.io.tmpdir")
-        }
-      } else {
-        storeLocation = System.getProperty("java.io.tmpdir")
-      }
-      storeLocation = storeLocation + '/' + "carbon" + System.nanoTime() + '_' + theSplit.index
       var mergeStatus = false
       var mergeNumber = ""
       var exec: CarbonCompactionExecutor = null
@@ -156,15 +138,8 @@ class CarbonMergerRDD[K, V](
           )
         }
         carbonLoadModel.setSegmentId(mergeNumber)
-        val tempLocationKey = CarbonDataProcessorUtil
-          .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
-            carbonLoadModel.getTableName,
-            carbonLoadModel.getSegmentId,
-            carbonLoadModel.getTaskNo,
-            true,
-            false)
-        CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
-        LOGGER.info(s"Temp storeLocation taken is $storeLocation")
+        CommonUtil.setTempStoreLocation(theSplit.index, carbonLoadModel, true, false)
+
         // get destination segment properties as sent from driver which is of last segment.
         val segmentProperties = new SegmentProperties(
           carbonMergerMapping.maxSegmentColumnSchemaList.asJava,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3b228fb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index b44a0fb..64e4bb1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -24,11 +24,12 @@ import java.util.regex.{Matcher, Pattern}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Map
+import scala.util.Random
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkEnv}
 import org.apache.spark.sql.{Row, RowFactory}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
@@ -53,9 +54,10 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil}
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
+
 object CommonUtil {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -890,4 +892,42 @@ object CommonUtil {
     (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
   }
 
+
+  def setTempStoreLocation(
+      index: Int,
+      carbonLoadModel: CarbonLoadModel,
+      isCompactionFlow: Boolean,
+      isAltPartitionFlow: Boolean) : Unit = {
+    var storeLocation: String = null
+
+    // this property is used to determine whether temp location for carbon is inside
+    // container temp dir or is yarn application directory.
+    val carbonUseLocalDir = CarbonProperties.getInstance()
+      .getProperty("carbon.use.local.dir", "false")
+
+    if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+
+      val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+      if (null != storeLocations && storeLocations.nonEmpty) {
+        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+      }
+      if (storeLocation == null) {
+        storeLocation = System.getProperty("java.io.tmpdir")
+      }
+    } else {
+      storeLocation = System.getProperty("java.io.tmpdir")
+    }
+    storeLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR + "carbon" +
+      System.nanoTime() + CarbonCommonConstants.UNDERSCORE + index
+
+    val tempLocationKey = CarbonDataProcessorUtil
+      .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
+        carbonLoadModel.getTableName,
+        carbonLoadModel.getSegmentId,
+        carbonLoadModel.getTaskNo,
+        isCompactionFlow,
+        isAltPartitionFlow)
+    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3b228fb/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index a96ab32..41dfa50 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -46,6 +46,8 @@ import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, C
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
 import org.apache.carbondata.spark.rdd.CarbonRDD
+import org.apache.carbondata.spark.util.CommonUtil
+
 
 /**
  * partition of the handoff segment
@@ -111,6 +113,8 @@ class StreamHandoffRDD[K, V](
     CarbonMetadata.getInstance().addCarbonTable(carbonTable)
     // the input iterator is using raw row
     val iteratorList = prepareInputIterator(split, carbonTable)
+
+    CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, true, false)
     // use CompactionResultSortProcessor to sort data dan write to columnar files
     val processor = prepareHandoffProcessor(carbonTable)
     val status = processor.execute(iteratorList)