You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/10/30 08:45:36 UTC
carbondata git commit: [CARBONDATA-3042] Column Schema objects are
present in Driver and Executor even after dropping table
Repository: carbondata
Updated Branches:
refs/heads/master e2c517e3f -> 10b393808
[CARBONDATA-3042] Column Schema objects are present in Driver and Executor even after dropping table
Problem:
Column Schema objects are present in Driver and Executor even after dropping table.
Solution:
In Driver: After dropping table, remove entry of tableInfo from CarbonMetaDataInstance.
In Executor: Remove usage of CarbonMetaDataInstance object and instead pass CarbonTable Object itself
This closes #2852
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/10b39380
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/10b39380
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/10b39380
Branch: refs/heads/master
Commit: 10b393808e91344b017ba3e946b28217c2dd9757
Parents: e2c517e
Author: Indhumathi27 <in...@gmail.com>
Authored: Thu Oct 25 13:37:08 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue Oct 30 14:19:39 2018 +0530
----------------------------------------------------------------------
.../core/metadata/CarbonMetadata.java | 5 +-
.../statusmanager/SegmentStatusManager.java | 6 +-
.../carbondata/core/util/DeleteLoadFolders.java | 31 +++++------
.../spark/rdd/AlterTableLoadPartitionRDD.scala | 4 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 3 +-
.../carbondata/spark/rdd/StreamHandoffRDD.scala | 1 -
.../spark/sql/CarbonDictionaryDecoder.scala | 5 --
.../spark/sql/hive/CarbonFileMetastore.scala | 3 +-
.../spark/sql/hive/CarbonHiveMetaStore.scala | 2 +-
.../loading/DataLoadProcessBuilder.java | 2 -
.../sort/impl/ParallelReadMergeSorterImpl.java | 9 ++-
...allelReadMergeSorterWithColumnRangeImpl.java | 8 +--
.../UnsafeBatchParallelReadMergeSorterImpl.java | 6 +-
...allelReadMergeSorterWithColumnRangeImpl.java | 11 ++--
.../CarbonRowDataWriterProcessorStepImpl.java | 13 ++---
.../steps/DataConverterProcessorStepImpl.java | 6 +-
.../steps/DataWriterBatchProcessorStepImpl.java | 11 ++--
.../steps/DataWriterProcessorStepImpl.java | 18 +++---
.../merger/CompactionResultSortProcessor.java | 4 +-
.../merger/RowResultMergerProcessor.java | 5 +-
.../partition/spliter/RowResultProcessor.java | 5 +-
.../sort/sortdata/SortParameters.java | 44 +++++++++------
.../store/CarbonFactDataHandlerModel.java | 4 +-
.../util/CarbonDataProcessorUtil.java | 58 +++++---------------
24 files changed, 110 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index 850f477..e44092e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -69,9 +69,8 @@ public final class CarbonMetadata {
/**
* Below method will be used to set the carbon table
- * This method will be used in executor side as driver will always have
- * updated table so from driver during query execution and data loading
- * we just need to add the table
+ * Note: Use this method only in driver as clean up in Executor is not handled
+ * if this table is added to executor
*
* @param carbonTable
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 9196367..fbb765b 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -1001,9 +1001,9 @@ public class SegmentStatusManager {
CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
}
if (updationCompletionStatus) {
- DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
- identifier, carbonTable.getMetadataPath(),
- newAddedLoadHistoryList, isForceDeletion, partitionSpecs);
+ DeleteLoadFolders
+ .physicalFactAndMeasureMetadataDeletion(carbonTable, newAddedLoadHistoryList,
+ isForceDeletion, partitionSpecs);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index f1cc57f..b614f55 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -66,22 +66,19 @@ public final class DeleteLoadFolders {
return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
}
- public static void physicalFactAndMeasureMetadataDeletion(
- AbsoluteTableIdentifier absoluteTableIdentifier,
- String metadataPath,
+ public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
LoadMetadataDetails[] newAddedLoadHistoryList,
boolean isForceDelete,
List<PartitionSpec> specs) {
- LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
- physicalFactAndMeasureMetadataDeletion(
- absoluteTableIdentifier,
+ LoadMetadataDetails[] currentDetails =
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+ physicalFactAndMeasureMetadataDeletion(carbonTable,
currentDetails,
isForceDelete,
specs,
currentDetails);
if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) {
- physicalFactAndMeasureMetadataDeletion(
- absoluteTableIdentifier,
+ physicalFactAndMeasureMetadataDeletion(carbonTable,
newAddedLoadHistoryList,
isForceDelete,
specs,
@@ -91,17 +88,15 @@ public final class DeleteLoadFolders {
/**
* Delete the invalid data physically from table.
- * @param absoluteTableIdentifier table identifier
+ * @param carbonTable table
* @param loadDetails Load details which need clean up
* @param isForceDelete is Force delete requested by user
* @param specs Partition specs
* @param currLoadDetails Current table status load details which are required for update manager.
*/
- private static void physicalFactAndMeasureMetadataDeletion(
- AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] loadDetails,
- boolean isForceDelete, List<PartitionSpec> specs, LoadMetadataDetails[] currLoadDetails) {
- CarbonTable carbonTable = DataMapStoreManager.getInstance().getCarbonTable(
- absoluteTableIdentifier);
+ private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
+ LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs,
+ LoadMetadataDetails[] currLoadDetails) {
List<TableDataMap> indexDataMaps = new ArrayList<>();
try {
for (TableDataMap dataMap : DataMapStoreManager.getInstance().getAllDataMap(carbonTable)) {
@@ -112,7 +107,8 @@ public final class DeleteLoadFolders {
} catch (IOException e) {
LOGGER.warn(String.format(
"Failed to get datamaps for %s.%s, therefore the datamap files could not be cleaned.",
- absoluteTableIdentifier.getDatabaseName(), absoluteTableIdentifier.getTableName()));
+ carbonTable.getAbsoluteTableIdentifier().getDatabaseName(),
+ carbonTable.getAbsoluteTableIdentifier().getTableName()));
}
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
@@ -120,12 +116,11 @@ public final class DeleteLoadFolders {
if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
try {
if (oneLoad.getSegmentFile() != null) {
- SegmentFileStore.deleteSegment(
- absoluteTableIdentifier.getTablePath(),
+ SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(),
new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()),
specs, updateStatusManager);
} else {
- String path = getSegmentPath(absoluteTableIdentifier, oneLoad);
+ String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad);
boolean status = false;
if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index a03447d..4322359 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -62,13 +62,11 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
val partitionId: Int = partitionInfo.getPartitionId(split.index)
carbonLoadModel.setTaskNo(String.valueOf(partitionId))
carbonLoadModel.setSegmentId(segmentId)
- CarbonMetadata.getInstance().addCarbonTable(
- carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)
CommonUtil.setTempStoreLocation(split.index, carbonLoadModel,
isCompactionFlow = false, isAltPartitionFlow = true)
val tempStoreLoc: Array[String] = CarbonDataProcessorUtil.getLocalDataFolderLocation(
- databaseName, factTableName, carbonLoadModel.getTaskNo, segmentId, false, true)
+ carbonTable, carbonLoadModel.getTaskNo, segmentId, false, true)
val loadStatus: Boolean = if (rows.isEmpty) {
LOGGER.info("After repartition this split, NO target rows to write back.")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 07f6964..1fbcc51 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -86,7 +86,6 @@ class CarbonMergerRDD[K, V](
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- CarbonMetadata.getInstance().addCarbonTable(carbonTable)
val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
if (carbonTable.isPartitionTable) {
carbonLoadModel.setTaskNo(String.valueOf(carbonSparkPartition.partitionId))
@@ -198,7 +197,7 @@ class CarbonMergerRDD[K, V](
}
val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
- databaseName, factTableName, carbonLoadModel.getTaskNo, mergeNumber, true, false)
+ carbonTable, carbonLoadModel.getTaskNo, mergeNumber, true, false)
if (restructuredBlockExists) {
LOGGER.info("CompactionResultSortProcessor flow is selected")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 3f0eb71..13172c7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -117,7 +117,6 @@ class StreamHandoffRDD[K, V](
carbonLoadModel.setTaskNo("" + split.index)
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
- CarbonMetadata.getInstance().addCarbonTable(carbonTable)
// the input iterator is using raw row
val iteratorList = prepareInputIterator(split, carbonTable)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index ff7ac60..f3d5bf0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -279,11 +279,6 @@ case class CarbonDictionaryDecoder(
if (null != carbonDimension.getColumnSchema.getParentColumnTableRelations &&
!carbonDimension
.getColumnSchema.getParentColumnTableRelations.isEmpty) {
- val parentRelationIdentifier = carbonDimension.getColumnSchema
- .getParentColumnTableRelations.get(0).getRelationIdentifier
- val parentTablePath = CarbonMetadata.getInstance()
- .getCarbonTable(parentRelationIdentifier.getDatabaseName,
- parentRelationIdentifier.getTableName).getTablePath
(QueryUtil
.getTableIdentifierForColumn(carbonDimension),
new ColumnIdentifier(carbonDimension.getColumnSchema
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 982bbee..96b31c2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -491,7 +491,6 @@ class CarbonFileMetastore extends CarbonMetaStore {
// in the other beeline need to update.
checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
- removeTableFromMetadata(dbName, tableName)
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
updateSchemasUpdatedTime(touchSchemaFileSystemTime())
// discard cached table info in cachedDataSourceTables
@@ -499,6 +498,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
+ removeTableFromMetadata(dbName, tableName)
} else {
if (!isTransactionalCarbonTable(absoluteTableIdentifier)) {
removeTableFromMetadata(dbName, tableName)
@@ -508,6 +508,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
+ removeTableFromMetadata(dbName, tableName)
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 5ebe242..c8c7d31 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -79,13 +79,13 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
}
checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
- removeTableFromMetadata(dbName, tableName)
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
val tableIdentifier = TableIdentifier(tableName, Option(dbName))
sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
+ removeTableFromMetadata(dbName, tableName)
}
override def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index f89a4e7..bcca915 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -27,7 +27,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -249,7 +248,6 @@ public final class DataLoadProcessBuilder {
configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
loadModel.getBadRecordsLocation());
- CarbonMetadata.getInstance().addCarbonTable(carbonTable);
List<CarbonDimension> dimensions =
carbonTable.getDimensionByTableName(carbonTable.getTableName());
List<CarbonMeasure> measures =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index f0920ee..55b336e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -71,11 +71,10 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
public void initialize(SortParameters sortParameters) {
this.sortParameters = sortParameters;
intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
- String[] storeLocations =
- CarbonDataProcessorUtil.getLocalDataFolderLocation(
- sortParameters.getDatabaseName(), sortParameters.getTableName(),
- String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
- false, false);
+ String[] storeLocations = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(sortParameters.getCarbonTable(),
+ String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(), false,
+ false);
// Set the data file location
String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations,
File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
index 3b767aa..8b86c0c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
@@ -148,7 +148,7 @@ public class ParallelReadMergeSorterWithColumnRangeImpl extends AbstractMergeSor
private SingleThreadFinalSortFilesMerger getFinalMerger(SortParameters sortParameters) {
String[] storeLocation = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+ .getLocalDataFolderLocation(sortParameters.getCarbonTable(),
String.valueOf(sortParameters.getTaskNo()),
sortParameters.getSegmentId() + "", false, false);
// Set the data file location
@@ -188,9 +188,9 @@ public class ParallelReadMergeSorterWithColumnRangeImpl extends AbstractMergeSor
private void setTempLocation(SortParameters parameters) {
String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(parameters.getDatabaseName(),
- parameters.getTableName(), parameters.getTaskNo(),
- parameters.getSegmentId(), false, false);
+ .getLocalDataFolderLocation(parameters.getCarbonTable(), parameters.getTaskNo(),
+ parameters.getSegmentId(),
+ false, false);
String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
parameters.setTempFileLocation(tmpLocs);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 9cb67df..aa960b6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -232,9 +232,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
}
private void setTempLocation(SortParameters parameters) {
- String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
- parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(),
- parameters.getSegmentId(), false, false);
+ String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(parameters.getCarbonTable(), parameters.getTaskNo(),
+ parameters.getSegmentId(), false, false);
String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
parameters.setTempFileLocation(tempDirs);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
index a8ec05c..f9631a5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
@@ -143,9 +143,9 @@ public class UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMe
private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(SortParameters sortParameters) {
String[] storeLocation = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
- String.valueOf(sortParameters.getTaskNo()),
- sortParameters.getSegmentId() + "", false, false);
+ .getLocalDataFolderLocation(sortParameters.getCarbonTable(),
+ String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId() + "", false,
+ false);
// Set the data file location
String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -182,9 +182,8 @@ public class UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMe
private void setTempLocation(SortParameters parameters) {
String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
- parameters.getTaskNo(), parameters.getSegmentId(),
- false, false);
+ .getLocalDataFolderLocation(parameters.getCarbonTable(), parameters.getTaskNo(),
+ parameters.getSegmentId(), false, false);
String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
LOGGER.warn("set temp location: " + StringUtils.join(tmpLoc, ", "));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index ae42df7..ce79f24 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -100,13 +100,10 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
child.initialize();
}
- private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
- String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
- tableIdentifier.getDatabaseName(),
- tableIdentifier.getTableName(),
- String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(),
- false,
- false);
+ private String[] getStoreLocation() {
+ String[] storeLocation = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(this.configuration.getTableSpec().getCarbonTable(),
+ String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), false, false);
CarbonDataProcessorUtil.createLocations(storeLocation);
return storeLocation;
}
@@ -161,7 +158,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
}
private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) throws IOException {
- String[] storeLocation = getStoreLocation(tableIdentifier);
+ String[] storeLocation = getStoreLocation();
DataMapWriterListener listener = getDataMapWriterListener(0);
CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
configuration, storeLocation, 0, iteratorIndex, listener);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index ae9ec3d..0195877 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -136,15 +136,13 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
Arrays.sort(convertedSortColumnRanges,
new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil
- .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
- configuration.getTableIdentifier().getTableName())));
+ .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable())));
// range partitioner to dispatch rows by sort columns
this.partitioner = new RangePartitionerImpl(convertedSortColumnRanges,
new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil
- .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
- configuration.getTableIdentifier().getTableName())));
+ .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable())));
}
// only convert sort column fields
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
index 694b345..7cb102b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -67,11 +67,10 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
child.initialize();
}
- private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
- return CarbonDataProcessorUtil.getLocalDataFolderLocation(
- tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(),
- String.valueOf(configuration.getTaskNo()),
- configuration.getSegmentId(), false, false);
+ private String[] getStoreLocation() {
+ return CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(configuration.getTableSpec().getCarbonTable(),
+ String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), false, false);
}
@Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
@@ -84,7 +83,7 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
.recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
System.currentTimeMillis());
int i = 0;
- String[] storeLocation = getStoreLocation(tableIdentifier);
+ String[] storeLocation = getStoreLocation();
CarbonDataProcessorUtil.createLocations(storeLocation);
for (Iterator<CarbonRowBatch> iterator : iterators) {
int k = 0;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index 3d704c9..1595e1b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -89,19 +89,16 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
this.carbonFactHandlers = new CopyOnWriteArrayList<>();
}
- private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
+ private String[] getStoreLocation() {
String[] storeLocation = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
- tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()),
- configuration.getSegmentId(), false, false);
+ .getLocalDataFolderLocation(configuration.getTableSpec().getCarbonTable(),
+ String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), false, false);
CarbonDataProcessorUtil.createLocations(storeLocation);
return storeLocation;
}
public CarbonFactDataHandlerModel getDataHandlerModel() {
- CarbonTableIdentifier tableIdentifier =
- configuration.getTableIdentifier().getCarbonTableIdentifier();
- String[] storeLocation = getStoreLocation(tableIdentifier);
+ String[] storeLocation = getStoreLocation();
listener = getDataMapWriterListener(0);
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.createCarbonFactDataHandlerModel(configuration, storeLocation, 0, 0, listener);
@@ -170,14 +167,13 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
@Override public Void call() throws Exception {
LOGGER.info("Process writer forward for table " + tableIdentifier.getTableName()
+ ", range: " + rangeId);
- processRange(insideRangeIterator, tableIdentifier, rangeId);
+ processRange(insideRangeIterator, rangeId);
return null;
}
}
- private void processRange(Iterator<CarbonRowBatch> insideRangeIterator,
- CarbonTableIdentifier tableIdentifier, int rangeId) {
- String[] storeLocation = getStoreLocation(tableIdentifier);
+ private void processRange(Iterator<CarbonRowBatch> insideRangeIterator, int rangeId) {
+ String[] storeLocation = getStoreLocation();
listener = getDataMapWriterListener(rangeId);
CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 8d28d45..7c7b8ee 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -463,7 +463,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
*/
private void initializeFinalThreadMergerForMergeSort() {
boolean[] noDictionarySortColumnMapping = CarbonDataProcessorUtil
- .getNoDictSortColMapping(carbonTable.getDatabaseName(), carbonTable.getTableName());
+ .getNoDictSortColMapping(carbonTable);
sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -482,7 +482,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
+ carbonLoadModel.getFactTimeStamp() + ".tmp";
} else {
carbonStoreLocation = CarbonDataProcessorUtil
- .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName,
+ .createCarbonStoreLocation(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
carbonLoadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index dcb7cb4..6475ba8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -81,8 +81,9 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel
.getFactTimeStamp() + ".tmp";
} else {
- carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
- loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
+ carbonStoreLocation = CarbonDataProcessorUtil
+ .createCarbonStoreLocation(loadModel.getCarbonDataLoadSchema().getCarbonTable(),
+ loadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index ac70f27..977b9d3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -50,8 +50,9 @@ public class RowResultProcessor {
CarbonDataProcessorUtil.createLocations(tempStoreLocation);
this.segmentProperties = segProp;
String tableName = carbonTable.getTableName();
- String carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
- loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
+ String carbonStoreLocation = CarbonDataProcessorUtil
+ .createCarbonStoreLocation(loadModel.getCarbonDataLoadSchema().getCarbonTable(),
+ loadModel.getSegmentId());
CarbonFactDataHandlerModel carbonFactDataHandlerModel =
CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
segProp, tableName, tempStoreLocation, carbonStoreLocation);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index ecce232..7908f4f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -139,6 +139,11 @@ public class SortParameters implements Serializable {
private int batchSortSizeinMb;
private int rangeId = 0;
+ /**
+ * CarbonTable Info
+ */
+ private CarbonTable carbonTable;
+
public SortParameters getCopy() {
SortParameters parameters = new SortParameters();
parameters.tempFileLocation = tempFileLocation;
@@ -172,6 +177,7 @@ public class SortParameters implements Serializable {
parameters.numberOfCores = numberOfCores;
parameters.batchSortSizeinMb = batchSortSizeinMb;
parameters.rangeId = rangeId;
+ parameters.carbonTable = carbonTable;
return parameters;
}
@@ -375,11 +381,21 @@ public class SortParameters implements Serializable {
this.batchSortSizeinMb = batchSortSizeinMb;
}
+ public void setCarbonTable(CarbonTable carbonTable) {
+ this.carbonTable = carbonTable;
+ }
+
+ public CarbonTable getCarbonTable() {
+ return carbonTable;
+ }
+
+
public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) {
SortParameters parameters = new SortParameters();
CarbonTableIdentifier tableIdentifier =
configuration.getTableIdentifier().getCarbonTableIdentifier();
CarbonProperties carbonProperties = CarbonProperties.getInstance();
+ parameters.setCarbonTable(configuration.getTableSpec().getCarbonTable());
parameters.setDatabaseName(tableIdentifier.getDatabaseName());
parameters.setTableName(tableIdentifier.getTableName());
parameters.setPartitionID("0");
@@ -401,8 +417,7 @@ public class SortParameters implements Serializable {
parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
- .getNoDictSortColMapping(configuration.getTableIdentifier().getDatabaseName(),
- configuration.getTableIdentifier().getTableName()));
+ .getNoDictSortColMapping(parameters.getCarbonTable()));
parameters.setSortColumn(configuration.getSortColumnMapping());
parameters.setObserver(new SortObserver());
// get sort buffer size
@@ -418,10 +433,9 @@ public class SortParameters implements Serializable {
LOGGER.info("Number of intermediate file to be merged: " + parameters
.getNumberOfIntermediateFileToBeMerged());
-
- String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
- tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(),
- configuration.getTaskNo(), configuration.getSegmentId(), false, false);
+ String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+ .getLocalDataFolderLocation(parameters.getCarbonTable(),
+ configuration.getTaskNo(), configuration.getSegmentId(), false, false);
String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -453,11 +467,9 @@ public class SortParameters implements Serializable {
DataType[] measureDataType = configuration.getMeasureDataType();
parameters.setMeasureDataType(measureDataType);
parameters.setNoDictDataType(CarbonDataProcessorUtil
- .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
- configuration.getTableIdentifier().getTableName()));
+ .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable()));
Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
- .getNoDictSortAndNoSortDataTypes(configuration.getTableIdentifier().getDatabaseName(),
- configuration.getTableIdentifier().getTableName());
+ .getNoDictSortAndNoSortDataTypes(configuration.getTableSpec().getCarbonTable());
parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
return parameters;
@@ -477,6 +489,7 @@ public class SortParameters implements Serializable {
boolean[] sortColumnMapping, boolean[] isVarcharDimensionColumn, boolean isCompactionFlow) {
SortParameters parameters = new SortParameters();
CarbonProperties carbonProperties = CarbonProperties.getInstance();
+ parameters.setCarbonTable(carbonTable);
parameters.setDatabaseName(databaseName);
parameters.setTableName(tableName);
parameters.setPartitionID(CarbonTablePath.DEPRECATED_PATITION_ID);
@@ -506,7 +519,7 @@ public class SortParameters implements Serializable {
.getNumberOfIntermediateFileToBeMerged());
String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(databaseName, tableName, taskNo, segmentId,
+ .getLocalDataFolderLocation(carbonTable, taskNo, segmentId,
isCompactionFlow, false);
String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -532,17 +545,16 @@ public class SortParameters implements Serializable {
CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
DataType[] type = CarbonDataProcessorUtil
- .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
- parameters.getTableName());
+ .getMeasureDataType(parameters.getMeasureColCount(), parameters.getCarbonTable());
parameters.setMeasureDataType(type);
parameters.setNoDictDataType(CarbonDataProcessorUtil
- .getNoDictDataTypes(parameters.getDatabaseName(), parameters.getTableName()));
+ .getNoDictDataTypes(carbonTable));
Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
- .getNoDictSortAndNoSortDataTypes(parameters.getDatabaseName(), parameters.getTableName());
+ .getNoDictSortAndNoSortDataTypes(parameters.getCarbonTable());
parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
- .getNoDictSortColMapping(parameters.getDatabaseName(), parameters.getTableName()));
+ .getNoDictSortColMapping(parameters.getCarbonTable()));
return parameters;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index d086b6d..878ce6b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -205,8 +204,7 @@ public class CarbonFactDataHandlerModel {
}
}
}
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
- identifier.getDatabaseName(), identifier.getTableName());
+ CarbonTable carbonTable = configuration.getTableSpec().getCarbonTable();
List<ColumnSchema> wrapperColumnSchema = CarbonUtil
.getColumnSchemaList(carbonTable.getDimensionByTableName(identifier.getTableName()),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 6f36ef8..044e4e8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -33,7 +33,6 @@ import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.constants.LoggerAction;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -138,25 +137,6 @@ public final class CarbonDataProcessorUtil {
}
/**
- * This method will form the local data folder store location
- *
- * @param databaseName
- * @param tableName
- * @param taskId
- * @param segmentId
- * @param isCompactionFlow
- * @param isAltPartitionFlow
- * @return
- */
- public static String[] getLocalDataFolderLocation(String databaseName, String tableName,
- String taskId, String segmentId, boolean isCompactionFlow,
- boolean isAltPartitionFlow) {
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
- return getLocalDataFolderLocation(carbonTable, taskId,
- segmentId, isCompactionFlow, isAltPartitionFlow);
- }
-
- /**
* This method will form the key for getting the temporary location set in carbon properties
*
* @param databaseName
@@ -413,14 +393,12 @@ public final class CarbonDataProcessorUtil {
return columnNames;
}
- public static DataType[] getMeasureDataType(int measureCount, String databaseName,
- String tableName) {
+ public static DataType[] getMeasureDataType(int measureCount, CarbonTable carbonTable) {
DataType[] type = new DataType[measureCount];
for (int i = 0; i < type.length; i++) {
type[i] = DataTypes.DOUBLE;
}
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
- List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(tableName);
+ List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(carbonTable.getTableName());
for (int i = 0; i < type.length; i++) {
type[i] = measures.get(i).getDataType();
}
@@ -430,13 +408,12 @@ public final class CarbonDataProcessorUtil {
/**
* Get the no dictionary data types on the table
*
- * @param databaseName
- * @param tableName
+ * @param carbonTable
* @return
*/
- public static DataType[] getNoDictDataTypes(String databaseName, String tableName) {
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
- List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+ public static DataType[] getNoDictDataTypes(CarbonTable carbonTable) {
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
List<DataType> type = new ArrayList<>();
for (int i = 0; i < dimensions.size(); i++) {
if (dimensions.get(i).isSortColumn() && !dimensions.get(i).hasEncoding(Encoding.DICTIONARY)) {
@@ -449,13 +426,12 @@ public final class CarbonDataProcessorUtil {
/**
* Get the no dictionary sort column mapping of the table
*
- * @param databaseName
- * @param tableName
+ * @param carbonTable
* @return
*/
- public static boolean[] getNoDictSortColMapping(String databaseName, String tableName) {
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
- List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+ public static boolean[] getNoDictSortColMapping(CarbonTable carbonTable) {
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
List<Boolean> noDicSortColMap = new ArrayList<>();
for (int i = 0; i < dimensions.size(); i++) {
if (dimensions.get(i).isSortColumn()) {
@@ -477,14 +453,12 @@ public final class CarbonDataProcessorUtil {
/**
* Get the data types of the no dictionary sort columns
*
- * @param databaseName
- * @param tableName
+ * @param carbonTable
* @return
*/
- public static Map<String, DataType[]> getNoDictSortAndNoSortDataTypes(String databaseName,
- String tableName) {
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
- List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+ public static Map<String, DataType[]> getNoDictSortAndNoSortDataTypes(CarbonTable carbonTable) {
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
List<DataType> noDictSortType = new ArrayList<>();
List<DataType> noDictNoSortType = new ArrayList<>();
for (int i = 0; i < dimensions.size(); i++) {
@@ -509,9 +483,7 @@ public final class CarbonDataProcessorUtil {
*
* @return data directory path
*/
- public static String createCarbonStoreLocation(String databaseName, String tableName,
- String segmentId) {
- CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
+ public static String createCarbonStoreLocation(CarbonTable carbonTable, String segmentId) {
return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
}