You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/03/20 13:51:45 UTC
carbondata git commit: [CARBONDATA-2223] Adding Listener Support for
Partition
Repository: carbondata
Updated Branches:
refs/heads/master 98b855014 -> f5cdd5ca9
[CARBONDATA-2223] Adding Listener Support for Partition
Adding Listener Support for Partition
This closes #2031
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f5cdd5ca
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f5cdd5ca
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f5cdd5ca
Branch: refs/heads/master
Commit: f5cdd5ca9dcf22984ed300fe1d2d36939755e947
Parents: 98b8550
Author: dhatchayani <dh...@gmail.com>
Authored: Mon Mar 5 15:17:13 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue Mar 20 19:24:18 2018 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 3 +++
.../indexstore/BlockletDataMapIndexStore.java | 14 +++++++++-----
.../core/metadata/SegmentFileStore.java | 2 +-
.../apache/carbondata/core/util/CarbonUtil.java | 18 +++++++++++++++++-
.../hadoop/api/CarbonOutputCommitter.java | 4 ----
.../carbondata/events/AlterTableEvents.scala | 2 +-
.../spark/rdd/CarbonTableCompactor.scala | 4 +++-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 20 ++++++++++++++++++++
.../management/CarbonLoadDataCommand.scala | 7 ++++++-
.../sql/test/Spark2TestQueryExecutor.scala | 1 +
.../spark/sql/hive/CarbonSessionState.scala | 2 +-
.../processing/loading/events/LoadEvents.java | 11 -----------
.../processing/util/CarbonLoaderUtil.java | 6 +++---
13 files changed, 65 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1b135dc..33a1884 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1297,6 +1297,9 @@ public final class CarbonCommonConstants {
public static final String CARBON_SESSIONSTATE_CLASSNAME = "spark.carbon.sessionstate.classname";
+ public static final String CARBON_COMMON_LISTENER_REGISTER_CLASSNAME =
+ "spark.carbon.common.listener.register.classname";
+
@CarbonProperty
public static final String CARBON_LEASE_RECOVERY_RETRY_COUNT =
"carbon.lease.recovery.retry.count";
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 53ef496..befa121 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -19,8 +19,10 @@ package org.apache.carbondata.core.indexstore;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.logging.LogService;
@@ -81,8 +83,9 @@ public class BlockletDataMapIndexStore
if (dataMap == null) {
try {
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ Set<String> filesRead = new HashSet<>();
Map<String, BlockMetaInfo> blockMetaInfoMap =
- getBlockMetaInfoMap(identifier, indexFileStore);
+ getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
} catch (MemoryException e) {
LOGGER.error("memory exception when loading datamap: " + e.getMessage());
@@ -93,13 +96,14 @@ public class BlockletDataMapIndexStore
}
private Map<String, BlockMetaInfo> getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier,
- SegmentIndexFileStore indexFileStore) throws IOException {
+ SegmentIndexFileStore indexFileStore, Set<String> filesRead) throws IOException {
if (identifier.getMergeIndexFileName() != null) {
CarbonFile indexMergeFile = FileFactory.getCarbonFile(
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getMergeIndexFileName());
- if (indexMergeFile.exists()) {
+ if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
+ filesRead.add(indexMergeFile.getPath());
}
}
if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
@@ -151,10 +155,10 @@ public class BlockletDataMapIndexStore
}
if (missedIdentifiers.size() > 0) {
SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
-
+ Set<String> filesRead = new HashSet<>();
for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
Map<String, BlockMetaInfo> blockMetaInfoMap =
- getBlockMetaInfoMap(identifier, indexFileStore);
+ getBlockMetaInfoMap(identifier, indexFileStore, filesRead);
blockletDataMaps.add(
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap));
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 3fc8ad6..4adc977 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -370,7 +370,7 @@ public class SegmentFileStore {
for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
String location = entry.getKey();
if (entry.getValue().isRelative) {
- location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+ location = tablePath + location;
}
if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
for (String indexFile : entry.getValue().getFiles()) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b961b60..06511f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2326,12 +2326,28 @@ public final class CarbonUtil {
throws IOException {
long carbonDataSize = 0L;
long carbonIndexSize = 0L;
+ List<String> listOfFilesRead = new ArrayList<>();
HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
if (fileStore.getLocationMap() != null) {
fileStore.readIndexFiles();
+ Map<String, String> indexFiles = fileStore.getIndexFiles();
Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
- carbonIndexSize += FileFactory.getCarbonFile(entry.getKey()).getSize();
+ // get the size of carbonindex file
+ String indexFile = entry.getKey();
+ String mergeIndexFile = indexFiles.get(indexFile);
+ if (null != mergeIndexFile) {
+ String mergeIndexPath = indexFile
+ .substring(0, indexFile.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR) + 1)
+ + mergeIndexFile;
+ if (!listOfFilesRead.contains(mergeIndexPath)) {
+ carbonIndexSize += FileFactory.getCarbonFile(mergeIndexPath).getSize();
+ listOfFilesRead.add(mergeIndexPath);
+ }
+ } else {
+ carbonIndexSize += FileFactory.getCarbonFile(indexFile).getSize();
+ }
+ // get the size of carbondata files
for (String blockFile : entry.getValue()) {
carbonDataSize += FileFactory.getCarbonFile(blockFile).getSize();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 7ea11bd..4634b06 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -160,13 +160,9 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
if (operationContext != null) {
LoadEvents.LoadTablePostStatusUpdateEvent postStatusUpdateEvent =
new LoadEvents.LoadTablePostStatusUpdateEvent(loadModel);
- LoadEvents.LoadTableMergePartitionEvent loadTableMergePartitionEvent =
- new LoadEvents.LoadTableMergePartitionEvent(readPath);
try {
OperationListenerBus.getInstance()
.fireEvent(postStatusUpdateEvent, (OperationContext) operationContext);
- OperationListenerBus.getInstance()
- .fireEvent(loadTableMergePartitionEvent, (OperationContext) operationContext);
} catch (Exception e) {
throw new IOException(e);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 671e132..538df4a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -185,7 +185,7 @@ case class AlterTableCompactionPreStatusUpdateEvent(sparkSession: SparkSession,
* Compaction Event for handling post update status file operations, like committing child
* datamaps in one transaction
*/
-case class AlterTableCompactionPostStatusUpdateEvent(
+case class AlterTableCompactionPostStatusUpdateEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
carbonMergerMapping: CarbonMergerMapping,
carbonLoadModel: CarbonLoadModel,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 231b748..a987127 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -246,7 +246,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
carbonLoadModel,
compactionType,
segmentFileName)
- val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(carbonTable,
+
+ val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(sc.sparkSession,
+ carbonTable,
carbonMergerMapping,
carbonLoadModel,
mergedLoadName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 8c3ca0f..95bbd29 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.preaaggregate._
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
import org.apache.spark.sql.hive.{HiveSessionCatalog, _}
+import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -135,6 +136,17 @@ object CarbonEnv {
}
/**
+ * Method
+ * 1. To initialize Listeners to their respective events in the OperationListenerBus
+ * 2. To register common listeners
+ *
+ */
+ def init(sparkSession: SparkSession): Unit = {
+ initListeners
+ registerCommonListener(sparkSession)
+ }
+
+ /**
* Method to initialize Listeners to their respective events in the OperationListenerBus.
*/
def initListeners(): Unit = {
@@ -158,6 +170,14 @@ object CarbonEnv {
.addListener(classOf[AlterTableCompactionPostStatusUpdateEvent], CommitPreAggregateListener)
}
+ def registerCommonListener(sparkSession: SparkSession): Unit = {
+ val clsName = sparkSession.sparkContext.conf
+ .get(CarbonCommonConstants.CARBON_COMMON_LISTENER_REGISTER_CLASSNAME)
+ if (null != clsName && !clsName.isEmpty) {
+ CarbonReflectionUtils.createObject(clsName)
+ }
+ }
+
/**
* Return carbon table instance from cache or by looking up table in `sparkSession`
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index eb00ebf..18c268c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -68,7 +68,7 @@ import org.apache.carbondata.events.exception.PreEventException
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.TableProcessingOperations
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -767,6 +767,11 @@ case class CarbonLoadDataCommand(
carbonLoadModel,
table,
operationContext)
+
+ val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+ new LoadTablePreStatusUpdateEvent(table.getCarbonTableIdentifier, carbonLoadModel)
+ OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+
} catch {
case e: Exception =>
throw new Exception(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index b341d6a..d30e96d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -67,6 +67,7 @@ object Spark2TestQueryExecutor {
.enableHiveSupport()
.config("spark.sql.warehouse.dir", warehouse)
.config("spark.sql.crossJoin.enabled", "true")
+ .config(CarbonCommonConstants.CARBON_COMMON_LISTENER_REGISTER_CLASSNAME, "")
.getOrCreateCarbonSession(null, TestQueryExecutor.metastoredb)
if (warehouse.startsWith("hdfs://")) {
System.setProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, warehouse)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index ba2fe947..d381144 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -79,7 +79,7 @@ class CarbonSessionCatalog(
}
// Initialize all listeners to the Operation bus.
- CarbonEnv.initListeners()
+ CarbonEnv.init(sparkSession)
/**
* This method will invalidate carbonrelation from cache if carbon table is updated in
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
index a3fa292..50ebc34 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -182,15 +182,4 @@ public class LoadEvents {
}
}
- public static class LoadTableMergePartitionEvent extends Event {
- private String segmentPath;
-
- public LoadTableMergePartitionEvent(String segmentPath) {
- this.segmentPath = segmentPath;
- }
-
- public String getSegmentPath() {
- return segmentPath;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f5cdd5ca/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 922a7ee..65827b0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -1103,14 +1103,14 @@ public final class CarbonLoaderUtil {
* Merge index files with in the segment of partitioned table
* @param segmentId
* @param tablePath
- * @param uniqueId
* @return
* @throws IOException
*/
- public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath,
- String uniqueId) throws IOException {
+ public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath)
+ throws IOException {
CarbonIndexFileMergeWriter.SegmentIndexFIleMergeStatus segmentIndexFIleMergeStatus =
new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentId, tablePath);
+ String uniqueId = "";
if (segmentIndexFIleMergeStatus != null) {
uniqueId = System.currentTimeMillis() + "";
String newSegmentFileName = segmentId + "_" + uniqueId + CarbonTablePath.SEGMENT_EXT;