You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/05/08 06:00:34 UTC
carbondata git commit: [CARBONDATA-2448] Adding compacted segments to
load and alter events
Repository: carbondata
Updated Branches:
refs/heads/master 1128d89ea -> 51db049c4
[CARBONDATA-2448] Adding compacted segments to load and alter events
Adding compacted segments to load and alter table events
This closes #2277
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/51db049c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/51db049c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/51db049c
Branch: refs/heads/master
Commit: 51db049c48519140d074c2fb88244378307c72fb
Parents: 1128d89
Author: dhatchayani <dh...@gmail.com>
Authored: Mon May 7 11:16:02 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue May 8 11:34:15 2018 +0530
----------------------------------------------------------------------
.../indexstore/blockletindex/BlockletDataMap.java | 2 +-
.../apache/carbondata/events/AlterTableEvents.scala | 3 +--
.../execution/command/carbonTableSchemaCommon.scala | 3 ++-
.../carbondata/spark/rdd/CarbonDataRDDFactory.scala | 16 +++++++++++++++-
.../carbondata/spark/rdd/CarbonTableCompactor.scala | 11 ++++++++---
.../carbondata/spark/rdd/CompactionFactory.scala | 2 ++
.../CarbonAlterTableCompactionCommand.scala | 8 +++++++-
.../command/management/CarbonLoadDataCommand.scala | 4 +++-
.../processing/loading/model/CarbonLoadModel.java | 16 ++++++++++++++++
9 files changed, 55 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 3ff9cdc..6730ad5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -932,7 +932,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Serializable
return dataMapRow;
}
- private byte[] getColumnSchemaBinary() {
+ public byte[] getColumnSchemaBinary() {
DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
return unsafeRow.getByteArray(SCHEMA);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index dbd9ebf..55fa2bd 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -161,12 +161,11 @@ case class AlterTableCompactionPreEvent(sparkSession: SparkSession,
* @param sparkSession
* @param carbonTable
* @param carbonMergerMapping
- * @param mergedLoadName
*/
case class AlterTableCompactionPostEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
carbonMergerMapping: CarbonMergerMapping,
- mergedLoadName: String) extends Event with AlterTableCompactionEventInfo
+ compactedLoads: java.util.List[String]) extends Event with AlterTableCompactionEventInfo
/**
* Compaction Event for handling pre update status file opeartions, lister has to implement this
* event before updating the table status file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index c55d726..8a12970 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -148,7 +148,8 @@ case class CompactionCallableModel(carbonLoadModel: CarbonLoadModel,
loadsToMerge: util.List[LoadMetadataDetails],
sqlContext: SQLContext,
compactionType: CompactionType,
- currentPartitions: Option[Seq[PartitionSpec]])
+ currentPartitions: Option[Seq[PartitionSpec]],
+ compactedSegments: java.util.List[String])
case class AlterPartitionModel(carbonLoadModel: CarbonLoadModel,
segmentId: String,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 6873289..42c9c25 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -88,6 +88,7 @@ object CarbonDataRDDFactory {
storeLocation: String,
compactionType: CompactionType,
carbonTable: CarbonTable,
+ compactedSegments: java.util.List[String],
compactionModel: CompactionModel,
operationContext: OperationContext): Unit = {
// taking system level lock at the mdt file location
@@ -111,6 +112,7 @@ object CarbonDataRDDFactory {
storeLocation,
compactionModel,
lock,
+ compactedSegments,
operationContext
)
} catch {
@@ -148,6 +150,7 @@ object CarbonDataRDDFactory {
storeLocation: String,
compactionModel: CompactionModel,
compactionLock: ICarbonLock,
+ compactedSegments: java.util.List[String],
operationContext: OperationContext): Unit = {
val executor: ExecutorService = Executors.newFixedThreadPool(1)
// update the updated table status.
@@ -165,6 +168,7 @@ object CarbonDataRDDFactory {
executor,
sqlContext,
storeLocation,
+ compactedSegments,
operationContext)
try {
// compaction status of the table which is triggered by the user.
@@ -220,6 +224,7 @@ object CarbonDataRDDFactory {
executor,
sqlContext,
storeLocation,
+ compactedSegments,
operationContext).executeCompaction()
} catch {
case e: Exception =>
@@ -567,7 +572,13 @@ object CarbonDataRDDFactory {
if (carbonTable.isHivePartitionTable) {
carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
}
- handleSegmentMerging(sqlContext, carbonLoadModel, carbonTable, operationContext)
+ val compactedSegments = new util.ArrayList[String]()
+ handleSegmentMerging(sqlContext,
+ carbonLoadModel,
+ carbonTable,
+ compactedSegments,
+ operationContext)
+ carbonLoadModel.setMergedSegmentIds(compactedSegments)
} catch {
case e: Exception =>
throw new Exception(
@@ -727,6 +738,7 @@ object CarbonDataRDDFactory {
sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
carbonTable: CarbonTable,
+ compactedSegments: java.util.List[String],
operationContext: OperationContext): Unit = {
LOGGER.info(s"compaction need status is" +
s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }")
@@ -765,6 +777,7 @@ object CarbonDataRDDFactory {
storeLocation,
CompactionType.MINOR,
carbonTable,
+ compactedSegments,
compactionModel,
operationContext
)
@@ -781,6 +794,7 @@ object CarbonDataRDDFactory {
storeLocation,
compactionModel,
lock,
+ compactedSegments,
operationContext
)
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 26adeee..199b7a3 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -45,6 +45,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
executor: ExecutorService,
sqlContext: SQLContext,
storeLocation: String,
+ compactedSegments: List[String],
operationContext: OperationContext)
extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
@@ -63,7 +64,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
deletePartialLoadsInCompaction()
try {
- scanSegmentsAndSubmitJob(loadsToMerge)
+ scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments)
} catch {
case e: Exception =>
LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }")
@@ -101,7 +102,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
/**
* This will submit the loads to be merged into the executor.
*/
- def scanSegmentsAndSubmitJob(loadsToMerge: util.List[LoadMetadataDetails]): Unit = {
+ def scanSegmentsAndSubmitJob(loadsToMerge: util.List[LoadMetadataDetails],
+ compactedSegments: List[String]): Unit = {
loadsToMerge.asScala.foreach { seg =>
LOGGER.info("loads identified for merge is " + seg.getLoadName)
}
@@ -111,7 +113,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
loadsToMerge,
sqlContext,
compactionModel.compactionType,
- compactionModel.currentPartitions)
+ compactionModel.currentPartitions,
+ compactedSegments)
triggerCompaction(compactionCallableModel)
}
@@ -125,6 +128,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
val tablePath = carbonLoadModel.getTablePath
val startTime = System.nanoTime()
val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
+ val mergedLoads = compactionCallableModel.compactedSegments
+ mergedLoads.add(mergedLoadName)
var finalMergeStatus = false
val databaseName: String = carbonLoadModel.getDatabaseName
val factTableName = carbonLoadModel.getTableName
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
index 8508d2a..c22e3f0 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
@@ -35,6 +35,7 @@ object CompactionFactory {
executor: ExecutorService,
sqlContext: SQLContext,
storeLocation: String,
+ mergedLoads: java.util.List[String],
operationContext: OperationContext): Compactor = {
if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
new AggregateDataMapCompactor(
@@ -51,6 +52,7 @@ object CompactionFactory {
executor,
sqlContext,
storeLocation,
+ mergedLoads,
operationContext)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 34ff1f8..6b43107 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.command.management
import java.io.{File, IOException}
+import java.util
import scala.collection.JavaConverters._
@@ -148,12 +149,14 @@ case class CarbonAlterTableCompactionCommand(
val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
AlterTableCompactionPreEvent(sparkSession, table, null, null)
OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
+ val compactedSegments: java.util.List[String] = new util.ArrayList[String]()
try {
alterTableForCompaction(
sparkSession.sqlContext,
alterTableModel,
carbonLoadModel,
storeLocation,
+ compactedSegments,
operationContext)
} catch {
case e: Exception =>
@@ -167,7 +170,7 @@ case class CarbonAlterTableCompactionCommand(
}
// trigger event for compaction
val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
- AlterTableCompactionPostEvent(sparkSession, table, null, null)
+ AlterTableCompactionPostEvent(sparkSession, table, null, compactedSegments)
OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
Seq.empty
}
@@ -177,6 +180,7 @@ case class CarbonAlterTableCompactionCommand(
alterTableModel: AlterTableModel,
carbonLoadModel: CarbonLoadModel,
storeLocation: String,
+ compactedSegments: java.util.List[String],
operationContext: OperationContext): Unit = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
@@ -257,6 +261,7 @@ case class CarbonAlterTableCompactionCommand(
storeLocation,
compactionType,
carbonTable,
+ compactedSegments,
compactionModel,
operationContext
)
@@ -276,6 +281,7 @@ case class CarbonAlterTableCompactionCommand(
storeLocation,
compactionModel,
lock,
+ compactedSegments,
operationContext
)
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 29d91d6..3e306fb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -756,13 +756,15 @@ case class CarbonLoadDataCommand(
}
try {
carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+ val compactedSegments = new util.ArrayList[String]()
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
sparkSession.sqlContext,
carbonLoadModel,
table,
+ compactedSegments,
operationContext)
-
+ carbonLoadModel.setMergedSegmentIds(compactedSegments)
} catch {
case e: Exception =>
throw new Exception(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 2b820d8..2a846e2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -18,10 +18,12 @@
package org.apache.carbondata.processing.loading.model;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
@@ -207,6 +209,8 @@ public class CarbonLoadModel implements Serializable {
*/
private String dataWritePath;
+ private List<String> mergedSegmentIds;
+
public boolean isAggLoadRequest() {
return isAggLoadRequest;
}
@@ -842,4 +846,16 @@ public class CarbonLoadModel implements Serializable {
public void setCarbonTransactionalTable(boolean carbonTransactionalTable) {
this.carbonTransactionalTable = carbonTransactionalTable;
}
+
+ public void setMergedSegmentIds(List<String> mergedSegmentIds) {
+ this.mergedSegmentIds = mergedSegmentIds;
+ }
+
+ public List<String> getMergedSegmentIds() {
+ if (null == mergedSegmentIds) {
+ mergedSegmentIds = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+ return mergedSegmentIds;
+ }
+
}