You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/05/08 06:00:34 UTC

carbondata git commit: [CARBONDATA-2448] Adding compacted segments to load and alter events

Repository: carbondata
Updated Branches:
  refs/heads/master 1128d89ea -> 51db049c4


[CARBONDATA-2448] Adding compacted segments to load and alter events

Adding compacted segments to load and alter table events

This closes #2277


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/51db049c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/51db049c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/51db049c

Branch: refs/heads/master
Commit: 51db049c48519140d074c2fb88244378307c72fb
Parents: 1128d89
Author: dhatchayani <dh...@gmail.com>
Authored: Mon May 7 11:16:02 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue May 8 11:34:15 2018 +0530

----------------------------------------------------------------------
 .../indexstore/blockletindex/BlockletDataMap.java   |  2 +-
 .../apache/carbondata/events/AlterTableEvents.scala |  3 +--
 .../execution/command/carbonTableSchemaCommon.scala |  3 ++-
 .../carbondata/spark/rdd/CarbonDataRDDFactory.scala | 16 +++++++++++++++-
 .../carbondata/spark/rdd/CarbonTableCompactor.scala | 11 ++++++++---
 .../carbondata/spark/rdd/CompactionFactory.scala    |  2 ++
 .../CarbonAlterTableCompactionCommand.scala         |  8 +++++++-
 .../command/management/CarbonLoadDataCommand.scala  |  4 +++-
 .../processing/loading/model/CarbonLoadModel.java   | 16 ++++++++++++++++
 9 files changed, 55 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 3ff9cdc..6730ad5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -932,7 +932,7 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Serializable
     return dataMapRow;
   }
 
