You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/30 03:19:04 UTC
[1/2] carbondata git commit: [CARBONDATA-2270] Write segment file in
loading for non-partition table
Repository: carbondata
Updated Branches:
refs/heads/master e43be5e74 -> 7e0803fec
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index a987127..15ae30f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -35,7 +35,6 @@ import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.spark.util.CommonUtil
/**
* This class is used to perform compaction on carbon table.
@@ -201,6 +200,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
if (finalMergeStatus) {
val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
+ var segmentFilesForIUDCompact = new util.ArrayList[Segment]()
var segmentFileName: String = null
if (carbonTable.isHivePartitionTable) {
val readPath =
@@ -220,6 +220,23 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
carbonLoadModel.getTablePath)
}
segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
+ } else {
+ // Get the segment files each updated segment in case of IUD compaction
+ if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
+ val segmentFilesList = loadsToMerge.asScala.map{seg =>
+ val file = SegmentFileStore.writeSegmentFile(
+ carbonTable.getTablePath,
+ seg.getLoadName,
+ carbonLoadModel.getFactTimeStamp.toString)
+ new Segment(seg.getLoadName, file)
+ }.filter(_.getSegmentFileName != null).asJava
+ segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList)
+ } else {
+ segmentFileName = SegmentFileStore.writeSegmentFile(
+ carbonTable.getTablePath,
+ mergedLoadNumber,
+ carbonLoadModel.getFactTimeStamp.toString)
+ }
}
// trigger event for compaction
val alterTableCompactionPreStatusUpdateEvent =
@@ -238,11 +255,12 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
CarbonDataMergerUtil
.updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
carbonTable.getMetadataPath,
- carbonLoadModel)) ||
+ carbonLoadModel,
+ segmentFilesForIUDCompact)) ||
CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus(
loadsToMerge,
- carbonTable.getMetadataPath,
- mergedLoadNumber,
+ carbonTable.getMetadataPath,
+ mergedLoadNumber,
carbonLoadModel,
compactionType,
segmentFileName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 2d19fd4..b27a150 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -51,14 +51,14 @@ case class CarbonCountStar(
val rowCount = CarbonUpdateUtil.getRowCount(
tableInputFormat.getBlockRowCount(
job,
- absoluteTableIdentifier,
+ carbonTable,
CarbonFilters.getPartitions(
Seq.empty,
sparkSession,
TableIdentifier(
carbonTable.getTableName,
Some(carbonTable.getDatabaseName))).map(_.asJava).orNull),
- absoluteTableIdentifier)
+ carbonTable)
val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]])
val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
val row = if (outUnsafeRows) unsafeProjection(value) else value
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 8eaeab1..0c6d2ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -105,12 +105,12 @@ object DeleteExecution {
val blockMappingVO =
carbonInputFormat.getBlockRowCount(
job,
- absoluteTableIdentifier,
+ carbonTable,
CarbonFilters.getPartitions(
Seq.empty,
sparkSession,
TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull)
- val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier)
+ val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(carbonTable)
CarbonUpdateUtil
.createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index f88e767..8c88d0e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -73,7 +73,7 @@ object HorizontalCompaction {
// SegmentUpdateStatusManager reads the Table Status File and Table Update Status
// file and save the content in segmentDetails and updateDetails respectively.
val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
- absTableIdentifier)
+ carbonTable)
if (isUpdateOperation) {
@@ -199,7 +199,7 @@ object HorizontalCompaction {
.substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length)
val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName,
- absTableIdentifier,
+ carbonTable,
updateStatusDetails,
timestamp)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index b583c6a..25a0d8e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -122,7 +122,9 @@ case class CarbonAlterTableAddHivePartitionCommand(
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false)
val newMetaEntry = loadModel.getCurrentLoadMetadataDetail
val segmentFileName =
- loadModel.getSegmentId + "_" + loadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT
+ SegmentFileStore.genSegmentFileName(
+ loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp)) +
+ CarbonTablePath.SEGMENT_EXT
newMetaEntry.setSegmentFile(segmentFileName)
val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 9c2835e..756bc97 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -210,9 +210,7 @@ case class CarbonAlterTableDropPartitionCommand(
for (thread <- threadArray) {
thread.join()
}
- val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
- carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
- val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+ val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava)
} catch {
case e: Exception =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 1bdf414..929de0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -231,9 +231,7 @@ case class CarbonAlterTableSplitPartitionCommand(
threadArray.foreach {
thread => thread.join()
}
- val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
- carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
- val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+ val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava)
} catch {
case e: Exception =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 47da9a5..7123b93 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -96,7 +96,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
private def driverSideCountStar(logicalRelation: LogicalRelation): Boolean = {
val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
val segmentUpdateStatusManager = new SegmentUpdateStatusManager(
- relation.carbonRelation.metaData.carbonTable.getAbsoluteTableIdentifier)
+ relation.carbonRelation.metaData.carbonTable)
val updateDeltaMetadata = segmentUpdateStatusManager.readLoadMetadata()
if (updateDeltaMetadata != null && updateDeltaMetadata.nonEmpty) {
false
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index ea5eb42..5bc85f8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -141,7 +141,7 @@ public final class CarbonDataMergerUtil {
*/
public static boolean updateLoadMetadataIUDUpdateDeltaMergeStatus(
List<LoadMetadataDetails> loadsToMerge, String metaDataFilepath,
- CarbonLoadModel carbonLoadModel) {
+ CarbonLoadModel carbonLoadModel, List<Segment> segmentFilesToBeUpdated) {
boolean status = false;
boolean updateLockStatus = false;
@@ -171,7 +171,7 @@ public final class CarbonDataMergerUtil {
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
SegmentUpdateStatusManager segmentUpdateStatusManager =
- new SegmentUpdateStatusManager(identifier);
+ new SegmentUpdateStatusManager(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable());
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
@@ -230,6 +230,13 @@ public final class CarbonDataMergerUtil {
loadDetail
.setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
}
+ // Update segement file name to status file
+ int segmentFileIndex =
+ segmentFilesToBeUpdated.indexOf(Segment.toSegment(loadDetail.getLoadName()));
+ if (segmentFileIndex > -1) {
+ loadDetail.setSegmentFile(
+ segmentFilesToBeUpdated.get(segmentFileIndex).getSegmentFileName());
+ }
}
}
@@ -1135,18 +1142,17 @@ public final class CarbonDataMergerUtil {
*
* @param seg
* @param blockName
- * @param absoluteTableIdentifier
* @param segmentUpdateDetails
* @param timestamp
* @return
* @throws IOException
*/
public static List<CarbonDataMergerUtilResult> compactBlockDeleteDeltaFiles(String seg,
- String blockName, AbsoluteTableIdentifier absoluteTableIdentifier,
+ String blockName, CarbonTable table,
SegmentUpdateDetails[] segmentUpdateDetails, Long timestamp) throws IOException {
SegmentUpdateStatusManager segmentUpdateStatusManager =
- new SegmentUpdateStatusManager(absoluteTableIdentifier);
+ new SegmentUpdateStatusManager(table);
List<CarbonDataMergerUtilResult> resultList = new ArrayList<CarbonDataMergerUtilResult>(1);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 65827b0..aabe91a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -47,7 +47,6 @@ import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
@@ -334,60 +333,6 @@ public final class CarbonLoaderUtil {
return status;
}
- /**
- * This API will update the segmentFile of a passed segment.
- *
- * @return boolean which determines whether status update is done or not.
- * @throws IOException
- */
- private static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile)
- throws IOException {
- boolean status = false;
- String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
- String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
- AbsoluteTableIdentifier absoluteTableIdentifier =
- AbsoluteTableIdentifier.from(tablePath, null, null);
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
- ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
- int retryCount = CarbonLockUtil
- .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
- CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
- int maxTimeout = CarbonLockUtil
- .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
- CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
- try {
- if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
- LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation");
- LoadMetadataDetails[] listOfLoadFolderDetailsArray =
- SegmentStatusManager.readLoadMetadata(metadataPath);
-
- for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) {
- // if the segments is in the list of marked for delete then update the status.
- if (segmentId.equals(detail.getLoadName())) {
- detail.setSegmentFile(segmentFile);
- break;
- }
- }
-
- SegmentStatusManager
- .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
- status = true;
- } else {
- LOGGER.error(
- "Not able to acquire the lock for Table status updation for table path " + tablePath);
- }
- ;
- } finally {
- if (carbonLock.unlock()) {
- LOGGER.info("Table unlocked successfully after table status updation" + tablePath);
- } else {
- LOGGER.error(
- "Unable to unlock Table lock for table" + tablePath + " during table status updation");
- }
- }
- return status;
- }
-
private static void addToStaleFolders(AbsoluteTableIdentifier identifier,
List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException {
String path = CarbonTablePath.getSegmentPath(
@@ -1102,26 +1047,15 @@ public final class CarbonLoaderUtil {
/**
* Merge index files with in the segment of partitioned table
* @param segmentId
- * @param tablePath
+ * @param table
* @return
* @throws IOException
*/
- public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath)
+ public static String mergeIndexFilesinPartitionedSegment(String segmentId, CarbonTable table)
throws IOException {
- CarbonIndexFileMergeWriter.SegmentIndexFIleMergeStatus segmentIndexFIleMergeStatus =
- new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentId, tablePath);
- String uniqueId = "";
- if (segmentIndexFIleMergeStatus != null) {
- uniqueId = System.currentTimeMillis() + "";
- String newSegmentFileName = segmentId + "_" + uniqueId + CarbonTablePath.SEGMENT_EXT;
- String path =
- CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
- + newSegmentFileName;
- SegmentFileStore.writeSegmentFile(segmentIndexFIleMergeStatus.getSegmentFile(), path);
- updateSegmentFile(tablePath, segmentId, newSegmentFileName);
- deleteFiles(segmentIndexFIleMergeStatus.getFilesTobeDeleted());
- }
- return uniqueId;
+ String tablePath = table.getTablePath();
+ return new CarbonIndexFileMergeWriter(table)
+ .mergeCarbonIndexFilesOfSegment(segmentId, tablePath);
}
private static void deleteFiles(List<String> filesToBeDeleted) throws IOException {
[2/2] carbondata git commit: [CARBONDATA-2270] Write segment file in
loading for non-partition table
Posted by ja...@apache.org.
[CARBONDATA-2270] Write segment file in loading for non-partition table
Currently when loading into partition table, carbon is writing a segment file to record the segment and index file location mapping.
This can avoid frequent listFile operation when querying. The same should be done for non-partition table also.
This closes #2092
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7e0803fe
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7e0803fe
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7e0803fe
Branch: refs/heads/master
Commit: 7e0803fec02f5872569e7680ce4d3ab02507285b
Parents: e43be5e
Author: Jacky Li <ja...@qq.com>
Authored: Fri Mar 23 12:13:11 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 30 11:18:42 2018 +0800
----------------------------------------------------------------------
.../carbondata/core/datamap/DataMapChooser.java | 5 +-
.../core/datamap/DataMapStoreManager.java | 61 +++----
.../core/datamap/IndexDataMapProvider.java | 3 +-
.../core/datastore/SegmentTaskIndexStore.java | 14 +-
.../core/metadata/SegmentFileStore.java | 172 +++++++++++++++++--
.../core/mutate/CarbonUpdateUtil.java | 70 ++++++--
.../SegmentUpdateStatusManager.java | 72 ++++----
.../core/writer/CarbonIndexFileMergeWriter.java | 46 +++--
.../hadoop/api/CarbonFileInputFormat.java | 7 +-
.../hadoop/api/CarbonInputFormat.java | 6 +-
.../hadoop/api/CarbonOutputCommitter.java | 3 +-
.../hadoop/api/CarbonTableInputFormat.java | 32 ++--
.../hadoop/api/DistributableDataMapFormat.java | 10 +-
.../sdv/generated/MergeIndexTestCase.scala | 8 +-
.../CarbonIndexFileMergeTestCase.scala | 14 +-
.../iud/DeleteCarbonTableTestCase.scala | 10 +-
.../iud/UpdateCarbonTableTestCase.scala | 8 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 4 +-
.../org/apache/spark/util/PartitionUtils.scala | 21 ++-
.../spark/rdd/CarbonDataRDDFactory.scala | 29 +++-
.../spark/rdd/CarbonTableCompactor.scala | 26 ++-
.../org/apache/spark/sql/CarbonCountStar.scala | 4 +-
.../command/mutation/DeleteExecution.scala | 4 +-
.../command/mutation/HorizontalCompaction.scala | 4 +-
...arbonAlterTableAddHivePartitionCommand.scala | 4 +-
.../CarbonAlterTableDropPartitionCommand.scala | 4 +-
.../CarbonAlterTableSplitPartitionCommand.scala | 4 +-
.../strategy/CarbonLateDecodeStrategy.scala | 2 +-
.../processing/merger/CarbonDataMergerUtil.java | 16 +-
.../processing/util/CarbonLoaderUtil.java | 76 +-------
30 files changed, 458 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index f9214a8..ac00e71 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -87,8 +87,9 @@ public class DataMapChooser {
}
}
// Return the default datamap if no other datamap exists.
- return new DataMapExprWrapperImpl(DataMapStoreManager.getInstance()
- .getDefaultDataMap(carbonTable.getAbsoluteTableIdentifier()), resolverIntf);
+ return new DataMapExprWrapperImpl(
+ DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable),
+ resolverIntf);
}
private ExpressionTuple selectDataMap(Expression expression, List<TableDataMap> allDataMap) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index d01df4f..b0aff0a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -102,7 +102,7 @@ public final class DataMapStoreManager {
if (dataMapSchema.isIndexDataMap() && identifier.getTableName()
.equals(carbonTable.getTableName()) && identifier.getDatabaseName()
.equals(carbonTable.getDatabaseName())) {
- dataMaps.add(getDataMap(carbonTable.getAbsoluteTableIdentifier(), dataMapSchema));
+ dataMaps.add(getDataMap(carbonTable, dataMapSchema));
}
}
}
@@ -187,32 +187,33 @@ public final class DataMapStoreManager {
/**
* It gives the default datamap of the table. Default datamap of any table is BlockletDataMap
*
- * @param identifier
+ * @param table
* @return
*/
- public TableDataMap getDefaultDataMap(AbsoluteTableIdentifier identifier) {
- return getDataMap(identifier, BlockletDataMapFactory.DATA_MAP_SCHEMA);
+ public TableDataMap getDefaultDataMap(CarbonTable table) {
+ return getDataMap(table, BlockletDataMapFactory.DATA_MAP_SCHEMA);
}
/**
* Get the datamap for reading data.
*/
- public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
- String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
- List<TableDataMap> tableIndices = allDataMaps.get(table);
+ public TableDataMap getDataMap(CarbonTable table, DataMapSchema dataMapSchema) {
+ String tableUniqueName =
+ table.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableUniqueName();
+ List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
TableDataMap dataMap = null;
if (tableIndices != null) {
dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices);
}
if (dataMap == null) {
- synchronized (table.intern()) {
- tableIndices = allDataMaps.get(table);
+ synchronized (tableUniqueName.intern()) {
+ tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices != null) {
dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices);
}
if (dataMap == null) {
try {
- dataMap = createAndRegisterDataMap(identifier, dataMapSchema);
+ dataMap = createAndRegisterDataMap(table, dataMapSchema);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -231,7 +232,7 @@ public final class DataMapStoreManager {
* The datamap is created using datamap name, datamap factory class and table identifier.
*/
// TODO: make it private
- public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
+ public TableDataMap createAndRegisterDataMap(CarbonTable table,
DataMapSchema dataMapSchema) throws MalformedDataMapCommandException, IOException {
DataMapFactory dataMapFactory;
try {
@@ -246,34 +247,34 @@ public final class DataMapStoreManager {
throw new MetadataProcessException(
"failed to create DataMap '" + dataMapSchema.getProviderName() + "'", e);
}
- return registerDataMap(identifier, dataMapSchema, dataMapFactory);
+ return registerDataMap(table, dataMapSchema, dataMapFactory);
}
- public TableDataMap registerDataMap(AbsoluteTableIdentifier identifier,
+ public TableDataMap registerDataMap(CarbonTable table,
DataMapSchema dataMapSchema, DataMapFactory dataMapFactory)
throws IOException, MalformedDataMapCommandException {
- String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
+ String tableUniqueName = table.getCarbonTableIdentifier().getTableUniqueName();
// Just update the segmentRefreshMap with the table if not added.
- getTableSegmentRefresher(identifier);
- List<TableDataMap> tableIndices = allDataMaps.get(table);
+ getTableSegmentRefresher(table);
+ List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices == null) {
tableIndices = new ArrayList<>();
}
- dataMapFactory.init(identifier, dataMapSchema);
+ dataMapFactory.init(table.getAbsoluteTableIdentifier(), dataMapSchema);
BlockletDetailsFetcher blockletDetailsFetcher;
SegmentPropertiesFetcher segmentPropertiesFetcher = null;
if (dataMapFactory instanceof BlockletDetailsFetcher) {
blockletDetailsFetcher = (BlockletDetailsFetcher) dataMapFactory;
} else {
- blockletDetailsFetcher = getBlockletDetailsFetcher(identifier);
+ blockletDetailsFetcher = getBlockletDetailsFetcher(table);
}
segmentPropertiesFetcher = (SegmentPropertiesFetcher) blockletDetailsFetcher;
- TableDataMap dataMap = new TableDataMap(identifier, dataMapSchema, dataMapFactory,
- blockletDetailsFetcher, segmentPropertiesFetcher);
+ TableDataMap dataMap = new TableDataMap(table.getAbsoluteTableIdentifier(),
+ dataMapSchema, dataMapFactory, blockletDetailsFetcher, segmentPropertiesFetcher);
tableIndices.add(dataMap);
- allDataMaps.put(table, tableIndices);
+ allDataMaps.put(tableUniqueName, tableIndices);
return dataMap;
}
@@ -294,7 +295,7 @@ public final class DataMapStoreManager {
* @param segments
*/
public void clearInvalidSegments(CarbonTable carbonTable, List<Segment> segments) {
- getDefaultDataMap(carbonTable.getAbsoluteTableIdentifier()).clear(segments);
+ getDefaultDataMap(carbonTable).clear(segments);
List<TableDataMap> allDataMap = getAllDataMap(carbonTable);
for (TableDataMap dataMap: allDataMap) {
dataMap.clear(segments);
@@ -347,11 +348,11 @@ public final class DataMapStoreManager {
/**
* Get the blocklet datamap factory to get the detail information of blocklets
*
- * @param identifier
+ * @param table
* @return
*/
- private BlockletDetailsFetcher getBlockletDetailsFetcher(AbsoluteTableIdentifier identifier) {
- TableDataMap blockletMap = getDataMap(identifier, BlockletDataMapFactory.DATA_MAP_SCHEMA);
+ private BlockletDetailsFetcher getBlockletDetailsFetcher(CarbonTable table) {
+ TableDataMap blockletMap = getDataMap(table, BlockletDataMapFactory.DATA_MAP_SCHEMA);
return (BlockletDetailsFetcher) blockletMap.getDataMapFactory();
}
@@ -367,10 +368,10 @@ public final class DataMapStoreManager {
/**
* Get the TableSegmentRefresher for the table. If not existed then add one and return.
*/
- public TableSegmentRefresher getTableSegmentRefresher(AbsoluteTableIdentifier identifier) {
- String uniqueName = identifier.uniqueName();
+ public TableSegmentRefresher getTableSegmentRefresher(CarbonTable table) {
+ String uniqueName = table.getAbsoluteTableIdentifier().uniqueName();
if (segmentRefreshMap.get(uniqueName) == null) {
- segmentRefreshMap.put(uniqueName, new TableSegmentRefresher(identifier));
+ segmentRefreshMap.put(uniqueName, new TableSegmentRefresher(table));
}
return segmentRefreshMap.get(uniqueName);
}
@@ -388,8 +389,8 @@ public final class DataMapStoreManager {
// altering.
private Map<String, Boolean> manualSegmentRefresh = new HashMap<>();
- public TableSegmentRefresher(AbsoluteTableIdentifier identifier) {
- SegmentUpdateStatusManager statusManager = new SegmentUpdateStatusManager(identifier);
+ TableSegmentRefresher(CarbonTable table) {
+ SegmentUpdateStatusManager statusManager = new SegmentUpdateStatusManager(table);
SegmentUpdateDetails[] updateStatusDetails = statusManager.getUpdateStatusDetails();
for (SegmentUpdateDetails updateDetails : updateStatusDetails) {
UpdateVO updateVO = statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
index e188bf1..b1729d1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
@@ -47,8 +47,7 @@ public class IndexDataMapProvider implements DataMapProvider {
new RelationIdentifier(mainTable.getDatabaseName(), mainTable.getTableName(),
mainTable.getTableInfo().getFactTable().getTableId()));
DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
- DataMapStoreManager.getInstance().registerDataMap(
- mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory);
+ DataMapStoreManager.getInstance().registerDataMap(mainTable, dataMapSchema, dataMapFactory);
storageProvider.saveSchema(dataMapSchema);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index 8ed5c18..d9e544f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
@@ -87,8 +88,9 @@ public class SegmentTaskIndexStore
SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
try {
segmentTaskIndexWrapper =
- loadAndGetTaskIdToSegmentsMap(tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(),
- tableSegmentUniqueIdentifier.getAbsoluteTableIdentifier(),
+ loadAndGetTaskIdToSegmentsMap(
+ tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(),
+ CarbonTable.buildFromTablePath("name", "path"),
tableSegmentUniqueIdentifier);
} catch (IndexBuilderException e) {
throw new IOException(e.getMessage(), e);
@@ -163,21 +165,20 @@ public class SegmentTaskIndexStore
* map
*
* @param segmentToTableBlocksInfos segment id to block info
- * @param absoluteTableIdentifier absolute table identifier
+ * @param table table handle
* @return map of taks id to segment mapping
* @throws IOException
*/
private SegmentTaskIndexWrapper loadAndGetTaskIdToSegmentsMap(
Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos,
- AbsoluteTableIdentifier absoluteTableIdentifier,
+ CarbonTable table,
TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException {
// task id to segment map
Iterator<Map.Entry<String, List<TableBlockInfo>>> iteratorOverSegmentBlocksInfos =
segmentToTableBlocksInfos.entrySet().iterator();
Map<TaskBucketHolder, AbstractIndex> taskIdToSegmentIndexMap = null;
SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
- SegmentUpdateStatusManager updateStatusManager =
- new SegmentUpdateStatusManager(absoluteTableIdentifier);
+ SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table);
String segmentId = null;
TaskBucketHolder taskBucketHolder = null;
try {
@@ -226,6 +227,7 @@ public class SegmentTaskIndexStore
}
Iterator<Map.Entry<TaskBucketHolder, List<TableBlockInfo>>> iterator =
taskIdToTableBlockInfoMap.entrySet().iterator();
+ AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
long requiredSize =
calculateRequiredSize(taskIdToTableBlockInfoMap, absoluteTableIdentifier);
segmentTaskIndexWrapper.setMemorySize(requiredSize);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 4adc977..257ee4c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -20,6 +20,7 @@ import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
@@ -32,6 +33,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -42,6 +45,8 @@ import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
+import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
@@ -61,6 +66,9 @@ import org.apache.hadoop.fs.Path;
*/
public class SegmentFileStore {
+ private static LogService LOGGER = LogServiceFactory.getLogService(
+ SegmentFileStore.class.getCanonicalName());
+
private SegmentFile segmentFile;
/**
@@ -104,7 +112,6 @@ public class SegmentFileStore {
isRelative = true;
}
SegmentFile segmentFile = new SegmentFile();
- Map<String, FolderDetails> locationMap = new HashMap<>();
FolderDetails folderDetails = new FolderDetails();
folderDetails.setRelative(isRelative);
folderDetails.setPartitions(partionNames);
@@ -112,8 +119,7 @@ public class SegmentFileStore {
for (CarbonFile file : carbonFiles) {
folderDetails.getFiles().add(file.getName());
}
- locationMap.put(location, folderDetails);
- segmentFile.setLocationMap(locationMap);
+ segmentFile.addPath(location, folderDetails);
String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
// write segment info to new file.
writeSegmentFile(segmentFile, path);
@@ -122,6 +128,56 @@ public class SegmentFileStore {
}
/**
+ * Generate Segment file name
+ * @param segmentId segment id
+ * @param UUID unique string, typically caller can use the loading start
+ * timestamp in CarbonLoadModel
+ * @return
+ */
+ public static String genSegmentFileName(String segmentId, String UUID) {
+ return segmentId + "_" + UUID;
+ }
+
+ /**
+ * Write segment file to the metadata folder of the table
+ * @param tablePath table path
+ * @param segmentId segment id
+ * @param UUID a UUID string used to construct the segment file name
+ * @return segment file name
+ */
+ public static String writeSegmentFile(String tablePath, String segmentId, String UUID)
+ throws IOException {
+ String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
+ CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath);
+ CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (indexFiles != null && indexFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(true);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : indexFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ String segmentRelativePath = segmentPath.substring(tablePath.length(), segmentPath.length());
+ segmentFile.addPath(segmentRelativePath, folderDetails);
+ String segmentFileFolder = CarbonTablePath.getSegmentFilesLocation(tablePath);
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(segmentFileFolder, FileFactory.getFileType(segmentFileFolder));
+ }
+ String segmentFileName = genSegmentFileName(segmentId, UUID) + CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, segmentFileFolder + File.separator + segmentFileName);
+ return segmentFileName;
+ }
+ return null;
+ }
+
+ /**
* Writes the segment file in json format
* @param segmentFile
* @param path
@@ -177,6 +233,60 @@ public class SegmentFileStore {
return null;
}
+ /**
+ * This API will update the segmentFile of a passed segment.
+ *
+ * @return boolean which determines whether status update is done or not.
+ * @throws IOException
+ */
+ public static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile)
+ throws IOException {
+ boolean status = false;
+ String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
+ String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier.from(tablePath, null, null);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+ int retryCount = CarbonLockUtil
+ .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+ CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+ int maxTimeout = CarbonLockUtil
+ .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+ CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+ try {
+ if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
+ LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation");
+ LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+ SegmentStatusManager.readLoadMetadata(metadataPath);
+
+ for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) {
+ // if the segments is in the list of marked for delete then update the status.
+ if (segmentId.equals(detail.getLoadName())) {
+ detail.setSegmentFile(segmentFile);
+ break;
+ }
+ }
+
+ SegmentStatusManager
+ .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+ status = true;
+ } else {
+ LOGGER.error(
+ "Not able to acquire the lock for Table status updation for table path " + tablePath);
+ }
+ ;
+ } finally {
+ if (carbonLock.unlock()) {
+ LOGGER.info("Table unlocked successfully after table status updation" + tablePath);
+ } else {
+ LOGGER.error(
+ "Unable to unlock Table lock for table" + tablePath + " during table status updation");
+ }
+ }
+ return status;
+ }
+
private static CarbonFile[] getSegmentFiles(String segmentPath) {
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
if (carbonFile.exists()) {
@@ -213,7 +323,6 @@ public class SegmentFileStore {
isRelative = true;
}
SegmentFile localSegmentFile = new SegmentFile();
- Map<String, FolderDetails> locationMap = new HashMap<>();
FolderDetails folderDetails = new FolderDetails();
folderDetails.setRelative(isRelative);
folderDetails.setPartitions(spec.getPartitions());
@@ -228,8 +337,7 @@ public class SegmentFileStore {
folderDetails.getFiles().add(file.getName());
}
}
- locationMap.put(location, folderDetails);
- localSegmentFile.setLocationMap(locationMap);
+ localSegmentFile.addPath(location, folderDetails);
if (segmentFile == null) {
segmentFile = localSegmentFile;
} else {
@@ -440,7 +548,8 @@ public class SegmentFileStore {
if (updateSegment) {
String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
writePath =
- writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentNo() + "_" + uniqueId
+ writePath + CarbonCommonConstants.FILE_SEPARATOR +
+ SegmentFileStore.genSegmentFileName(segment.getSegmentNo(), String.valueOf(uniqueId))
+ CarbonTablePath.SEGMENT_EXT;
writeSegmentFile(segmentFile, writePath);
}
@@ -564,9 +673,17 @@ public class SegmentFileStore {
Map<String, List<String>> locationMap) {
for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
Path location = new Path(entry.getKey()).getParent();
- boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
- if (!exists) {
- FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+ if (partitionSpecs != null) {
+ boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
+ if (!exists) {
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+ }
+ } else {
+ // delete the segment folder if it is empty
+ CarbonFile file = FileFactory.getCarbonFile(location.toString());
+ if (file.listFiles().length == 0) {
+ file.delete();
+ }
}
}
}
@@ -698,9 +815,16 @@ public class SegmentFileStore {
private static final long serialVersionUID = 3582245668420401089L;
+ /**
+ * mapping of index file parent folder to the index file folder info
+ */
private Map<String, FolderDetails> locationMap;
- public SegmentFile merge(SegmentFile mapper) {
+ SegmentFile() {
+ locationMap = new HashMap<>();
+ }
+
+ SegmentFile merge(SegmentFile mapper) {
if (this == mapper) {
return this;
}
@@ -724,9 +848,13 @@ public class SegmentFileStore {
return locationMap;
}
- public void setLocationMap(Map<String, FolderDetails> locationMap) {
- this.locationMap = locationMap;
+ /**
+ * Add index file parent folder and the index file folder info
+ */
+ void addPath(String path, FolderDetails details) {
+ locationMap.put(path, details);
}
+
}
/**
@@ -736,14 +864,32 @@ public class SegmentFileStore {
private static final long serialVersionUID = 501021868886928553L;
+ /**
+ * Based on isRelative variable:
+ * 1. if it is relative, it is relative path to the table path, for all index files
+ * 2. if it is not relative, it is the full path of all index files
+ */
private Set<String> files = new HashSet<>();
+ /**
+ * all partition names
+ */
private List<String> partitions = new ArrayList<>();
+ /**
+ * status for the partition, success or mark for delete
+ */
private String status;
+ /**
+ * file name for merge index file in this folder
+ */
private String mergeFileName;
+ /**
+ * true if it is relative path, for example, if user give partition location when
+ * adding the partition, it will be false
+ */
private boolean isRelative;
public FolderDetails merge(FolderDetails folderDetails) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 4ff19cb..2b4dabe 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,6 +35,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.data.BlockMappingVO;
import org.apache.carbondata.core.mutate.data.RowCountDetailsVO;
@@ -117,8 +119,7 @@ public class CarbonUpdateUtil {
public static boolean updateSegmentStatus(List<SegmentUpdateDetails> updateDetailsList,
CarbonTable table, String updateStatusFileIdentifier, boolean isCompaction) {
boolean status = false;
- SegmentUpdateStatusManager segmentUpdateStatusManager =
- new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+ SegmentUpdateStatusManager segmentUpdateStatusManager = new SegmentUpdateStatusManager(table);
ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
boolean lockStatus = false;
@@ -419,7 +420,7 @@ public class CarbonUpdateUtil {
* @param table clean up will be handled on this table.
* @param forceDelete if true then max query execution timeout will not be considered.
*/
- public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) {
+ public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) throws IOException {
SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
@@ -432,6 +433,8 @@ public class CarbonUpdateUtil {
boolean isInvalidFile = false;
+ List<Segment> segmentFilesToBeUpdated = new ArrayList<>();
+
// scan through each segment.
for (LoadMetadataDetails segment : details) {
@@ -453,11 +456,13 @@ public class CarbonUpdateUtil {
CarbonFile[] allSegmentFiles = segDir.listFiles();
// scan through the segment and find the carbondatafiles and index files.
- SegmentUpdateStatusManager updateStatusManager =
- new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+ SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table);
+ boolean updateSegmentFile = false;
// deleting of the aborted file scenario.
- deleteStaleCarbonDataFiles(segment, allSegmentFiles, updateStatusManager);
+ if (deleteStaleCarbonDataFiles(segment, allSegmentFiles, updateStatusManager)) {
+ updateSegmentFile = true;
+ }
// get Invalid update delta files.
CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
@@ -467,12 +472,9 @@ public class CarbonUpdateUtil {
// now for each invalid delta file need to check the query execution time out
// and then delete.
-
for (CarbonFile invalidFile : invalidUpdateDeltaFiles) {
-
compareTimestampsAndDelete(invalidFile, forceDelete, false);
}
-
// do the same for the index files.
CarbonFile[] invalidIndexFiles = updateStatusManager
.getUpdateDeltaFilesList(segment.getLoadName(), false,
@@ -483,10 +485,10 @@ public class CarbonUpdateUtil {
// and then delete.
for (CarbonFile invalidFile : invalidIndexFiles) {
-
- compareTimestampsAndDelete(invalidFile, forceDelete, false);
+ if (compareTimestampsAndDelete(invalidFile, forceDelete, false)) {
+ updateSegmentFile = true;
+ }
}
-
// now handle all the delete delta files which needs to be deleted.
// there are 2 cases here .
// 1. if the block is marked as compacted then the corresponding delta files
@@ -531,7 +533,11 @@ public class CarbonUpdateUtil {
for (CarbonFile invalidFile : blockRelatedFiles) {
- compareTimestampsAndDelete(invalidFile, forceDelete, false);
+ if (compareTimestampsAndDelete(invalidFile, forceDelete, false)) {
+ if (invalidFile.getName().endsWith(CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)) {
+ updateSegmentFile = true;
+ }
+ }
}
@@ -545,8 +551,27 @@ public class CarbonUpdateUtil {
}
}
}
+ if (updateSegmentFile) {
+ segmentFilesToBeUpdated.add(Segment.toSegment(segment.getLoadName()));
+ }
}
}
+ String UUID = String.valueOf(System.currentTimeMillis());
+ List<Segment> segmentFilesToBeUpdatedLatest = new ArrayList<>();
+ for (Segment segment : segmentFilesToBeUpdated) {
+ String file =
+ SegmentFileStore.writeSegmentFile(table.getTablePath(), segment.getSegmentNo(), UUID);
+ segmentFilesToBeUpdatedLatest.add(new Segment(segment.getSegmentNo(), file));
+ }
+ if (segmentFilesToBeUpdated.size() > 0) {
+ updateTableMetadataStatus(
+ new HashSet<Segment>(segmentFilesToBeUpdated),
+ table,
+ UUID,
+ false,
+ new ArrayList<Segment>(),
+ segmentFilesToBeUpdatedLatest);
+ }
// delete the update table status files which are old.
if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {
@@ -589,7 +614,7 @@ public class CarbonUpdateUtil {
* @param allSegmentFiles
* @param updateStatusManager
*/
- private static void deleteStaleCarbonDataFiles(LoadMetadataDetails segment,
+ private static boolean deleteStaleCarbonDataFiles(LoadMetadataDetails segment,
CarbonFile[] allSegmentFiles, SegmentUpdateStatusManager updateStatusManager) {
CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
.getUpdateDeltaFilesList(segment.getLoadName(), false,
@@ -607,9 +632,13 @@ public class CarbonUpdateUtil {
true);
// now for each invalid index file need to check the query execution time out
// and then delete.
+ boolean updateSegmentFile = false;
for (CarbonFile invalidFile : invalidIndexFiles) {
- compareTimestampsAndDelete(invalidFile, true, false);
+ if (compareTimestampsAndDelete(invalidFile, true, false)) {
+ updateSegmentFile = true;
+ }
}
+ return updateSegmentFile;
}
/**
@@ -642,8 +671,9 @@ public class CarbonUpdateUtil {
* @param forceDelete
* @param isUpdateStatusFile if true then the parsing of file name logic changes.
*/
- private static void compareTimestampsAndDelete(CarbonFile invalidFile,
- boolean forceDelete, boolean isUpdateStatusFile) {
+ private static boolean compareTimestampsAndDelete(
+ CarbonFile invalidFile,
+ boolean forceDelete, boolean isUpdateStatusFile) {
long fileTimestamp = 0L;
if (isUpdateStatusFile) {
@@ -661,12 +691,14 @@ public class CarbonUpdateUtil {
try {
LOGGER.info("deleting the invalid file : " + invalidFile.getName());
CarbonUtil.deleteFoldersAndFiles(invalidFile);
+ return true;
} catch (IOException e) {
LOGGER.error("error in clean up of compacted files." + e.getMessage());
} catch (InterruptedException e) {
LOGGER.error("error in clean up of compacted files." + e.getMessage());
}
}
+ return false;
}
public static boolean isBlockInvalid(SegmentStatus blockStatus) {
@@ -719,9 +751,9 @@ public class CarbonUpdateUtil {
*/
public static long getRowCount(
BlockMappingVO blockMappingVO,
- AbsoluteTableIdentifier absoluteTableIdentifier) {
+ CarbonTable carbonTable) {
SegmentUpdateStatusManager updateStatusManager =
- new SegmentUpdateStatusManager(absoluteTableIdentifier);
+ new SegmentUpdateStatusManager(carbonTable);
long rowCount = 0;
Map<String, Long> blockRowCountMap = blockMappingVO.getBlockRowCountMapping();
for (Map.Entry<String, Long> blockRowEntry : blockRowCountMap.entrySet()) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index a21873d..308fe30 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
import org.apache.carbondata.core.mutate.TupleIdEnum;
@@ -70,31 +71,24 @@ public class SegmentUpdateStatusManager {
private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
private boolean isPartitionTable;
- public SegmentUpdateStatusManager(AbsoluteTableIdentifier absoluteTableIdentifier,
+ public SegmentUpdateStatusManager(CarbonTable table,
LoadMetadataDetails[] segmentDetails) {
- this.identifier = absoluteTableIdentifier;
+ this.identifier = table.getAbsoluteTableIdentifier();
// current it is used only for read function scenarios, as file update always requires to work
// on latest file status.
this.segmentDetails = segmentDetails;
- if (segmentDetails.length > 0) {
- isPartitionTable = segmentDetails[0].getSegmentFile() != null;
- }
+ isPartitionTable = table.isHivePartitionTable();
updateDetails = readLoadMetadata();
populateMap();
}
- /**
- * @param identifier
- */
- public SegmentUpdateStatusManager(AbsoluteTableIdentifier identifier) {
- this.identifier = identifier;
+ public SegmentUpdateStatusManager(CarbonTable table) {
+ this.identifier = table.getAbsoluteTableIdentifier();
// current it is used only for read function scenarios, as file update always requires to work
// on latest file status.
segmentDetails = SegmentStatusManager.readLoadMetadata(
CarbonTablePath.getMetadataPath(identifier.getTablePath()));
- if (segmentDetails.length > 0) {
- isPartitionTable = segmentDetails[0].getSegmentFile() != null;
- }
+ isPartitionTable = table.isHivePartitionTable();
updateDetails = readLoadMetadata();
populateMap();
}
@@ -261,36 +255,30 @@ public class SegmentUpdateStatusManager {
* Returns all delta file paths of specified block
*/
private List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
- try {
- String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
- String completeBlockName = CarbonTablePath.addDataPartPrefix(
- CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
- + CarbonCommonConstants.FACT_FILE_EXT);
- String blockPath;
- if (isPartitionTable) {
- blockPath = identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
- + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
- .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
- } else {
- String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(
- identifier.getTablePath(), segment);
- blockPath =
- carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
- }
- CarbonFile file = FileFactory.getCarbonFile(blockPath, FileFactory.getFileType(blockPath));
- if (!file.exists()) {
- throw new Exception("Invalid tuple id " + tupleId);
- }
- String blockNameWithoutExtn = completeBlockName.substring(0, completeBlockName.indexOf('.'));
- //blockName without timestamp
- final String blockNameFromTuple =
- blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-"));
- return getDeltaFiles(file, blockNameFromTuple, extension, segment);
- } catch (Exception ex) {
- String errorMsg = "Invalid tuple id " + tupleId;
- LOG.error(errorMsg);
- throw new Exception(errorMsg);
+ String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
+ String completeBlockName = CarbonTablePath.addDataPartPrefix(
+ CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
+ + CarbonCommonConstants.FACT_FILE_EXT);
+ String blockPath;
+ if (isPartitionTable) {
+ blockPath = identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
+ .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
+ } else {
+ String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(
+ identifier.getTablePath(), segment);
+ blockPath =
+ carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
+ }
+ CarbonFile file = FileFactory.getCarbonFile(blockPath, FileFactory.getFileType(blockPath));
+ if (!file.exists()) {
+ throw new Exception("Invalid tuple id " + tupleId);
}
+ String blockNameWithoutExtn = completeBlockName.substring(0, completeBlockName.indexOf('.'));
+ //blockName without timestamp
+ final String blockNameFromTuple =
+ blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-"));
+ return getDeltaFiles(file, blockNameFromTuple, extension, segment);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index bc150e5..bcecdef 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.MergedBlockIndex;
@@ -40,10 +41,19 @@ import org.apache.hadoop.fs.Path;
public class CarbonIndexFileMergeWriter {
/**
+ * table handle
+ */
+ private CarbonTable table;
+
+ /**
* thrift writer object
*/
private ThriftWriter thriftWriter;
+ public CarbonIndexFileMergeWriter(CarbonTable table) {
+ this.table = table;
+ }
+
/**
* Merge all the carbonindex files of segment to a merged file
* @param tablePath
@@ -54,7 +64,7 @@ public class CarbonIndexFileMergeWriter {
* which do not store the blocklet info to current version
* @throws IOException
*/
- private SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+ private String mergeCarbonIndexFilesOfSegment(String segmentId,
String tablePath, List<String> indexFileNamesTobeAdded,
boolean readFileFooterFromCarbonDataFile) throws IOException {
Segment segment = Segment.getSegment(segmentId, tablePath);
@@ -70,17 +80,18 @@ public class CarbonIndexFileMergeWriter {
}
if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
if (sfs == null) {
- return mergeNormalSegment(indexFileNamesTobeAdded, readFileFooterFromCarbonDataFile,
- segmentPath, indexFiles);
+ return writeMergeIndexFileBasedOnSegmentFolder(
+ indexFileNamesTobeAdded, readFileFooterFromCarbonDataFile, segmentPath, indexFiles);
} else {
- return mergePartitionSegment(indexFileNamesTobeAdded, sfs, indexFiles);
+ return writeMergeIndexFileBasedOnSegmentFile(
+ segmentId, indexFileNamesTobeAdded, sfs, indexFiles);
}
}
return null;
}
- private SegmentIndexFIleMergeStatus mergeNormalSegment(List<String> indexFileNamesTobeAdded,
+ private String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded,
boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles)
throws IOException {
SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
@@ -100,7 +111,9 @@ public class CarbonIndexFileMergeWriter {
return null;
}
- private SegmentIndexFIleMergeStatus mergePartitionSegment(List<String> indexFileNamesTobeAdded,
+ private String writeMergeIndexFileBasedOnSegmentFile(
+ String segmentId,
+ List<String> indexFileNamesTobeAdded,
SegmentFileStore sfs, CarbonFile[] indexFiles) throws IOException {
SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
fileStore
@@ -133,11 +146,20 @@ public class CarbonIndexFileMergeWriter {
}
}
- List<String> filesTobeDeleted = new ArrayList<>();
+ String uniqueId = String.valueOf(System.currentTimeMillis());
+ String newSegmentFileName =
+ SegmentFileStore.genSegmentFileName(segmentId, String.valueOf(uniqueId))
+ + CarbonTablePath.SEGMENT_EXT;
+ String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
+ + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
+ SegmentFileStore.writeSegmentFile(sfs.getSegmentFile(), path);
+ SegmentFileStore.updateSegmentFile(table.getTablePath(), segmentId, newSegmentFileName);
+
for (CarbonFile file : indexFiles) {
- filesTobeDeleted.add(file.getAbsolutePath());
+ file.delete();
}
- return new SegmentIndexFIleMergeStatus(sfs.getSegmentFile(), filesTobeDeleted);
+
+ return uniqueId;
}
private String writeMergeIndexFile(List<String> indexFileNamesTobeAdded, String segmentPath,
@@ -173,7 +195,7 @@ public class CarbonIndexFileMergeWriter {
* @param indexFileNamesTobeAdded
* @throws IOException
*/
- public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+ public String mergeCarbonIndexFilesOfSegment(String segmentId,
String tablePath, List<String> indexFileNamesTobeAdded) throws IOException {
return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, indexFileNamesTobeAdded, false);
}
@@ -184,7 +206,7 @@ public class CarbonIndexFileMergeWriter {
* @param segmentId
* @throws IOException
*/
- public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+ public String mergeCarbonIndexFilesOfSegment(String segmentId,
String tablePath) throws IOException {
return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false);
}
@@ -196,7 +218,7 @@ public class CarbonIndexFileMergeWriter {
* @param readFileFooterFromCarbonDataFile
* @throws IOException
*/
- public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+ public String mergeCarbonIndexFilesOfSegment(String segmentId,
String tablePath, boolean readFileFooterFromCarbonDataFile) throws IOException {
return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null,
readFileFooterFromCarbonDataFile);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index c057129..c352a95 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -163,16 +163,13 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
UpdateVO invalidBlockVOForSegmentId = null;
Boolean isIUDTable = false;
- AbsoluteTableIdentifier absoluteTableIdentifier =
- getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
- SegmentUpdateStatusManager updateStatusManager =
- new SegmentUpdateStatusManager(absoluteTableIdentifier);
+ SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(carbonTable);
isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
// for each segment fetch blocks matching filter in Driver BTree
List<CarbonInputSplit> dataBlocksOfSegment =
- getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+ getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
validSegments, partitionInfo, oldPartitionIdList);
numBlocks = dataBlocksOfSegment.size();
for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 8d2318b..c757ba9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -319,7 +319,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
* get data blocks of given segment
*/
protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
- AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+ CarbonTable carbonTable, FilterResolverIntf resolver,
BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
List<Integer> oldPartitionIdList) throws IOException {
@@ -328,7 +328,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
// get tokens for all the required FileSystem for table path
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
- new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+ new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration());
boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
@@ -339,7 +339,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
List<ExtendedBlocklet> prunedBlocklets;
if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
DistributableDataMapFormat datamapDstr =
- new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper, segmentIds,
+ new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, segmentIds,
partitionsToPrune, BlockletDataMapFactory.class.getName());
prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
// Apply expression on the blocklets.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 4634b06..f573acf 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -117,7 +117,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
+ CarbonCommonConstants.FILE_SEPARATOR
+ loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp() + ".tmp";
// Merge all partition files into a single file.
- String segmentFileName = loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp();
+ String segmentFileName = SegmentFileStore.genSegmentFileName(
+ loadModel.getSegmentId(), String.valueOf(loadModel.getFactTimeStamp()));
SegmentFileStore.SegmentFile segmentFile = SegmentFileStore
.mergeSegmentFiles(readPath, segmentFileName,
CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath()));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index c94f777..beb51cb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -130,12 +130,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager
.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
- SegmentUpdateStatusManager updateStatusManager =
- new SegmentUpdateStatusManager(identifier, loadMetadataDetails);
CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
}
+ SegmentUpdateStatusManager updateStatusManager =
+ new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails);
List<Segment> invalidSegments = new ArrayList<>();
List<UpdateVO> invalidTimestampsList = new ArrayList<>();
List<Segment> streamSegments = null;
@@ -182,7 +182,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager
.getUpdateStatusDetails()) {
boolean refreshNeeded =
- DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+ DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
.isRefreshNeeded(segmentUpdateDetail.getSegmentName(), updateStatusManager);
if (refreshNeeded) {
toBeCleanedSegments.add(new Segment(segmentUpdateDetail.getSegmentName(), null));
@@ -190,7 +190,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
}
// Clean segments if refresh is needed
for (Segment segment : filteredSegmentToAccess) {
- if (DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+ if (DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
.isRefreshNeeded(segment.getSegmentNo())) {
toBeCleanedSegments.add(segment);
}
@@ -370,12 +370,9 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
* @param targetSegment
* @param oldPartitionIdList get old partitionId before partitionInfo was changed
* @return
- * @throws IOException
*/
public List<InputSplit> getSplitsOfOneSegment(JobContext job, String targetSegment,
- List<Integer> oldPartitionIdList, PartitionInfo partitionInfo)
- throws IOException {
- AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+ List<Integer> oldPartitionIdList, PartitionInfo partitionInfo) {
List<Segment> invalidSegments = new ArrayList<>();
List<UpdateVO> invalidTimestampsList = new ArrayList<>();
@@ -383,7 +380,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
segmentList.add(new Segment(targetSegment, null));
setSegmentsToAccess(job.getConfiguration(), segmentList);
try {
-
// process and resolve the expression
Expression filter = getFilterPredicates(job.getConfiguration());
CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
@@ -414,7 +410,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter, tableProvider);
// do block filtering and get split
List<InputSplit> splits = getSplits(job, filterInterface, segmentList, matchedPartitions,
- partitionInfo, oldPartitionIdList, new SegmentUpdateStatusManager(identifier));
+ partitionInfo, oldPartitionIdList, new SegmentUpdateStatusManager(carbonTable));
// pass the invalid segment to task side in order to remove index entry in task side
if (invalidSegments.size() > 0) {
for (InputSplit split : splits) {
@@ -473,14 +469,11 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
UpdateVO invalidBlockVOForSegmentId = null;
Boolean isIUDTable = false;
- AbsoluteTableIdentifier absoluteTableIdentifier =
- getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
-
isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
// for each segment fetch blocks matching filter in Driver BTree
List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
- getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+ getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
validSegments, partitionInfo, oldPartitionIdList);
numBlocks = dataBlocksOfSegment.size();
for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
@@ -527,18 +520,15 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
/**
* Get the row count of the Block and mapping of segment and Block count.
- *
- * @param identifier
- * @return
- * @throws IOException
*/
- public BlockMappingVO getBlockRowCount(Job job, AbsoluteTableIdentifier identifier,
+ public BlockMappingVO getBlockRowCount(Job job, CarbonTable table,
List<PartitionSpec> partitions) throws IOException {
- TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(identifier);
+ AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
+ TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager
.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(
- identifier, loadMetadataDetails);
+ table, loadMetadataDetails);
SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
new SegmentStatusManager(identifier).getValidAndInvalidSegments(loadMetadataDetails);
Map<String, Long> blockRowCountMapping = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index 60c88dc..3f19a3f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -29,7 +29,7 @@ import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
@@ -48,7 +48,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
private static final String FILTER_EXP = "mapreduce.input.distributed.datamap.filter";
- private AbsoluteTableIdentifier identifier;
+ private CarbonTable table;
private DataMapExprWrapper dataMapExprWrapper;
@@ -58,10 +58,10 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
private List<PartitionSpec> partitions;
- DistributableDataMapFormat(AbsoluteTableIdentifier identifier,
+ DistributableDataMapFormat(CarbonTable table,
DataMapExprWrapper dataMapExprWrapper, List<Segment> validSegments,
List<PartitionSpec> partitions, String className) {
- this.identifier = identifier;
+ this.table = table;
this.dataMapExprWrapper = dataMapExprWrapper;
this.validSegments = validSegments;
this.className = className;
@@ -106,7 +106,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
throws IOException, InterruptedException {
DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit;
TableDataMap dataMap = DataMapStoreManager.getInstance()
- .getDataMap(identifier, distributable.getDistributable().getDataMapSchema());
+ .getDataMap(table, distributable.getDistributable().getDataMapSchema());
List<ExtendedBlocklet> blocklets = dataMap.prune(
distributable.getDistributable(),
dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index ff421e1..fd97996 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -52,7 +52,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_merge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_merge")
- new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
+ new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
assert(getIndexFileCount("default", "carbon_automation_merge", "0") == 0)
checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""),
sql("""Select count(*) from carbon_automation_merge"""))
@@ -69,8 +69,8 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2)
val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
- new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
- new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
+ new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
+ new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 0)
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 0)
checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)
@@ -91,7 +91,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "2") == 2)
sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'minor'").collect()
val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
- new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
+ new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 0)
assert(getMergedIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 1)
checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index aace3ea..c7912cf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -62,7 +62,7 @@ class CarbonIndexFileMergeTestCase
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='100')")
val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge")
- new CarbonIndexFileMergeWriter()
+ new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
assert(getIndexFileCount("default_indexmerge", "0") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""),
@@ -85,9 +85,9 @@ class CarbonIndexFileMergeTestCase
assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
- new CarbonIndexFileMergeWriter()
+ new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
- new CarbonIndexFileMergeWriter()
+ new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
@@ -110,9 +110,9 @@ class CarbonIndexFileMergeTestCase
assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
- new CarbonIndexFileMergeWriter()
+ new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
- new CarbonIndexFileMergeWriter()
+ new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
@@ -139,7 +139,7 @@ class CarbonIndexFileMergeTestCase
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
- new CarbonIndexFileMergeWriter()
+ new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
@@ -168,7 +168,7 @@ class CarbonIndexFileMergeTestCase
assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
- new CarbonIndexFileMergeWriter()
+ new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 510903a..1fbddb0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -16,11 +16,15 @@
*/
package org.apache.carbondata.spark.testsuite.iud
+import java.io.File
+
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{CarbonEnv, Row, SaveMode}
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
@@ -178,6 +182,8 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("clean files for table select_after_clean")
sql("delete from select_after_clean where name='def'")
sql("clean files for table select_after_clean")
+ assertResult(false)(new File(
+ CarbonTablePath.getSegmentPath(s"$storeLocation/iud_db.db/select_after_clean", "0")).exists())
checkAnswer(sql("""select * from select_after_clean"""),
Seq(Row(1, "abc"), Row(3, "uhj"), Row(4, "frg")))
}
@@ -198,7 +204,9 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
val files = FileFactory.getCarbonFile(metaPath)
val result = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.getClass
if(result.getCanonicalName.contains("CarbonFileMetastore")) {
- assert(files.listFiles().length == 2)
+ assert(files.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = !file.isDirectory
+ }).length == 2)
}
else
assert(files.listFiles().length == 1)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 98c9a16..ec39f66 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -398,12 +398,12 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("""drop table if exists iud.show_segment""").show
sql("""create table iud.show_segment (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.show_segment""")
- val before_update = sql("""show segments for table iud.show_segment""").toDF()
+ val before_update = sql("""show segments for table iud.show_segment""").collect()
sql("""update iud.show_segment d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
- val after_update = sql("""show segments for table iud.show_segment""").toDF()
+ val after_update = sql("""show segments for table iud.show_segment""")
checkAnswer(
- before_update,
- after_update
+ after_update,
+ before_update
)
sql("""drop table if exists iud.show_segment""").show
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index c9237d1..b4c5e4d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -274,8 +274,9 @@ class CarbonMergerRDD[K, V](
val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
)
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
- absoluteTableIdentifier)
+ carbonTable)
val jobConf: JobConf = new JobConf(new Configuration)
SparkHadoopUtil.get.addCredentials(jobConf)
val job: Job = new Job(jobConf)
@@ -383,7 +384,6 @@ class CarbonMergerRDD[K, V](
dataFileFooter.getSegmentInfo.getColumnCardinality)
}
val updatedMaxSegmentColumnList = new util.ArrayList[ColumnSchema]()
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
// update cardinality and column schema list according to master schema
val cardinality = CarbonCompactionUtil
.updateColumnSchemaAndGetCardinality(columnToCardinalityMap,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 48ebdb4..37cd12e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -28,11 +28,13 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.execution.command.AlterPartitionModel
+import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.CarbonInputSplit
@@ -203,5 +205,22 @@ object PartitionUtils {
files.add(file)
}
CarbonUtil.deleteFiles(files.asScala.toArray)
+ if (!files.isEmpty) {
+ val carbonTable = alterPartitionModel.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val file = SegmentFileStore.writeSegmentFile(
+ identifier.getTablePath,
+ alterPartitionModel.segmentId,
+ alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString)
+ val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, file)).asJava
+ if (!CarbonUpdateUtil.updateTableMetadataStatus(
+ new util.HashSet[Segment](Seq(new Segment(alterPartitionModel.segmentId, null)).asJava),
+ carbonTable,
+ alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString,
+ true,
+ new util.ArrayList[Segment](0),
+ new util.ArrayList[Segment](segmentFiles))) {
+ throw new IOException("Data update failed due to failure in table status updation.")
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0d00023..bbe7be0 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -51,7 +51,7 @@ import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore}
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -291,7 +291,6 @@ object CarbonDataRDDFactory {
dataFrame: Option[DataFrame] = None,
updateModel: Option[UpdateTableModel] = None,
operationContext: OperationContext): Unit = {
- val storePath: String = carbonLoadModel.getTablePath
LOGGER.audit(s"Data load request has been received for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
// Check if any load need to be deleted before loading new data
@@ -428,6 +427,13 @@ object CarbonDataRDDFactory {
segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null))
}
}
+ val segmentFiles = segmentDetails.asScala.map{seg =>
+ val file = SegmentFileStore.writeSegmentFile(
+ carbonTable.getTablePath,
+ seg.getSegmentNo,
+ updateModel.get.updatedTimeStamp.toString)
+ new Segment(seg.getSegmentNo, file)
+ }.filter(_.getSegmentFileName != null).asJava
// this means that the update doesnt have any records to update so no need to do table
// status file updation.
@@ -441,7 +447,8 @@ object CarbonDataRDDFactory {
carbonTable,
updateModel.get.updatedTimeStamp + "",
true,
- new util.ArrayList[Segment](0))) {
+ new util.ArrayList[Segment](0),
+ new util.ArrayList[Segment](segmentFiles))) {
LOGGER.audit("Data update is successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
} else {
@@ -499,6 +506,11 @@ object CarbonDataRDDFactory {
}
writeDictionary(carbonLoadModel, result, writeAll = false)
+
+ val segmentFileName =
+ SegmentFileStore.writeSegmentFile(carbonTable.getTablePath, carbonLoadModel.getSegmentId,
+ String.valueOf(carbonLoadModel.getFactTimeStamp))
+
operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
carbonLoadModel.getSegmentId)
val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
@@ -513,6 +525,7 @@ object CarbonDataRDDFactory {
loadStatus,
newEntryLoadStatus,
overwriteTable,
+ segmentFileName,
uniqueTableStatusId)
val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
new LoadTablePostStatusUpdateEvent(carbonLoadModel)
@@ -788,6 +801,7 @@ object CarbonDataRDDFactory {
loadStatus: SegmentStatus,
newEntryLoadStatus: SegmentStatus,
overwriteTable: Boolean,
+ segmentFileName: String,
uuid: String = ""): Boolean = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val metadataDetails = if (status != null && status.size > 0 && status(0) != null) {
@@ -795,11 +809,12 @@ object CarbonDataRDDFactory {
} else {
new LoadMetadataDetails
}
+ metadataDetails.setSegmentFile(segmentFileName)
CarbonLoaderUtil.populateNewLoadMetaEntry(
- metadataDetails,
- newEntryLoadStatus,
- carbonLoadModel.getFactTimeStamp,
- true)
+ metadataDetails,
+ newEntryLoadStatus,
+ carbonLoadModel.getFactTimeStamp,
+ true)
CarbonLoaderUtil
.addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable)
val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,