You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/06 10:00:26 UTC

[09/20] incubator-carbondata git commit: [CARBONDATA-134] changing store location to inside that of the container. (#901)

[CARBONDATA-134] changing store location to inside that of the container. (#901)

* changing store location to inside that of the container.
* deleting the empty folders of data load in temp location.
* Adding switch to configure to use LOCAL_DIRS or java temp dir, defaulting to java temp dir.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d70a6e55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d70a6e55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d70a6e55

Branch: refs/heads/master
Commit: d70a6e55514651afc3b780851c1cb918d1f7f1fd
Parents: b327375
Author: ravikiran23 <ra...@gmail.com>
Authored: Thu Aug 4 12:54:21 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Thu Aug 4 12:54:21 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java | 28 +++-----------------
 .../spark/rdd/CarbonDataLoadRDD.scala           | 21 +++++++++++----
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 21 +++++++++++----
 .../processing/csvreaderstep/CsvInput.java      | 24 +++++------------
 .../csvbased/CarbonCSVBasedSeqGenStep.java      | 23 +++++-----------
 5 files changed, 49 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
index bce99a7..e0e109d 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
@@ -425,46 +425,26 @@ public final class CarbonLoaderUtil {
    * This method will delete the local data load folder location after data load is complete
    *
    * @param loadModel
-   * @param segmentName
    */
   public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
-      String segmentName, boolean isCompactionFlow) {
+      boolean isCompactionFlow) {
     String databaseName = loadModel.getDatabaseName();
     String tableName = loadModel.getTableName();
-    CarbonTableIdentifier carbonTableIdentifier =
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
-    String segmentId = segmentName.substring(CarbonCommonConstants.LOAD_FOLDER.length());
     String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
         + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
     if (isCompactionFlow) {
       tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + tempLocationKey;
     }
     // form local store location
-    String localStoreLocation = getStoreLocation(CarbonProperties.getInstance()
-            .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL),
-        carbonTableIdentifier, segmentId, loadModel.getPartitionId());
+    String localStoreLocation = CarbonProperties.getInstance()
+        .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
     try {
-      CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation) });
+      CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation).getParentFile()});
       LOGGER.info("Deleted the local store location" + localStoreLocation);
     } catch (CarbonUtilException e) {
       LOGGER.error(e, "Failed to delete local data load folder location");
     }
 
-    // delete ktr file.
-    if (!isCompactionFlow) {
-      String graphPath = CarbonProperties.getInstance()
-          .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL)
-          + File.separator + "/etl" + File.separator + databaseName + File.separator + tableName
-          + File.separator + loadModel.getSegmentId() + File.separator + loadModel.getTaskNo()
-          + File.separator + tableName + ".ktr";
-      File path = new File(graphPath);
-      if (path.exists()) {
-        if (!path.delete()) {
-          LOGGER.error("failed to delete the ktr file in path " + path);
-        }
-      }
-    }
-
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 2a19da9..b8cab78 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -175,11 +175,22 @@ class CarbonDataLoadRDD[K, V](
         CarbonProperties.getInstance().addProperty("high.cardinality.value", "100000")
         CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false")
         CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000")
-        val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
-        if (null != storeLocations && storeLocations.length > 0) {
-          storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+
+        // this property is used to determine whether temp location for carbon is inside
+        // container temp dir or is yarn application directory.
+        val carbonUseLocalDir = CarbonProperties.getInstance()
+          .getProperty("carbon.use.local.dir", "false")
+
+        if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+          val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+          if (null != storeLocations && storeLocations.length > 0) {
+            storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+          }
+          if (storeLocation == null) {
+            storeLocation = System.getProperty("java.io.tmpdir")
+          }
         }
-        if (storeLocation == null) {
+        else {
           storeLocation = System.getProperty("java.io.tmpdir")
         }
         storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
@@ -211,7 +222,7 @@ class CarbonDataLoadRDD[K, V](
             try {
               val isCompaction = false
               CarbonLoaderUtil
-                .deleteLocalDataLoadFolderLocation(model, newSlice, isCompaction)
+                .deleteLocalDataLoadFolderLocation(model, isCompaction)
             } catch {
               case e: Exception =>
                 LOGGER.error(e)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
index 7b15cbf..a3dd1c6 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -77,11 +77,22 @@ class CarbonMergerRDD[K, V](
         .getDatabaseName + '_' + carbonLoadModel
         .getTableName + '_' + carbonLoadModel.getTaskNo
 
-      val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
-      if (null != storeLocations && storeLocations.length > 0) {
-        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+      // this property is used to determine whether temp location for carbon is inside
+      // container temp dir or is yarn application directory.
+      val carbonUseLocalDir = CarbonProperties.getInstance()
+        .getProperty("carbon.use.local.dir", "false")
+
+      if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+
+        val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+        if (null != storeLocations && storeLocations.length > 0) {
+          storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+        }
+        if (storeLocation == null) {
+          storeLocation = System.getProperty("java.io.tmpdir")
+        }
       }
-      if (storeLocation == null) {
+      else {
         storeLocation = System.getProperty("java.io.tmpdir")
       }
       storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
@@ -177,7 +188,7 @@ class CarbonMergerRDD[K, V](
         try {
           val isCompactionFlow = true
           CarbonLoaderUtil
-            .deleteLocalDataLoadFolderLocation(carbonLoadModel, newSlice, isCompactionFlow)
+            .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow)
         } catch {
           case e: Exception =>
             LOGGER.error(e)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
index 3b69b4a..b865393 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
@@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -397,27 +398,16 @@ public class CsvInput extends BaseStep implements StepInterface {
     }
 
     resultArray = results.toArray(new Future[results.size()]);
-    boolean completed = false;
     try {
-      while (!completed) {
-        completed = true;
-        for (int j = 0; j < resultArray.length; j++) {
-          if (!resultArray[j].isDone()) {
-            completed = false;
-          }
-
-        }
-        if (isTerminated) {
-          exec.shutdownNow();
-          throw new RuntimeException("Interrupted due to failing of other threads");
-        }
-        Thread.sleep(100);
-
+      for (int j = 0; j < resultArray.length; j++) {
+        resultArray[j].get();
       }
-    } catch (InterruptedException e) {
+    } catch (InterruptedException | ExecutionException e) {
       throw new RuntimeException("Thread InterruptedException", e);
     }
-    exec.shutdown();
+    finally {
+      exec.shutdownNow();
+    }
   }
 
   private void doProcessUnivocity() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index c2093ad..37c912e 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -750,27 +751,15 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
     }
 
     this.resultArray = results.toArray(new Future[results.size()]);
-    boolean completed = false;
     try {
-      while (!completed) {
-        completed = true;
-        for (int j = 0; j < this.resultArray.length; j++) {
-          if (!this.resultArray[j].isDone()) {
-            completed = false;
-          }
-
-        }
-        if (isTerminated) {
-          exec.shutdownNow();
-          throw new RuntimeException("Interrupted due to failing of other threads");
-        }
-        Thread.sleep(100);
-
+      for (int j = 0; j < this.resultArray.length; j++) {
+        this.resultArray[j].get();
       }
-    } catch (InterruptedException e) {
+    } catch (InterruptedException | ExecutionException e) {
       throw new RuntimeException("Thread InterruptedException", e);
+    } finally {
+      exec.shutdownNow();
     }
-    exec.shutdown();
   }
 
   private int[] getUpdatedLens(int[] lens, boolean[] presentDims) {