You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/06 10:00:26 UTC
[09/20] incubator-carbondata git commit: [CARBONDATA-134] changing
store location to inside that of the container. (#901)
[CARBONDATA-134] changing store location to inside that of the container. (#901)
* changing store location to inside that of the container.
* deleting the empty folders of data load in temp location.
* Adding switch to configure to use LOCAL_DIRS or java temp dir, defaulting to java temp dir.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d70a6e55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d70a6e55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d70a6e55
Branch: refs/heads/master
Commit: d70a6e55514651afc3b780851c1cb918d1f7f1fd
Parents: b327375
Author: ravikiran23 <ra...@gmail.com>
Authored: Thu Aug 4 12:54:21 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Thu Aug 4 12:54:21 2016 +0530
----------------------------------------------------------------------
.../carbondata/spark/load/CarbonLoaderUtil.java | 28 +++-----------------
.../spark/rdd/CarbonDataLoadRDD.scala | 21 +++++++++++----
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 21 +++++++++++----
.../processing/csvreaderstep/CsvInput.java | 24 +++++------------
.../csvbased/CarbonCSVBasedSeqGenStep.java | 23 +++++-----------
5 files changed, 49 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
index bce99a7..e0e109d 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
@@ -425,46 +425,26 @@ public final class CarbonLoaderUtil {
* This method will delete the local data load folder location after data load is complete
*
* @param loadModel
- * @param segmentName
*/
public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
- String segmentName, boolean isCompactionFlow) {
+ boolean isCompactionFlow) {
String databaseName = loadModel.getDatabaseName();
String tableName = loadModel.getTableName();
- CarbonTableIdentifier carbonTableIdentifier =
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
- String segmentId = segmentName.substring(CarbonCommonConstants.LOAD_FOLDER.length());
String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
+ CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
if (isCompactionFlow) {
tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + tempLocationKey;
}
// form local store location
- String localStoreLocation = getStoreLocation(CarbonProperties.getInstance()
- .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL),
- carbonTableIdentifier, segmentId, loadModel.getPartitionId());
+ String localStoreLocation = CarbonProperties.getInstance()
+ .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
try {
- CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation) });
+ CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation).getParentFile()});
LOGGER.info("Deleted the local store location" + localStoreLocation);
} catch (CarbonUtilException e) {
LOGGER.error(e, "Failed to delete local data load folder location");
}
- // delete ktr file.
- if (!isCompactionFlow) {
- String graphPath = CarbonProperties.getInstance()
- .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL)
- + File.separator + "/etl" + File.separator + databaseName + File.separator + tableName
- + File.separator + loadModel.getSegmentId() + File.separator + loadModel.getTaskNo()
- + File.separator + tableName + ".ktr";
- File path = new File(graphPath);
- if (path.exists()) {
- if (!path.delete()) {
- LOGGER.error("failed to delete the ktr file in path " + path);
- }
- }
- }
-
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 2a19da9..b8cab78 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -175,11 +175,22 @@ class CarbonDataLoadRDD[K, V](
CarbonProperties.getInstance().addProperty("high.cardinality.value", "100000")
CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false")
CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000")
- val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
- if (null != storeLocations && storeLocations.length > 0) {
- storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+
+ // this property is used to determine whether temp location for carbon is inside
+ // container temp dir or is yarn application directory.
+ val carbonUseLocalDir = CarbonProperties.getInstance()
+ .getProperty("carbon.use.local.dir", "false")
+
+ if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+ val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+ if (null != storeLocations && storeLocations.length > 0) {
+ storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+ }
+ if (storeLocation == null) {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
}
- if (storeLocation == null) {
+ else {
storeLocation = System.getProperty("java.io.tmpdir")
}
storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
@@ -211,7 +222,7 @@ class CarbonDataLoadRDD[K, V](
try {
val isCompaction = false
CarbonLoaderUtil
- .deleteLocalDataLoadFolderLocation(model, newSlice, isCompaction)
+ .deleteLocalDataLoadFolderLocation(model, isCompaction)
} catch {
case e: Exception =>
LOGGER.error(e)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
index 7b15cbf..a3dd1c6 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -77,11 +77,22 @@ class CarbonMergerRDD[K, V](
.getDatabaseName + '_' + carbonLoadModel
.getTableName + '_' + carbonLoadModel.getTaskNo
- val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
- if (null != storeLocations && storeLocations.length > 0) {
- storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+ // this property is used to determine whether temp location for carbon is inside
+ // container temp dir or is yarn application directory.
+ val carbonUseLocalDir = CarbonProperties.getInstance()
+ .getProperty("carbon.use.local.dir", "false")
+
+ if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+
+ val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+ if (null != storeLocations && storeLocations.length > 0) {
+ storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+ }
+ if (storeLocation == null) {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
}
- if (storeLocation == null) {
+ else {
storeLocation = System.getProperty("java.io.tmpdir")
}
storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
@@ -177,7 +188,7 @@ class CarbonMergerRDD[K, V](
try {
val isCompactionFlow = true
CarbonLoaderUtil
- .deleteLocalDataLoadFolderLocation(carbonLoadModel, newSlice, isCompactionFlow)
+ .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow)
} catch {
case e: Exception =>
LOGGER.error(e)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
index 3b69b4a..b865393 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
@@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -397,27 +398,16 @@ public class CsvInput extends BaseStep implements StepInterface {
}
resultArray = results.toArray(new Future[results.size()]);
- boolean completed = false;
try {
- while (!completed) {
- completed = true;
- for (int j = 0; j < resultArray.length; j++) {
- if (!resultArray[j].isDone()) {
- completed = false;
- }
-
- }
- if (isTerminated) {
- exec.shutdownNow();
- throw new RuntimeException("Interrupted due to failing of other threads");
- }
- Thread.sleep(100);
-
+ for (int j = 0; j < resultArray.length; j++) {
+ resultArray[j].get();
}
- } catch (InterruptedException e) {
+ } catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Thread InterruptedException", e);
}
- exec.shutdown();
+ finally {
+ exec.shutdownNow();
+ }
}
private void doProcessUnivocity() {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d70a6e55/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index c2093ad..37c912e 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -37,6 +37,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -750,27 +751,15 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
}
this.resultArray = results.toArray(new Future[results.size()]);
- boolean completed = false;
try {
- while (!completed) {
- completed = true;
- for (int j = 0; j < this.resultArray.length; j++) {
- if (!this.resultArray[j].isDone()) {
- completed = false;
- }
-
- }
- if (isTerminated) {
- exec.shutdownNow();
- throw new RuntimeException("Interrupted due to failing of other threads");
- }
- Thread.sleep(100);
-
+ for (int j = 0; j < this.resultArray.length; j++) {
+ this.resultArray[j].get();
}
- } catch (InterruptedException e) {
+ } catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Thread InterruptedException", e);
+ } finally {
+ exec.shutdownNow();
}
- exec.shutdown();
}
private int[] getUpdatedLens(int[] lens, boolean[] presentDims) {