You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/10/24 02:59:02 UTC

carbondata git commit: [CARBONDATA-3008] Optimize default value for multiple temp dir

Repository: carbondata
Updated Branches:
  refs/heads/master 8af737204 -> b21a6d49f


[CARBONDATA-3008] Optimize default value for multiple temp dir

The feature of supporting multiple temp dirs for data loading is
introduced about 1.5 year ago. This feature is to solve the single disk
hot spot problem. After one year's verification in real production
environment, the feature turns out to be effective and correct. So in
this commit, we change the default behavior of this feature -- change it
from disable to enable by default.

Moreover, we remove the parameter 'carbon.use.multiple.temp.dir' and
only keep the parameter 'carbon.use.local.dir' and enable it by default.
If the cluster is not configured with yarn-local-dirs, the java temp dir
will be used.

This closes #2824


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b21a6d49
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b21a6d49
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b21a6d49

Branch: refs/heads/master
Commit: b21a6d49f8a7d99a6bbe804949d22cc6b3320de4
Parents: 8af7372
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Thu Oct 18 17:50:57 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Oct 24 10:58:33 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  9 ++-
 .../carbondata/core/util/CarbonProperties.java  | 17 -----
 docs/configuration-parameters.md                |  3 +-
 docs/performance-tuning.md                      |  1 -
 docs/usecases.md                                |  2 -
 .../TestLoadDataWithYarnLocalDirs.scala         |  9 ++-
 .../load/DataLoadProcessorStepOnSpark.scala     | 30 +--------
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  | 41 ++----------
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 30 +--------
 .../carbondata/spark/util/CommonUtil.scala      | 66 +++++++++++---------
 .../datasources/SparkCarbonTableFormat.scala    | 29 +--------
 11 files changed, 60 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1b1046a..b5e1e5d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1371,16 +1371,15 @@ public final class CarbonCommonConstants {
   public static final String CARBON_SECURE_DICTIONARY_SERVER_DEFAULT = "true";
 
   /**
-   * whether to use multi directories when loading data,
-   * the main purpose is to avoid single-disk-hot-spot
+   * for loading, whether to use yarn's local dir the main purpose is to avoid single disk hot spot
    */
   @CarbonProperty
-  public static final String CARBON_USE_MULTI_TEMP_DIR = "carbon.use.multiple.temp.dir";
+  public static final String CARBON_LOADING_USE_YARN_LOCAL_DIR = "carbon.use.local.dir";
 
   /**
-   * default value for multi temp dir
+   * default value for whether to enable carbon use yarn local dir
    */
-  public static final String CARBON_USE_MULTI_TEMP_DIR_DEFAULT = "false";
+  public static final String CARBON_LOADING_USE_YARN_LOCAL_DIR_DEFAULT = "true";
 
   /**
    * name of compressor to compress sort temp files

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index a32ad52..e6440b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1135,23 +1135,6 @@ public final class CarbonProperties {
   }
 
   /**
-   * Returns whether to use multi temp dirs
-   * @return boolean
-   */
-  public boolean isUseMultiTempDir() {
-    String usingMultiDirStr = getProperty(CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR,
-        CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT);
-    boolean validateBoolean = CarbonUtil.validateBoolean(usingMultiDirStr);
-    if (!validateBoolean) {
-      LOGGER.warn("The carbon.use.multiple.temp.dir configuration value is invalid."
-          + "Configured value: \"" + usingMultiDirStr + "\"."
-          + "Data Load will not use multiple temp directories.");
-      usingMultiDirStr = CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT;
-    }
-    return usingMultiDirStr.equalsIgnoreCase("true");
-  }
-
-  /**
    * Return valid storage level
    * @return String
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index ac204b1..7a6dcab 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -83,8 +83,9 @@ This section provides the details of all the configurations required for the Car
 | carbon.enable.calculate.size | true | **For Load Operation**: Enabling this property will let carbondata calculate the size of the carbon data file (.carbondata) and the carbon index file (.carbonindex) for each load and update the table status file. **For Describe Formatted**: Enabling this property will let carbondata calculate the total size of the carbon data files and the carbon index files for the each table and display it in describe formatted command. **NOTE:** This is useful to determine the overall size of the carbondata table and also get an idea of how the table is growing in order to take up other backup strategy decisions. |
 | carbon.cutOffTimestamp | (none) | CarbonData has capability to generate the Dictionary values for the timestamp columns from the data itself without the need to store the computed dictionary values. This configuration sets the start date for calculating the timestamp. Java counts the number of milliseconds from start of "1970-01-01 00:00:00". This property is used to customize the start of position. For example "2000-01-01 00:00:00". **NOTE:** The date must be in the form ***carbon.timestamp.format***. CarbonData supports storing data for upto 68 years.For example, if the cut-off time is 1970-01-01 05:30:00, then data upto 2038-01-01 05:30:00 will be supported by CarbonData. |
 | carbon.timegranularity | SECOND | The configuration is used to specify the data granularity level such as DAY, HOUR, MINUTE, or SECOND. This helps to store more than 68 years of data into CarbonData. |
-| carbon.use.local.dir | false | CarbonData,during data loading, writes files to local temp directories before copying the files to HDFS. This configuration is used to specify whether CarbonData can write locally to tmp directory of the container or to the YARN application directory. |
+| carbon.use.local.dir | true | CarbonData,during data loading, writes files to local temp directories before copying the files to HDFS. This configuration is used to specify whether CarbonData can write locally to tmp directory of the container or to the YARN application directory. |
 | carbon.use.multiple.temp.dir | false | When multiple disks are present in the system, YARN is generally configured with multiple disks to be used as temp directories for managing the containers. This configuration specifies whether to use multiple YARN local directories during data loading for disk IO load balancing.Enable ***carbon.use.local.dir*** for this configuration to take effect. **NOTE:** Data Loading is an IO intensive operation whose performance can be limited by the disk IO threshold, particularly during multi table concurrent data load.Configuring this parameter, balances the disk IO across multiple disks there by improving the over all load performance. |
+| carbon.sort.temp.compressor | (none) | CarbonData writes every ***carbon.sort.size*** number of records to intermediate temp files during data loading to ensure memory footprint is within limits. These temporary files can be compressed and written in order to save the storage space. This configuration specifies the name of compressor to be used to compress the intermediate sort temp files during sort procedure in data loading. The valid values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD' and empty. By default, empty means that Carbondata will not compress the sort temp files. **NOTE:** Compressor will be useful if you encounter disk bottleneck.Since the data needs to be compressed and decompressed,it involves additional CPU cycles,but is compensated by the high IO throughput due to less data to be written or read from the disks. |
 | carbon.load.skewedDataOptimization.enabled | false | During data loading,CarbonData would divide the number of blocks equally so as to ensure all executors process same number of blocks. This mechanism satisfies most of the scenarios and ensures maximum parallel processing for optimal data loading performance.In some business scenarios, there might be scenarios where the size of blocks vary significantly and hence some executors would have to do more work if they get blocks containing more data. This configuration enables size based block allocation strategy for data loading. When loading, carbondata will use file size based block allocation strategy for task distribution. It will make sure that all the executors process the same size of data.**NOTE:** This configuration is useful if the size of your input data files varies widely, say 1MB to 1GB.For this configuration to work effectively,knowing the data pattern and size is important and necessary. |
 | carbon.load.min.size.enabled | false | During Data Loading, CarbonData would divide the number of files among the available executors to parallelize the loading operation. When the input data files are very small, this action causes to generate many small carbondata files. This configuration determines whether to enable node minumun input data size allocation strategy for data loading.It will make sure that the node load the minimum amount of data there by reducing number of carbondata files.**NOTE:** This configuration is useful if the size of the input data files are very small, like 1MB to 256MB. Refer to the load option ***load_min_size_inmb*** to configure the minimum size to be considered for splitting files among executors. |
 | enable.data.loading.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional data loading statistics information to more accurately locate the issues being debugged. **NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time.It is advised to configure the log files size, retention of log files parameters in log4j properties appropriately. Also extensive logging is an increased IO operation and hence over all data loading performance might get reduced. Therefore it is recommended to enable this configuration only for the duration of debugging. |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/docs/performance-tuning.md
----------------------------------------------------------------------
diff --git a/docs/performance-tuning.md b/docs/performance-tuning.md
index 2b005af..64f80c4 100644
--- a/docs/performance-tuning.md
+++ b/docs/performance-tuning.md
@@ -170,7 +170,6 @@
 | spark.executor.instances/spark.executor.cores/spark.executor.memory | spark/conf/spark-defaults.conf | Querying | The number of executors, CPU cores, and memory used for CarbonData query. | In the bank scenario, we provide the 4 CPUs cores and 15 GB for each executor which can get good performance. This 2 value does not mean more the better. It needs to be configured properly in case of limited resources. For example, In the bank scenario, it has enough CPU 32 cores each node but less memory 64 GB each node. So we cannot give more CPU but less memory. For example, when 4 cores and 12GB for each executor. It sometimes happens GC during the query which impact the query performance very much from the 3 second to more than 15 seconds. In this scenario need to increase the memory or decrease the CPU cores. |
 | carbon.detail.batch.size | spark/carbonlib/carbon.properties | Querying | The buffer size to store records, returned from the block scan. | In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12000. |
 | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | Whether use YARN local directories for multi-table load disk load balance | If this is set it to true CarbonData will use YARN local directories for multi-table load disk load balance, that will improve the data load performance. |
-| carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data loading | Whether to use multiple YARN local directories during table data loading for disk load balance | After enabling 'carbon.use.local.dir', if this is set to true, CarbonData will use all YARN local directories during data load for disk load balance, that will improve the data load performance. Please enable this property when you encounter disk hotspot problem during data loading. |
 | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data loading | Specify the name of compressor to compress the intermediate sort temporary files during sort procedure in data loading. | The optional values are 'SNAPPY','GZIP','BZIP2','LZ4','ZSTD', and empty. By default, empty means that Carbondata will not compress the sort temp files. This parameter will be useful if you encounter disk bottleneck. |
 | carbon.load.skewedDataOptimization.enabled | spark/carbonlib/carbon.properties | Data loading | Whether to enable size based block allocation strategy for data loading. | When loading, carbondata will use file size based block allocation strategy for task distribution. It will make sure that all the executors process the same size of data -- It's useful if the size of your input data files varies widely, say 1MB to 1GB. |
 | carbon.load.min.size.enabled | spark/carbonlib/carbon.properties | Data loading | Whether to enable node minumun input data size allocation strategy for data loading.| When loading, carbondata will use node minumun input data size allocation strategy for task distribution. It will make sure the nodes load the minimum amount of data -- It's useful if the size of your input data files very small, say 1MB to 256MB,Avoid generating a large number of small files. |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/docs/usecases.md
----------------------------------------------------------------------
diff --git a/docs/usecases.md b/docs/usecases.md
index e8b98b5..c029bb3 100644
--- a/docs/usecases.md
+++ b/docs/usecases.md
@@ -72,7 +72,6 @@ Apart from these, the following CarbonData configuration was suggested to be con
 | Data Loading | table_blocksize                         | 256  | To efficiently schedule multiple tasks during query |
 | Data Loading | carbon.sort.intermediate.files.limit    | 100    | Increased to 100 as number of cores are more.Can perform merging in backgorund.If less number of files to merge, sort threads would be idle |
 | Data Loading | carbon.use.local.dir                    | TRUE   | yarn application directory will be usually on a single disk.YARN would be configured with multiple disks to be used as temp or to assign randomly to applications. Using the yarn temp directory will allow carbon to use multiple disks and improve IO performance |
-| Data Loading | carbon.use.multiple.temp.dir            | TRUE   | multiple disks to write sort files will lead to better IO and reduce the IO bottleneck |
 | Compaction | carbon.compaction.level.threshold       | 6,6    | Since frequent small loads, compacting more segments will give better query results |
 | Compaction | carbon.enable.auto.load.merge           | true   | Since data loading is small,auto compacting keeps the number of segments less and also compaction can complete in  time |
 | Compaction | carbon.number.of.cores.while.compacting | 4      | Higher number of cores can improve the compaction speed |
@@ -127,7 +126,6 @@ Use all columns are no-dictionary as the cardinality is high.
 | Data Loading | table_blocksize                         | 512                     | To efficiently schedule multiple tasks during query. This size depends on data scenario.If data is such that the filters would select less number of blocklets to scan, keeping higher number works well.If the number blocklets to scan is more, better to reduce the size as more tasks can be scheduled in parallel. |
 | Data Loading | carbon.sort.intermediate.files.limit    | 100                     | Increased to 100 as number of cores are more.Can perform merging in backgorund.If less number of files to merge, sort threads would be idle |
 | Data Loading | carbon.use.local.dir                    | TRUE                    | yarn application directory will be usually on a single disk.YARN would be configured with multiple disks to be used as temp or to assign randomly to applications. Using the yarn temp directory will allow carbon to use multiple disks and improve IO performance |
-| Data Loading | carbon.use.multiple.temp.dir            | TRUE                    | multiple disks to write sort files will lead to better IO and reduce the IO bottleneck |
 | Data Loading | sort.inmemory.size.in.mb                | 92160 | Memory allocated to do inmemory sorting. When more memory is available in the node, configuring this will retain more sort blocks in memory so that the merge sort is faster due to no/very less IO |
 | Compaction | carbon.major.compaction.size            | 921600                  | Sum of several loads to combine into single segment |
 | Compaction | carbon.number.of.cores.while.compacting | 12                      | Higher number of cores can improve the compaction speed.Data size is huge.Compaction need to use more threads to speed up the process |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
index ff415ae..ef1bcbb 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithYarnLocalDirs.scala
@@ -65,15 +65,14 @@ class TestLoadDataWithYarnLocalDirs extends QueryTest with BeforeAndAfterAll {
   }
 
   private def enableMultipleDir = {
-    CarbonProperties.getInstance().addProperty("carbon.use.local.dir", "true")
     CarbonProperties.getInstance().addProperty(
-      CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR, "true")
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR, "true")
   }
 
   private def disableMultipleDir = {
-    CarbonProperties.getInstance().addProperty("carbon.use.local.dir", "false")
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR,
-      CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT)
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR,
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR_DEFAULT)
   }
 
   test("test carbon table data loading for multiple temp dir") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index f5c65b3..0a68fb0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.spark.load
 
-import scala.util.Random
-
 import com.univocity.parsers.common.TextParsingException
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
@@ -30,8 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
 import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
@@ -43,7 +40,7 @@ import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
 import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
 import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
+import org.apache.carbondata.spark.util.CommonUtil
 
 object DataLoadProcessorStepOnSpark {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -238,7 +235,7 @@ object DataLoadProcessorStepOnSpark {
     var dataWriter: DataWriterProcessorStepImpl = null
     try {
       model = modelBroadcast.value.getCopyWithTaskNo(index.toString)
-      val storeLocation = Array(getTempStoreLocation(index))
+      val storeLocation = CommonUtil.getTempStoreLocations(index.toString)
       val conf = DataLoadProcessBuilder.createConfiguration(model, storeLocation)
 
       tableName = model.getTableName
@@ -291,27 +288,6 @@ object DataLoadProcessorStepOnSpark {
     }
   }
 
-  private def getTempStoreLocation(index: Int): String = {
-    var storeLocation = ""
-    // this property is used to determine whether temp location for carbon is inside
-    // container temp dir or is yarn application directory.
-    val carbonUseLocalDir = CarbonProperties.getInstance()
-      .getProperty("carbon.use.local.dir", "false")
-    if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-      val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-      if (null != storeLocations && storeLocations.nonEmpty) {
-        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-      }
-      if (storeLocation == null) {
-        storeLocation = System.getProperty("java.io.tmpdir")
-      }
-    } else {
-      storeLocation = System.getProperty("java.io.tmpdir")
-    }
-    storeLocation = storeLocation + '/' + System.nanoTime() + '_' + index
-    storeLocation
-  }
-
   private def wrapException(e: Throwable, model: CarbonLoadModel): Unit = {
     e match {
       case e: CarbonDataLoadingException => throw e

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 86a5043..a03447d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -18,16 +18,14 @@
 package org.apache.carbondata.spark.rdd
 
 import scala.collection.JavaConverters._
-import scala.util.Random
 
-import org.apache.spark.{Partition, SparkEnv, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.AlterPartitionModel
 import org.apache.spark.util.PartitionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.partition.spliter.RowResultProcessor
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
@@ -42,7 +40,6 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     prev: RDD[Array[AnyRef]])
   extends CarbonRDD[(K, V)](alterPartitionModel.sqlContext.sparkSession, prev) {
 
-  var storeLocation: String = null
   val carbonLoadModel = alterPartitionModel.carbonLoadModel
   val segmentId = alterPartitionModel.segmentId
   val oldPartitionIds = alterPartitionModel.oldPartitionIds
@@ -62,44 +59,18 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
     val iter = new Iterator[(K, V)] {
-      val partitionId = partitionInfo.getPartitionId(split.index)
+      val partitionId: Int = partitionInfo.getPartitionId(split.index)
       carbonLoadModel.setTaskNo(String.valueOf(partitionId))
       carbonLoadModel.setSegmentId(segmentId)
       CarbonMetadata.getInstance().addCarbonTable(
         carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)
-      CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true)
-      val tempLocationKey = CarbonDataProcessorUtil
-        .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
-          carbonLoadModel.getTableName,
-          segmentId,
-          carbonLoadModel.getTaskNo,
-          false,
-          true)
-      // this property is used to determine whether temp location for carbon is inside
-      // container temp dir or is yarn application directory.
-      val carbonUseLocalDir = CarbonProperties.getInstance()
-        .getProperty("carbon.use.local.dir", "false")
 
-      if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-
-        val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-        if (null != storeLocations && storeLocations.nonEmpty) {
-          storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-        }
-        if (storeLocation == null) {
-          storeLocation = System.getProperty("java.io.tmpdir")
-        }
-      } else {
-        storeLocation = System.getProperty("java.io.tmpdir")
-      }
-      storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index
-      CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
-      LOGGER.info(s"Temp storeLocation taken is $storeLocation")
-
-      val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+      CommonUtil.setTempStoreLocation(split.index, carbonLoadModel,
+        isCompactionFlow = false, isAltPartitionFlow = true)
+      val tempStoreLoc: Array[String] = CarbonDataProcessorUtil.getLocalDataFolderLocation(
         databaseName, factTableName, carbonLoadModel.getTaskNo, segmentId, false, true)
 
-      val loadStatus = if (rows.isEmpty) {
+      val loadStatus: Boolean = if (rows.isEmpty) {
         LOGGER.info("After repartition this split, NO target rows to write back.")
         true
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index fe09034..041dc1c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -17,13 +17,11 @@
 
 package org.apache.carbondata.spark.rdd
 
-import java.io._
 import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
 import java.util.{Date, UUID}
 
 import scala.collection.mutable
-import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
@@ -89,35 +87,9 @@ class SparkPartitionLoader(model: CarbonLoadModel,
     CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true")
     CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false")
 
-    // this property is used to determine whether temp location for carbon is inside
-    // container temp dir or is yarn application directory.
-    val isCarbonUseLocalDir = CarbonProperties.getInstance()
-      .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
-
-    val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
-
-    if (isCarbonUseLocalDir) {
-      val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-
-      if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) {
-        // use single dir
-        storeLocation = storeLocation :+
-            (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix)
-        if (storeLocation == null || storeLocation.isEmpty) {
-          storeLocation = storeLocation :+
-              (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-        }
-      } else {
-        // use all the yarn dirs
-        storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
-      }
-    } else {
-      storeLocation = storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-    }
+    storeLocation = CommonUtil.getTempStoreLocations(splitIndex.toString)
     LOGGER.info("Temp location for loading data: " + storeLocation.mkString(","))
   }
-
-  private def tmpLocationSuffix = File.separator + "carbon" + System.nanoTime() + "_" + splitIndex
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 82a2f9d..7071295 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.spark.util
 
 
+import java.io.File
 import java.text.SimpleDateFormat
 import java.util
 import java.util.regex.{Matcher, Pattern}
@@ -705,38 +706,47 @@ object CommonUtil {
       carbonLoadModel: CarbonLoadModel,
       isCompactionFlow: Boolean,
       isAltPartitionFlow: Boolean) : Unit = {
-    var storeLocation: String = null
-
-    // this property is used to determine whether temp location for carbon is inside
-    // container temp dir or is yarn application directory.
-    val carbonUseLocalDir = CarbonProperties.getInstance()
-      .getProperty("carbon.use.local.dir", "false")
-
-    if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+    val storeLocation = getTempStoreLocations(index.toString).mkString(File.pathSeparator)
+
+    val tempLocationKey = CarbonDataProcessorUtil.getTempStoreLocationKey(
+      carbonLoadModel.getDatabaseName,
+      carbonLoadModel.getTableName,
+      carbonLoadModel.getSegmentId,
+      carbonLoadModel.getTaskNo,
+      isCompactionFlow,
+      isAltPartitionFlow)
+    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+  }
 
-      val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-      if (null != storeLocations && storeLocations.nonEmpty) {
-        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-      }
-      if (storeLocation == null) {
-        storeLocation = System.getProperty("java.io.tmpdir")
+  /**
+   * get the temp locations for each process thread
+   *
+   * @param index the id for each process thread
+   * @return an array of temp locations
+   */
+  def getTempStoreLocations(index: String) : Array[String] = {
+    var storeLocation: Array[String] = Array[String]()
+    val isCarbonUseYarnLocalDir = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR,
+      CarbonCommonConstants.CARBON_LOADING_USE_YARN_LOCAL_DIR_DEFAULT).equalsIgnoreCase("true")
+    val tmpLocationSuffix =
+      s"${File.separator}carbon${System.nanoTime()}${CarbonCommonConstants.UNDERSCORE}$index"
+    if (isCarbonUseYarnLocalDir) {
+      val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+
+      if (null != yarnStoreLocations && yarnStoreLocations.nonEmpty) {
+        storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
+      } else {
+        LOGGER.warn("It seems that the we didn't configure local dirs for yarn," +
+                    " so we are unable to use them for data loading." +
+                    " Here we will fall back using the java tmp dir.")
+        storeLocation = storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
       }
     } else {
-      storeLocation = System.getProperty("java.io.tmpdir")
-    }
-    storeLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR + "carbon" +
-      System.nanoTime() + CarbonCommonConstants.UNDERSCORE + index
-
-    val tempLocationKey = CarbonDataProcessorUtil
-      .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
-        carbonLoadModel.getTableName,
-        carbonLoadModel.getSegmentId,
-        carbonLoadModel.getTaskNo,
-        isCompactionFlow,
-        isAltPartitionFlow)
-    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+      storeLocation = storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
+    }
+    storeLocation
   }
