You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/08 18:05:26 UTC
[06/30] carbondata git commit: [CARBONDATA-2025] Unify all path
construction through CarbonTablePath static method
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index a8db6c9..bbc3697 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -56,43 +55,39 @@ public class TableProcessingOperations {
*/
public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
final boolean isCompactionFlow) throws IOException {
- String metaDataLocation = carbonTable.getMetaDataFilepath();
+ String metaDataLocation = carbonTable.getMetadataPath();
final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier());
//delete folder which metadata no exist in tablestatus
- for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
- String partitionPath = carbonTablePath.getPartitionDir();
- FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
- if (FileFactory.isFileExist(partitionPath, fileType)) {
- CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
- CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
- @Override public boolean accept(CarbonFile path) {
- String segmentId =
- CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
- boolean found = false;
- for (int j = 0; j < details.length; j++) {
- if (details[j].getLoadName().equals(segmentId)) {
- found = true;
- break;
- }
- }
- return !found;
- }
- });
- for (int k = 0; k < listFiles.length; k++) {
+ String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
+ FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
+ if (FileFactory.isFileExist(partitionPath, fileType)) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile path) {
String segmentId =
- CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
- if (isCompactionFlow) {
- if (segmentId.contains(".")) {
- CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
- }
- } else {
- if (!segmentId.contains(".")) {
- CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+ CarbonTablePath.DataFileUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+ boolean found = false;
+ for (int j = 0; j < details.length; j++) {
+ if (details[j].getLoadName().equals(segmentId)) {
+ found = true;
+ break;
}
}
+ return !found;
+ }
+ });
+ for (int k = 0; k < listFiles.length; k++) {
+ String segmentId =
+ CarbonTablePath.DataFileUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+ if (isCompactionFlow) {
+ if (segmentId.contains(".")) {
+ CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+ }
+ } else {
+ if (!segmentId.contains(".")) {
+ CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 4cd5014..193d192 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -34,8 +34,6 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datatypes.ArrayDataType;
import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -105,12 +103,11 @@ public class FieldEncoderFactory {
ColumnIdentifier parentColumnIdentifier =
new ColumnIdentifier(parentColumnTableRelation.getColumnId(), null,
dataField.getColumn().getDataType());
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
AbsoluteTableIdentifier parentAbsoluteTableIdentifier =
AbsoluteTableIdentifier.from(
- CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier.getTableName()),
- parentTableIdentifier);
+ CarbonTablePath.getNewTablePath(
+ absoluteTableIdentifier.getTablePath(), parentTableIdentifier.getTableName()),
+ parentTableIdentifier);
identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier,
parentColumnIdentifier, dataField.getColumn().getDataType());
return new DictionaryFieldConverterImpl(dataField, cache, parentAbsoluteTableIdentifier,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index d3caa99..a08177a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -19,10 +19,8 @@ package org.apache.carbondata.processing.merger;
import java.util.List;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
@@ -42,13 +40,11 @@ public abstract class AbstractResultProcessor {
public abstract boolean execute(List<RawResultIterator> resultIteratorList);
protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
- CompactionType compactionType, CarbonTable carbonTable,
- CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
+ CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
CarbonDataFileAttributes carbonDataFileAttributes;
if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
- CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(),
- carbonTable.getCarbonTableIdentifier()));
+ loadModel.getTablePath());
// Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
// be written in same segment. So the TaskNo should be incremented by 1 from max val.
long index = taskNo + 1;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 2a69f0d..a4d3d2b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -272,7 +272,7 @@ public class CarbonCompactionUtil {
public static CarbonTable getNextTableToCompact(CarbonTable[] carbonTables,
List<CarbonTableIdentifier> skipList) {
for (CarbonTable ctable : carbonTables) {
- String metadataPath = ctable.getMetaDataFilepath();
+ String metadataPath = ctable.getMetadataPath();
// check for the compaction required file and at the same time exclude the tables which are
// present in the skip list.
if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 0eadc7f..c43dbf9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -31,7 +31,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
@@ -42,7 +41,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -167,15 +165,13 @@ public final class CarbonDataMergerUtil {
// End Timestamp.
// Table Update Status Metadata Update.
- AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier identifier =
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
- CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-
SegmentUpdateStatusManager segmentUpdateStatusManager =
- new SegmentUpdateStatusManager(absoluteTableIdentifier);
+ new SegmentUpdateStatusManager(identifier);
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
ICarbonLock statusLock = segmentStatusManager.getTableStatusLock();
@@ -222,7 +218,7 @@ public final class CarbonDataMergerUtil {
}
LoadMetadataDetails[] loadDetails =
- segmentStatusManager.readLoadMetadata(metaDataFilepath);
+ SegmentStatusManager.readLoadMetadata(metaDataFilepath);
for (LoadMetadataDetails loadDetail : loadDetails) {
if (loadsToMerge.contains(loadDetail)) {
@@ -235,18 +231,18 @@ public final class CarbonDataMergerUtil {
}
}
- segmentUpdateStatusManager
- .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp);
- segmentStatusManager
- .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails);
+ segmentUpdateStatusManager.writeLoadDetailsIntoFile(
+ Arrays.asList(updateLists), timestamp);
+ SegmentStatusManager.writeLoadDetailsIntoFile(
+ CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()), loadDetails);
status = true;
} else {
LOGGER.error("Not able to acquire the lock.");
status = false;
}
} catch (IOException e) {
- LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath
- .getMetadataDirectoryPath());
+ LOGGER.error("Error while updating metadata. The metadata file path is " +
+ CarbonTablePath.getMetadataPath(identifier.getTablePath()));
status = false;
} finally {
@@ -282,9 +278,9 @@ public final class CarbonDataMergerUtil {
String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel,
CompactionType compactionType) throws IOException {
boolean tableStatusUpdationStatus = false;
- AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier identifier =
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -293,10 +289,7 @@ public final class CarbonDataMergerUtil {
LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
+ carbonLoadModel.getTableName() + " for table status updation ");
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier);
-
- String statusFilePath = carbonTablePath.getTableStatusFilePath();
+ String statusFilePath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
@@ -595,10 +588,6 @@ public final class CarbonDataMergerUtil {
List<LoadMetadataDetails> segmentsToBeMerged =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- CarbonTableIdentifier tableIdentifier =
- carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
-
-
// total length
long totalLength = 0;
@@ -613,7 +602,7 @@ public final class CarbonDataMergerUtil {
String segId = segment.getLoadName();
// variable to store one segment size across partition.
long sizeOfOneSegmentAcrossPartition =
- getSizeOfSegment(tablePath, tableIdentifier, segId);
+ getSizeOfSegment(tablePath, segId);
// if size of a segment is greater than the Major compaction size. then ignore it.
if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
@@ -652,35 +641,17 @@ public final class CarbonDataMergerUtil {
/**
* For calculating the size of the specified segment
* @param tablePath the store path of the segment
- * @param tableIdentifier identifier of table that the segment belong to
* @param segId segment id
* @return the data size of the segment
*/
- private static long getSizeOfSegment(String tablePath,
- CarbonTableIdentifier tableIdentifier, String segId) {
- String loadPath = getStoreLocation(tablePath, tableIdentifier, segId);
+ private static long getSizeOfSegment(String tablePath, String segId) {
+ String loadPath = CarbonTablePath.getSegmentPath(tablePath, segId);
CarbonFile segmentFolder =
FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
return getSizeOfFactFileInLoad(segmentFolder);
}
/**
- * This method will get the store location for the given path, segemnt id and partition id
- *
- * @param tablePath
- * @param carbonTableIdentifier identifier of catbon table that the segment belong to
- * @param segmentId segment id
- * @return the store location of the segment
- */
- private static String getStoreLocation(String tablePath,
- CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier);
- return carbonTablePath.getCarbonDataDirectoryPath(segmentId);
- }
-
-
- /**
* Identify the segments to be merged based on the segment count
*
* @param listOfSegmentsAfterPreserve the list of segments after
@@ -1022,21 +993,19 @@ public final class CarbonDataMergerUtil {
* if UpdateDelta Files are more than IUD Compaction threshold.
*
* @param seg
- * @param absoluteTableIdentifier
+ * @param identifier
* @param segmentUpdateStatusManager
* @param numberDeltaFilesThreshold
* @return
*/
public static Boolean checkUpdateDeltaFilesInSeg(String seg,
- AbsoluteTableIdentifier absoluteTableIdentifier,
+ AbsoluteTableIdentifier identifier,
SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
CarbonFile[] updateDeltaFiles = null;
Set<String> uniqueBlocks = new HashSet<String>();
- CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(seg);
+ String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), seg);
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
CarbonFile[] allSegmentFiles = segDir.listFiles();
@@ -1282,15 +1251,12 @@ public final class CarbonDataMergerUtil {
CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true);
// Update the Table Status.
- String metaDataFilepath = table.getMetaDataFilepath();
- AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
-
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier);
+ String metaDataFilepath = table.getMetadataPath();
+ AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
- String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+ String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -1304,7 +1270,7 @@ public final class CarbonDataMergerUtil {
+ " for table status updation");
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
- segmentStatusManager.readLoadMetadata(metaDataFilepath);
+ SegmentStatusManager.readLoadMetadata(metaDataFilepath);
for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
@@ -1313,7 +1279,7 @@ public final class CarbonDataMergerUtil {
}
}
try {
- segmentStatusManager
+ SegmentStatusManager
.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
} catch (IOException e) {
return false;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index ff65db2..8fc6e66 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -404,8 +404,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
tempStoreLocation);
- setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable,
- carbonFactDataHandlerModel);
+ setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel);
dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel,
CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 3d0700b..6f506b1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -72,8 +72,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
tempStoreLocation);
- setDataFileAttributesInModel(loadModel, compactionType, carbonTable,
- carbonFactDataHandlerModel);
+ setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
carbonFactDataHandlerModel.setCompactionFlow(true);
dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 9f3c86f..bc87823 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -308,8 +307,7 @@ public class CarbonFactDataHandlerModel {
}
carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
String carbonDataDirectoryPath = CarbonDataProcessorUtil
- .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
- tableName, loadModel.getSegmentId());
+ .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getSegmentId());
carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);
boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
@@ -334,10 +332,9 @@ public class CarbonFactDataHandlerModel {
* @return data directory path
*/
private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) {
- AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
- CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+ AbsoluteTableIdentifier identifier = configuration.getTableIdentifier();
String carbonDataDirectoryPath =
- carbonTablePath.getCarbonDataDirectoryPath(configuration.getSegmentId());
+ CarbonTablePath.getSegmentPath(identifier.getTablePath(), configuration.getSegmentId());
CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
return carbonDataDirectoryPath;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index cfe6e31..ccde9e1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.ColumnType;
import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -44,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datatypes.ArrayDataType;
import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -143,12 +141,9 @@ public final class CarbonDataProcessorUtil {
String[] baseTmpStorePathArray = StringUtils.split(baseTempStorePath, File.pathSeparator);
String[] localDataFolderLocArray = new String[baseTmpStorePathArray.length];
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
for (int i = 0 ; i < baseTmpStorePathArray.length; i++) {
String tmpStore = baseTmpStorePathArray[i];
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(tmpStore, carbonTable.getCarbonTableIdentifier());
- String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+ String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(tmpStore, segmentId);
localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + taskId;
}
@@ -375,12 +370,9 @@ public final class CarbonDataProcessorUtil {
* @return data directory path
*/
public static String checkAndCreateCarbonStoreLocation(String factStoreLocation,
- String databaseName, String tableName, String segmentId) {
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
- CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier);
- String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+ String segmentId) {
+ String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(
+ factStoreLocation, segmentId);
CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
return carbonDataDirectoryPath;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 083128d..e6a3323 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -45,7 +45,6 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -54,7 +53,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.merger.NodeBlockRelation;
@@ -73,11 +71,8 @@ public final class CarbonLoaderUtil {
}
public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
- CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
-
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(currentLoad + "");
+ String segmentPath = CarbonTablePath.getSegmentPath(
+ loadModel.getTablePath(), currentLoad + "");
deleteStorePath(segmentPath);
}
@@ -90,33 +85,26 @@ public final class CarbonLoaderUtil {
*/
public static boolean isValidSegment(CarbonLoadModel loadModel,
int currentLoad) {
- CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
- .getCarbonTable();
- CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
- loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
int fileCount = 0;
- int partitionCount = carbonTable.getPartitionCount();
- for (int i = 0; i < partitionCount; i++) {
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(
- currentLoad + "");
- CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
- FileFactory.getFileType(segmentPath));
- CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
-
- @Override
- public boolean accept(CarbonFile file) {
- return file.getName().endsWith(
- CarbonTablePath.getCarbonIndexExtension())
- || file.getName().endsWith(
- CarbonTablePath.getCarbonDataExtension());
- }
-
- });
- fileCount += files.length;
- if (files.length > 0) {
- return true;
+ String segmentPath = CarbonTablePath.getSegmentPath(
+ loadModel.getTablePath(), currentLoad + "");
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
+ FileFactory.getFileType(segmentPath));
+ CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+
+ @Override
+ public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(
+ CarbonTablePath.getCarbonIndexExtension())
+ || file.getName().endsWith(
+ CarbonTablePath.getCarbonDataExtension());
}
+
+ });
+ fileCount += files.length;
+ if (files.length > 0) {
+ return true;
}
if (fileCount == 0) {
return false;
@@ -165,21 +153,20 @@ public final class CarbonLoaderUtil {
CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid)
throws IOException {
boolean status = false;
- AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier identifier =
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
- CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
- String metadataPath = carbonTablePath.getMetadataDirectoryPath();
+ String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath());
FileType fileType = FileFactory.getFileType(metadataPath);
if (!FileFactory.isFileExist(metadataPath, fileType)) {
FileFactory.mkdirs(metadataPath, fileType);
}
String tableStatusPath;
if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !uuid.isEmpty()) {
- tableStatusPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid);
+ tableStatusPath = CarbonTablePath.getTableStatusFilePathWithUUID(uuid);
} else {
- tableStatusPath = carbonTablePath.getTableStatusFilePath();
+ tableStatusPath = CarbonTablePath.getTableStatusFilePath();
}
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
int retryCount = CarbonLockUtil
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
@@ -193,7 +180,8 @@ public final class CarbonLoaderUtil {
"Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
+ " for table status updation");
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
- SegmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+ SegmentStatusManager.readLoadMetadata(
+ CarbonTablePath.getMetadataPath(identifier.getTablePath()));
List<LoadMetadataDetails> listOfLoadFolderDetails =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<CarbonFile> staleFolders = new ArrayList<>();
@@ -220,12 +208,12 @@ public final class CarbonLoaderUtil {
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
&& SegmentStatusManager.isLoadInProgress(
- absoluteTableIdentifier, entry.getLoadName())) {
+ identifier, entry.getLoadName())) {
throw new RuntimeException("Already insert overwrite is in progress");
} else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
&& entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
&& SegmentStatusManager.isLoadInProgress(
- absoluteTableIdentifier, entry.getLoadName())) {
+ identifier, entry.getLoadName())) {
throw new RuntimeException("Already insert into or load is in progress");
}
}
@@ -248,7 +236,7 @@ public final class CarbonLoaderUtil {
entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
// For insert overwrite, we will delete the old segment folder immediately
// So collect the old segments here
- addToStaleFolders(carbonTablePath, staleFolders, entry);
+ addToStaleFolders(identifier, staleFolders, entry);
}
}
}
@@ -257,7 +245,7 @@ public final class CarbonLoaderUtil {
// when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE
// so empty segment folder should be deleted
if (newMetaEntry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) {
- addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry);
+ addToStaleFolders(identifier, staleFolders, newMetaEntry);
}
SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
@@ -291,9 +279,10 @@ public final class CarbonLoaderUtil {
return status;
}
- private static void addToStaleFolders(CarbonTablePath carbonTablePath,
+ private static void addToStaleFolders(AbsoluteTableIdentifier identifier,
List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException {
- String path = carbonTablePath.getCarbonDataDirectoryPath(entry.getLoadName());
+ String path = CarbonTablePath.getSegmentPath(
+ identifier.getTablePath(), entry.getLoadName());
// add to the deletion list only if file exist else HDFS file system will throw
// exception while deleting the file if file path does not exist
if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
@@ -319,11 +308,9 @@ public final class CarbonLoaderUtil {
loadMetadataDetails.setLoadStartTime(loadStartTime);
}
- public static void writeLoadMetadata(AbsoluteTableIdentifier absoluteTableIdentifier,
+ public static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
- String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+ String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
DataOutputStream dataOutputStream;
Gson gsonObjectToWrite = new Gson();
@@ -871,10 +858,8 @@ public final class CarbonLoaderUtil {
* This method will get the store location for the given path, segment id and partition id
*/
public static void checkAndCreateCarbonDataLocation(String segmentId, CarbonTable carbonTable) {
- CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), carbonTableIdentifier);
- String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+ String segmentFolder = CarbonTablePath.getSegmentPath(
+ carbonTable.getTablePath(), segmentId);
CarbonUtil.checkAndCreateFolder(segmentFolder);
}
@@ -903,10 +888,8 @@ public final class CarbonLoaderUtil {
*/
public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails,
String segmentId, CarbonTable carbonTable) throws IOException {
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier()));
Map<String, Long> dataIndexSize =
- CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId);
+ CarbonUtil.getDataSizeAndIndexSize(carbonTable.getTablePath(), segmentId);
Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
loadMetadataDetails.setDataSize(String.valueOf(dataSize));
Long indexSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index f9f3e20..1fdce32 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -32,7 +32,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
public final class DeleteLoadFolders {
@@ -47,15 +46,14 @@ public final class DeleteLoadFolders {
/**
* returns segment path
*
- * @param absoluteTableIdentifier
+ * @param identifier
* @param oneLoad
* @return
*/
- private static String getSegmentPath(AbsoluteTableIdentifier absoluteTableIdentifier,
+ private static String getSegmentPath(AbsoluteTableIdentifier identifier,
LoadMetadataDetails oneLoad) {
- CarbonTablePath carbon = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
String segmentId = oneLoad.getLoadName();
- return carbon.getCarbonDataDirectoryPath(segmentId);
+ return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
}
public static void physicalFactAndMeasureMetadataDeletion(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index 63320ef..53f1296 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -71,12 +71,12 @@ public class BlockIndexStoreTest extends TestCase {
// file.length(), ColumnarFormatVersion.V1, null);
// CarbonTableIdentifier carbonTableIdentifier =
// new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-// AbsoluteTableIdentifier absoluteTableIdentifier =
+// AbsoluteTableIdentifier identifier =
// new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
// try {
//
// List<TableBlockUniqueIdentifier> tableBlockInfoList =
-// getTableBlockUniqueIdentifierList(Arrays.asList(new TableBlockInfo[] { info }), absoluteTableIdentifier);
+// getTableBlockUniqueIdentifierList(Arrays.asList(new TableBlockInfo[] { info }), identifier);
// List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockInfoList);
// assertTrue(loadAndGetBlocks.size() == 1);
// } catch (Exception e) {
@@ -84,7 +84,7 @@ public class BlockIndexStoreTest extends TestCase {
// }
// List<String> segmentIds = new ArrayList<>();
// segmentIds.add(info.getSegmentId());
-// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
+// cache.removeTableBlocks(segmentIds, identifier);
// }
//
private List<TableBlockUniqueIdentifier> getTableBlockUniqueIdentifierList(List<TableBlockInfo> tableBlockInfos,
@@ -120,19 +120,19 @@ public class BlockIndexStoreTest extends TestCase {
//
// CarbonTableIdentifier carbonTableIdentifier =
// new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-// AbsoluteTableIdentifier absoluteTableIdentifier =
+// AbsoluteTableIdentifier identifier =
// new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
// ExecutorService executor = Executors.newFixedThreadPool(3);
// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
-// absoluteTableIdentifier));
+// identifier));
// executor.submit(
// new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
-// absoluteTableIdentifier));
+// identifier));
// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
-// absoluteTableIdentifier));
+// identifier));
// executor.submit(
// new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
-// absoluteTableIdentifier));
+// identifier));
// executor.shutdown();
// try {
// executor.awaitTermination(1, TimeUnit.DAYS);
@@ -143,7 +143,7 @@ public class BlockIndexStoreTest extends TestCase {
// Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 });
// try {
// List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
-// getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier);
+// getTableBlockUniqueIdentifierList(tableBlockInfos, identifier);
// List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockUniqueIdentifiers);
// assertTrue(loadAndGetBlocks.size() == 5);
// } catch (Exception e) {
@@ -153,7 +153,7 @@ public class BlockIndexStoreTest extends TestCase {
// for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
// segmentIds.add(tableBlockInfo.getSegmentId());
// }
-// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
+// cache.removeTableBlocks(segmentIds, identifier);
// }
//
// public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently()
@@ -191,18 +191,18 @@ public class BlockIndexStoreTest extends TestCase {
//
// CarbonTableIdentifier carbonTableIdentifier =
// new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-// AbsoluteTableIdentifier absoluteTableIdentifier =
+// AbsoluteTableIdentifier identifier =
// new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
// ExecutorService executor = Executors.newFixedThreadPool(3);
// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
-// absoluteTableIdentifier));
+// identifier));
// executor.submit(
// new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
-// absoluteTableIdentifier));
+// identifier));
// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info5, info6 }),
-// absoluteTableIdentifier));
+// identifier));
// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info7 }),
-// absoluteTableIdentifier));
+// identifier));
//
// executor.shutdown();
// try {
@@ -215,7 +215,7 @@ public class BlockIndexStoreTest extends TestCase {
// .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6, info7 });
// try {
// List<TableBlockUniqueIdentifier> blockUniqueIdentifierList =
-// getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier);
+// getTableBlockUniqueIdentifierList(tableBlockInfos, identifier);
// List<AbstractIndex> loadAndGetBlocks = cache.getAll(blockUniqueIdentifierList);
// assertTrue(loadAndGetBlocks.size() == 8);
// } catch (Exception e) {
@@ -225,7 +225,7 @@ public class BlockIndexStoreTest extends TestCase {
// for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
// segmentIds.add(tableBlockInfo.getSegmentId());
// }
-// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
+// cache.removeTableBlocks(segmentIds, identifier);
// }
private class BlockLoaderThread implements Callable<Void> {
@@ -248,7 +248,7 @@ public class BlockIndexStoreTest extends TestCase {
}
private static File getPartFile() {
- String path = StoreCreator.getAbsoluteTableIdentifier().getTablePath()
+ String path = StoreCreator.getIdentifier().getTablePath()
+ "/Fact/Part0/Segment_0";
File file = new File(path);
File[] files = file.listFiles();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 7f0aef6..d42dcde 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -64,7 +64,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
@@ -98,14 +97,14 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
*/
public class StoreCreator {
- private static AbsoluteTableIdentifier absoluteTableIdentifier;
+ private static AbsoluteTableIdentifier identifier;
private static String storePath = "";
static {
try {
storePath = new File("target/store").getCanonicalPath();
String dbName = "testdb";
String tableName = "testtable";
- absoluteTableIdentifier =
+ identifier =
AbsoluteTableIdentifier.from(
storePath + "/testdb/testtable",
new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
@@ -114,8 +113,8 @@ public class StoreCreator {
}
}
- public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
- return absoluteTableIdentifier;
+ public static AbsoluteTableIdentifier getIdentifier() {
+ return identifier;
}
/**
@@ -134,12 +133,12 @@ public class StoreCreator {
CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
CarbonLoadModel loadModel = new CarbonLoadModel();
loadModel.setCarbonDataLoadSchema(schema);
- loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
- loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
- loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+ loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
+ loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
+ loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
loadModel.setFactFilePath(factFilePath);
loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
- loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
+ loadModel.setTablePath(identifier.getTablePath());
loadModel.setDateFormat(null);
loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -175,9 +174,9 @@ public class StoreCreator {
private static CarbonTable createTable() throws IOException {
TableInfo tableInfo = new TableInfo();
- tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
+ tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
TableSchema tableSchema = new TableSchema();
- tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+ tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
ArrayList<Encoding> encodings = new ArrayList<>();
encodings.add(Encoding.DICTIONARY);
@@ -257,16 +256,13 @@ public class StoreCreator {
tableSchema.setSchemaEvalution(schemaEvol);
tableSchema.setTableId(UUID.randomUUID().toString());
tableInfo.setTableUniqueName(
- absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName()
+ identifier.getCarbonTableIdentifier().getTableUniqueName()
);
tableInfo.setLastUpdatedTime(System.currentTimeMillis());
tableInfo.setFactTable(tableSchema);
- tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
+ tableInfo.setTablePath(identifier.getTablePath());
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
- String schemaFilePath = carbonTablePath.getSchemaFilePath();
+ String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
@@ -329,7 +325,7 @@ public class StoreCreator {
writer.close();
writer.commit();
Dictionary dict = (Dictionary) dictCache.get(
- new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
+ new DictionaryColumnUniqueIdentifier(identifier,
columnIdentifier, dims.get(i).getDataType()));
CarbonDictionarySortInfoPreparator preparator =
new CarbonDictionarySortInfoPreparator();
@@ -444,7 +440,7 @@ public class StoreCreator {
loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
listOfLoadFolderDetails.add(loadMetadataDetails);
- String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + File.separator
+ String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator
+ CarbonCommonConstants.LOADMETADATA_FILENAME;
DataOutputStream dataOutputStream;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 7b823ac..8c9889d 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -38,7 +38,6 @@ import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
import org.apache.carbondata.format.BlockIndex;
@@ -60,8 +59,6 @@ public class StreamSegment {
* get stream segment or create new stream segment if not exists
*/
public static String open(CarbonTable table) throws IOException {
- CarbonTablePath tablePath =
- CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(table.getAbsoluteTableIdentifier());
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -72,7 +69,8 @@ public class StreamSegment {
+ " for stream table get or create segment");
LoadMetadataDetails[] details =
- SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+ SegmentStatusManager.readLoadMetadata(
+ CarbonTablePath.getMetadataPath(table.getTablePath()));
LoadMetadataDetails streamSegment = null;
for (LoadMetadataDetails detail : details) {
if (FileFormat.ROW_V1 == detail.getFileFormat()) {
@@ -97,8 +95,8 @@ public class StreamSegment {
newDetails[i] = details[i];
}
newDetails[i] = newDetail;
- SegmentStatusManager
- .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails);
+ SegmentStatusManager.writeLoadDetailsIntoFile(
+ CarbonTablePath.getTableStatusFilePath(table.getTablePath()), newDetails);
return newDetail.getLoadName();
} else {
return streamSegment.getLoadName();
@@ -126,8 +124,6 @@ public class StreamSegment {
*/
public static String close(CarbonTable table, String segmentId)
throws IOException {
- CarbonTablePath tablePath =
- CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(table.getAbsoluteTableIdentifier());
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -138,7 +134,8 @@ public class StreamSegment {
+ " for stream table finish segment");
LoadMetadataDetails[] details =
- SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+ SegmentStatusManager.readLoadMetadata(
+ CarbonTablePath.getMetadataPath(table.getTablePath()));
for (LoadMetadataDetails detail : details) {
if (segmentId.equals(detail.getLoadName())) {
detail.setLoadEndTime(System.currentTimeMillis());
@@ -162,7 +159,8 @@ public class StreamSegment {
}
newDetails[i] = newDetail;
SegmentStatusManager
- .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails);
+ .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
+ table.getTablePath()), newDetails);
return newDetail.getLoadName();
} else {
LOGGER.error(
@@ -192,7 +190,7 @@ public class StreamSegment {
try {
if (statusLock.lockWithRetries()) {
LoadMetadataDetails[] details =
- SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
boolean updated = false;
for (LoadMetadataDetails detail : details) {
if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
@@ -202,10 +200,8 @@ public class StreamSegment {
}
}
if (updated) {
- CarbonTablePath tablePath =
- CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
SegmentStatusManager.writeLoadDetailsIntoFile(
- tablePath.getTableStatusFilePath(),
+ CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()),
details);
}
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index ec6ab1a..5c6165d 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
@@ -220,7 +220,6 @@ object StreamHandoffRDD {
): Unit = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val identifier = carbonTable.getAbsoluteTableIdentifier
- val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
var continueHandoff = false
// require handoff lock on table
val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK)
@@ -237,7 +236,7 @@ object StreamHandoffRDD {
try {
if (statusLock.lockWithRetries()) {
loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
- tablePath.getMetadataDirectoryPath)
+ CarbonTablePath.getMetadataPath(identifier.getTablePath))
}
} finally {
if (null != statusLock) {
@@ -359,19 +358,16 @@ object StreamHandoffRDD {
loadModel: CarbonLoadModel
): Boolean = {
var status = false
- val metaDataFilepath =
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath()
- val identifier =
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier()
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
- val metadataPath = carbonTablePath.getMetadataDirectoryPath()
+ val metaDataFilepath = loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
+ val identifier = loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier
+ val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath)
val fileType = FileFactory.getFileType(metadataPath)
if (!FileFactory.isFileExist(metadataPath, fileType)) {
FileFactory.mkdirs(metadataPath, fileType)
}
- val tableStatusPath = carbonTablePath.getTableStatusFilePath()
+ val tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath)
val segmentStatusManager = new SegmentStatusManager(identifier)
- val carbonLock = segmentStatusManager.getTableStatusLock()
+ val carbonLock = segmentStatusManager.getTableStatusLock
try {
if (carbonLock.lockWithRetries()) {
LOGGER.info(
@@ -404,7 +400,7 @@ object StreamHandoffRDD {
status = true
} else {
LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
- .getDatabaseName() + "." + loadModel.getTableName());
+ .getDatabaseName() + "." + loadModel.getTableName())
}
} finally {
if (carbonLock.unlock()) {
@@ -415,6 +411,6 @@ object StreamHandoffRDD {
"." + loadModel.getTableName() + " during table status updation")
}
}
- return status
+ status
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index f2274be..c417fbe 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceP
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -126,16 +126,14 @@ object StreamSinkFactory {
* @return
*/
private def getStreamSegmentId(carbonTable: CarbonTable): String = {
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- val fileType = FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath)
- if (!FileFactory.isFileExist(carbonTablePath.getMetadataDirectoryPath, fileType)) {
+ val segmentId = StreamSegment.open(carbonTable)
+ val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+ val fileType = FileFactory.getFileType(segmentDir)
+ if (!FileFactory.isFileExist(segmentDir, fileType)) {
// Create table directory path, in case of enabling hive metastore first load may not have
// table folder created.
- FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType)
+ FileFactory.mkdirs(segmentDir, fileType)
}
- val segmentId = StreamSegment.open(carbonTable)
- val segmentDir = carbonTablePath.getSegmentDir(segmentId)
if (FileFactory.isFileExist(segmentDir, fileType)) {
// recover fault
StreamSegment.recoverSegmentIfRequired(segmentDir)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6a6924cb/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 45bc19a..ff483e5 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.stats.QueryStatistic
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -62,9 +62,7 @@ class CarbonAppendableStreamSink(
carbonLoadModel: CarbonLoadModel,
server: Option[DictionaryServer]) extends Sink {
- private val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- private val fileLogPath = carbonTablePath.getStreamingLogDir
+ private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
// prepare configuration
private val hadoopConf = {
@@ -149,12 +147,12 @@ class CarbonAppendableStreamSink(
* if the directory size of current segment beyond the threshold, hand off new segment
*/
private def checkOrHandOffSegment(): Unit = {
- val segmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
+ val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
val fileType = FileFactory.getFileType(segmentDir)
if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
currentSegmentId = newSegmentId
- val newSegmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
+ val newSegmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
FileFactory.mkdirs(newSegmentDir, fileType)
// TODO trigger hand off operation
@@ -250,15 +248,13 @@ object CarbonAppendableStreamSink {
}
// update data file info in index file
- val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- StreamSegment.updateIndexFile(tablePath.getSegmentDir(segmentId))
+ StreamSegment.updateIndexFile(
+ CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId))
} catch {
// catch fault of executor side
case t: Throwable =>
- val tablePath =
- CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- val segmentDir = tablePath.getSegmentDir(segmentId)
+ val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
StreamSegment.recoverSegmentIfRequired(segmentDir)
LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
committer.abortJob(job)