-  private byte[] getColumnSchemaBinary() {
+  public byte[] getColumnSchemaBinary() {
     DataMapRow unsafeRow = summaryDMStore.getDataMapRow(0);
     return unsafeRow.getByteArray(SCHEMA);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index dbd9ebf..55fa2bd 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -161,12 +161,11 @@ case class AlterTableCompactionPreEvent(sparkSession: SparkSession,
  * @param sparkSession
  * @param carbonTable
  * @param carbonMergerMapping
- * @param mergedLoadName
  */
 case class AlterTableCompactionPostEvent(sparkSession: SparkSession,
     carbonTable: CarbonTable,
     carbonMergerMapping: CarbonMergerMapping,
-    mergedLoadName: String) extends Event with AlterTableCompactionEventInfo
+    compactedLoads: java.util.List[String]) extends Event with AlterTableCompactionEventInfo
 /**
  * Compaction Event for handling pre update status file opeartions, lister has to implement this
  * event before updating the table status file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index c55d726..8a12970 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -148,7 +148,8 @@ case class CompactionCallableModel(carbonLoadModel: CarbonLoadModel,
     loadsToMerge: util.List[LoadMetadataDetails],
     sqlContext: SQLContext,
     compactionType: CompactionType,
-    currentPartitions: Option[Seq[PartitionSpec]])
+    currentPartitions: Option[Seq[PartitionSpec]],
+    compactedSegments: java.util.List[String])
 
 case class AlterPartitionModel(carbonLoadModel: CarbonLoadModel,
     segmentId: String,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 6873289..42c9c25 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -88,6 +88,7 @@ object CarbonDataRDDFactory {
       storeLocation: String,
       compactionType: CompactionType,
       carbonTable: CarbonTable,
+      compactedSegments: java.util.List[String],
       compactionModel: CompactionModel,
       operationContext: OperationContext): Unit = {
     // taking system level lock at the mdt file location
@@ -111,6 +112,7 @@ object CarbonDataRDDFactory {
           storeLocation,
           compactionModel,
           lock,
+          compactedSegments,
           operationContext
         )
       } catch {
@@ -148,6 +150,7 @@ object CarbonDataRDDFactory {
       storeLocation: String,
       compactionModel: CompactionModel,
       compactionLock: ICarbonLock,
+      compactedSegments: java.util.List[String],
       operationContext: OperationContext): Unit = {
     val executor: ExecutorService = Executors.newFixedThreadPool(1)
     // update the updated table status.
@@ -165,6 +168,7 @@ object CarbonDataRDDFactory {
           executor,
           sqlContext,
           storeLocation,
+          compactedSegments,
           operationContext)
         try {
           // compaction status of the table which is triggered by the user.
@@ -220,6 +224,7 @@ object CarbonDataRDDFactory {
                   executor,
                   sqlContext,
                   storeLocation,
+                  compactedSegments,
                   operationContext).executeCompaction()
               } catch {
                 case e: Exception =>
@@ -567,7 +572,13 @@ object CarbonDataRDDFactory {
         if (carbonTable.isHivePartitionTable) {
           carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
         }
-        handleSegmentMerging(sqlContext, carbonLoadModel, carbonTable, operationContext)
+        val compactedSegments = new util.ArrayList[String]()
+        handleSegmentMerging(sqlContext,
+          carbonLoadModel,
+          carbonTable,
+          compactedSegments,
+          operationContext)
+        carbonLoadModel.setMergedSegmentIds(compactedSegments)
       } catch {
         case e: Exception =>
           throw new Exception(
@@ -727,6 +738,7 @@ object CarbonDataRDDFactory {
       sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       carbonTable: CarbonTable,
+      compactedSegments: java.util.List[String],
       operationContext: OperationContext): Unit = {
     LOGGER.info(s"compaction need status is" +
                 s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }")
@@ -765,6 +777,7 @@ object CarbonDataRDDFactory {
           storeLocation,
           CompactionType.MINOR,
           carbonTable,
+          compactedSegments,
           compactionModel,
           operationContext
         )
@@ -781,6 +794,7 @@ object CarbonDataRDDFactory {
               storeLocation,
               compactionModel,
               lock,
+              compactedSegments,
               operationContext
             )
           } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 26adeee..199b7a3 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -45,6 +45,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     executor: ExecutorService,
     sqlContext: SQLContext,
     storeLocation: String,
+    compactedSegments: List[String],
     operationContext: OperationContext)
   extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
 
@@ -63,7 +64,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       deletePartialLoadsInCompaction()
 
       try {
-        scanSegmentsAndSubmitJob(loadsToMerge)
+        scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments)
       } catch {
         case e: Exception =>
           LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }")
@@ -101,7 +102,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
   /**
    * This will submit the loads to be merged into the executor.
    */
-  def scanSegmentsAndSubmitJob(loadsToMerge: util.List[LoadMetadataDetails]): Unit = {
+  def scanSegmentsAndSubmitJob(loadsToMerge: util.List[LoadMetadataDetails],
+      compactedSegments: List[String]): Unit = {
     loadsToMerge.asScala.foreach { seg =>
       LOGGER.info("loads identified for merge is " + seg.getLoadName)
     }
@@ -111,7 +113,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       loadsToMerge,
       sqlContext,
       compactionModel.compactionType,
-      compactionModel.currentPartitions)
+      compactionModel.currentPartitions,
+      compactedSegments)
     triggerCompaction(compactionCallableModel)
   }
 
@@ -125,6 +128,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     val tablePath = carbonLoadModel.getTablePath
     val startTime = System.nanoTime()
     val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
+    val mergedLoads = compactionCallableModel.compactedSegments
+    mergedLoads.add(mergedLoadName)
     var finalMergeStatus = false
     val databaseName: String = carbonLoadModel.getDatabaseName
     val factTableName = carbonLoadModel.getTableName

http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
index 8508d2a..c22e3f0 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
@@ -35,6 +35,7 @@ object CompactionFactory {
       executor: ExecutorService,
       sqlContext: SQLContext,
       storeLocation: String,
+      mergedLoads: java.util.List[String],
       operationContext: OperationContext): Compactor = {
     if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
       new AggregateDataMapCompactor(
@@ -51,6 +52,7 @@ object CompactionFactory {
         executor,
         sqlContext,
         storeLocation,
+        mergedLoads,
         operationContext)
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 34ff1f8..6b43107 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.command.management
 
 import java.io.{File, IOException}
+import java.util
 
 import scala.collection.JavaConverters._
 
@@ -148,12 +149,14 @@ case class CarbonAlterTableCompactionCommand(
       val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
         AlterTableCompactionPreEvent(sparkSession, table, null, null)
       OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
+      val compactedSegments: java.util.List[String] = new util.ArrayList[String]()
       try {
         alterTableForCompaction(
           sparkSession.sqlContext,
           alterTableModel,
           carbonLoadModel,
           storeLocation,
+          compactedSegments,
           operationContext)
       } catch {
         case e: Exception =>
@@ -167,7 +170,7 @@ case class CarbonAlterTableCompactionCommand(
       }
       // trigger event for compaction
       val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
-        AlterTableCompactionPostEvent(sparkSession, table, null, null)
+        AlterTableCompactionPostEvent(sparkSession, table, null, compactedSegments)
       OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
       Seq.empty
     }
@@ -177,6 +180,7 @@ case class CarbonAlterTableCompactionCommand(
       alterTableModel: AlterTableModel,
       carbonLoadModel: CarbonLoadModel,
       storeLocation: String,
+      compactedSegments: java.util.List[String],
       operationContext: OperationContext): Unit = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
     val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
@@ -257,6 +261,7 @@ case class CarbonAlterTableCompactionCommand(
         storeLocation,
         compactionType,
         carbonTable,
+        compactedSegments,
         compactionModel,
         operationContext
       )
@@ -276,6 +281,7 @@ case class CarbonAlterTableCompactionCommand(
             storeLocation,
             compactionModel,
             lock,
+            compactedSegments,
             operationContext
           )
         } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 29d91d6..3e306fb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -756,13 +756,15 @@ case class CarbonLoadDataCommand(
     }
     try {
       carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
+      val compactedSegments = new util.ArrayList[String]()
       // Trigger auto compaction
       CarbonDataRDDFactory.handleSegmentMerging(
         sparkSession.sqlContext,
         carbonLoadModel,
         table,
+        compactedSegments,
         operationContext)
-
+      carbonLoadModel.setMergedSegmentIds(compactedSegments)
     } catch {
       case e: Exception =>
         throw new Exception(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/51db049c/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 2b820d8..2a846e2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -18,10 +18,12 @@
 package org.apache.carbondata.processing.loading.model;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
@@ -207,6 +209,8 @@ public class CarbonLoadModel implements Serializable {
    */
   private String dataWritePath;
 
+  private List<String> mergedSegmentIds;
+
   public boolean isAggLoadRequest() {
     return isAggLoadRequest;
   }
@@ -842,4 +846,16 @@ public class CarbonLoadModel implements Serializable {
   public void setCarbonTransactionalTable(boolean carbonTransactionalTable) {
     this.carbonTransactionalTable = carbonTransactionalTable;
   }
+
+  public void setMergedSegmentIds(List<String> mergedSegmentIds) {
+    this.mergedSegmentIds = mergedSegmentIds;
+  }
+
+  public List<String> getMergedSegmentIds() {
+    if (null == mergedSegmentIds) {
+      mergedSegmentIds = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    }
+    return mergedSegmentIds;
+  }
+
 }