-
   /**
    * This method will validate the cache level
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b21a6d49/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index b605a1d..6bbdcec 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -53,7 +53,7 @@ import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWrit
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.CarbonBadRecordUtil
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
 class SparkCarbonTableFormat
   extends FileFormat
@@ -172,33 +172,8 @@ with Serializable {
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
         val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
-        val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
-        var storeLocation: Array[String] = Array[String]()
-        val isCarbonUseLocalDir = CarbonProperties.getInstance()
-          .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
-
-
         val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
-        val tmpLocationSuffix =
-          File.separator + "carbon" + System.nanoTime() + File.separator + taskNumber
-        if (isCarbonUseLocalDir) {
-          val yarnStoreLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-          if (!isCarbonUseMultiDir && null != yarnStoreLocations && yarnStoreLocations.nonEmpty) {
-            // use single dir
-            storeLocation = storeLocation :+
-              (yarnStoreLocations(Random.nextInt(yarnStoreLocations.length)) + tmpLocationSuffix)
-            if (storeLocation == null || storeLocation.isEmpty) {
-              storeLocation = storeLocation :+
-                (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-            }
-          } else {
-            // use all the yarn dirs
-            storeLocation = yarnStoreLocations.map(_ + tmpLocationSuffix)
-          }
-        } else {
-          storeLocation =
-            storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
-        }
+        val storeLocation = CommonUtil.getTempStoreLocations(taskNumber)
         CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
         new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model)
       }