You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 03:28:47 UTC
[27/49] carbondata git commit: [REBASE] Solve conflict after rebasing
master
[REBASE] Solve conflict after rebasing master
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/806b9841
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/806b9841
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/806b9841
Branch: refs/heads/carbonstore-rebase4
Commit: 806b98410b6a149be513fa136878e031e6b45920
Parents: 2d77936
Author: Jacky Li <ja...@qq.com>
Authored: Fri Feb 9 01:39:20 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Feb 27 00:30:22 2018 +0800
----------------------------------------------------------------------
.../scan/filter/FilterExpressionProcessor.java | 2 +-
.../filter/executer/FalseFilterExecutor.java | 16 +++++++-------
.../FalseConditionalResolverImpl.java | 4 ++--
.../apache/carbondata/core/util/CarbonUtil.java | 1 -
.../core/util/path/CarbonTablePath.java | 6 +++---
.../spark/rdd/AggregateDataMapCompactor.scala | 12 +++++------
.../preaaggregate/PreAggregateListeners.scala | 22 +++++++++-----------
.../CarbonAlterTableDataTypeChangeCommand.scala | 17 ++++++++-------
.../schema/CarbonAlterTableRenameCommand.scala | 3 ++-
.../apache/spark/sql/hive/CarbonMetaStore.scala | 12 ++++++-----
.../processing/util/CarbonLoaderUtil.java | 5 +++--
11 files changed, 51 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/806b9841/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index b882b51..26b202f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -398,7 +398,7 @@ public class FilterExpressionProcessor implements FilterProcessor {
ConditionalExpression condExpression = null;
switch (filterExpressionType) {
case FALSE:
- return new FalseConditionalResolverImpl(expression, false, false, tableIdentifier);
+ return new FalseConditionalResolverImpl(expression, false, false);
case TRUE:
return new TrueConditionalResolverImpl(expression, false, false);
case EQUALS:
http://git-wip-us.apache.org/repos/asf/carbondata/blob/806b9841/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java
index 2d2a15c..75a6ec3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java
@@ -21,7 +21,7 @@ import java.util.BitSet;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.intf.RowIntf;
-import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
import org.apache.carbondata.core.util.BitSetGroup;
/**
@@ -33,8 +33,8 @@ import org.apache.carbondata.core.util.BitSetGroup;
public class FalseFilterExecutor implements FilterExecuter {
@Override
- public BitSetGroup applyFilter(BlocksChunkHolder blocksChunkHolder, boolean useBitsetPipeline)
- throws FilterUnsupportedException, IOException {
+ public BitSetGroup applyFilter(RawBlockletColumnChunks blocksChunkHolder,
+ boolean useBitsetPipeline) throws FilterUnsupportedException, IOException {
int numberOfPages = blocksChunkHolder.getDataBlock().numberOfPages();
BitSetGroup group = new BitSetGroup(numberOfPages);
for (int i = 0; i < numberOfPages; i++) {
@@ -44,17 +44,19 @@ public class FalseFilterExecutor implements FilterExecuter {
return group;
}
- @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax)
+ @Override
+ public boolean applyFilter(RowIntf value, int dimOrdinalMax)
throws FilterUnsupportedException, IOException {
return false;
}
- @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
-
+ @Override
+ public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
return new BitSet();
}
- @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException {
+ @Override
+ public void readColumnChunks(RawBlockletColumnChunks blockChunkHolder) {
// Do Nothing
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/806b9841/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java
index eccda1e..63ce790 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java
@@ -28,8 +28,8 @@ public class FalseConditionalResolverImpl extends ConditionalFilterResolverImpl
private static final long serialVersionUID = 4599541011924324041L;
public FalseConditionalResolverImpl(Expression exp, boolean isExpressionResolve,
- boolean isIncludeFilter, AbsoluteTableIdentifier tableIdentifier) {
- super(exp, isExpressionResolve, isIncludeFilter, tableIdentifier, false);
+ boolean isIncludeFilter) {
+ super(exp, isExpressionResolve, isIncludeFilter, false);
}
@Override public void resolve(AbsoluteTableIdentifier absoluteTableIdentifier,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/806b9841/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index d531b5c..5ec0158 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -91,7 +91,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.comparator.Comparator;
import org.apache.carbondata.core.util.comparator.SerializableComparator;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.BlockletHeader;
import org.apache.carbondata.format.DataChunk2;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/806b9841/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 0164151..238d92a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -201,11 +201,11 @@ public class CarbonTablePath {
return getMetadataPath(tablePath) + File.separator + TABLE_STATUS_FILE;
}
- public String getTableStatusFilePathWithUUID(String uuid) {
+ public static String getTableStatusFilePathWithUUID(String tablePath, String uuid) {
if (!uuid.isEmpty()) {
- return getTableStatusFilePath() + CarbonCommonConstants.UNDERSCORE + uuid;
+ return getTableStatusFilePath(tablePath) + CarbonCommonConstants.UNDERSCORE + uuid;
} else {
- return getTableStatusFilePath();
+ return getTableStatusFilePath(tablePath);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/806b9841/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index 184bf1b..8bbe816 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -84,8 +84,8 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
case other => other
}
SegmentStatusManager.writeLoadDetailsIntoFile(
- CarbonTablePath.getTableStatusFilePathWithUUID(uuid),
- updatedLoadMetaDataDetails)
+ CarbonTablePath.getTableStatusFilePathWithUUID(carbonTable.getTablePath, uuid),
+ updatedLoadMetaDataDetails)
carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava)
} finally {
// check if any other segments needs compaction on in case of MINOR_COMPACTION.
@@ -105,11 +105,9 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
// 4. Therefore tablestatus file will be committed in between multiple commits.
if (!compactionModel.compactionType.equals(CompactionType.MAJOR)) {
if (!identifySegmentsToBeMerged().isEmpty) {
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- .getAbsoluteTableIdentifier)
- val uuidTableStaus = carbonTablePath.getTableStatusFilePathWithUUID(uuid)
- val tableStatus = carbonTablePath.getTableStatusFilePath
+ val uuidTableStaus = CarbonTablePath.getTableStatusFilePathWithUUID(
+ carbonTable.getTablePath, uuid)
+ val tableStatus = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
if (!uuidTableStaus.equalsIgnoreCase(tableStatus)) {
FileFactory.getCarbonFile(uuidTableStaus).renameForce(tableStatus)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/806b9841/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 657e0c5..083b8f2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -290,13 +290,12 @@ object CommitPreAggregateListener extends OperationEventListener {
// keep committing until one fails
val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand =>
val childCarbonTable = childLoadCommand.table
- val carbonTablePath =
- new CarbonTablePath(childCarbonTable.getCarbonTableIdentifier,
- childCarbonTable.getTablePath)
// Generate table status file name with UUID, forExample: tablestatus_1
- val oldTableSchemaPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid)
+ val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
+ childCarbonTable.getTablePath, uuid)
// Generate table status file name without UUID, forExample: tablestatus
- val newTableSchemaPath = carbonTablePath.getTableStatusFilePath
+ val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+ childCarbonTable.getTablePath)
renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
}
// if true then the commit for one of the child tables has failed
@@ -306,11 +305,11 @@ object CommitPreAggregateListener extends OperationEventListener {
renamedDataMaps.foreach {
loadCommand =>
val carbonTable = loadCommand.table
- val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
- carbonTable.getTablePath)
// rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
- val backupTableSchemaPath = carbonTablePath.getTableStatusFilePath + "_backup_" + uuid
- val tableSchemaPath = carbonTablePath.getTableStatusFilePath
+ val backupTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+ carbonTable.getTablePath) + "_backup_" + uuid
+ val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+ carbonTable.getTablePath)
markInProgressSegmentAsDeleted(backupTableSchemaPath, operationContext, loadCommand)
renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
}
@@ -377,9 +376,8 @@ object CommitPreAggregateListener extends OperationEventListener {
operationContext: OperationContext,
uuid: String): Unit = {
childTables.foreach { childTable =>
- val carbonTablePath = new CarbonTablePath(childTable.getCarbonTableIdentifier,
- childTable.getTablePath)
- val metaDataDir = FileFactory.getCarbonFile(carbonTablePath.getMetadataDirectoryPath)
+ val metaDataDir = FileFactory.getCarbonFile(
+ CarbonTablePath.getMetadataPath(childTable.getTablePath))
val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
file.getName.contains(uuid) || file.getName.contains("backup")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/806b9841/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index f4077e6..91d1c1b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -26,11 +26,10 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
private[sql] case class CarbonAlterTableDataTypeChangeCommand(
@@ -75,16 +74,18 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
throwMetadataException(dbName, tableName, s"Invalid Column: $columnName")
}
// read the latest schema file
- val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+ val tableInfo: org.apache.carbondata.format.TableInfo =
+ metastore.getThriftTableInfo(carbonTable)(sparkSession)
// maintain the added column for schema evolution history
- var addColumnSchema: ColumnSchema = null
- var deletedColumnSchema: ColumnSchema = null
+ var addColumnSchema: org.apache.carbondata.format.ColumnSchema = null
+ var deletedColumnSchema: org.apache.carbondata.format.ColumnSchema = null
val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
columnSchemaList.foreach { columnSchema =>
if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
deletedColumnSchema = columnSchema.deepCopy
- columnSchema.setData_type(DataTypeConverterUtil
- .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
+ columnSchema.setData_type(
+ DataTypeConverterUtil.convertToThriftDataType(
+ alterTableDataTypeChangeModel.dataTypeInfo.dataType))
columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
addColumnSchema = columnSchema
http://git-wip-us.apache.org/repos/asf/carbondata/blob/806b9841/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index a55dbdd..870c140 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -138,7 +138,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
}
}
- newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier,
+ newTablePath = metastore.updateTableSchemaForAlter(
+ newTableIdentifier,
carbonTable.getCarbonTableIdentifier,
tableInfo,
schemaEvolutionEntry,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/806b9841/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 0645040..7c40bcd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -27,8 +27,7 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable
import org.apache.carbondata.core.metadata.schema
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.format.SchemaEvolutionEntry
/**
@@ -69,7 +68,8 @@ trait CarbonMetaStore {
* @param carbonStorePath
* @param sparkSession
*/
- def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier,
+ def updateTableSchemaForAlter(
+ newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
@@ -128,7 +128,8 @@ trait CarbonMetaStore {
def removeTableFromMetadata(dbName: String, tableName: String): Unit
def updateMetadataByThriftTable(schemaFilePath: String,
- tableInfo: TableInfo, dbName: String, tableName: String, tablePath: String): Unit
+ tableInfo: org.apache.carbondata.format.TableInfo,
+ dbName: String, tableName: String, tablePath: String): Unit
def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean
@@ -143,7 +144,8 @@ trait CarbonMetaStore {
def listAllTables(sparkSession: SparkSession): Seq[CarbonTable]
- def getThriftTableInfo(carbonTable: CarbonTable)(sparkSession: SparkSession): TableInfo
+ def getThriftTableInfo(carbonTable: CarbonTable)(sparkSession: SparkSession):
+ org.apache.carbondata.format.TableInfo
def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
http://git-wip-us.apache.org/repos/asf/carbondata/blob/806b9841/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index e7c52f6..03c8c27 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -180,9 +180,10 @@ public final class CarbonLoaderUtil {
}
String tableStatusPath;
if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !uuid.isEmpty()) {
- tableStatusPath = CarbonTablePath.getTableStatusFilePathWithUUID(uuid);
+ tableStatusPath = CarbonTablePath.getTableStatusFilePathWithUUID(
+ identifier.getTablePath(), uuid);
} else {
- tableStatusPath = CarbonTablePath.getTableStatusFilePath();
+ tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
}
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();