You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/20 15:44:39 UTC

[06/10] carbondata git commit: [CARBONDATA-2223] Adding Listener Support for Partition

[CARBONDATA-2223] Adding Listener Support for Partition

Adding Listener Support for Partition

This closes #2031


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f5cdd5ca
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f5cdd5ca
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f5cdd5ca

Branch: refs/heads/carbonfile
Commit: f5cdd5ca9dcf22984ed300fe1d2d36939755e947
Parents: 98b8550
Author: dhatchayani <dh...@gmail.com>
Authored: Mon Mar 5 15:17:13 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue Mar 20 19:24:18 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  3 +++
 .../indexstore/BlockletDataMapIndexStore.java   | 14 +++++++++-----
 .../core/metadata/SegmentFileStore.java         |  2 +-
 .../apache/carbondata/core/util/CarbonUtil.java | 18 +++++++++++++++++-
 .../hadoop/api/CarbonOutputCommitter.java       |  4 ----
 .../carbondata/events/AlterTableEvents.scala    |  2 +-
 .../spark/rdd/CarbonTableCompactor.scala        |  4 +++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  | 20 ++++++++++++++++++++
 .../management/CarbonLoadDataCommand.scala      |  7 ++++++-
 .../sql/test/Spark2TestQueryExecutor.scala      |  1 +
 .../spark/sql/hive/CarbonSessionState.scala     |  2 +-
 .../processing/loading/events/LoadEvents.java   | 11 -----------
 .../processing/util/CarbonLoaderUtil.java       |  6 +++---
 13 files changed, 65 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1b135dc..33a1884 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1297,6 +1297,9 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_SESSIONSTATE_CLASSNAME = "spark.carbon.sessionstate.classname";
 
