You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 16:38:54 UTC
[08/50] carbondata git commit: [CARBONDATA-2025] Unify all path
construction through CarbonTablePath static method
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index 2a69f0d..a4d3d2b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -272,7 +272,7 @@ public class CarbonCompactionUtil {
public static CarbonTable getNextTableToCompact(CarbonTable[] carbonTables,
List<CarbonTableIdentifier> skipList) {
for (CarbonTable ctable : carbonTables) {
- String metadataPath = ctable.getMetaDataFilepath();
+ String metadataPath = ctable.getMetadataPath();
// check for the compaction required file and at the same time exclude the tables which are
// present in the skip list.
if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index c141636..89326a3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -169,15 +169,13 @@ public final class CarbonDataMergerUtil {
// End Timestamp.
// Table Update Status Metadata Update.
- AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier identifier =
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
- CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-
SegmentUpdateStatusManager segmentUpdateStatusManager =
- new SegmentUpdateStatusManager(absoluteTableIdentifier);
+ new SegmentUpdateStatusManager(identifier);
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
ICarbonLock statusLock = segmentStatusManager.getTableStatusLock();
@@ -224,7 +222,7 @@ public final class CarbonDataMergerUtil {
}
LoadMetadataDetails[] loadDetails =
- segmentStatusManager.readLoadMetadata(metaDataFilepath);
+ SegmentStatusManager.readLoadMetadata(metaDataFilepath);
for (LoadMetadataDetails loadDetail : loadDetails) {
if (loadsToMerge.contains(loadDetail)) {
@@ -237,18 +235,18 @@ public final class CarbonDataMergerUtil {
}
}
- segmentUpdateStatusManager
- .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp);
- segmentStatusManager
- .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails);
+ segmentUpdateStatusManager.writeLoadDetailsIntoFile(
+ Arrays.asList(updateLists), timestamp);
+ SegmentStatusManager.writeLoadDetailsIntoFile(
+ CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()), loadDetails);
status = true;
} else {
LOGGER.error("Not able to acquire the lock.");
status = false;
}
} catch (IOException e) {
- LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath
- .getMetadataDirectoryPath());
+ LOGGER.error("Error while updating metadata. The metadata file path is " +
+ CarbonTablePath.getMetadataPath(identifier.getTablePath()));
status = false;
} finally {
@@ -284,9 +282,9 @@ public final class CarbonDataMergerUtil {
String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel,
CompactionType compactionType, String segmentFile) throws IOException {
boolean tableStatusUpdationStatus = false;
- AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier identifier =
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -295,10 +293,7 @@ public final class CarbonDataMergerUtil {
LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
+ carbonLoadModel.getTableName() + " for table status updation ");
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier);
-
- String statusFilePath = carbonTablePath.getTableStatusFilePath();
+ String statusFilePath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
@@ -617,8 +612,6 @@ public final class CarbonDataMergerUtil {
// variable to store one segment size across partition.
long sizeOfOneSegmentAcrossPartition;
if (segment.getSegmentFile() != null) {
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier());
sizeOfOneSegmentAcrossPartition = CarbonUtil
.getSizeOfSegment(carbonTablePath, new Segment(segId, segment.getSegmentFile()));
} else {
@@ -662,35 +655,17 @@ public final class CarbonDataMergerUtil {
/**
* For calculating the size of the specified segment
* @param tablePath the store path of the segment
- * @param tableIdentifier identifier of table that the segment belong to
* @param segId segment id
* @return the data size of the segment
*/
- private static long getSizeOfSegment(String tablePath,
- CarbonTableIdentifier tableIdentifier, String segId) {
- String loadPath = getStoreLocation(tablePath, tableIdentifier, segId);
+ private static long getSizeOfSegment(String tablePath, String segId) {
+ String loadPath = CarbonTablePath.getSegmentPath(tablePath, segId);
CarbonFile segmentFolder =
FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
return getSizeOfFactFileInLoad(segmentFolder);
}
/**
- * This method will get the store location for the given path, segemnt id and partition id
- *
- * @param tablePath
- * @param carbonTableIdentifier identifier of catbon table that the segment belong to
- * @param segmentId segment id
- * @return the store location of the segment
- */
- private static String getStoreLocation(String tablePath,
- CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier);
- return carbonTablePath.getCarbonDataDirectoryPath(segmentId);
- }
-
-
- /**
* Identify the segments to be merged based on the segment count
*
* @param listOfSegmentsAfterPreserve the list of segments after
@@ -1033,7 +1008,7 @@ public final class CarbonDataMergerUtil {
* if UpdateDelta Files are more than IUD Compaction threshold.
*
* @param seg
- * @param absoluteTableIdentifier
+ * @param identifier
* @param segmentUpdateStatusManager
* @param numberDeltaFilesThreshold
* @return
@@ -1045,9 +1020,7 @@ public final class CarbonDataMergerUtil {
CarbonFile[] updateDeltaFiles = null;
Set<String> uniqueBlocks = new HashSet<String>();
- CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(seg.getSegmentNo());
+ String segmentPath = CarbonTablePath.getSegmentPath(absoluteTableIdentifier.getTablePath(), seg.getSegmentNo());
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
CarbonFile[] allSegmentFiles = segDir.listFiles();
@@ -1295,15 +1268,12 @@ public final class CarbonDataMergerUtil {
CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true);
// Update the Table Status.
- String metaDataFilepath = table.getMetaDataFilepath();
- AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
-
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier);
+ String metaDataFilepath = table.getMetadataPath();
+ AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
- String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+ String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -1317,7 +1287,7 @@ public final class CarbonDataMergerUtil {
+ " for table status updation");
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
- segmentStatusManager.readLoadMetadata(metaDataFilepath);
+ SegmentStatusManager.readLoadMetadata(metaDataFilepath);
for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
@@ -1326,7 +1296,7 @@ public final class CarbonDataMergerUtil {
}
}
try {
- segmentStatusManager
+ SegmentStatusManager
.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
} catch (IOException e) {
return false;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 732a7e8..5062a78 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -337,9 +336,8 @@ public class CarbonFactDataHandlerModel {
return configuration.getDataWritePath();
}
AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
- CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
- String carbonDataDirectoryPath = carbonTablePath
- .getCarbonDataDirectoryPath(configuration.getSegmentId() + "");
+ String carbonDataDirectoryPath = CarbonTablePath
+ .getSegmentPath(absoluteTableIdentifier.getTablePath(), configuration.getSegmentId() + "");
CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
return carbonDataDirectoryPath;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index e319160..2c08c18 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -44,7 +44,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datatypes.ArrayDataType;
import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -143,12 +142,9 @@ public final class CarbonDataProcessorUtil {
String[] baseTmpStorePathArray = StringUtils.split(baseTempStorePath, File.pathSeparator);
String[] localDataFolderLocArray = new String[baseTmpStorePathArray.length];
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
for (int i = 0 ; i < baseTmpStorePathArray.length; i++) {
String tmpStore = baseTmpStorePathArray[i];
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(tmpStore, carbonTable.getCarbonTableIdentifier());
- String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+ String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(tmpStore, segmentId);
localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + taskId;
}
@@ -376,16 +372,12 @@ public final class CarbonDataProcessorUtil {
* @return data directory path
*/
public static String createCarbonStoreLocation(String factStoreLocation,
- String databaseName, String tableName, String partitionId, String segmentId) {
+ String databaseName, String tableName, String segmentId) {
CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
- CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier);
- String carbonDataDirectoryPath =
- carbonTablePath.getCarbonDataDirectoryPath(segmentId);
- return carbonDataDirectoryPath;
+ return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
}
+
/**
* initialise data type for measures for their storage format
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index c135a88..e7c52f6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -46,7 +46,6 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -55,7 +54,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.merger.NodeBlockRelation;
@@ -74,11 +72,8 @@ public final class CarbonLoaderUtil {
}
public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
- CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
-
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(currentLoad + "");
+ String segmentPath = CarbonTablePath.getSegmentPath(
+ loadModel.getTablePath(), currentLoad + "");
deleteStorePath(segmentPath);
}
@@ -91,33 +86,26 @@ public final class CarbonLoaderUtil {
*/
public static boolean isValidSegment(CarbonLoadModel loadModel,
int currentLoad) {
- CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
- .getCarbonTable();
- CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
- loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier());
int fileCount = 0;
- int partitionCount = carbonTable.getPartitionCount();
- for (int i = 0; i < partitionCount; i++) {
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(
- currentLoad + "");
- CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
- FileFactory.getFileType(segmentPath));
- CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
-
- @Override
- public boolean accept(CarbonFile file) {
- return file.getName().endsWith(
- CarbonTablePath.getCarbonIndexExtension())
- || file.getName().endsWith(
- CarbonTablePath.getCarbonDataExtension());
- }
-
- });
- fileCount += files.length;
- if (files.length > 0) {
- return true;
+ String segmentPath = CarbonTablePath.getSegmentPath(
+ loadModel.getTablePath(), currentLoad + "");
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
+ FileFactory.getFileType(segmentPath));
+ CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+
+ @Override
+ public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(
+ CarbonTablePath.getCarbonIndexExtension())
+ || file.getName().endsWith(
+ CarbonTablePath.getCarbonDataExtension());
}
+
+ });
+ fileCount += files.length;
+ if (files.length > 0) {
+ return true;
}
if (fileCount == 0) {
return false;
@@ -183,21 +171,20 @@ public final class CarbonLoaderUtil {
CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid,
List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated) throws IOException {
boolean status = false;
- AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier identifier =
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
- CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
- String metadataPath = carbonTablePath.getMetadataDirectoryPath();
+ String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath());
FileType fileType = FileFactory.getFileType(metadataPath);
if (!FileFactory.isFileExist(metadataPath, fileType)) {
FileFactory.mkdirs(metadataPath, fileType);
}
String tableStatusPath;
if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !uuid.isEmpty()) {
- tableStatusPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid);
+ tableStatusPath = CarbonTablePath.getTableStatusFilePathWithUUID(uuid);
} else {
- tableStatusPath = carbonTablePath.getTableStatusFilePath();
+ tableStatusPath = CarbonTablePath.getTableStatusFilePath();
}
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
int retryCount = CarbonLockUtil
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
@@ -211,7 +198,8 @@ public final class CarbonLoaderUtil {
"Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
+ " for table status updation");
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
- SegmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+ SegmentStatusManager.readLoadMetadata(
+ CarbonTablePath.getMetadataPath(identifier.getTablePath()));
List<LoadMetadataDetails> listOfLoadFolderDetails =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<CarbonFile> staleFolders = new ArrayList<>();
@@ -238,12 +226,12 @@ public final class CarbonLoaderUtil {
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
&& SegmentStatusManager.isLoadInProgress(
- absoluteTableIdentifier, entry.getLoadName())) {
+ identifier, entry.getLoadName())) {
throw new RuntimeException("Already insert overwrite is in progress");
} else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
&& entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
&& SegmentStatusManager.isLoadInProgress(
- absoluteTableIdentifier, entry.getLoadName())) {
+ identifier, entry.getLoadName())) {
throw new RuntimeException("Already insert into or load is in progress");
}
}
@@ -268,7 +256,7 @@ public final class CarbonLoaderUtil {
entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
// For insert overwrite, we will delete the old segment folder immediately
// So collect the old segments here
- addToStaleFolders(carbonTablePath, staleFolders, entry);
+ addToStaleFolders(identifier, staleFolders, entry);
}
}
}
@@ -281,7 +269,7 @@ public final class CarbonLoaderUtil {
// when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE
// so empty segment folder should be deleted
if (newMetaEntry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) {
- addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry);
+ addToStaleFolders(identifier, staleFolders, newMetaEntry);
}
for (LoadMetadataDetails detail: listOfLoadFolderDetails) {
@@ -326,9 +314,10 @@ public final class CarbonLoaderUtil {
return status;
}
- private static void addToStaleFolders(CarbonTablePath carbonTablePath,
+ private static void addToStaleFolders(AbsoluteTableIdentifier identifier,
List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException {
- String path = carbonTablePath.getCarbonDataDirectoryPath(entry.getLoadName());
+ String path = CarbonTablePath.getSegmentPath(
+ identifier.getTablePath(), entry.getLoadName());
// add to the deletion list only if file exist else HDFS file system will throw
// exception while deleting the file if file path does not exist
if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
@@ -354,11 +343,9 @@ public final class CarbonLoaderUtil {
loadMetadataDetails.setLoadStartTime(loadStartTime);
}
- public static void writeLoadMetadata(AbsoluteTableIdentifier absoluteTableIdentifier,
+ public static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
- String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+ String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
DataOutputStream dataOutputStream;
Gson gsonObjectToWrite = new Gson();
@@ -906,10 +893,8 @@ public final class CarbonLoaderUtil {
* This method will get the store location for the given path, segment id and partition id
*/
public static void checkAndCreateCarbonDataLocation(String segmentId, CarbonTable carbonTable) {
- CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), carbonTableIdentifier);
- String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
+ String segmentFolder = CarbonTablePath.getSegmentPath(
+ carbonTable.getTablePath(), segmentId);
CarbonUtil.checkAndCreateFolder(segmentFolder);
}
@@ -938,9 +923,7 @@ public final class CarbonLoaderUtil {
*/
public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails,
String segmentId, CarbonTable carbonTable) throws IOException {
- CarbonTablePath carbonTablePath =
- CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier()));
- Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTablePath,
+ Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTable.getAbsoluteTableIdentifier(),
new Segment(segmentId, loadMetadataDetails.getSegmentFile()));
Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
loadMetadataDetails.setDataSize(String.valueOf(dataSize));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index 288cd54..c00cc86 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
public final class DeleteLoadFolders {
@@ -50,15 +49,14 @@ public final class DeleteLoadFolders {
/**
* returns segment path
*
- * @param absoluteTableIdentifier
+ * @param identifier
* @param oneLoad
* @return
*/
- private static String getSegmentPath(AbsoluteTableIdentifier absoluteTableIdentifier,
+ private static String getSegmentPath(AbsoluteTableIdentifier identifier,
LoadMetadataDetails oneLoad) {
- CarbonTablePath carbon = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
String segmentId = oneLoad.getLoadName();
- return carbon.getCarbonDataDirectoryPath(segmentId);
+ return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
}
public static void physicalFactAndMeasureMetadataDeletion(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index cd1e28a..d30891a 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -63,30 +63,6 @@ public class BlockIndexStoreTest extends TestCase {
}
-// public void testLoadAndGetTaskIdToSegmentsMapForSingleSegment()
-// throws IOException {
-// File file = getPartFile();
-// TableBlockInfo info =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V1, null);
-// CarbonTableIdentifier carbonTableIdentifier =
-// new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-// AbsoluteTableIdentifier absoluteTableIdentifier =
-// new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
-// try {
-//
-// List<TableBlockUniqueIdentifier> tableBlockInfoList =
-// getTableBlockUniqueIdentifierList(Arrays.asList(new TableBlockInfo[] { info }), absoluteTableIdentifier);
-// List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockInfoList);
-// assertTrue(loadAndGetBlocks.size() == 1);
-// } catch (Exception e) {
-// assertTrue(false);
-// }
-// List<String> segmentIds = new ArrayList<>();
-// segmentIds.add(info.getSegment());
-// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
-// }
-//
private List<TableBlockUniqueIdentifier> getTableBlockUniqueIdentifierList(List<TableBlockInfo> tableBlockInfos,
AbsoluteTableIdentifier absoluteTableIdentifier) {
List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = new ArrayList<>();
@@ -95,138 +71,6 @@ public class BlockIndexStoreTest extends TestCase {
}
return tableBlockUniqueIdentifiers;
}
-//
-// public void testloadAndGetTaskIdToSegmentsMapForSameBlockLoadedConcurrently()
-// throws IOException {
-// String canonicalPath =
-// new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
-// File file = getPartFile();
-// TableBlockInfo info =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V1, null);
-// TableBlockInfo info1 =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V1, null);
-//
-// TableBlockInfo info2 =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V1, null);
-// TableBlockInfo info3 =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V1, null);
-// TableBlockInfo info4 =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V1, null);
-//
-// CarbonTableIdentifier carbonTableIdentifier =
-// new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-// AbsoluteTableIdentifier absoluteTableIdentifier =
-// new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
-// ExecutorService executor = Executors.newFixedThreadPool(3);
-// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
-// absoluteTableIdentifier));
-// executor.submit(
-// new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
-// absoluteTableIdentifier));
-// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
-// absoluteTableIdentifier));
-// executor.submit(
-// new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
-// absoluteTableIdentifier));
-// executor.shutdown();
-// try {
-// executor.awaitTermination(1, TimeUnit.DAYS);
-// } catch (InterruptedException e) {
-// e.printStackTrace();
-// }
-// List<TableBlockInfo> tableBlockInfos =
-// Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 });
-// try {
-// List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
-// getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier);
-// List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockUniqueIdentifiers);
-// assertTrue(loadAndGetBlocks.size() == 5);
-// } catch (Exception e) {
-// assertTrue(false);
-// }
-// List<String> segmentIds = new ArrayList<>();
-// for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
-// segmentIds.add(tableBlockInfo.getSegment());
-// }
-// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
-// }
-//
-// public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently()
-// throws IOException {
-// String canonicalPath =
-// new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
-// File file = getPartFile();
-// TableBlockInfo info =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V3, null);
-// TableBlockInfo info1 =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V3, null);
-//
-// TableBlockInfo info2 =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V3, null);
-// TableBlockInfo info3 =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V3, null);
-// TableBlockInfo info4 =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V3, null);
-//
-// TableBlockInfo info5 =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
-// file.length(),ColumnarFormatVersion.V3, null);
-// TableBlockInfo info6 =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V3, null);
-//
-// TableBlockInfo info7 =
-// new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { "loclhost" },
-// file.length(), ColumnarFormatVersion.V3, null);
-//
-// CarbonTableIdentifier carbonTableIdentifier =
-// new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
-// AbsoluteTableIdentifier absoluteTableIdentifier =
-// new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
-// ExecutorService executor = Executors.newFixedThreadPool(3);
-// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }),
-// absoluteTableIdentifier));
-// executor.submit(
-// new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }),
-// absoluteTableIdentifier));
-// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info5, info6 }),
-// absoluteTableIdentifier));
-// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info7 }),
-// absoluteTableIdentifier));
-//
-// executor.shutdown();
-// try {
-// executor.awaitTermination(1, TimeUnit.DAYS);
-// } catch (InterruptedException e) {
-// // TODO Auto-generated catch block
-// e.printStackTrace();
-// }
-// List<TableBlockInfo> tableBlockInfos = Arrays
-// .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6, info7 });
-// try {
-// List<TableBlockUniqueIdentifier> blockUniqueIdentifierList =
-// getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier);
-// List<AbstractIndex> loadAndGetBlocks = cache.getAll(blockUniqueIdentifierList);
-// assertTrue(loadAndGetBlocks.size() == 8);
-// } catch (Exception e) {
-// assertTrue(false);
-// }
-// List<String> segmentIds = new ArrayList<>();
-// for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
-// segmentIds.add(tableBlockInfo.getSegment());
-// }
-// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
-// }
private class BlockLoaderThread implements Callable<Void> {
private List<TableBlockInfo> tableBlockInfoList;
@@ -248,7 +92,7 @@ public class BlockIndexStoreTest extends TestCase {
}
private static File getPartFile() {
- String path = StoreCreator.getAbsoluteTableIdentifier().getTablePath()
+ String path = StoreCreator.getIdentifier().getTablePath()
+ "/Fact/Part0/Segment_0";
File file = new File(path);
File[] files = file.listFiles();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 7f0aef6..d42dcde 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -64,7 +64,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
@@ -98,14 +97,14 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
*/
public class StoreCreator {
- private static AbsoluteTableIdentifier absoluteTableIdentifier;
+ private static AbsoluteTableIdentifier identifier;
private static String storePath = "";
static {
try {
storePath = new File("target/store").getCanonicalPath();
String dbName = "testdb";
String tableName = "testtable";
- absoluteTableIdentifier =
+ identifier =
AbsoluteTableIdentifier.from(
storePath + "/testdb/testtable",
new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
@@ -114,8 +113,8 @@ public class StoreCreator {
}
}
- public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
- return absoluteTableIdentifier;
+ public static AbsoluteTableIdentifier getIdentifier() {
+ return identifier;
}
/**
@@ -134,12 +133,12 @@ public class StoreCreator {
CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
CarbonLoadModel loadModel = new CarbonLoadModel();
loadModel.setCarbonDataLoadSchema(schema);
- loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
- loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
- loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+ loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
+ loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
+ loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
loadModel.setFactFilePath(factFilePath);
loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
- loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
+ loadModel.setTablePath(identifier.getTablePath());
loadModel.setDateFormat(null);
loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -175,9 +174,9 @@ public class StoreCreator {
private static CarbonTable createTable() throws IOException {
TableInfo tableInfo = new TableInfo();
- tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
+ tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
TableSchema tableSchema = new TableSchema();
- tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+ tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
ArrayList<Encoding> encodings = new ArrayList<>();
encodings.add(Encoding.DICTIONARY);
@@ -257,16 +256,13 @@ public class StoreCreator {
tableSchema.setSchemaEvalution(schemaEvol);
tableSchema.setTableId(UUID.randomUUID().toString());
tableInfo.setTableUniqueName(
- absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName()
+ identifier.getCarbonTableIdentifier().getTableUniqueName()
);
tableInfo.setLastUpdatedTime(System.currentTimeMillis());
tableInfo.setFactTable(tableSchema);
- tableInfo.setTablePath(absoluteTableIdentifier.getTablePath());
+ tableInfo.setTablePath(identifier.getTablePath());
- CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
- String schemaFilePath = carbonTablePath.getSchemaFilePath();
+ String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
@@ -329,7 +325,7 @@ public class StoreCreator {
writer.close();
writer.commit();
Dictionary dict = (Dictionary) dictCache.get(
- new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
+ new DictionaryColumnUniqueIdentifier(identifier,
columnIdentifier, dims.get(i).getDataType()));
CarbonDictionarySortInfoPreparator preparator =
new CarbonDictionarySortInfoPreparator();
@@ -444,7 +440,7 @@ public class StoreCreator {
loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
listOfLoadFolderDetails.add(loadMetadataDetails);
- String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + File.separator
+ String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator
+ CarbonCommonConstants.LOADMETADATA_FILENAME;
DataOutputStream dataOutputStream;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 7b823ac..8c9889d 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -38,7 +38,6 @@ import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
import org.apache.carbondata.format.BlockIndex;
@@ -60,8 +59,6 @@ public class StreamSegment {
* get stream segment or create new stream segment if not exists
*/
public static String open(CarbonTable table) throws IOException {
- CarbonTablePath tablePath =
- CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(table.getAbsoluteTableIdentifier());
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -72,7 +69,8 @@ public class StreamSegment {
+ " for stream table get or create segment");
LoadMetadataDetails[] details =
- SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+ SegmentStatusManager.readLoadMetadata(
+ CarbonTablePath.getMetadataPath(table.getTablePath()));
LoadMetadataDetails streamSegment = null;
for (LoadMetadataDetails detail : details) {
if (FileFormat.ROW_V1 == detail.getFileFormat()) {
@@ -97,8 +95,8 @@ public class StreamSegment {
newDetails[i] = details[i];
}
newDetails[i] = newDetail;
- SegmentStatusManager
- .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails);
+ SegmentStatusManager.writeLoadDetailsIntoFile(
+ CarbonTablePath.getTableStatusFilePath(table.getTablePath()), newDetails);
return newDetail.getLoadName();
} else {
return streamSegment.getLoadName();
@@ -126,8 +124,6 @@ public class StreamSegment {
*/
public static String close(CarbonTable table, String segmentId)
throws IOException {
- CarbonTablePath tablePath =
- CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
SegmentStatusManager segmentStatusManager =
new SegmentStatusManager(table.getAbsoluteTableIdentifier());
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -138,7 +134,8 @@ public class StreamSegment {
+ " for stream table finish segment");
LoadMetadataDetails[] details =
- SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+ SegmentStatusManager.readLoadMetadata(
+ CarbonTablePath.getMetadataPath(table.getTablePath()));
for (LoadMetadataDetails detail : details) {
if (segmentId.equals(detail.getLoadName())) {
detail.setLoadEndTime(System.currentTimeMillis());
@@ -162,7 +159,8 @@ public class StreamSegment {
}
newDetails[i] = newDetail;
SegmentStatusManager
- .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails);
+ .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
+ table.getTablePath()), newDetails);
return newDetail.getLoadName();
} else {
LOGGER.error(
@@ -192,7 +190,7 @@ public class StreamSegment {
try {
if (statusLock.lockWithRetries()) {
LoadMetadataDetails[] details =
- SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
boolean updated = false;
for (LoadMetadataDetails detail : details) {
if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
@@ -202,10 +200,8 @@ public class StreamSegment {
}
}
if (updated) {
- CarbonTablePath tablePath =
- CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
SegmentStatusManager.writeLoadDetailsIntoFile(
- tablePath.getTableStatusFilePath(),
+ CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()),
details);
}
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 790f9d8..c9e61d3 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
@@ -221,7 +221,6 @@ object StreamHandoffRDD {
): Unit = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val identifier = carbonTable.getAbsoluteTableIdentifier
- val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
var continueHandoff = false
// require handoff lock on table
val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK)
@@ -238,7 +237,7 @@ object StreamHandoffRDD {
try {
if (statusLock.lockWithRetries()) {
loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
- tablePath.getMetadataDirectoryPath)
+ CarbonTablePath.getMetadataPath(identifier.getTablePath))
}
} finally {
if (null != statusLock) {
@@ -360,19 +359,16 @@ object StreamHandoffRDD {
loadModel: CarbonLoadModel
): Boolean = {
var status = false
- val metaDataFilepath =
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath()
- val identifier =
- loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier()
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
- val metadataPath = carbonTablePath.getMetadataDirectoryPath()
+ val metaDataFilepath = loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
+ val identifier = loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier
+ val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath)
val fileType = FileFactory.getFileType(metadataPath)
if (!FileFactory.isFileExist(metadataPath, fileType)) {
FileFactory.mkdirs(metadataPath, fileType)
}
- val tableStatusPath = carbonTablePath.getTableStatusFilePath()
+ val tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath)
val segmentStatusManager = new SegmentStatusManager(identifier)
- val carbonLock = segmentStatusManager.getTableStatusLock()
+ val carbonLock = segmentStatusManager.getTableStatusLock
try {
if (carbonLock.lockWithRetries()) {
LOGGER.info(
@@ -406,7 +402,7 @@ object StreamHandoffRDD {
status = true
} else {
LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
- .getDatabaseName() + "." + loadModel.getTableName());
+ .getDatabaseName() + "." + loadModel.getTableName())
}
} finally {
if (carbonLock.unlock()) {
@@ -417,6 +413,6 @@ object StreamHandoffRDD {
"." + loadModel.getTableName() + " during table status updation")
}
}
- return status
+ status
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 75fcfb0..6316d84 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceP
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -127,16 +127,14 @@ object StreamSinkFactory {
* @return
*/
private def getStreamSegmentId(carbonTable: CarbonTable): String = {
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- val fileType = FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath)
- if (!FileFactory.isFileExist(carbonTablePath.getMetadataDirectoryPath, fileType)) {
+ val segmentId = StreamSegment.open(carbonTable)
+ val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+ val fileType = FileFactory.getFileType(segmentDir)
+ if (!FileFactory.isFileExist(segmentDir, fileType)) {
// Create table directory path, in case of enabling hive metastore first load may not have
// table folder created.
- FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType)
+ FileFactory.mkdirs(segmentDir, fileType)
}
- val segmentId = StreamSegment.open(carbonTable)
- val segmentDir = carbonTablePath.getSegmentDir(segmentId)
if (FileFactory.isFileExist(segmentDir, fileType)) {
// recover fault
StreamSegment.recoverSegmentIfRequired(segmentDir)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 206ba91..4f839ce 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.stats.QueryStatistic
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -62,9 +62,7 @@ class CarbonAppendableStreamSink(
carbonLoadModel: CarbonLoadModel,
server: Option[DictionaryServer]) extends Sink {
- private val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- private val fileLogPath = carbonTablePath.getStreamingLogDir
+ private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
// prepare configuration
private val hadoopConf = {
@@ -150,12 +148,12 @@ class CarbonAppendableStreamSink(
* if the directory size of current segment beyond the threshold, hand off new segment
*/
private def checkOrHandOffSegment(): Unit = {
- val segmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
+ val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
val fileType = FileFactory.getFileType(segmentDir)
if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
currentSegmentId = newSegmentId
- val newSegmentDir = carbonTablePath.getSegmentDir(currentSegmentId)
+ val newSegmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
FileFactory.mkdirs(newSegmentDir, fileType)
// TODO trigger hand off operation
@@ -251,15 +249,13 @@ object CarbonAppendableStreamSink {
}
// update data file info in index file
- val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- StreamSegment.updateIndexFile(tablePath.getSegmentDir(segmentId))
+ StreamSegment.updateIndexFile(
+ CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId))
} catch {
// catch fault of executor side
case t: Throwable =>
- val tablePath =
- CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- val segmentDir = tablePath.getSegmentDir(segmentId)
+ val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
StreamSegment.recoverSegmentIfRequired(segmentDir)
LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
committer.abortJob(job)