You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/12/12 09:17:19 UTC
carbondata git commit: [CARBONDATA-1851] Refactor to use only
SegmentsToAccess for Aggregatetable,
move tableFolderDeletion to TableProcessingOperations
Repository: carbondata
Updated Branches:
refs/heads/master 4eb37240f -> 34cb55194
[CARBONDATA-1851] Refactor to use only SegmentsToAccess for Aggregatetable, move tableFolderDeletion to TableProcessingOperations
This closes #1616
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/34cb5519
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/34cb5519
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/34cb5519
Branch: refs/heads/master
Commit: 34cb55194c2061efe54acaf4cea67d5b6179034c
Parents: 4eb3724
Author: rahulforallp <ra...@knoldus.in>
Authored: Tue Dec 5 19:48:38 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Dec 12 14:45:41 2017 +0530
----------------------------------------------------------------------
.../filesystem/AbstractDFSCarbonFile.java | 4 +-
.../core/metadata/schema/table/CarbonTable.java | 10 ++
.../carbondata/core/scan/model/QueryModel.java | 46 +++---
.../carbondata/core/util/CarbonProperties.java | 8 +
.../carbondata/core/util/SessionParams.java | 7 +-
.../hadoop/api/CarbonTableInputFormat.java | 17 +-
.../hadoop/util/CarbonInputFormatUtil.java | 8 +-
.../org/apache/carbondata/events/Events.scala | 2 +-
.../load/DataLoadProcessorStepOnSpark.scala | 4 +-
.../spark/rdd/AlterTableLoadPartitionRDD.scala | 3 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 3 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 20 +--
.../spark/rdd/NewCarbonDataLoadRDD.scala | 10 +-
.../carbondata/spark/rdd/UpdateDataLoad.scala | 6 +-
.../apache/carbondata/spark/rdd/Compactor.scala | 5 +-
.../management/CarbonCleanFilesCommand.scala | 9 +-
.../CarbonAlterTableDropPartitionCommand.scala | 7 +-
.../CarbonAlterTableSplitPartitionCommand.scala | 7 +-
.../loading/TableProcessingOperations.java | 164 +++++++++++++++++++
.../store/CarbonFactDataHandlerModel.java | 2 +-
.../processing/util/CarbonLoaderUtil.java | 100 -----------
21 files changed, 249 insertions(+), 193 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 3eb97bc..fcd230a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -63,7 +63,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
fs = path.getFileSystem(this.hadoopConf);
fileStatus = fs.getFileStatus(path);
} catch (IOException e) {
- LOGGER.error("Exception occurred:" + e.getMessage());
+ LOGGER.debug("Exception occurred:" + e.getMessage());
}
}
@@ -78,7 +78,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
fs = path.getFileSystem(this.hadoopConf);
fileStatus = fs.getFileStatus(path);
} catch (IOException e) {
- LOGGER.error("Exception occurred:" + e.getMessage());
+ LOGGER.debug("Exception occurred:" + e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index e5d8839..4ebc02d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -273,6 +273,16 @@ public class CarbonTable implements Serializable {
}
/**
+ * to get the all dimension of a table
+ *
+ * @param tableName
+ * @return
+ */
+ public List<CarbonDimension> getImplicitDimensionByTableName(String tableName) {
+ return tableImplicitDimensionsMap.get(tableName);
+ }
+
+ /**
* Read all primitive/complex children and set it as list of child carbon dimension to parent
* dimension
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 67b8681..5e4872b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -122,10 +122,9 @@ public class QueryModel implements Serializable {
public static QueryModel createModel(AbsoluteTableIdentifier absoluteTableIdentifier,
CarbonQueryPlan queryPlan, CarbonTable carbonTable, DataTypeConverter converter) {
QueryModel queryModel = new QueryModel();
- String factTableName = carbonTable.getTableName();
queryModel.setAbsoluteTableIdentifier(absoluteTableIdentifier);
- fillQueryModel(queryPlan, carbonTable, queryModel, factTableName);
+ fillQueryModel(queryPlan, carbonTable, queryModel);
queryModel.setForcedDetailRawQuery(queryPlan.isRawDetailQuery());
queryModel.setQueryId(queryPlan.getQueryId());
@@ -134,7 +133,7 @@ public class QueryModel implements Serializable {
}
private static void fillQueryModel(CarbonQueryPlan queryPlan, CarbonTable carbonTable,
- QueryModel queryModel, String factTableName) {
+ QueryModel queryModel) {
queryModel.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
queryModel.setQueryDimension(queryPlan.getDimensions());
queryModel.setQueryMeasures(queryPlan.getMeasures());
@@ -142,9 +141,8 @@ public class QueryModel implements Serializable {
boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
boolean[] isFilterMeasures =
new boolean[carbonTable.getNumberOfMeasures(carbonTable.getTableName())];
- processFilterExpression(queryPlan.getFilterExpression(),
- carbonTable.getDimensionByTableName(factTableName),
- carbonTable.getMeasureByTableName(factTableName), isFilterDimensions, isFilterMeasures);
+ processFilterExpression(carbonTable, queryPlan.getFilterExpression(), isFilterDimensions,
+ isFilterMeasures);
queryModel.setIsFilterDimensions(isFilterDimensions);
queryModel.setIsFilterMeasures(isFilterMeasures);
}
@@ -153,8 +151,7 @@ public class QueryModel implements Serializable {
queryModel.setTable(carbonTable);
}
- public static void processFilterExpression(Expression filterExpression,
- List<CarbonDimension> dimensions, List<CarbonMeasure> measures,
+ public static void processFilterExpression(CarbonTable carbonTable, Expression filterExpression,
final boolean[] isFilterDimensions, final boolean[] isFilterMeasures) {
if (null != filterExpression) {
if (null != filterExpression.getChildren() && filterExpression.getChildren().size() == 0) {
@@ -162,24 +159,22 @@ public class QueryModel implements Serializable {
List<ColumnExpression> listOfCol =
((ConditionalExpression) filterExpression).getColumnList();
for (ColumnExpression expression : listOfCol) {
- setDimAndMsrColumnNode(dimensions, measures, expression, isFilterDimensions,
- isFilterMeasures);
+ setDimAndMsrColumnNode(carbonTable, expression, isFilterDimensions, isFilterMeasures);
}
}
}
for (Expression expression : filterExpression.getChildren()) {
if (expression instanceof ColumnExpression) {
- setDimAndMsrColumnNode(dimensions, measures, (ColumnExpression) expression,
- isFilterDimensions, isFilterMeasures);
+ setDimAndMsrColumnNode(carbonTable, (ColumnExpression) expression, isFilterDimensions,
+ isFilterMeasures);
} else if (expression instanceof UnknownExpression) {
UnknownExpression exp = ((UnknownExpression) expression);
List<ColumnExpression> listOfColExpression = exp.getAllColumnList();
for (ColumnExpression col : listOfColExpression) {
- setDimAndMsrColumnNode(dimensions, measures, col, isFilterDimensions, isFilterMeasures);
+ setDimAndMsrColumnNode(carbonTable, col, isFilterDimensions, isFilterMeasures);
}
} else {
- processFilterExpression(expression, dimensions, measures, isFilterDimensions,
- isFilterMeasures);
+ processFilterExpression(carbonTable, expression, isFilterDimensions, isFilterMeasures);
}
}
}
@@ -195,15 +190,16 @@ public class QueryModel implements Serializable {
return null;
}
- private static void setDimAndMsrColumnNode(List<CarbonDimension> dimensions,
- List<CarbonMeasure> measures, ColumnExpression col, boolean[] isFilterDimensions,
- boolean[] isFilterMeasures) {
+ private static void setDimAndMsrColumnNode(CarbonTable carbonTable, ColumnExpression col,
+ boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
CarbonDimension dim;
CarbonMeasure msr;
String columnName;
columnName = col.getColumnName();
- dim = CarbonUtil.findDimension(dimensions, columnName);
- msr = getCarbonMetadataMeasure(columnName, measures);
+ dim = CarbonUtil
+ .findDimension(carbonTable.getDimensionByTableName(carbonTable.getTableName()), columnName);
+ msr = getCarbonMetadataMeasure(columnName,
+ carbonTable.getMeasureByTableName(carbonTable.getTableName()));
col.setDimension(false);
col.setMeasure(false);
@@ -215,13 +211,21 @@ public class QueryModel implements Serializable {
if (null != isFilterDimensions) {
isFilterDimensions[dim.getOrdinal()] = true;
}
- } else {
+ } else if (msr != null) {
col.setCarbonColumn(msr);
col.setMeasure(msr);
col.setMeasure(true);
if (null != isFilterMeasures) {
isFilterMeasures[msr.getOrdinal()] = true;
}
+ } else {
+ // check if this is an implicit dimension
+ dim = CarbonUtil
+ .findDimension(carbonTable.getImplicitDimensionByTableName(carbonTable.getTableName()),
+ columnName);
+ col.setCarbonColumn(dim);
+ col.setDimension(dim);
+ col.setDimension(true);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 281ee15..fe396cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -990,4 +990,12 @@ public final class CarbonProperties {
return addedProperty;
}
+ /**
+ * to add external property
+ *
+ * @param externalPropertySet
+ */
+ public void addPropertyToPropertySet(Set<String> externalPropertySet) {
+ propertySet.addAll(externalPropertySet);
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 5dda9e4..0540ed6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -80,12 +80,7 @@ public class SessionParams implements Serializable {
* @return properties value
*/
public SessionParams addProperty(String key, String value) throws InvalidConfigurationException {
- boolean isValidConf = validateKeyValue(key, value);
- if (isValidConf) {
- LOGGER.audit("The key " + key + " with value " + value + " added in the session param");
- sProps.put(key, value);
- }
- return this;
+ return addProperty(key, value, true);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 15d1304..c16b0aa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -280,14 +280,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
configuration.set(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
}
- public static void setAggeragateTableSegments(Configuration configuration, String segments) {
- configuration.set(CarbonCommonConstants.CARBON_INPUT_SEGMENTS, segments);
- }
-
- private static String getAggeragateTableSegments(Configuration configuration) {
- return configuration.get(CarbonCommonConstants.CARBON_INPUT_SEGMENTS);
- }
-
/**
* get list of segment to access
*/
@@ -330,14 +322,13 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
}
- String aggregateTableSegments = getAggeragateTableSegments(job.getConfiguration());
TableDataMap blockletMap =
DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
BlockletDataMapFactory.class.getName());
List<String> invalidSegments = new ArrayList<>();
List<UpdateVO> invalidTimestampsList = new ArrayList<>();
List<String> streamSegments = null;
- List<String> filteredSegmentToAccess = null;
+
if (getValidateSegmentsToAccess(job.getConfiguration())) {
// get all valid segments and set them into the configuration
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
@@ -349,7 +340,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
return getSplitsOfStreaming(job, identifier, streamSegments);
}
- filteredSegmentToAccess = getFilteredSegment(job, validSegments);
+ List<String> filteredSegmentToAccess = getFilteredSegment(job, validSegments);
if (filteredSegmentToAccess.size() == 0) {
return new ArrayList<>(0);
} else {
@@ -363,10 +354,10 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
if (invalidSegments.size() > 0) {
blockletMap.clear(invalidSegments);
}
- } else {
- filteredSegmentToAccess = Arrays.asList(aggregateTableSegments.split(","));
}
+ // get updated filtered list
+ List<String> filteredSegmentToAccess = Arrays.asList(getSegmentsToAccess(job));
// Clean the updated segments from memory if the update happens on segments
List<String> toBeCleanedSegments = new ArrayList<>();
for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index bf1b188..514428b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -121,12 +121,8 @@ public class CarbonInputFormatUtil {
public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable,
boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
- List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getTableName());
- List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getTableName());
- QueryModel.processFilterExpression(filterExpression, dimensions, measures,
- isFilterDimensions, isFilterMeasures);
+ QueryModel.processFilterExpression(carbonTable, filterExpression, isFilterDimensions,
+ isFilterMeasures);
if (null != filterExpression) {
// Optimize Filter Expression and fit RANGE filters is conditions apply.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 8e69855..799d8c4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.events
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index c28426d..6759b20 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -27,7 +27,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.loading.DataLoadProcessBuilder
+import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, TableProcessingOperations}
import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
@@ -206,7 +206,7 @@ object DataLoadProcessorStepOnSpark {
dataWriter.close()
}
// clean up the folders and files created locally for data load operation
- CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
+ TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 37ab8c3..1ecab9f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -28,6 +28,7 @@ import org.apache.spark.util.PartitionUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.partition.spliter.RowResultProcessor
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.AlterPartitionResult
@@ -119,7 +120,7 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
case e: Exception =>
sys.error(s"Exception when executing Row result processor ${e.getMessage}")
} finally {
- CarbonLoaderUtil
+ TableProcessingOperations
.deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 82b2a57..fb4634e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -46,6 +46,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUt
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger._
import org.apache.carbondata.processing.splits.TableSplit
@@ -233,7 +234,7 @@ class CarbonMergerRDD[K, V](
// delete temp location data
try {
val isCompactionFlow = true
- CarbonLoaderUtil
+ TableProcessingOperations
.deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow, false)
} catch {
case e: Exception =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index d599c22..cc7f757 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -357,22 +357,14 @@ class CarbonScanRDD(
CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
}
+
+ // when validate segments is disabled in thread local update it to CarbonTableInputFormat
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (carbonSessionInfo != null) {
- val segmentsToScan = carbonSessionInfo.getSessionParams.getProperty(
- CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
- identifier.getCarbonTableIdentifier.getDatabaseName + "." +
- identifier.getCarbonTableIdentifier.getTableName)
- if (segmentsToScan != null) {
- CarbonTableInputFormat.setAggeragateTableSegments(conf, segmentsToScan)
- }
- val validateSegments = carbonSessionInfo.getSessionParams.getProperty(
- CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
- identifier.getCarbonTableIdentifier.getDatabaseName + "." +
- identifier.getCarbonTableIdentifier.getTableName)
- if (validateSegments != null) {
- CarbonTableInputFormat.setValidateSegmentsToAccess(conf, validateSegments.toBoolean)
- }
+ CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
+ .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+ identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+ identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
}
format
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index f948ac8..b27521a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -45,12 +45,12 @@ import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
-import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
+import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations}
import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.splits.TableSplit
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil, CarbonQueryUtil}
+import org.apache.carbondata.processing.util.CarbonQueryUtil
import org.apache.carbondata.spark.DataLoadResult
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
@@ -261,7 +261,7 @@ class NewCarbonDataLoadRDD[K, V](
throw e
} finally {
// clean up the folders and files created locally for data load operation
- CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
+ TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
// in case of failure the same operation will be re-tried several times.
// So print the data load statistics only in case of non failure case
if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
@@ -403,7 +403,7 @@ class NewDataFrameLoaderRDD[K, V](
throw e
} finally {
// clean up the folders and files created locally for data load operation
- CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
+ TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
// in case of failure the same operation will be re-tried several times.
// So print the data load statistics only in case of non failure case
if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
@@ -587,7 +587,7 @@ class PartitionTableDataLoaderRDD[K, V](
throw e
} finally {
// clean up the folders and files created locally for data load operation
- CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
+ TableProcessingOperations.deleteLocalDataLoadFolderLocation(model, false, false)
// in case of failure the same operation will be re-tried several times.
// So print the data load statistics only in case of non failure case
if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index b1dfc01..4934cbc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -24,11 +24,9 @@ import org.apache.spark.sql.Row
import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
+import org.apache.carbondata.processing.loading.{DataLoadExecutor, TableProcessingOperations}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.loading.DataLoadExecutor
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
/**
* Data load in case of update command .
@@ -65,7 +63,7 @@ object UpdateDataLoad {
LOGGER.error(e)
throw e
} finally {
- CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(carbonLoadModel, false, false)
+ TableProcessingOperations.deleteLocalDataLoadFolderLocation(carbonLoadModel, false, false)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 6fafc95..6da8bd6 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.execution.command.CompactionModel
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
abstract class Compactor(carbonLoadModel: CarbonLoadModel,
compactionModel: CompactionModel,
@@ -52,7 +52,8 @@ abstract class Compactor(carbonLoadModel: CarbonLoadModel,
// status.
// so deleting those folders.
try {
- CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
+ TableProcessingOperations
+ .deletePartialLoadDataIfExist(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, true)
} catch {
case e: Exception =>
LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index eacfded..e0530f6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -57,7 +57,7 @@ case class CarbonCleanFilesCommand(
}
val cleanFilesPostEvent: CleanFilesPostEvent =
CleanFilesPostEvent(carbonTable, sparkSession)
- OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent)
+ OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent, operationContext)
Seq.empty
}
@@ -76,10 +76,6 @@ case class CarbonCleanFilesCommand(
private def cleanGarbageData(sparkSession: SparkSession,
databaseNameOp: Option[String], tableName: String): Unit = {
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
- val cleanFilesPreEvent: CleanFilesPreEvent =
- CleanFilesPreEvent(carbonTable,
- sparkSession)
- OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
CarbonStore.cleanFiles(
CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
@@ -87,9 +83,6 @@ case class CarbonCleanFilesCommand(
CarbonProperties.getStorePath,
carbonTable,
forceTableClean)
-
- val cleanFilesPostEvent: CleanFilesPostEvent = CleanFilesPostEvent(carbonTable, sparkSession)
- OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent)
}
private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 69aa91a..fb515fa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -35,13 +35,12 @@ import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.partition.DropPartitionCallable
case class CarbonAlterTableDropPartitionCommand(
@@ -224,7 +223,9 @@ case class CarbonAlterTableDropPartitionCommand(
} finally {
executor.shutdown()
try {
- CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+ TableProcessingOperations
+ .deletePartialLoadDataIfExist(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+ false)
} catch {
case e: Exception =>
LOGGER.error(s"Exception in dropping partition thread while deleting partial load file" +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 338ec5a..1a535fd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -38,13 +38,12 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetad
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.partition.SplitPartitionCallable
/**
@@ -233,7 +232,9 @@ case class CarbonAlterTableSplitPartitionCommand(
} finally {
executor.shutdown()
try {
- CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+ TableProcessingOperations
+ .deletePartialLoadDataIfExist(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+ false)
} catch {
case e: Exception =>
LOGGER.error(s"Exception in add/split partition thread while deleting partial load file" +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
new file mode 100644
index 0000000..cb53d6e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonThreadFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.annotation.DeveloperApi;
+
+@DeveloperApi
+public class TableProcessingOperations {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
+
+ /**
+ *
+ * @param carbonTable
+ * @param isCompactionFlow
+ * @throws IOException
+ */
+ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
+ final boolean isCompactionFlow) throws IOException {
+ String metaDataLocation = carbonTable.getMetaDataFilepath();
+ final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+ CarbonTablePath carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier());
+
+ //delete folder which metadata no exist in tablestatus
+ for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
+ final String partitionCount = i + "";
+ String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
+ FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
+ if (FileFactory.isFileExist(partitionPath, fileType)) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile path) {
+ String segmentId =
+ CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+ boolean found = false;
+ for (int j = 0; j < details.length; j++) {
+ if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
+ .equals(partitionCount)) {
+ found = true;
+ break;
+ }
+ }
+ return !found;
+ }
+ });
+ for (int k = 0; k < listFiles.length; k++) {
+ String segmentId =
+ CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+ if (isCompactionFlow) {
+ if (segmentId.contains(".")) {
+ CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+ }
+ } else {
+ if (!segmentId.contains(".")) {
+ CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * This method will delete the local data load folder location after data load is complete
+ *
+ * @param loadModel
+ * @param isCompactionFlow COMPACTION keyword will be added to path to make path unique if true
+ * @param isAltPartitionFlow Alter_Partition keyword will be added to path to make path unique if
+ * true
+ */
+ public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
+ boolean isCompactionFlow, boolean isAltPartitionFlow) {
+ String tableName = loadModel.getTableName();
+ String databaseName = loadModel.getDatabaseName();
+ String tempLocationKey = CarbonDataProcessorUtil
+ .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
+ loadModel.getTaskNo(), isCompactionFlow, isAltPartitionFlow);
+ deleteLocalDataLoadFolderLocation(tempLocationKey, tableName);
+ }
+
+ /**
+ *
+ * This method will delete the local data load folder location after data load is complete
+ *
+ * @param tempLocationKey temporary location set in carbon properties
+ * @param tableName
+ */
+ public static void deleteLocalDataLoadFolderLocation(String tempLocationKey, String tableName) {
+
+ // form local store location
+ final String localStoreLocations = CarbonProperties.getInstance().getProperty(tempLocationKey);
+ if (localStoreLocations == null) {
+ throw new RuntimeException("Store location not set for the key " + tempLocationKey);
+ }
+ // submit local folder clean up in another thread so that main thread execution is not blocked
+ ExecutorService localFolderDeletionService = Executors
+ .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName));
+ try {
+ localFolderDeletionService.submit(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ long startTime = System.currentTimeMillis();
+ String[] locArray = StringUtils.split(localStoreLocations, File.pathSeparator);
+ for (String loc : locArray) {
+ try {
+ CarbonUtil.deleteFoldersAndFiles(new File(loc));
+ } catch (IOException | InterruptedException e) {
+ LOGGER.error(e, "Failed to delete local data load folder location: " + loc);
+ }
+ }
+ LOGGER.info(
+ "Deleted the local store location: " + localStoreLocations + " : Time taken: " + (
+ System.currentTimeMillis() - startTime));
+ return null;
+ }
+ });
+ } finally {
+ if (null != localFolderDeletionService) {
+ localFolderDeletionService.shutdown();
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 37b585d..a8ae513 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -305,7 +305,7 @@ public class CarbonFactDataHandlerModel {
}
carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
String carbonDataDirectoryPath = CarbonDataProcessorUtil
- .checkAndCreateCarbonStoreLocation(loadModel.getTablePath(), loadModel.getDatabaseName(),
+ .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/34cb5519/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 4275603..9e6a73e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.processing.util;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.InetAddress;
@@ -26,9 +25,6 @@ import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -56,8 +52,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -66,7 +60,6 @@ import org.apache.carbondata.processing.merger.NodeBlockRelation;
import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
import com.google.gson.Gson;
-import org.apache.commons.lang3.StringUtils;
public final class CarbonLoaderUtil {
@@ -129,52 +122,6 @@ public final class CarbonLoaderUtil {
}
return true;
}
- public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel,
- final boolean isCompactionFlow) throws IOException {
- CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
- String metaDataLocation = carbonTable.getMetaDataFilepath();
- final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
-
- //delete folder which metadata no exist in tablestatus
- for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
- final String partitionCount = i + "";
- String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
- FileType fileType = FileFactory.getFileType(partitionPath);
- if (FileFactory.isFileExist(partitionPath, fileType)) {
- CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
- CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile path) {
- String segmentId =
- CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
- boolean found = false;
- for (int j = 0; j < details.length; j++) {
- if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
- .equals(partitionCount)) {
- found = true;
- break;
- }
- }
- return !found;
- }
- });
- for (int k = 0; k < listFiles.length; k++) {
- String segmentId =
- CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
- if (isCompactionFlow) {
- if (segmentId.contains(".")) {
- deleteStorePath(listFiles[k].getAbsolutePath());
- }
- } else {
- if (!segmentId.contains(".")) {
- deleteStorePath(listFiles[k].getAbsolutePath());
- }
- }
- }
- }
- }
- }
public static void deleteStorePath(String path) {
try {
@@ -188,53 +135,6 @@ public final class CarbonLoaderUtil {
}
}
-
- /**
- * This method will delete the local data load folder location after data load is complete
- *
- * @param loadModel
- */
- public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
- boolean isCompactionFlow, boolean isAltPartitionFlow) {
- String databaseName = loadModel.getDatabaseName();
- String tableName = loadModel.getTableName();
- String tempLocationKey = CarbonDataProcessorUtil
- .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
- loadModel.getTaskNo(), isCompactionFlow, isAltPartitionFlow);
- // form local store location
- final String localStoreLocations = CarbonProperties.getInstance().getProperty(tempLocationKey);
- if (localStoreLocations == null) {
- throw new RuntimeException("Store location not set for the key " + tempLocationKey);
- }
- // submit local folder clean up in another thread so that main thread execution is not blocked
- ExecutorService localFolderDeletionService = Executors
- .newFixedThreadPool(1, new CarbonThreadFactory("LocalFolderDeletionPool:" + tableName));
- try {
- localFolderDeletionService.submit(new Callable<Void>() {
- @Override public Void call() throws Exception {
- long startTime = System.currentTimeMillis();
- String[] locArray = StringUtils.split(localStoreLocations, File.pathSeparator);
- for (String loc : locArray) {
- try {
- CarbonUtil.deleteFoldersAndFiles(new File(loc));
- } catch (IOException | InterruptedException e) {
- LOGGER.error(e,
- "Failed to delete local data load folder location: " + loc);
- }
- }
- LOGGER.info("Deleted the local store location: " + localStoreLocations
- + " : Time taken: " + (System.currentTimeMillis() - startTime));
- return null;
- }
- });
- } finally {
- if (null != localFolderDeletionService) {
- localFolderDeletionService.shutdown();
- }
- }
-
- }
-
/**
* This API will write the load level metadata for the loadmanagement module inorder to
* manage the load and query execution management smoothly.