You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/03 19:43:30 UTC
[23/50] [abbrv] carbondata git commit: [CARBONDATA-2062] Configure
the temp directory to be used for streaming handoff
[CARBONDATA-2062] Configure the temp directory to be used for streaming handoff
This closes #1841
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d3b228fb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d3b228fb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d3b228fb
Branch: refs/heads/branch-1.3
Commit: d3b228fb8cde5bace2fc932124ee68b8b2e4ee8c
Parents: f9606e9
Author: Raghunandan S <ca...@gmail.com>
Authored: Mon Jan 22 11:47:28 2018 +0530
Committer: QiangCai <qi...@qq.com>
Committed: Fri Feb 2 14:52:05 2018 +0800
----------------------------------------------------------------------
.../spark/rdd/AlterTableLoadPartitionRDD.scala | 34 ++-------------
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 31 ++------------
.../carbondata/spark/util/CommonUtil.scala | 44 +++++++++++++++++++-
.../carbondata/streaming/StreamHandoffRDD.scala | 4 ++
4 files changed, 52 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3b228fb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 35a8ea7..76c99f2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -18,21 +18,19 @@
package org.apache.carbondata.spark.rdd
import scala.collection.JavaConverters._
-import scala.util.Random
-import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.AlterPartitionModel
import org.apache.spark.util.PartitionUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.partition.spliter.RowResultProcessor
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.AlterPartitionResult
-import org.apache.carbondata.spark.util.Util
+import org.apache.carbondata.spark.util.CommonUtil
class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
result: AlterPartitionResult[K, V],
@@ -65,33 +63,7 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
carbonLoadModel.setTaskNo(String.valueOf(partitionId))
carbonLoadModel.setSegmentId(segmentId)
carbonLoadModel.setPartitionId("0")
- val tempLocationKey = CarbonDataProcessorUtil
- .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName,
- segmentId,
- carbonLoadModel.getTaskNo,
- false,
- true)
- // this property is used to determine whether temp location for carbon is inside
- // container temp dir or is yarn application directory.
- val carbonUseLocalDir = CarbonProperties.getInstance()
- .getProperty("carbon.use.local.dir", "false")
-
- if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-
- val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
- if (null != storeLocations && storeLocations.nonEmpty) {
- storeLocation = storeLocations(Random.nextInt(storeLocations.length))
- }
- if (storeLocation == null) {
- storeLocation = System.getProperty("java.io.tmpdir")
- }
- } else {
- storeLocation = System.getProperty("java.io.tmpdir")
- }
- storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index
- CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
- LOGGER.info(s"Temp storeLocation taken is $storeLocation")
+ CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true)
val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
factTableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3b228fb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index f37b0c5..0859f2e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.util.Random
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
@@ -55,7 +54,7 @@ import org.apache.carbondata.processing.merger._
import org.apache.carbondata.processing.splits.TableSplit
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
+import org.apache.carbondata.spark.util.{CommonUtil, SparkDataTypeConverterImpl, Util}
class CarbonMergerRDD[K, V](
sc: SparkContext,
@@ -93,24 +92,7 @@ class CarbonMergerRDD[K, V](
} else {
null
}
- // this property is used to determine whether temp location for carbon is inside
- // container temp dir or is yarn application directory.
- val carbonUseLocalDir = CarbonProperties.getInstance()
- .getProperty("carbon.use.local.dir", "false")
- if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-
- val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
- if (null != storeLocations && storeLocations.nonEmpty) {
- storeLocation = storeLocations(Random.nextInt(storeLocations.length))
- }
- if (storeLocation == null) {
- storeLocation = System.getProperty("java.io.tmpdir")
- }
- } else {
- storeLocation = System.getProperty("java.io.tmpdir")
- }
- storeLocation = storeLocation + '/' + "carbon" + System.nanoTime() + '_' + theSplit.index
var mergeStatus = false
var mergeNumber = ""
var exec: CarbonCompactionExecutor = null
@@ -156,15 +138,8 @@ class CarbonMergerRDD[K, V](
)
}
carbonLoadModel.setSegmentId(mergeNumber)
- val tempLocationKey = CarbonDataProcessorUtil
- .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName,
- carbonLoadModel.getSegmentId,
- carbonLoadModel.getTaskNo,
- true,
- false)
- CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
- LOGGER.info(s"Temp storeLocation taken is $storeLocation")
+ CommonUtil.setTempStoreLocation(theSplit.index, carbonLoadModel, true, false)
+
// get destination segment properties as sent from driver which is of last segment.
val segmentProperties = new SegmentProperties(
carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3b228fb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index b44a0fb..64e4bb1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -24,11 +24,12 @@ import java.util.regex.{Matcher, Pattern}
import scala.collection.JavaConverters._
import scala.collection.mutable.Map
+import scala.util.Random
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.sql.{Row, RowFactory}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
@@ -53,9 +54,10 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil}
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
object CommonUtil {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -890,4 +892,42 @@ object CommonUtil {
(Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
}
+
+ def setTempStoreLocation(
+ index: Int,
+ carbonLoadModel: CarbonLoadModel,
+ isCompactionFlow: Boolean,
+ isAltPartitionFlow: Boolean) : Unit = {
+ var storeLocation: String = null
+
+ // this property is used to determine whether temp location for carbon is inside
+ // container temp dir or is yarn application directory.
+ val carbonUseLocalDir = CarbonProperties.getInstance()
+ .getProperty("carbon.use.local.dir", "false")
+
+ if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+
+ val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+ if (null != storeLocations && storeLocations.nonEmpty) {
+ storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+ }
+ if (storeLocation == null) {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
+ } else {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
+ storeLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR + "carbon" +
+ System.nanoTime() + CarbonCommonConstants.UNDERSCORE + index
+
+ val tempLocationKey = CarbonDataProcessorUtil
+ .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName,
+ carbonLoadModel.getSegmentId,
+ carbonLoadModel.getTaskNo,
+ isCompactionFlow,
+ isAltPartitionFlow)
+ CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3b228fb/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index a96ab32..41dfa50 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -46,6 +46,8 @@ import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, C
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
import org.apache.carbondata.spark.rdd.CarbonRDD
+import org.apache.carbondata.spark.util.CommonUtil
+
/**
* partition of the handoff segment
@@ -111,6 +113,8 @@ class StreamHandoffRDD[K, V](
CarbonMetadata.getInstance().addCarbonTable(carbonTable)
// the input iterator is using raw row
val iteratorList = prepareInputIterator(split, carbonTable)
+
+ CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, true, false)
// use CompactionResultSortProcessor to sort data dan write to columnar files
val processor = prepareHandoffProcessor(carbonTable)
val status = processor.execute(iteratorList)