+  public static final String CARBON_COMMON_LISTENER_REGISTER_CLASSNAME =
+      "spark.carbon.common.listener.register.classname";
+
   @CarbonProperty
   public static final String CARBON_LEASE_RECOVERY_RETRY_COUNT =
       "carbon.lease.recovery.retry.count";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 53ef496..befa121 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -19,8 +19,10 @@ package org.apache.carbondata.core.indexstore;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -81,8 +83,9 @@ public class BlockletDataMapIndexStore
     if (dataMap == null) {
       try {
         SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+        Set<String> filesRead = new HashSet<>();
         Map<String, BlockMetaInfo> blockMetaInfoMap =
-            getBlockMetaInfoMap(identifier, indexFileStore);
+            getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
         dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
       } catch (MemoryException e) {
         LOGGER.error("memory exception when loading datamap: " + e.getMessage());
@@ -93,13 +96,14 @@ public class BlockletDataMapIndexStore
   }
 
   private Map<String, BlockMetaInfo> getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier,
-      SegmentIndexFileStore indexFileStore) throws IOException {
+      SegmentIndexFileStore indexFileStore, Set<String> filesRead) throws IOException {
     if (identifier.getMergeIndexFileName() != null) {
       CarbonFile indexMergeFile = FileFactory.getCarbonFile(
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
               .getMergeIndexFileName());
-      if (indexMergeFile.exists()) {
+      if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
         indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
+        filesRead.add(indexMergeFile.getPath());
       }
     }
     if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
@@ -151,10 +155,10 @@ public class BlockletDataMapIndexStore
       }
       if (missedIdentifiers.size() > 0) {
         SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
-
+        Set<String> filesRead = new HashSet<>();
         for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
           Map<String, BlockMetaInfo> blockMetaInfoMap =
-              getBlockMetaInfoMap(identifier, indexFileStore);
+              getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
           blockletDataMaps.add(
               loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap));
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 3fc8ad6..4adc977 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -370,7 +370,7 @@ public class SegmentFileStore {
       for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
         String location = entry.getKey();
         if (entry.getValue().isRelative) {
-          location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+          location = tablePath + location;
         }
         if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
           for (String indexFile : entry.getValue().getFiles()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b961b60..06511f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2326,12 +2326,28 @@ public final class CarbonUtil {
       throws IOException {
     long carbonDataSize = 0L;
     long carbonIndexSize = 0L;
+    List<String> listOfFilesRead = new ArrayList<>();
     HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
     if (fileStore.getLocationMap() != null) {
       fileStore.readIndexFiles();
+      Map<String, String> indexFiles = fileStore.getIndexFiles();
       Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
       for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
-        carbonIndexSize += FileFactory.getCarbonFile(entry.getKey()).getSize();
+        // get the size of carbonindex file
+        String indexFile = entry.getKey();
+        String mergeIndexFile = indexFiles.get(indexFile);
+        if (null != mergeIndexFile) {
+          String mergeIndexPath = indexFile
+              .substring(0, indexFile.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1)
+              + mergeIndexFile;
+          if (!listOfFilesRead.contains(mergeIndexPath)) {
+            carbonIndexSize += FileFactory.getCarbonFile(mergeIndexPath).getSize();
+            listOfFilesRead.add(mergeIndexPath);
+          }
+        } else {
+          carbonIndexSize += FileFactory.getCarbonFile(indexFile).getSize();
+        }
+        // get the size of carbondata files
         for (String blockFile : entry.getValue()) {
           carbonDataSize += FileFactory.getCarbonFile(blockFile).getSize();
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 7ea11bd..4634b06 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -160,13 +160,9 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
       if (operationContext != null) {
         LoadEvents.LoadTablePostStatusUpdateEvent postStatusUpdateEvent =
             new LoadEvents.LoadTablePostStatusUpdateEvent(loadModel);
-        LoadEvents.LoadTableMergePartitionEvent loadTableMergePartitionEvent =
-            new LoadEvents.LoadTableMergePartitionEvent(readPath);
         try {
           OperationListenerBus.getInstance()
               .fireEvent(postStatusUpdateEvent, (OperationContext) operationContext);
-          OperationListenerBus.getInstance()
-              .fireEvent(loadTableMergePartitionEvent, (OperationContext) operationContext);
         } catch (Exception e) {
           throw new IOException(e);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 671e132..538df4a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -185,7 +185,7 @@ case class AlterTableCompactionPreStatusUpdateEvent(sparkSession: SparkSession,
  * Compaction Event for handling post update status file operations, like committing child
  * datamaps in one transaction
  */
-case class AlterTableCompactionPostStatusUpdateEvent(
+case class AlterTableCompactionPostStatusUpdateEvent(sparkSession: SparkSession,
     carbonTable: CarbonTable,
     carbonMergerMapping: CarbonMergerMapping,
     carbonLoadModel: CarbonLoadModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 231b748..a987127 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -246,7 +246,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           carbonLoadModel,
           compactionType,
           segmentFileName)
-      val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(carbonTable,
+
+      val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(sc.sparkSession,
+        carbonTable,
         carbonMergerMapping,
         carbonLoadModel,
         mergedLoadName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 8c3ca0f..95bbd29 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive.{HiveSessionCatalog, _}
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -135,6 +136,17 @@ object CarbonEnv {
   }
 
   /**
+   * Method
+   * 1. To initialize Listeners to their respective events in the OperationListenerBus
+   * 2. To register common listeners
+   *
+   */
+  def init(sparkSession: SparkSession): Unit = {
+    initListeners
+    registerCommonListener(sparkSession)
+  }
+
+  /**
    * Method to initialize Listeners to their respective events in the OperationListenerBus.
    */
   def initListeners(): Unit = {
@@ -158,6 +170,14 @@ object CarbonEnv {
       .addListener(classOf[AlterTableCompactionPostStatusUpdateEvent], CommitPreAggregateListener)
   }
 
+  def registerCommonListener(sparkSession: SparkSession): Unit = {
+    val clsName = sparkSession.sparkContext.conf
+      .get(CarbonCommonConstants.CARBON_COMMON_LISTENER_REGISTER_CLASSNAME)
+    if (null != clsName && !clsName.isEmpty) {
+      CarbonReflectionUtils.createObject(clsName)
+    }
+  }
+
   /**
    * Return carbon table instance from cache or by looking up table in `sparkSession`
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index eb00ebf..18c268c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -68,7 +68,7 @@ import org.apache.carbondata.events.exception.PreEventException
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -767,6 +767,11 @@ case class CarbonLoadDataCommand(
         carbonLoadModel,
         table,
         operationContext)
+
+      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+        new LoadTablePreStatusUpdateEvent(table.getCarbonTableIdentifier, carbonLoadModel)
+      OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+
     } catch {
       case e: Exception =>
         throw new Exception(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index b341d6a..d30e96d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -67,6 +67,7 @@ object Spark2TestQueryExecutor {
     .enableHiveSupport()
     .config("spark.sql.warehouse.dir", warehouse)
     .config("spark.sql.crossJoin.enabled", "true")
+    .config(CarbonCommonConstants.CARBON_COMMON_LISTENER_REGISTER_CLASSNAME, "")
     .getOrCreateCarbonSession(null, TestQueryExecutor.metastoredb)
   if (warehouse.startsWith("hdfs://")) {
     System.setProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, warehouse)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index ba2fe947..d381144 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -79,7 +79,7 @@ class CarbonSessionCatalog(
   }
 
   // Initialize all listeners to the Operation bus.
-  CarbonEnv.initListeners()
+  CarbonEnv.init(sparkSession)
 
   /**
    * This method will invalidate carbonrelation from cache if carbon table is updated in

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
index a3fa292..50ebc34 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -182,15 +182,4 @@ public class LoadEvents {
     }
   }
 
-  public static class LoadTableMergePartitionEvent extends Event {
-    private String segmentPath;
-
-    public LoadTableMergePartitionEvent(String segmentPath) {
-      this.segmentPath = segmentPath;
-    }
-
-    public String getSegmentPath() {
-      return segmentPath;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 922a7ee..65827b0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -1103,14 +1103,14 @@ public final class CarbonLoaderUtil {
    * Merge index files with in the segment of partitioned table
    * @param segmentId
    * @param tablePath
-   * @param uniqueId
    * @return
    * @throws IOException
    */
-  public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath,
-      String uniqueId) throws IOException {
+  public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath)
+      throws IOException {
     CarbonIndexFileMergeWriter.SegmentIndexFIleMergeStatus segmentIndexFIleMergeStatus =
         new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentId, tablePath);
+    String uniqueId = "";
     if (segmentIndexFIleMergeStatus != null) {
       uniqueId = System.currentTimeMillis() + "";
       String newSegmentFileName = segmentId + "_" + uniqueId + CarbonTablePath.SEGMENT_EXT;