You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/05 15:02:43 UTC
[25/50] [abbrv] carbondata git commit: [CARBONDATA-1992] Remove
partitionId in CarbonTablePath
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 8b87cfc..6cf1dcd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
@@ -88,11 +89,13 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
child.initialize();
}
- private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
- String[] storeLocation = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
- tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
- configuration.getSegmentId() + "", false, false);
+ private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
+ String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getTableName(),
+ String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(),
+ false,
+ false);
CarbonDataProcessorUtil.createLocations(storeLocation);
return storeLocation;
}
@@ -115,11 +118,11 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
measureCount = configuration.getMeasureCount();
outputLength = measureCount + (this.noDictWithComplextCount > 0 ? 1 : 0) + 1;
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+ .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
System.currentTimeMillis());
if (iterators.length == 1) {
- doExecute(iterators[0], 0, 0);
+ doExecute(iterators[0], 0);
} else {
executorService = Executors.newFixedThreadPool(iterators.length,
new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier()
@@ -150,11 +153,10 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
return null;
}
- private void doExecute(Iterator<CarbonRowBatch> iterator, int partitionId, int iteratorIndex) {
- String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId));
- CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
- .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId,
- iteratorIndex);
+ private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) {
+ String[] storeLocation = getStoreLocation(tableIdentifier);
+ CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
+ configuration, storeLocation, 0, iteratorIndex);
CarbonFactHandler dataHandler = null;
boolean rowsNotExist = true;
while (iterator.hasNext()) {
@@ -189,10 +191,11 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
processingComplete(dataHandler);
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+ .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
System.currentTimeMillis());
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+ .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID,
+ System.currentTimeMillis());
}
private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
@@ -298,7 +301,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
}
@Override public void run() {
- doExecute(this.iterator, 0, iteratorIndex);
+ doExecute(this.iterator, iteratorIndex);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
index f030d52..369c1f2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
@@ -59,13 +60,11 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
child.initialize();
}
- private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
- String[] storeLocation = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
- tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
- configuration.getSegmentId() + "", false, false);
- CarbonDataProcessorUtil.createLocations(storeLocation);
- return storeLocation;
+ private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
+ return CarbonDataProcessorUtil.getLocalDataFolderLocation(
+ tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(),
+ String.valueOf(configuration.getTaskNo()),
+ configuration.getSegmentId(), false, false);
}
@Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
@@ -75,18 +74,19 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
String tableName = tableIdentifier.getTableName();
try {
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+ .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
System.currentTimeMillis());
int i = 0;
+ String[] storeLocation = getStoreLocation(tableIdentifier);
+ CarbonDataProcessorUtil.createLocations(storeLocation);
for (Iterator<CarbonRowBatch> iterator : iterators) {
- String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
int k = 0;
while (iterator.hasNext()) {
CarbonRowBatch next = iterator.next();
// If no rows from merge sorter, then don't create a file in fact column handler
if (next.hasNext()) {
CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
- .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
+ .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, k++);
CarbonFactHandler dataHandler = CarbonFactHandlerFactory
.createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
dataHandler.initialise();
@@ -119,10 +119,11 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
processingComplete(dataHandler);
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+ .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
System.currentTimeMillis());
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+ .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID,
+ System.currentTimeMillis());
}
private void processingComplete(CarbonFactHandler dataHandler) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index d321405..58009af 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
@@ -65,21 +66,21 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
child.initialize();
}
- private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
+ private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
String[] storeLocation = CarbonDataProcessorUtil
.getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
- tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
- configuration.getSegmentId() + "", false, false);
+ tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()),
+ configuration.getSegmentId(), false, false);
CarbonDataProcessorUtil.createLocations(storeLocation);
return storeLocation;
}
- public CarbonFactDataHandlerModel getDataHandlerModel(int partitionId) {
+ public CarbonFactDataHandlerModel getDataHandlerModel() {
CarbonTableIdentifier tableIdentifier =
configuration.getTableIdentifier().getCarbonTableIdentifier();
- String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId));
+ String[] storeLocation = getStoreLocation(tableIdentifier);
return CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration,
- storeLocation, partitionId, 0);
+ storeLocation, 0, 0);
}
@Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
@@ -89,11 +90,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
String tableName = tableIdentifier.getTableName();
try {
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+ .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
System.currentTimeMillis());
int i = 0;
for (Iterator<CarbonRowBatch> iterator : iterators) {
- String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
+ String[] storeLocation = getStoreLocation(tableIdentifier);
CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
.createCarbonFactDataHandlerModel(configuration, storeLocation, i, 0);
@@ -147,10 +148,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
processingComplete(dataHandler);
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+ .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
System.currentTimeMillis());
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+ .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID,
+ System.currentTimeMillis());
}
private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index be27866..0eadc7f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -319,7 +319,7 @@ public final class CarbonDataMergerUtil {
// create entry for merged one.
LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
- loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
+ loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID);
loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
long loadEnddate = CarbonUpdateUtil.readCurrentTime();
loadMetadataDetails.setLoadEndTime(loadEnddate);
@@ -676,7 +676,7 @@ public final class CarbonDataMergerUtil {
CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
CarbonTablePath carbonTablePath =
CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier);
- return carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+ return carbonTablePath.getCarbonDataDirectoryPath(segmentId);
}
@@ -1036,7 +1036,7 @@ public final class CarbonDataMergerUtil {
CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg);
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(seg);
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
CarbonFile[] allSegmentFiles = segDir.listFiles();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 2480a39..ff65db2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -372,7 +372,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
return SortParameters
.createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName,
dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount,
- noDictionaryCount, carbonLoadModel.getPartitionId(), segmentId,
+ noDictionaryCount, segmentId,
carbonLoadModel.getTaskNo(), noDictionaryColMapping, true);
}
@@ -422,7 +422,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
private void initTempStoreLocation() {
tempStoreLocation = CarbonDataProcessorUtil
.getLocalDataFolderLocation(carbonLoadModel.getDatabaseName(), tableName,
- carbonLoadModel.getTaskNo(), carbonLoadModel.getPartitionId(), segmentId,
+ carbonLoadModel.getTaskNo(), segmentId,
true, false);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index a2248ee..0c0b2b0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -356,7 +357,7 @@ public class SortParameters implements Serializable {
CarbonProperties carbonProperties = CarbonProperties.getInstance();
parameters.setDatabaseName(tableIdentifier.getDatabaseName());
parameters.setTableName(tableIdentifier.getTableName());
- parameters.setPartitionID(configuration.getPartitionId());
+ parameters.setPartitionID("0");
parameters.setSegmentId(configuration.getSegmentId());
parameters.setTaskNo(configuration.getTaskNo());
parameters.setMeasureColCount(configuration.getMeasureCount());
@@ -392,10 +393,9 @@ public class SortParameters implements Serializable {
LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
- String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
- tableIdentifier.getTableName(), configuration.getTaskNo(),
- configuration.getPartitionId(), configuration.getSegmentId(), false, false);
+ String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+ tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(),
+ configuration.getTaskNo(), configuration.getSegmentId(), false, false);
String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -444,13 +444,13 @@ public class SortParameters implements Serializable {
public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
String tableName, int dimColCount, int complexDimColCount, int measureColCount,
- int noDictionaryCount, String partitionID, String segmentId, String taskNo,
+ int noDictionaryCount, String segmentId, String taskNo,
boolean[] noDictionaryColMaping, boolean isCompactionFlow) {
SortParameters parameters = new SortParameters();
CarbonProperties carbonProperties = CarbonProperties.getInstance();
parameters.setDatabaseName(databaseName);
parameters.setTableName(tableName);
- parameters.setPartitionID(partitionID);
+ parameters.setPartitionID(CarbonTablePath.DEPRECATED_PATITION_ID);
parameters.setSegmentId(segmentId);
parameters.setTaskNo(taskNo);
parameters.setMeasureColCount(measureColCount);
@@ -482,7 +482,7 @@ public class SortParameters implements Serializable {
LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize());
String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId,
+ .getLocalDataFolderLocation(databaseName, tableName, taskNo, segmentId,
isCompactionFlow, false);
String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index d15152c..9f3c86f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -309,7 +309,7 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
String carbonDataDirectoryPath = CarbonDataProcessorUtil
.checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
- tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
+ tableName, loadModel.getSegmentId());
carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);
boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
@@ -336,9 +336,8 @@ public class CarbonFactDataHandlerModel {
private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) {
AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
- String carbonDataDirectoryPath = carbonTablePath
- .getCarbonDataDirectoryPath(configuration.getPartitionId(),
- configuration.getSegmentId() + "");
+ String carbonDataDirectoryPath =
+ carbonTablePath.getCarbonDataDirectoryPath(configuration.getSegmentId());
CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
return carbonDataDirectoryPath;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 2a4cc00..cfe6e31 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -123,13 +123,11 @@ public final class CarbonDataProcessorUtil {
* @param databaseName
* @param tableName
* @param taskId
- * @param partitionId
* @param segmentId
* @return
*/
public static String[] getLocalDataFolderLocation(String databaseName, String tableName,
- String taskId, String partitionId, String segmentId, boolean isCompactionFlow,
- boolean isAltPartitionFlow) {
+ String taskId, String segmentId, boolean isCompactionFlow, boolean isAltPartitionFlow) {
String tempLocationKey =
getTempStoreLocationKey(databaseName, tableName, segmentId, taskId, isCompactionFlow,
isAltPartitionFlow);
@@ -150,8 +148,7 @@ public final class CarbonDataProcessorUtil {
String tmpStore = baseTmpStorePathArray[i];
CarbonTablePath carbonTablePath =
CarbonStorePath.getCarbonTablePath(tmpStore, carbonTable.getCarbonTableIdentifier());
- String carbonDataDirectoryPath =
- carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId + "");
+ String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + taskId;
}
@@ -378,13 +375,12 @@ public final class CarbonDataProcessorUtil {
* @return data directory path
*/
public static String checkAndCreateCarbonStoreLocation(String factStoreLocation,
- String databaseName, String tableName, String partitionId, String segmentId) {
+ String databaseName, String tableName, String segmentId) {
CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
CarbonTablePath carbonTablePath =
CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier);
- String carbonDataDirectoryPath =
- carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
+ String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
return carbonDataDirectoryPath;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 12fc5c1..7be61d9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -77,10 +77,8 @@ public final class CarbonLoaderUtil {
CarbonTablePath carbonTablePath = CarbonStorePath
.getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
- for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", currentLoad + "");
- deleteStorePath(segmentPath);
- }
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(currentLoad + "");
+ deleteStorePath(segmentPath);
}
/**
@@ -100,7 +98,7 @@ public final class CarbonLoaderUtil {
int fileCount = 0;
int partitionCount = carbonTable.getPartitionCount();
for (int i = 0; i < partitionCount; i++) {
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "",
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(
currentLoad + "");
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
FileFactory.getFileType(segmentPath));
@@ -274,7 +272,7 @@ public final class CarbonLoaderUtil {
private static void addToStaleFolders(CarbonTablePath carbonTablePath,
List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException {
- String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName());
+ String path = carbonTablePath.getCarbonDataDirectoryPath(entry.getLoadName());
// add to the deletion list only if file exist else HDFS file system will throw
// exception while deleting the file if file path does not exist
if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
@@ -843,7 +841,7 @@ public final class CarbonLoaderUtil {
CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
CarbonTablePath carbonTablePath =
CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), carbonTableIdentifier);
- String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+ String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
CarbonUtil.checkAndCreateFolder(segmentFolder);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index 02ab1d8..f9f3e20 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -48,15 +48,14 @@ public final class DeleteLoadFolders {
* returns segment path
*
* @param absoluteTableIdentifier
- * @param partitionId
* @param oneLoad
* @return
*/
private static String getSegmentPath(AbsoluteTableIdentifier absoluteTableIdentifier,
- int partitionId, LoadMetadataDetails oneLoad) {
+ LoadMetadataDetails oneLoad) {
CarbonTablePath carbon = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
String segmentId = oneLoad.getLoadName();
- return carbon.getCarbonDataDirectoryPath("" + partitionId, segmentId);
+ return carbon.getCarbonDataDirectoryPath(segmentId);
}
public static void physicalFactAndMeasureMetadataDeletion(
@@ -64,7 +63,7 @@ public final class DeleteLoadFolders {
LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
for (LoadMetadataDetails oneLoad : currentDetails) {
if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
- String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
+ String path = getSegmentPath(absoluteTableIdentifier, oneLoad);
boolean status = false;
try {
if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index e662757..7f0aef6 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -163,7 +163,6 @@ public class StoreCreator {
loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
loadModel.setTaskNo("0");
loadModel.setSegmentId("0");
- loadModel.setPartitionId("0");
loadModel.setFactTimeStamp(System.currentTimeMillis());
loadModel.setMaxColumns("10");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index a96ab32..36a5a15 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -105,7 +105,6 @@ class StreamHandoffRDD[K, V](
split: Partition,
context: TaskContext
): Iterator[(K, V)] = {
- carbonLoadModel.setPartitionId("0")
carbonLoadModel.setTaskNo("" + split.index)
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
CarbonMetadata.getInstance().addCarbonTable(carbonTable)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 1c7be6a..f2274be 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -135,7 +135,7 @@ object StreamSinkFactory {
FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType)
}
val segmentId = StreamSegment.open(carbonTable)
- val segmentDir = carbonTablePath.getSegmentDir("0", segmentId)
+ val segmentDir = carbonTablePath.getSegmentDir(segmentId)
if (FileFactory.isFileExist(segmentDir, fileType)) {
// recover fault
StreamSegment.recoverSegmentIfRequired(segmentDir)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/952665a8/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 849bf99..45bc19a 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -149,12 +149,12 @@ class CarbonAppendableStreamSink(
* if the directory size of current segment beyond the threshold, hand off new segment
*/
private def checkOrHandOffSegment(): Unit = {
- val segmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
+ val segmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
val fileType = FileFactory.getFileType(segmentDir)
if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
currentSegmentId = newSegmentId
- val newSegmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
+ val newSegmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
FileFactory.mkdirs(newSegmentDir, fileType)
// TODO trigger hand off operation
@@ -251,14 +251,14 @@ object CarbonAppendableStreamSink {
// update data file info in index file
val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- StreamSegment.updateIndexFile(tablePath.getSegmentDir("0", segmentId))
+ StreamSegment.updateIndexFile(tablePath.getSegmentDir(segmentId))
} catch {
// catch fault of executor side
case t: Throwable =>
val tablePath =
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- val segmentDir = tablePath.getSegmentDir("0", segmentId)
+ val segmentDir = tablePath.getSegmentDir(segmentId)
StreamSegment.recoverSegmentIfRequired(segmentDir)
LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
committer.abortJob(job)