You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/27 09:10:47 UTC
carbondata git commit: [CARBONDATA-1586][Streaming] Support handoff
from row format to columnar format
Repository: carbondata
Updated Branches:
refs/heads/master 373342d0e -> f8e0585d2
[CARBONDATA-1586][Streaming] Support handoff from row format to columnar format
1.configuration
carbon.handoff.size =>carbon.streaming.segment.max.size
2.SQL command
alter table <table_name> compact 'streaming'
3.refactory CompactionType
This closes #1566
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f8e0585d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f8e0585d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f8e0585d
Branch: refs/heads/master
Commit: f8e0585d2c31b90e70ff2574e47c6434d61fcced
Parents: 373342d
Author: QiangCai <qi...@qq.com>
Authored: Sat Nov 25 01:54:41 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Nov 27 17:10:34 2017 +0800
----------------------------------------------------------------------
.../apache/carbondata/core/locks/LockUsage.java | 1 +
.../statusmanager/SegmentStatusManager.java | 6 +-
.../hadoop/api/CarbonTableInputFormat.java | 2 +-
.../streaming/CarbonStreamOutputFormat.java | 2 +-
.../streaming/CarbonStreamRecordReader.java | 69 +++-
.../org/apache/carbondata/spark/KeyVal.scala | 9 +
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 19 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 3 +-
.../apache/carbondata/spark/rdd/Compactor.scala | 4 +-
.../spark/rdd/DataManagementFunc.scala | 8 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 45 +-
.../AlterTableCompactionCommand.scala | 60 +--
.../command/mutation/HorizontalCompaction.scala | 8 +-
.../sql/execution/strategy/DDLStrategy.scala | 16 +-
.../TestStreamingTableOperation.scala | 49 ++-
.../merger/AbstractResultProcessor.java | 2 +-
.../processing/merger/CarbonCompactionUtil.java | 10 +-
.../processing/merger/CarbonDataMergerUtil.java | 16 +-
.../merger/CompactionResultSortProcessor.java | 14 +-
.../processing/merger/CompactionType.java | 11 +-
.../carbondata/streaming/StreamHandoffRDD.scala | 408 +++++++++++++++++++
21 files changed, 654 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
index 434129c..ca9a721 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
@@ -25,6 +25,7 @@ public class LockUsage {
public static final String LOCK = ".lock";
public static final String METADATA_LOCK = "meta.lock";
public static final String COMPACTION_LOCK = "compaction.lock";
+ public static final String HANDOFF_LOCK = "handoff.lock";
public static final String SYSTEMLEVEL_COMPACTION_LOCK = "system_level_compaction.lock";
public static final String ALTER_PARTITION_LOCK = "alter_partition.lock";
public static final String TABLE_STATUS_LOCK = "tablestatus.lock";
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 2409219..aea9110 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -180,15 +180,15 @@ public class SegmentStatusManager {
/**
* This method reads the load metadata file
*
- * @param tableFolderPath
+ * @param metadataFolderPath
* @return
*/
- public static LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) {
+ public static LoadMetadataDetails[] readLoadMetadata(String metadataFolderPath) {
Gson gsonObjectToRead = new Gson();
DataInputStream dataInputStream = null;
BufferedReader buffReader = null;
InputStreamReader inStream = null;
- String metadataFileName = tableFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ CarbonCommonConstants.LOADMETADATA_FILENAME;
LoadMetadataDetails[] listOfLoadFolderDetailsArray;
AtomicFileOperations fileOperation =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 06a4006..e4e9ceb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -462,7 +462,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
/**
* use file list in .carbonindex file to get the split of streaming.
*/
- private List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
+ public List<InputSplit> getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier identifier,
List<String> streamSegments) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
if (streamSegments != null && !streamSegments.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
index 47b43c4..8497d2b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
@@ -57,7 +57,7 @@ public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> {
* if the byte size of streaming segment reach this value,
* the system will create a new stream segment
*/
- public static final String HANDOFF_SIZE = "carbon.handoff.size";
+ public static final String HANDOFF_SIZE = "carbon.streaming.segment.max.size";
/**
* the min handoff size of streaming segment, the unit is byte
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
index f7169a0..0086a3c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -134,6 +134,9 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
// empty project, null filter
private boolean skipScanData;
+ // return raw row for handoff
+ private boolean useRawRow = false;
+
@Override public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// input
@@ -256,6 +259,10 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
return header.getSync_marker();
}
+ public void setUseRawRow(boolean useRawRow) {
+ this.useRawRow = useRawRow;
+ }
+
private void initializeAtFirstRow() throws IOException {
filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
filterRow = new RowImpl();
@@ -342,7 +349,12 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
input.nextRow();
scanMore = false;
} else {
- readRowFromStream();
+ if (useRawRow) {
+ // read raw row for streaming handoff which does not require decode raw row
+ readRawRowFromStream();
+ } else {
+ readRowFromStream();
+ }
if (null != filter) {
scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
} else {
@@ -605,6 +617,61 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
}
}
+ private void readRawRowFromStream() {
+ input.nextRow();
+ short nullLen = input.readShort();
+ BitSet nullBitSet = allNonNull;
+ if (nullLen > 0) {
+ nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+ }
+ int colCount = 0;
+ // primitive type dimension
+ for (; colCount < isNoDictColumn.length; colCount++) {
+ if (nullBitSet.get(colCount)) {
+ outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+ } else {
+ if (isNoDictColumn[colCount]) {
+ int v = input.readShort();
+ outputValues[colCount] = input.readBytes(v);
+ } else {
+ outputValues[colCount] = input.readInt();
+ }
+ }
+ }
+ // complex type dimension
+ for (; colCount < dimensionCount; colCount++) {
+ if (nullBitSet.get(colCount)) {
+ outputValues[colCount] = null;
+ } else {
+ short v = input.readShort();
+ outputValues[colCount] = input.readBytes(v);
+ }
+ }
+ // measure
+ DataType dataType;
+ for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+ if (nullBitSet.get(colCount)) {
+ outputValues[colCount] = null;
+ } else {
+ dataType = measureDataTypes[msrCount];
+ if (dataType == DataTypes.BOOLEAN) {
+ outputValues[colCount] = input.readBoolean();
+ } else if (dataType == DataTypes.SHORT) {
+ outputValues[colCount] = input.readShort();
+ } else if (dataType == DataTypes.INT) {
+ outputValues[colCount] = input.readInt();
+ } else if (dataType == DataTypes.LONG) {
+ outputValues[colCount] = input.readLong();
+ } else if (dataType == DataTypes.DOUBLE) {
+ outputValues[colCount] = input.readDouble();
+ } else if (DataTypes.isDecimal(dataType)) {
+ int len = input.readShort();
+ outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+ }
+ }
+ }
+ }
+
private void putRowToColumnBatch(int rowId) {
for (int i = 0; i < projection.length; i++) {
Object value = outputValues[i];
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index bbf242e..450fed0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -107,6 +107,15 @@ class MergeResultImpl extends MergeResult[String, Boolean] {
override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value)
}
+trait HandoffResult[K, V] extends Serializable {
+ def getKey(key: String, value: Boolean): (K, V)
+
+}
+
+class HandoffResultImpl extends HandoffResult[String, Boolean] {
+ override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value)
+}
+
trait AlterPartitionResult[K, V] extends Serializable {
def getKey(key: String, value: Boolean): (K, V)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index db2037c..997838c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -116,7 +116,7 @@ class CarbonMergerRDD[K, V](
// During UPDATE DELTA COMPACTION case all the blocks received in compute belongs to
// one segment, so max cardinality will be calculated from first block of segment
- if(carbonMergerMapping.campactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+ if (CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.campactionType) {
var dataFileFooter: DataFileFooter = null
try {
// As the tableBlockInfoList is sorted take the ColCardinality from the last
@@ -137,15 +137,14 @@ class CarbonMergerRDD[K, V](
carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
.toList
}
- mergeNumber = if (carbonMergerMapping.campactionType ==
- CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+ mergeNumber = if (CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.campactionType) {
tableBlockInfoList.get(0).getSegmentId
- }
- else {
- mergedLoadName
- .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
- CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
- )
+ } else {
+ mergedLoadName.substring(
+ mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
+ CarbonCommonConstants.LOAD_FOLDER.length(),
+ mergedLoadName.length()
+ )
}
carbonLoadModel.setSegmentId(mergeNumber)
val tempLocationKey = CarbonDataProcessorUtil
@@ -277,7 +276,7 @@ class CarbonMergerRDD[K, V](
var defaultParallelism = sparkContext.defaultParallelism
val result = new java.util.ArrayList[Partition](defaultParallelism)
var taskPartitionNo = 0
- var carbonPartitionId = 0;
+ var carbonPartitionId = 0
var noOfBlocks = 0
val taskInfoList = new java.util.ArrayList[Distributable]
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index cc40f13..7eaf95a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -308,10 +308,11 @@ class CarbonScanRDD(
val value = reader.getCurrentValue
value
}
+
private def close() {
TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId)
inputMetricsStats.updateAndClose()
- }
+ }
}
} else {
new Iterator[Any] {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 911dc9e..ba634b7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -87,7 +87,7 @@ object Compactor {
}
val mergeStatus =
- if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+ if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
new CarbonIUDMergerRDD(
sc.sparkContext,
new MergeResultImpl(),
@@ -124,7 +124,7 @@ object Compactor {
val endTime = System.nanoTime()
logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
val statusFileUpdation =
- ((compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) &&
+ ((compactionType == CompactionType.IUD_UPDDEL_DELTA) &&
CarbonDataMergerUtil
.updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
carbonTable.getMetaDataFilepath,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index 762dcee..26a66f6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.execution.command.{CompactionCallableModel, Compacti
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -62,7 +61,7 @@ object DataManagementFunc {
compactionModel.compactionType
)
while (loadsToMerge.size() > 1 ||
- (compactionModel.compactionType.name().equals("IUD_UPDDEL_DELTA_COMPACTION") &&
+ (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
loadsToMerge.size() > 0)) {
val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
deletePartialLoadsInCompaction(carbonLoadModel)
@@ -97,13 +96,13 @@ object DataManagementFunc {
// in case of major compaction we will scan only once and come out as it will keep
// on doing major for the new loads also.
// excluding the newly added segments.
- if (compactionModel.compactionType == CompactionType.MAJOR_COMPACTION) {
+ if (CompactionType.MAJOR == compactionModel.compactionType) {
segList = CarbonDataMergerUtil
.filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
}
- if (compactionModel.compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+ if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType) {
loadsToMerge.clear()
} else if (segList.size > 0) {
loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
@@ -223,5 +222,4 @@ object DataManagementFunc {
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 284587d..f384d28 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -83,29 +83,34 @@ object CarbonDataRDDFactory {
carbonTable: CarbonTable,
compactionModel: CompactionModel): Unit = {
// taking system level lock at the mdt file location
- var configuredMdtPath = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
- CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT).trim
+ var configuredMdtPath = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
+ CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT).trim
+
configuredMdtPath = CarbonUtil.checkAndAppendFileSystemURIScheme(configuredMdtPath)
- val lock = CarbonLockFactory
- .getCarbonLockObj(configuredMdtPath + CarbonCommonConstants.FILE_SEPARATOR +
- CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
- LockUsage.SYSTEMLEVEL_COMPACTION_LOCK)
+ val lock = CarbonLockFactory.getCarbonLockObj(
+ configuredMdtPath + CarbonCommonConstants.FILE_SEPARATOR +
+ CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
+ LockUsage.SYSTEMLEVEL_COMPACTION_LOCK)
+
if (lock.lockWithRetries()) {
LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
s".${ carbonLoadModel.getTableName }")
try {
- if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+ if (compactionType == CompactionType.SEGMENT_INDEX) {
// Just launch job to merge index and return
- CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+ CommonUtil.mergeIndexFiles(
+ sqlContext.sparkContext,
CarbonDataMergerUtil.getValidSegmentList(
carbonTable.getAbsoluteTableIdentifier).asScala,
carbonLoadModel.getTablePath,
- carbonTable, true)
+ carbonTable,
+ true)
lock.unlock()
return
}
- startCompactionThreads(sqlContext,
+ startCompactionThreads(
+ sqlContext,
carbonLoadModel,
storeLocation,
compactionModel,
@@ -148,7 +153,7 @@ object CarbonDataRDDFactory {
compactionLock: ICarbonLock): Unit = {
val executor: ExecutorService = Executors.newFixedThreadPool(1)
// update the updated table status.
- if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+ if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
// update the updated table status. For the case of Update Delta Compaction the Metadata
// is filled in LoadModel, no need to refresh.
CommonUtil.readLoadMetadataDetails(carbonLoadModel)
@@ -197,8 +202,7 @@ object CarbonDataRDDFactory {
val newCarbonLoadModel = prepareCarbonLoadModel(table)
- val compactionSize = CarbonDataMergerUtil
- .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+ val compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR)
val newcompactionModel = CompactionModel(compactionSize,
compactionType,
@@ -231,11 +235,10 @@ object CarbonDataRDDFactory {
}
}
// ********* check again for all the tables.
- tableForCompaction = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession)
- .carbonMetastore.listAllTables(sqlContext.sparkSession).toArray,
- skipCompactionTables.asJava
- )
+ tableForCompaction = CarbonCompactionUtil.getNextTableToCompact(
+ CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
+ .listAllTables(sqlContext.sparkSession).toArray,
+ skipCompactionTables.asJava)
}
}
// giving the user his error for telling in the beeline if his triggered table
@@ -695,7 +698,7 @@ object CarbonDataRDDFactory {
val compactionSize = 0
val isCompactionTriggerByDDl = false
val compactionModel = CompactionModel(compactionSize,
- CompactionType.MINOR_COMPACTION,
+ CompactionType.MINOR,
carbonTable,
isCompactionTriggerByDDl
)
@@ -718,7 +721,7 @@ object CarbonDataRDDFactory {
handleCompactionForSystemLocking(sqlContext,
carbonLoadModel,
storeLocation,
- CompactionType.MINOR_COMPACTION,
+ CompactionType.MINOR,
carbonTable,
compactionModel
)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 2cd771c..cc398b5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, Car
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.streaming.StreamHandoffRDD
/**
* Command for the compaction in alter table command
@@ -82,17 +83,16 @@ case class AlterTableCompactionCommand(
carbonLoadModel.setDatabaseName(table.getDatabaseName)
carbonLoadModel.setTablePath(table.getTablePath)
- var storeLocation = CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
- System.getProperty("java.io.tmpdir")
- )
+ var storeLocation = CarbonProperties.getInstance.getProperty(
+ CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
+ System.getProperty("java.io.tmpdir"))
storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
try {
- alterTableForCompaction(sparkSession.sqlContext,
- alterTableModel,
- carbonLoadModel,
- storeLocation
- )
+ alterTableForCompaction(
+ sparkSession.sqlContext,
+ alterTableModel,
+ carbonLoadModel,
+ storeLocation)
} catch {
case e: Exception =>
if (null != e.getMessage) {
@@ -110,26 +110,16 @@ case class AlterTableCompactionCommand(
alterTableModel: AlterTableModel,
carbonLoadModel: CarbonLoadModel,
storeLocation: String): Unit = {
- val LOGGER: LogService =
- LogServiceFactory.getLogService(this.getClass.getName)
- var compactionSize: Long = 0
- var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
- if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
- compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
- compactionType = CompactionType.MAJOR_COMPACTION
- } else if (alterTableModel.compactionType.equalsIgnoreCase(
- CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString)) {
- compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
+ val compactionSize: Long = CarbonDataMergerUtil.getCompactionSize(compactionType)
+ if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
if (alterTableModel.segmentUpdateStatusManager.isDefined) {
carbonLoadModel.setSegmentUpdateStatusManager(
alterTableModel.segmentUpdateStatusManager.get)
carbonLoadModel.setLoadMetadataDetails(
alterTableModel.segmentUpdateStatusManager.get.getLoadMetadataDetails.toList.asJava)
}
- } else if (alterTableModel.compactionType.equalsIgnoreCase("segment_index")) {
- compactionType = CompactionType.SEGMENT_INDEX_COMPACTION
- } else {
- compactionType = CompactionType.MINOR_COMPACTION
}
LOGGER.audit(s"Compaction request received for table " +
@@ -139,6 +129,15 @@ case class AlterTableCompactionCommand(
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
+
+ if (compactionType == CompactionType.STREAMING) {
+ StreamHandoffRDD.startStreamingHandoffThread(
+ carbonLoadModel,
+ sqlContext,
+ storeLocation)
+ return
+ }
+
// reading the start time of data load.
val loadStartTime : Long =
if (alterTableModel.factTimeStamp.isEmpty) {
@@ -166,7 +165,8 @@ case class AlterTableCompactionCommand(
// so that this will be taken up by the compaction process which is executing.
if (!isConcurrentCompactionAllowed) {
LOGGER.info("System level compaction lock is enabled.")
- CarbonDataRDDFactory.handleCompactionForSystemLocking(sqlContext,
+ CarbonDataRDDFactory.handleCompactionForSystemLocking(
+ sqlContext,
carbonLoadModel,
storeLocation,
compactionType,
@@ -175,16 +175,15 @@ case class AlterTableCompactionCommand(
)
} else {
// normal flow of compaction
- val lock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
- LockUsage.COMPACTION_LOCK
- )
+ val lock = CarbonLockFactory.getCarbonLockObj(
+ carbonTable.getAbsoluteTableIdentifier,
+ LockUsage.COMPACTION_LOCK)
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the compaction lock for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
- if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+ if (compactionType == CompactionType.SEGMENT_INDEX) {
// Just launch job to merge index and return
CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
CarbonDataMergerUtil.getValidSegmentList(
@@ -194,7 +193,8 @@ case class AlterTableCompactionCommand(
lock.unlock()
return
}
- CarbonDataRDDFactory.startCompactionThreads(sqlContext,
+ CarbonDataRDDFactory.startCompactionThreads(
+ sqlContext,
carbonLoadModel,
storeLocation,
compactionModel,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 6762489..a875ad6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -53,7 +53,7 @@ object HorizontalCompaction {
return
}
- var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+ var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA
val carbonTable = carbonRelation.carbonTable
val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val updateTimeStamp = System.currentTimeMillis()
@@ -78,7 +78,7 @@ object HorizontalCompaction {
if (isUpdateOperation) {
// This is only update operation, perform only update compaction.
- compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+ compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA
performUpdateDeltaCompaction(sparkSession,
compactionTypeIUD,
carbonTable,
@@ -89,7 +89,7 @@ object HorizontalCompaction {
}
// After Update Compaction perform delete compaction
- compactionTypeIUD = CompactionType.IUD_DELETE_DELTA_COMPACTION
+ compactionTypeIUD = CompactionType.IUD_DELETE_DELTA
segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
if (segLists == null || segLists.size() == 0) {
return
@@ -135,7 +135,7 @@ object HorizontalCompaction {
val alterTableModel = AlterTableModel(Option(carbonTable.getDatabaseName),
carbonTable.getTableName,
Some(segmentUpdateStatusManager),
- CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
+ CompactionType.IUD_UPDDEL_DELTA.toString,
Some(factTimeStamp),
"")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index bf037d1..2418d2a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsComm
import org.apache.spark.sql.execution.command.schema._
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
+import org.apache.carbondata.processing.merger.CompactionType
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
@@ -83,9 +84,18 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
.tableExists(TableIdentifier(altertablemodel.tableName,
altertablemodel.dbName))(sparkSession)
if (isCarbonTable) {
- if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
- altertablemodel.compactionType.equalsIgnoreCase("major") ||
- altertablemodel.compactionType.equalsIgnoreCase("segment_index")) {
+ var compactionType: CompactionType = null
+ try {
+ compactionType = CompactionType.valueOf(altertablemodel.compactionType.toUpperCase)
+ } catch {
+ case _ =>
+ throw new MalformedCarbonCommandException(
+ "Unsupported alter operation on carbon table")
+ }
+ if (CompactionType.MINOR == compactionType ||
+ CompactionType.MAJOR == compactionType ||
+ CompactionType.SEGMENT_INDEX == compactionType ||
+ CompactionType.STREAMING == compactionType) {
ExecutedCommandExec(alterTable) :: Nil
} else {
throw new MalformedCarbonCommandException(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index d9591c4..3b9b2c3 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -109,6 +109,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
// 12. reject alter streaming properties
createTable(tableName = "stream_table_alter", streaming = true, withBatchLoad = false)
+
+ // 13. handoff streaming segment
+ createTable(tableName = "stream_table_handoff", streaming = true, withBatchLoad = false)
}
test("validate streaming property") {
@@ -189,6 +192,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists streaming.stream_table_tolerant")
sql("drop table if exists streaming.stream_table_delete")
sql("drop table if exists streaming.stream_table_alter")
+ sql("drop table if exists streaming.stream_table_handoff")
}
// normal table not support streaming ingest
@@ -251,7 +255,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
// streaming ingest 10 rows
generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1,
- identifier )
+ identifier)
thread.start()
Thread.sleep(2000)
generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir)
@@ -327,7 +331,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
)
checkAnswer(
sql("select count(*) from streaming.stream_table_10s"),
- Seq(Row(5 + 10000*5)))
+ Seq(Row(5 + 10000 * 5)))
}
// batch loading on streaming table
@@ -344,13 +348,13 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
)
checkAnswer(
sql("select count(*) from streaming.stream_table_batch"),
- Seq(Row(100*5)))
+ Seq(Row(100 * 5)))
executeBatchLoad("stream_table_batch")
checkAnswer(
sql("select count(*) from streaming.stream_table_batch"),
- Seq(Row(100*5 + 5)))
+ Seq(Row(100 * 5 + 5)))
}
// detail query on batch and stream segment
@@ -666,6 +670,43 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
}
}
+ test("handoff 'streaming finish' segment to columnar segment") {
+ executeStreamingIngest(
+ tableName = "stream_table_handoff",
+ batchNums = 6,
+ rowNumsEachBatch = 10000,
+ intervalOfSource = 5,
+ intervalOfIngest = 10,
+ continueSeconds = 40,
+ generateBadRecords = false,
+ badRecordAction = "force",
+ handoffSize = 1024L * 200
+ )
+ val segments = sql("show segments for table streaming.stream_table_handoff").collect()
+ assertResult(3)(segments.length)
+ assertResult("Streaming")(segments(0).getString(1))
+ assertResult("Streaming Finish")(segments(1).getString(1))
+ assertResult("Streaming Finish")(segments(2).getString(1))
+ checkAnswer(
+ sql("select count(*) from streaming.stream_table_handoff"),
+ Seq(Row(6 * 10000))
+ )
+
+ sql("alter table streaming.stream_table_handoff compact 'streaming'")
+ Thread.sleep(10000)
+ val newSegments = sql("show segments for table streaming.stream_table_handoff").collect()
+ assertResult(5)(newSegments.length)
+ assertResult("Success")(newSegments(0).getString(1))
+ assertResult("Success")(newSegments(1).getString(1))
+ assertResult("Streaming")(newSegments(2).getString(1))
+ assertResult("Compacted")(newSegments(3).getString(1))
+ assertResult("Compacted")(newSegments(4).getString(1))
+ checkAnswer(
+ sql("select count(*) from streaming.stream_table_handoff"),
+ Seq(Row(6 * 10000))
+ )
+ }
+
def createWriteSocketThread(
serverSocket: ServerSocket,
writeNums: Int,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index 6da50a9..c32aa51 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -45,7 +45,7 @@ public abstract class AbstractResultProcessor {
CompactionType compactionType, CarbonTable carbonTable,
CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
CarbonDataFileAttributes carbonDataFileAttributes;
- if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+ if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(),
carbonTable.getCarbonTableIdentifier()));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index c60bb24..d796262 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -171,17 +171,17 @@ public class CarbonCompactionUtil {
try {
if (FileFactory.isFileExist(minorCompactionStatusFile,
FileFactory.getFileType(minorCompactionStatusFile))) {
- return CompactionType.MINOR_COMPACTION;
+ return CompactionType.MINOR;
}
if (FileFactory.isFileExist(majorCompactionStatusFile,
FileFactory.getFileType(majorCompactionStatusFile))) {
- return CompactionType.MAJOR_COMPACTION;
+ return CompactionType.MAJOR;
}
} catch (IOException e) {
LOGGER.error("Exception in determining the compaction request file " + e.getMessage());
}
- return CompactionType.MINOR_COMPACTION;
+ return CompactionType.MINOR;
}
/**
@@ -193,7 +193,7 @@ public class CarbonCompactionUtil {
public static boolean deleteCompactionRequiredFile(String metaFolderPath,
CompactionType compactionType) {
String compactionRequiredFile;
- if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+ if (compactionType.equals(CompactionType.MINOR)) {
compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ CarbonCommonConstants.minorCompactionRequiredFile;
} else {
@@ -229,7 +229,7 @@ public class CarbonCompactionUtil {
public static boolean createCompactionRequiredFile(String metaFolderPath,
CompactionType compactionType) {
String statusFile;
- if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+ if (CompactionType.MINOR == compactionType) {
statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+ CarbonCommonConstants.minorCompactionRequiredFile;
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 15ee4fb..d6f2d9a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -332,7 +332,7 @@ public final class CarbonDataMergerUtil {
loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
loadMetadataDetails.setPartitionCount("0");
// if this is a major compaction then set the segment as major compaction.
- if (compactionType == CompactionType.MAJOR_COMPACTION) {
+ if (CompactionType.MAJOR == compactionType) {
loadMetadataDetails.setMajorCompacted("true");
}
@@ -394,7 +394,7 @@ public final class CarbonDataMergerUtil {
sortSegments(sortedSegments);
// Check for segments which are qualified for IUD compaction.
- if (compactionType.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
+ if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
return identifySegmentsToBeMergedBasedOnIUD(sortedSegments, carbonLoadModel);
}
@@ -410,7 +410,7 @@ public final class CarbonDataMergerUtil {
identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve);
List<LoadMetadataDetails> listOfSegmentsToBeMerged;
// identify the segments to merge based on the Size of the segments across partition.
- if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
+ if (CompactionType.MAJOR == compactionType) {
listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, tablePath);
@@ -574,7 +574,7 @@ public final class CarbonDataMergerUtil {
* @param listOfSegmentsAfterPreserve the segments list after
* preserving the configured number of latest loads
* @param carbonLoadModel carbon load model
- * @param storeLocation the store location of the segment
+ * @param tablePath the store location of the segment
* @return the list of segments that need to be merged
* based on the Size in case of Major compaction
*/
@@ -641,7 +641,7 @@ public final class CarbonDataMergerUtil {
/**
* For calculating the size of the specified segment
- * @param storePath the store path of the segment
+ * @param tablePath the store path of the segment
* @param tableIdentifier identifier of table that the segment belong to
* @param segId segment id
* @return the data size of the segment
@@ -814,7 +814,7 @@ public final class CarbonDataMergerUtil {
long compactionSize = 0;
switch (compactionType) {
- case MAJOR_COMPACTION:
+ case MAJOR:
compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
break;
default: // this case can not come.
@@ -926,7 +926,7 @@ public final class CarbonDataMergerUtil {
List<String> validSegments = new ArrayList<>();
- if (compactionTypeIUD.equals(CompactionType.IUD_DELETE_DELTA_COMPACTION)) {
+ if (CompactionType.IUD_DELETE_DELTA == compactionTypeIUD) {
int numberDeleteDeltaFilesThreshold =
CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
List<String> deleteSegments = new ArrayList<>();
@@ -948,7 +948,7 @@ public final class CarbonDataMergerUtil {
}
}
}
- } else if (compactionTypeIUD.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
+ } else if (CompactionType.IUD_UPDDEL_DELTA == compactionTypeIUD) {
int numberUpdateDeltaFilesThreshold =
CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
for (String seg : Segments) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 187ba06..d115a7a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -196,9 +196,17 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
*/
private void processResult(List<RawResultIterator> resultIteratorList) throws Exception {
for (RawResultIterator resultIterator : resultIteratorList) {
- while (resultIterator.hasNext()) {
- addRowForSorting(prepareRowObjectForSorting(resultIterator.next()));
- isRecordFound = true;
+ if (CompactionType.STREAMING == compactionType) {
+ while (resultIterator.hasNext()) {
+ // the input iterator of streaming segment is already using raw row
+ addRowForSorting(resultIterator.next());
+ isRecordFound = true;
+ }
+ } else {
+ while (resultIterator.hasNext()) {
+ addRowForSorting(prepareRowObjectForSorting(resultIterator.next()));
+ isRecordFound = true;
+ }
}
}
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
index 863257c..314afc2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
@@ -23,10 +23,11 @@ package org.apache.carbondata.processing.merger;
* called IUD compaction.
*/
public enum CompactionType {
- MINOR_COMPACTION,
- MAJOR_COMPACTION,
- IUD_UPDDEL_DELTA_COMPACTION,
- IUD_DELETE_DELTA_COMPACTION,
- SEGMENT_INDEX_COMPACTION,
+ MINOR,
+ MAJOR,
+ IUD_UPDDEL_DELTA,
+ IUD_DELETE_DELTA,
+ SEGMENT_INDEX,
+ STREAMING,
NONE
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8e0585d/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
new file mode 100644
index 0000000..f268883
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Date
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.block.SegmentProperties
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
+import org.apache.carbondata.spark.rdd.CarbonRDD
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * partition of the handoff segment
+ */
+class HandoffPartition(
+ val rddId: Int,
+ val idx: Int,
+ @transient val inputSplit: CarbonInputSplit
+) extends Partition {
+
+ val split = new SerializableWritable[CarbonInputSplit](inputSplit)
+
+ override val index: Int = idx
+
+ override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+/**
+ * package the record reader of the handoff segment to RawResultIterator
+ */
+class StreamingRawResultIterator(
+ recordReader: CarbonStreamRecordReader
+) extends RawResultIterator(null, null, null) {
+
+ override def hasNext: Boolean = {
+ recordReader.nextKeyValue()
+ }
+
+ override def next(): Array[Object] = {
+ recordReader
+ .getCurrentValue
+ .asInstanceOf[GenericInternalRow]
+ .values
+ .asInstanceOf[Array[Object]]
+ }
+}
+
+/**
+ * execute streaming segment handoff
+ */
+class StreamHandoffRDD[K, V](
+ sc: SparkContext,
+ result: HandoffResult[K, V],
+ carbonLoadModel: CarbonLoadModel,
+ handOffSegmentId: String
+) extends CarbonRDD[(K, V)](sc, Nil) {
+
+ private val jobTrackerId: String = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ formatter.format(new Date())
+ }
+
+ override def internalCompute(
+ split: Partition,
+ context: TaskContext
+ ): Iterator[(K, V)] = {
+ carbonLoadModel.setPartitionId("0")
+ carbonLoadModel.setTaskNo("" + split.index)
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ // the input iterator is using raw row
+ val iteratorList = prepareInputIterator(split, carbonTable)
+ // use CompactionResultSortProcessor to sort data dan write to columnar files
+ val processor = prepareHandoffProcessor(carbonTable)
+ val status = processor.execute(iteratorList)
+
+ new Iterator[(K, V)] {
+ private var finished = false
+
+ override def hasNext: Boolean = {
+ !finished
+ }
+
+ override def next(): (K, V) = {
+ finished = true
+ result.getKey("" + split.index, status)
+ }
+ }
+ }
+
+ /**
+ * prepare input iterator by basing CarbonStreamRecordReader
+ */
+ private def prepareInputIterator(
+ split: Partition,
+ carbonTable: CarbonTable
+ ): util.ArrayList[RawResultIterator] = {
+ val inputSplit = split.asInstanceOf[HandoffPartition].split.value
+ val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+ val hadoopConf = new Configuration()
+ CarbonTableInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName)
+ CarbonTableInputFormat.setTableName(hadoopConf, carbonTable.getTableName)
+ CarbonTableInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath)
+ val projection = new CarbonProjection
+ val dataFields = carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName)
+ (0 until dataFields.size()).foreach { index =>
+ projection.addColumn(dataFields.get(index).getColName)
+ }
+ CarbonTableInputFormat.setColumnProjection(hadoopConf, projection)
+ val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
+ val format = new CarbonTableInputFormat[Array[Object]]()
+ val model = format.getQueryModel(inputSplit, attemptContext)
+ val inputFormat = new CarbonStreamInputFormat
+ val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
+ .asInstanceOf[CarbonStreamRecordReader]
+ streamReader.setVectorReader(false)
+ streamReader.setQueryModel(model)
+ streamReader.setUseRawRow(true)
+ streamReader.initialize(inputSplit, attemptContext)
+ val iteratorList = new util.ArrayList[RawResultIterator](1)
+ iteratorList.add(new StreamingRawResultIterator(streamReader))
+ iteratorList
+ }
+
+ private def prepareHandoffProcessor(
+ carbonTable: CarbonTable
+ ): CompactionResultSortProcessor = {
+ val wrapperColumnSchemaList = CarbonUtil.getColumnSchemaList(
+ carbonTable.getDimensionByTableName(carbonTable.getTableName),
+ carbonTable.getMeasureByTableName(carbonTable.getTableName))
+ val dimLensWithComplex =
+ (0 until wrapperColumnSchemaList.size()).map(_ => Integer.MAX_VALUE).toArray
+ val dictionaryColumnCardinality =
+ CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList)
+ val segmentProperties =
+ new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality)
+
+ new CompactionResultSortProcessor(
+ carbonLoadModel,
+ carbonTable,
+ segmentProperties,
+ CompactionType.STREAMING,
+ carbonTable.getTableName
+ )
+ }
+
+ /**
+ * get the partitions of the handoff segment
+ */
+ override protected def getPartitions: Array[Partition] = {
+ val job = Job.getInstance(FileFactory.getConfiguration)
+ val inputFormat = new CarbonTableInputFormat[Array[Object]]()
+ val segmentList = new util.ArrayList[String](1)
+ segmentList.add(handOffSegmentId)
+ val splits = inputFormat.getSplitsOfStreaming(
+ job,
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier,
+ segmentList
+ )
+
+ (0 until splits.size()).map { index =>
+ new HandoffPartition(id, index, splits.get(index).asInstanceOf[CarbonInputSplit])
+ }.toArray[Partition]
+ }
+}
+
+object StreamHandoffRDD {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * start new thread to execute stream segment handoff
+ */
+ def startStreamingHandoffThread(
+ carbonLoadModel: CarbonLoadModel,
+ sqlContext: SQLContext,
+ storeLocation: String
+ ): Unit = {
+ // start a new thread to execute streaming segment handoff
+ val handoffThread = new Thread() {
+ override def run(): Unit = {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val identifier = carbonTable.getAbsoluteTableIdentifier
+ val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
+ var continueHandoff = false
+ // require handoff lock on table
+ val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK)
+ try {
+ if (lock.lockWithRetries()) {
+ LOGGER.info("Acquired the handoff lock for table" +
+ s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
+ // handoff streaming segment one by one
+ do {
+ val segmentStatusManager = new SegmentStatusManager(identifier)
+ var loadMetadataDetails: Array[LoadMetadataDetails] = null
+ // lock table to read table status file
+ val statusLock = segmentStatusManager.getTableStatusLock
+ try {
+ if (statusLock.lockWithRetries()) {
+ loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
+ tablePath.getMetadataDirectoryPath)
+ }
+ } finally {
+ if (null != statusLock) {
+ statusLock.unlock()
+ }
+ }
+ if (null != loadMetadataDetails) {
+ val streamSegments =
+ loadMetadataDetails.filter(_.getSegmentStatus == SegmentStatus.STREAMING_FINISH)
+
+ continueHandoff = streamSegments.length > 0
+ if (continueHandoff) {
+ // handoff a streaming segment
+ val loadMetadataDetail = streamSegments(0)
+ executeStreamingHandoff(
+ carbonLoadModel,
+ sqlContext,
+ storeLocation,
+ loadMetadataDetail.getLoadName
+ )
+ }
+ } else {
+ continueHandoff = false
+ }
+ } while (continueHandoff)
+ }
+ } finally {
+ if (null != lock) {
+ lock.unlock()
+ }
+ }
+ }
+ }
+ handoffThread.start()
+ }
+
+ /**
+ * invoke StreamHandoffRDD to handoff a streaming segment to a columnar segment
+ */
+ def executeStreamingHandoff(
+ carbonLoadModel: CarbonLoadModel,
+ sqlContext: SQLContext,
+ storeLocation: String,
+ handoffSegmenId: String
+ ): Unit = {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ var loadStatus = SegmentStatus.SUCCESS
+ var errorMessage: String = "Handoff failure"
+ try {
+ // generate new columnar segment
+ val newMetaEntry = new LoadMetadataDetails
+ carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+ CarbonLoaderUtil.populateNewLoadMetaEntry(
+ newMetaEntry,
+ SegmentStatus.INSERT_IN_PROGRESS,
+ carbonLoadModel.getFactTimeStamp,
+ false)
+ CarbonLoaderUtil.recordLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
+ // convert a streaming segment to columnar segment
+ val status = new StreamHandoffRDD(
+ sqlContext.sparkContext,
+ new HandoffResultImpl(),
+ carbonLoadModel,
+ handoffSegmenId).collect()
+
+ status.foreach { x =>
+ if (!x._2) {
+ loadStatus = SegmentStatus.LOAD_FAILURE
+ }
+ }
+ } catch {
+ case ex =>
+ loadStatus = SegmentStatus.LOAD_FAILURE
+ errorMessage = errorMessage + ": " + ex.getCause.getMessage
+ LOGGER.error(errorMessage)
+ LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId")
+ }
+
+ if (loadStatus == SegmentStatus.LOAD_FAILURE) {
+ CommonUtil.updateTableStatusForFailure(carbonLoadModel)
+ LOGGER.info("********starting clean up**********")
+ CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+ LOGGER.info("********clean up done**********")
+ LOGGER.audit(s"Handoff is failed for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ LOGGER.warn("Cannot write load metadata file as handoff failed")
+ throw new Exception(errorMessage)
+ }
+
+ if (loadStatus == SegmentStatus.SUCCESS) {
+ val done = updateLoadMetadata(handoffSegmenId, carbonLoadModel)
+ if (!done) {
+ val errorMessage = "Handoff failed due to failure in table status updation."
+ LOGGER.audit("Handoff is failed for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ LOGGER.error("Handoff failed due to failure in table status updation.")
+ throw new Exception(errorMessage)
+ }
+ done
+ }
+
+ }
+
+ /**
+ * update streaming segment and new columnar segment
+ */
+ private def updateLoadMetadata(
+ handoffSegmentId: String,
+ loadModel: CarbonLoadModel
+ ): Boolean = {
+ var status = false
+ val metaDataFilepath =
+ loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath()
+ val identifier =
+ loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier()
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
+ val metadataPath = carbonTablePath.getMetadataDirectoryPath()
+ val fileType = FileFactory.getFileType(metadataPath)
+ if (!FileFactory.isFileExist(metadataPath, fileType)) {
+ FileFactory.mkdirs(metadataPath, fileType)
+ }
+ val tableStatusPath = carbonTablePath.getTableStatusFilePath()
+ val segmentStatusManager = new SegmentStatusManager(identifier)
+ val carbonLock = segmentStatusManager.getTableStatusLock()
+ try {
+ if (carbonLock.lockWithRetries()) {
+ LOGGER.info(
+ "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
+ + " for table status updation")
+ val listOfLoadFolderDetailsArray =
+ SegmentStatusManager.readLoadMetadata(metaDataFilepath)
+
+ // update new columnar segment to success status
+ val newSegment =
+ listOfLoadFolderDetailsArray.find(_.getLoadName.equals(loadModel.getSegmentId))
+ if (newSegment.isEmpty) {
+ throw new Exception("Failed to update table status for new segment")
+ } else {
+ newSegment.get.setSegmentStatus(SegmentStatus.SUCCESS)
+ newSegment.get.setLoadEndTime(System.currentTimeMillis())
+ }
+
+ // update streaming segment to compacted status
+ val streamSegment =
+ listOfLoadFolderDetailsArray.find(_.getLoadName.equals(handoffSegmentId))
+ if (streamSegment.isEmpty) {
+ throw new Exception("Failed to update table status for streaming segment")
+ } else {
+ streamSegment.get.setSegmentStatus(SegmentStatus.COMPACTED)
+ }
+
+ // refresh table status file
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray)
+ status = true
+ } else {
+ LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
+ .getDatabaseName() + "." + loadModel.getTableName());
+ }
+ } finally {
+ if (carbonLock.unlock()) {
+ LOGGER.info("Table unlocked successfully after table status updation" +
+ loadModel.getDatabaseName() + "." + loadModel.getTableName())
+ } else {
+ LOGGER.error("Unable to unlock Table lock for table" + loadModel.getDatabaseName() +
+ "." + loadModel.getTableName() + " during table status updation")
+ }
+ }
+ return status
+ }
+}