You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:34 UTC
[36/47] incubator-carbondata git commit: [CARBONDATA-118][Bug] Temp
location clean up in compaction (#883)
[CARBONDATA-118][Bug] Temp location clean up in compaction (#883)
Passed compaction temporary location to compaction and data load merge flow. Handled clean up of that temp folder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5341c7dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5341c7dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5341c7dc
Branch: refs/heads/master
Commit: 5341c7dce8ce1a260d1ac48af67d58cd12331ba6
Parents: 61b6074
Author: ravikiran23 <ra...@gmail.com>
Authored: Fri Jul 29 06:05:39 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Fri Jul 29 06:05:39 2016 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 5 +++
.../carbondata/spark/load/CarbonLoaderUtil.java | 26 +++++++-----
.../spark/rdd/CarbonDataLoadRDD.scala | 4 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 42 ++++++++++++++++----
.../processing/mdkeygen/MDKeyGenStep.java | 2 +-
.../sortandgroupby/sortdata/SortDataRows.java | 2 +-
.../csvbased/CarbonCSVBasedSeqGenStep.java | 2 +-
.../FileStoreSurrogateKeyGenForCSV.java | 3 +-
.../util/CarbonDataProcessorUtil.java | 7 +++-
9 files changed, 69 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
index 5852241..86d87eb 100644
--- a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
@@ -849,6 +849,11 @@ public final class CarbonCommonConstants {
*/
public static String LEVEL2_COMPACTION_INDEX = ".2";
+ /**
+ * Indicates compaction
+ */
+ public static String COMPACTION_KEY_WORD = "COMPACTION";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
index 3ecdfe4..bce99a7 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
@@ -428,7 +428,7 @@ public final class CarbonLoaderUtil {
* @param segmentName
*/
public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
- String segmentName) {
+ String segmentName, boolean isCompactionFlow) {
String databaseName = loadModel.getDatabaseName();
String tableName = loadModel.getTableName();
CarbonTableIdentifier carbonTableIdentifier =
@@ -436,26 +436,32 @@ public final class CarbonLoaderUtil {
String segmentId = segmentName.substring(CarbonCommonConstants.LOAD_FOLDER.length());
String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
+ CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+ if (isCompactionFlow) {
+ tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + tempLocationKey;
+ }
// form local store location
String localStoreLocation = getStoreLocation(CarbonProperties.getInstance()
.getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL),
carbonTableIdentifier, segmentId, loadModel.getPartitionId());
try {
CarbonUtil.deleteFoldersAndFiles(new File[] { new File(localStoreLocation) });
+ LOGGER.info("Deleted the local store location" + localStoreLocation);
} catch (CarbonUtilException e) {
LOGGER.error(e, "Failed to delete local data load folder location");
}
// delete ktr file.
- String graphPath = CarbonProperties.getInstance()
- .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL)
- + File.separator + "/etl" + File.separator + databaseName + File.separator + tableName
- + File.separator + loadModel.getSegmentId() + File.separator + loadModel.getTaskNo()
- + File.separator + tableName + ".ktr";
- File path = new File(graphPath);
- if (path.exists()) {
- if (!path.delete()) {
- LOGGER.error("failed to delete the ktr file in path " + path);
+ if (!isCompactionFlow) {
+ String graphPath = CarbonProperties.getInstance()
+ .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL)
+ + File.separator + "/etl" + File.separator + databaseName + File.separator + tableName
+ + File.separator + loadModel.getSegmentId() + File.separator + loadModel.getTaskNo()
+ + File.separator + tableName + ".ktr";
+ File path = new File(graphPath);
+ if (path.exists()) {
+ if (!path.delete()) {
+ LOGGER.error("failed to delete the ktr file in path " + path);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 87f7885..36a1c0f 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -208,7 +208,9 @@ class CarbonDataLoadRDD[K, V](
// delete temp location data
val newSlice = CarbonCommonConstants.LOAD_FOLDER + loadCount
try {
- CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, newSlice)
+ val isCompaction = false
+ CarbonLoaderUtil
+ .deleteLocalDataLoadFolderLocation(model, newSlice, isCompaction)
} catch {
case e: Exception =>
LOGGER.error(e)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
index 254d51b..7b15cbf 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -21,6 +21,7 @@ import java.util
import java.util.{Collections, List}
import scala.collection.JavaConverters._
+import scala.util.Random
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._
@@ -59,7 +60,7 @@ class CarbonMergerRDD[K, V](
sc.setLocalProperty("spark.scheduler.pool", "DDL")
sc.setLocalProperty("spark.job.interruptOnCancel", "true")
- val storeLocation = carbonMergerMapping.storeLocation
+ var storeLocation: String = null
val hdfsStoreLocation = carbonMergerMapping.hdfsStoreLocation
val metadataFilePath = carbonMergerMapping.metadataFilePath
val mergedLoadName = carbonMergerMapping.mergedLoadName
@@ -70,16 +71,28 @@ class CarbonMergerRDD[K, V](
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ val tempLocationKey: String = CarbonCommonConstants
+ .COMPACTION_KEY_WORD + '_' + carbonLoadModel
+ .getDatabaseName + '_' + carbonLoadModel
+ .getTableName + '_' + carbonLoadModel.getTaskNo
+
+ val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+ if (null != storeLocations && storeLocations.length > 0) {
+ storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+ }
+ if (storeLocation == null) {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
+ storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
+ CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+ LOGGER.info("Temp storeLocation taken is " + storeLocation)
var mergeStatus = false
+ var mergeNumber = ""
try {
var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
- val tempLocationKey: String = carbonLoadModel.getDatabaseName + '_' + carbonLoadModel
- .getTableName + carbonLoadModel.getTaskNo
- CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
-
// sorting the table block info List.
var tableBlockInfoList = carbonSparkPartition.tableBlockInfos
@@ -126,7 +139,7 @@ class CarbonMergerRDD[K, V](
}
}
- val mergeNumber = mergedLoadName
+ mergeNumber = mergedLoadName
.substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
)
@@ -135,7 +148,8 @@ class CarbonMergerRDD[K, V](
factTableName,
carbonLoadModel.getTaskNo,
"0",
- mergeNumber
+ mergeNumber,
+ true
)
carbonLoadModel.setSegmentId(mergeNumber)
@@ -157,6 +171,18 @@ class CarbonMergerRDD[K, V](
LOGGER.error(e)
throw e
}
+ finally {
+ // delete temp location data
+ val newSlice = CarbonCommonConstants.LOAD_FOLDER + mergeNumber
+ try {
+ val isCompactionFlow = true
+ CarbonLoaderUtil
+ .deleteLocalDataLoadFolderLocation(carbonLoadModel, newSlice, isCompactionFlow)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e)
+ }
+ }
var finished = false
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java b/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
index 8ddb429..6b997d5 100644
--- a/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
@@ -259,7 +259,7 @@ public class MDKeyGenStep extends BaseStep {
this.tableName = meta.getTableName();
storeLocation = CarbonDataProcessorUtil
.getLocalDataFolderLocation(meta.getSchemaName(), meta.getTableName(),
- String.valueOf(meta.getTaskNo()), meta.getPartitionID(), meta.getSegmentId()+"");
+ String.valueOf(meta.getTaskNo()), meta.getPartitionID(), meta.getSegmentId()+"", false);
isNoDictionaryDimension =
RemoveDictionaryUtil.convertStringToBooleanArr(meta.getNoDictionaryDimsMapping());
isUseInvertedIndex =
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 1b3de04..bb8ab2a 100644
--- a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -525,7 +525,7 @@ public class SortDataRows {
private void updateSortTempFileLocation() {
String carbonDataDirectoryPath = CarbonDataProcessorUtil
.getLocalDataFolderLocation(schemaName, tableName, taskNo, partitionID,
- segmentId);
+ segmentId, false);
this.tempFileLocation =
carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
LOGGER.info("temp file location" + this.tempFileLocation);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index d5ecb37..c2093ad 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -707,7 +707,7 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
private void updateStoreLocation() {
loadFolderLoc = CarbonDataProcessorUtil
.getLocalDataFolderLocation(meta.getSchemaName(), meta.getTableName(), meta.getTaskNo(),
- meta.getPartitionID(), meta.getSegmentId()+"");
+ meta.getPartitionID(), meta.getSegmentId()+"", false);
}
private String getBadLogStoreLocation(String storeLocation) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
index a920d63..b4822db 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
@@ -193,7 +193,8 @@ public class FileStoreSurrogateKeyGenForCSV extends CarbonCSVBasedDimSurrogateKe
private String checkAndCreateLoadFolderNumber(String databaseName,
String tableName) throws KettleException {
String carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId+"");
+ .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId + "",
+ false);
boolean isDirCreated = new File(carbonDataDirectoryPath).mkdirs();
if (!isDirCreated) {
throw new KettleException("Unable to create data load directory" + carbonDataDirectoryPath);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5341c7dc/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
index 8f2c9e7..84274ed 100644
--- a/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -262,9 +262,14 @@ public final class CarbonDataProcessorUtil {
* @return
*/
public static String getLocalDataFolderLocation(String databaseName, String tableName,
- String taskId, String partitionId, String segmentId) {
+ String taskId, String partitionId, String segmentId, boolean isCompactionFlow) {
String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
+ CarbonCommonConstants.UNDERSCORE + taskId;
+ if(isCompactionFlow){
+ tempLocationKey = CarbonCommonConstants
+ .COMPACTION_KEY_WORD + '_' + tempLocationKey;
+ }
+
String baseStorePath = CarbonProperties.getInstance()
.getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
CarbonTable carbonTable = CarbonMetadata.getInstance()