You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:34 UTC

[36/47] incubator-carbondata git commit: [CARBONDATA-118][Bug] Temp location clean up in compaction (#883)

[CARBONDATA-118][Bug] Temp location clean up in compaction (#883)

Passed compaction temporary location to compaction and data load merge flow. Handled clean up of that temp folder.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5341c7dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5341c7dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5341c7dc

Branch: refs/heads/master
Commit: 5341c7dce8ce1a260d1ac48af67d58cd12331ba6
Parents: 61b6074
Author: ravikiran23 <ra...@gmail.com>
Authored: Fri Jul 29 06:05:39 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Fri Jul 29 06:05:39 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  5 +++
 .../carbondata/spark/load/CarbonLoaderUtil.java | 26 +++++++-----
 .../spark/rdd/CarbonDataLoadRDD.scala           |  4 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 42 ++++++++++++++++----
 .../processing/mdkeygen/MDKeyGenStep.java       |  2 +-
 .../sortandgroupby/sortdata/SortDataRows.java   |  2 +-
 .../csvbased/CarbonCSVBasedSeqGenStep.java      |  2 +-
 .../FileStoreSurrogateKeyGenForCSV.java         |  3 +-
 .../util/CarbonDataProcessorUtil.java           |  7 +++-
 9 files changed, 69 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
index 5852241..86d87eb 100644
--- a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
@@ -849,6 +849,11 @@ public final class CarbonCommonConstants {
    */
   public static String LEVEL2_COMPACTION_INDEX = ".2";
 
+  /**
+   * Indicates compaction
+   */
+  public static String COMPACTION_KEY_WORD = "COMPACTION";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
index 3ecdfe4..bce99a7 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
@@ -428,7 +428,7 @@ public final class CarbonLoaderUtil {
    * @param segmentName
    */
   public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
-      String segmentName) {
+      String segmentName, boolean isCompactionFlow) {
     String databaseName = loadModel.getDatabaseName();
     String tableName = loadModel.getTableName();
     CarbonTableIdentifier carbonTableIdentifier =
@@ -436,26 +436,32 @@ public final class CarbonLoaderUtil {
     String segmentId = segmentName.substring(CarbonCommonConstants.LOAD_FOLDER.length());
     String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
         + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+    if (isCompactionFlow) {
+      tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + tempLocationKey;
+    }
     // form local store location
     String localStoreLocation = getStoreLocation(CarbonProperties.getInstance()
             .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL),
         carbonTableIdentifier, segmentId, loadModel.getPartitionId());
     try {
       CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation) });
+      LOGGER.info("Deleted the local store location" + localStoreLocation);
     } catch (CarbonUtilException e) {
       LOGGER.error(e, "Failed to delete local data load folder location");
     }
 
     // delete ktr file.
-    String graphPath = CarbonProperties.getInstance()
-        .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL)
-        + File.separator + "/etl" + File.separator + databaseName + File.separator + tableName
-        + File.separator + loadModel.getSegmentId() + File.separator + loadModel.getTaskNo()
-        + File.separator + tableName + ".ktr";
-    File path = new File(graphPath);
-    if (path.exists()) {
-      if (!path.delete()) {
-        LOGGER.error("failed to delete the ktr file in path " + path);
+    if (!isCompactionFlow) {
+      String graphPath = CarbonProperties.getInstance()
+          .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL)
+          + File.separator + "/etl" + File.separator + databaseName + File.separator + tableName
+          + File.separator + loadModel.getSegmentId() + File.separator + loadModel.getTaskNo()
+          + File.separator + tableName + ".ktr";
+      File path = new File(graphPath);
+      if (path.exists()) {
+        if (!path.delete()) {
+          LOGGER.error("failed to delete the ktr file in path " + path);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 87f7885..36a1c0f 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -208,7 +208,9 @@ class CarbonDataLoadRDD[K, V](
             // delete temp location data
             val newSlice = CarbonCommonConstants.LOAD_FOLDER + loadCount
             try {
-              CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, newSlice)
+              val isCompaction = false
+              CarbonLoaderUtil
+                .deleteLocalDataLoadFolderLocation(model, newSlice, isCompaction)
             } catch {
               case e: Exception =>
                 LOGGER.error(e)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
index 254d51b..7b15cbf 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -21,6 +21,7 @@ import java.util
 import java.util.{Collections, List}
 
 import scala.collection.JavaConverters._
+import scala.util.Random
 
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark._
@@ -59,7 +60,7 @@ class CarbonMergerRDD[K, V](
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
   sc.setLocalProperty("spark.job.interruptOnCancel", "true")
 
-  val storeLocation = carbonMergerMapping.storeLocation
+  var storeLocation: String = null
   val hdfsStoreLocation = carbonMergerMapping.hdfsStoreLocation
   val metadataFilePath = carbonMergerMapping.metadataFilePath
   val mergedLoadName = carbonMergerMapping.mergedLoadName
@@ -70,16 +71,28 @@ class CarbonMergerRDD[K, V](
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
 
+      carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+      val tempLocationKey: String = CarbonCommonConstants
+        .COMPACTION_KEY_WORD + '_' + carbonLoadModel
+        .getDatabaseName + '_' + carbonLoadModel
+        .getTableName + '_' + carbonLoadModel.getTaskNo
+
+      val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+      if (null != storeLocations && storeLocations.length > 0) {
+        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+      }
+      if (storeLocation == null) {
+        storeLocation = System.getProperty("java.io.tmpdir")
+      }
+      storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
+      CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+      LOGGER.info("Temp storeLocation taken is " + storeLocation)
       var mergeStatus = false
+      var mergeNumber = ""
       try {
         var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-        carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
         val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
 
-        val tempLocationKey: String = carbonLoadModel.getDatabaseName + '_' + carbonLoadModel
-          .getTableName + carbonLoadModel.getTaskNo
-        CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
-
         // sorting the table block info List.
         var tableBlockInfoList = carbonSparkPartition.tableBlockInfos
 
@@ -126,7 +139,7 @@ class CarbonMergerRDD[K, V](
             }
         }
 
-        val mergeNumber = mergedLoadName
+        mergeNumber = mergedLoadName
           .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
             CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
           )
@@ -135,7 +148,8 @@ class CarbonMergerRDD[K, V](
           factTableName,
           carbonLoadModel.getTaskNo,
           "0",
-          mergeNumber
+          mergeNumber,
+          true
         )
 
         carbonLoadModel.setSegmentId(mergeNumber)
@@ -157,6 +171,18 @@ class CarbonMergerRDD[K, V](
           LOGGER.error(e)
           throw e
       }
+      finally {
+        // delete temp location data
+        val newSlice = CarbonCommonConstants.LOAD_FOLDER + mergeNumber
+        try {
+          val isCompactionFlow = true
+          CarbonLoaderUtil
+            .deleteLocalDataLoadFolderLocation(carbonLoadModel, newSlice, isCompactionFlow)
+        } catch {
+          case e: Exception =>
+            LOGGER.error(e)
+        }
+      }
 
       var finished = false
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java b/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
index 8ddb429..6b997d5 100644
--- a/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
@@ -259,7 +259,7 @@ public class MDKeyGenStep extends BaseStep {
     this.tableName = meta.getTableName();
     storeLocation = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(meta.getSchemaName(), meta.getTableName(),
-            String.valueOf(meta.getTaskNo()), meta.getPartitionID(), meta.getSegmentId()+"");
+            String.valueOf(meta.getTaskNo()), meta.getPartitionID(), meta.getSegmentId()+"", false);
     isNoDictionaryDimension =
         RemoveDictionaryUtil.convertStringToBooleanArr(meta.getNoDictionaryDimsMapping());
     isUseInvertedIndex =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 1b3de04..bb8ab2a 100644
--- a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -525,7 +525,7 @@ public class SortDataRows {
   private void updateSortTempFileLocation() {
     String carbonDataDirectoryPath = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(schemaName, tableName, taskNo, partitionID,
-            segmentId);
+            segmentId, false);
     this.tempFileLocation =
         carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
     LOGGER.info("temp file location" + this.tempFileLocation);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index d5ecb37..c2093ad 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -707,7 +707,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
   private void updateStoreLocation() {
     loadFolderLoc = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(meta.getSchemaName(), meta.getTableName(), meta.getTaskNo(),
-            meta.getPartitionID(), meta.getSegmentId()+"");
+            meta.getPartitionID(), meta.getSegmentId()+"", false);
   }
 
   private String getBadLogStoreLocation(String storeLocation) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
index a920d63..b4822db 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
@@ -193,7 +193,8 @@ public class FileStoreSurrogateKeyGenForCSV extends CarbonCSVBasedDimSurrogateKe
   private String checkAndCreateLoadFolderNumber(String databaseName,
       String tableName) throws KettleException {
     String carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId+"");
+        .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId + "",
+            false);
     boolean isDirCreated = new File(carbonDataDirectoryPath).mkdirs();
     if (!isDirCreated) {
       throw new KettleException("Unable to create data load directory" + carbonDataDirectoryPath);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
index 8f2c9e7..84274ed 100644
--- a/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -262,9 +262,14 @@ public final class CarbonDataProcessorUtil {
    * @return
    */
   public static String getLocalDataFolderLocation(String databaseName, String tableName,
-      String taskId, String partitionId, String segmentId) {
+      String taskId, String partitionId, String segmentId, boolean isCompactionFlow) {
     String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
         + CarbonCommonConstants.UNDERSCORE + taskId;
+    if(isCompactionFlow){
+      tempLocationKey = CarbonCommonConstants
+          .COMPACTION_KEY_WORD + '_' + tempLocationKey;
+    }
+
     String baseStorePath = CarbonProperties.getInstance()
         .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
     CarbonTable carbonTable = CarbonMetadata.getInstance()