You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/03/16 09:28:55 UTC
[12/14] incubator-carbondata git commit: Restructure requirement
related spark-integration changes
Restructure requirement related spark-integration changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/44bb6f1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/44bb6f1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/44bb6f1e
Branch: refs/heads/master
Commit: 44bb6f1e02f6f62739cdbc111bcb31f8860a09ea
Parents: 70256e7
Author: nareshpr <pr...@gmail.com>
Authored: Fri Mar 10 11:30:51 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Mar 16 14:50:43 2017 +0530
----------------------------------------------------------------------
.../spark/merger/CarbonCompactionExecutor.java | 36 ++-
.../spark/merger/CarbonCompactionUtil.java | 79 +++++
.../spark/merger/RowResultMerger.java | 5 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 44 +--
.../spark/rdd/DataManagementFunc.scala | 2 +-
.../carbondata/spark/util/CarbonScalaUtil.scala | 100 ++++++
.../carbondata/spark/util/CommonUtil.scala | 7 +
.../spark/util/DataTypeConverterUtil.scala | 36 +++
.../spark/util/GlobalDictionaryUtil.scala | 113 ++++++-
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 55 +++-
.../execution/command/carbonTableSchema.scala | 168 +++++++++-
.../spark/sql/CarbonDictionaryDecoder.scala | 6 +-
.../org/apache/spark/sql/CarbonSource.scala | 34 +-
.../sql/execution/command/DDLStrategy.scala | 27 ++
.../execution/command/carbonTableSchema.scala | 310 ++++++++++++++++++-
.../apache/spark/sql/hive/CarbonMetastore.scala | 121 ++++++--
.../sql/parser/CarbonSpark2SqlParser.scala | 83 ++++-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 7 +-
.../AlterTableValidationTestCase.scala | 111 +++++++
19 files changed, 1248 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
index 2f84ade..4458457 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionExecutor.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
@@ -92,10 +93,15 @@ public class CarbonCompactionExecutor {
String segmentId = taskMap.getKey();
List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
- int[] colCardinality = listMetadata.get(0).getSegmentInfo().getColumnCardinality();
-
+ List<ColumnSchema> updatedColumnSchemaList = CarbonUtil
+ .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
+ carbonTable.getMeasureByTableName(carbonTable.getFactTableName()));
+ int[] updatedColumnCardinalities = CarbonUtil
+ .getUpdatedColumnCardinalities(listMetadata.get(0).getColumnInTable(),
+ carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
+ listMetadata.get(0).getSegmentInfo().getColumnCardinality());
SegmentProperties sourceSegProperties =
- new SegmentProperties(listMetadata.get(0).getColumnInTable(), colCardinality);
+ new SegmentProperties(updatedColumnSchemaList, updatedColumnCardinalities);
// for each segment get taskblock info
TaskBlockInfo taskBlockInfo = taskMap.getValue();
@@ -171,16 +177,28 @@ public class CarbonCompactionExecutor {
List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- for (CarbonDimension dim : destinationSegProperties.getDimensions()) {
- QueryDimension queryDimension = new QueryDimension(dim.getColName());
- dims.add(queryDimension);
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ for (CarbonDimension dim : dimensions) {
+ // check if dimension is deleted
+ if (!dim.isInvisible()) {
+ QueryDimension queryDimension = new QueryDimension(dim.getColName());
+ queryDimension.setDimension(dim);
+ dims.add(queryDimension);
+ }
}
model.setQueryDimension(dims);
List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- for (CarbonMeasure carbonMeasure : destinationSegProperties.getMeasures()) {
- QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
- msrs.add(queryMeasure);
+ List<CarbonMeasure> measures =
+ carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ for (CarbonMeasure carbonMeasure : measures) {
+ // check if measure is deleted
+ if (!carbonMeasure.isInvisible()) {
+ QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
+ queryMeasure.setMeasure(carbonMeasure);
+ msrs.add(queryMeasure);
+ }
}
model.setQueryMeasures(msrs);
model.setQueryId(System.nanoTime() + "");
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
index ed19b27..f63778d 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
@@ -30,10 +30,15 @@ import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.commons.lang3.ArrayUtils;
+
/**
* Utility Class for the Compaction Flow.
*/
@@ -267,4 +272,78 @@ public class CarbonCompactionUtil {
}
return null;
}
+
+ /**
+ * This method will add the prepare the max column cardinality map
+ *
+ * @param columnCardinalityMap
+ * @param currentBlockSchema
+ * @param currentBlockCardinality
+ */
+ public static void addColumnCardinalityToMap(Map<String, Integer> columnCardinalityMap,
+ List<ColumnSchema> currentBlockSchema, int[] currentBlockCardinality) {
+ for (int i = 0; i < currentBlockCardinality.length; i++) {
+ // add value to map only if does not exist or new cardinality is > existing value
+ String columnUniqueId = currentBlockSchema.get(i).getColumnUniqueId();
+ Integer value = columnCardinalityMap.get(columnUniqueId);
+ if (null == value) {
+ columnCardinalityMap.put(columnUniqueId, currentBlockCardinality[i]);
+ } else {
+ if (currentBlockCardinality[i] > value) {
+ columnCardinalityMap.put(columnUniqueId, currentBlockCardinality[i]);
+ }
+ }
+ }
+ }
+
+ /**
+ * This method will return the updated cardinality according to the master schema
+ *
+ * @param columnCardinalityMap
+ * @param carbonTable
+ * @param updatedColumnSchemaList
+ * @return
+ */
+ public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> columnCardinalityMap,
+ CarbonTable carbonTable, List<ColumnSchema> updatedColumnSchemaList) {
+ List<CarbonDimension> masterDimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ List<Integer> updatedCardinalityList = new ArrayList<>(columnCardinalityMap.size());
+ for (CarbonDimension dimension : masterDimensions) {
+ if (!dimension.isInvisible()) {
+ Integer value = columnCardinalityMap.get(dimension.getColumnId());
+ if (null == value) {
+ updatedCardinalityList.add(getDimensionDefaultCardinality(dimension));
+ } else {
+ updatedCardinalityList.add(value);
+ }
+ updatedColumnSchemaList.add(dimension.getColumnSchema());
+ }
+ }
+ int[] updatedCardinality = ArrayUtils
+ .toPrimitive(updatedCardinalityList.toArray(new Integer[updatedCardinalityList.size()]));
+ return updatedCardinality;
+ }
+
+ /**
+ * This method will return the default cardinality based on dimension type
+ *
+ * @param dimension
+ * @return
+ */
+ private static int getDimensionDefaultCardinality(CarbonDimension dimension) {
+ int cardinality = 0;
+ if (dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ cardinality = Integer.MAX_VALUE;
+ } else if (dimension.hasEncoding(Encoding.DICTIONARY)) {
+ if (null != dimension.getDefaultValue()) {
+ cardinality = CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1;
+ } else {
+ cardinality = CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY;
+ }
+ } else {
+ cardinality = -1;
+ }
+ return cardinality;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
index 089cd0e..91a5c03 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
@@ -79,7 +79,7 @@ public class RowResultMerger {
public RowResultMerger(List<RawResultIterator> iteratorList, String databaseName,
String tableName, SegmentProperties segProp, String tempStoreLocation,
- CarbonLoadModel loadModel, int[] colCardinality, CompactionType compactionType) {
+ CarbonLoadModel loadModel, CompactionType compactionType) {
CarbonDataFileAttributes carbonDataFileAttributes;
@@ -131,7 +131,7 @@ public class RowResultMerger {
} else {
carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
}
- carbonFactDataHandlerModel.setColCardinality(colCardinality);
+ carbonFactDataHandlerModel.setColCardinality(segProp.getDimColumnsCardinality());
carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
@@ -202,6 +202,7 @@ public class RowResultMerger {
}
mergeStatus = true;
} catch (Exception e) {
+ LOGGER.error(e, e.getMessage());
LOGGER.error("Exception in compaction merger " + e.getMessage());
mergeStatus = false;
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 7a506ba..51f9022 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -139,6 +139,7 @@ class CarbonMergerRDD[K, V](
.toList
}
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
// get destination segment properties as sent from driver which is of last segment.
val segmentProperties = new SegmentProperties(
carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
@@ -153,7 +154,7 @@ class CarbonMergerRDD[K, V](
carbonLoadModel.setStorePath(hdfsStoreLocation)
exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
- carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, dataFileMetadataSegMapping)
+ carbonTable, dataFileMetadataSegMapping)
// fire a query and get the results.
var result2: java.util.List[RawResultIterator] = null
@@ -196,7 +197,6 @@ class CarbonMergerRDD[K, V](
segmentProperties,
tempStoreLoc,
carbonLoadModel,
- carbonMergerMapping.maxSegmentColCardinality,
carbonMergerMapping.campactionType
)
mergeStatus = merger.mergerSlice()
@@ -237,21 +237,6 @@ class CarbonMergerRDD[K, V](
iter
}
-
- def calculateCardanility(targetCardinality: Array[Int],
- sourceCardinality: Array[Int],
- columnSize: Int): Unit = {
- var cols = columnSize
-
- // Choose the highest cardinality among all the blocks.
- while (cols > 0) {
- if (targetCardinality(cols - 1) < sourceCardinality(cols - 1)) {
- targetCardinality(cols - 1) = sourceCardinality(cols - 1)
- }
- cols -= 1
- }
- }
-
override def getPartitions: Array[Partition] = {
val startTime = System.currentTimeMillis()
val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
@@ -319,13 +304,9 @@ class CarbonMergerRDD[K, V](
logError("Exception in preparing the data file footer for compaction " + e.getMessage)
throw e
}
-
- columnSize = dataFileFooter.getSegmentInfo.getColumnCardinality.size
- carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
- .toList
}
- var cardinality = new Array[Int](columnSize)
+ val columnToCardinalityMap = new util.HashMap[java.lang.String, Integer]()
carbonInputSplits.foreach(splits => {
val taskNo = splits.taskId
@@ -350,14 +331,21 @@ class CarbonMergerRDD[K, V](
logError("Exception in preparing the data file footer for compaction " + e.getMessage)
throw e
}
-
- // Calculate the Cardinality of the new segment
- calculateCardanility(cardinality,
- dataFileFooter.getSegmentInfo.getColumnCardinality,
- columnSize)
+ // add all the column and cardinality to the map
+ CarbonCompactionUtil
+ .addColumnCardinalityToMap(columnToCardinalityMap,
+ dataFileFooter.getColumnInTable,
+ dataFileFooter.getSegmentInfo.getColumnCardinality)
}
)
-
+ val updatedMaxSegmentColumnList = new util.ArrayList[ColumnSchema]()
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ // update cardinality and column schema list according to master schema
+ val cardinality = CarbonCompactionUtil
+ .updateColumnSchemaAndGetCardinality(columnToCardinalityMap,
+ carbonTable,
+ updatedMaxSegmentColumnList)
+ carbonMergerMapping.maxSegmentColumnSchemaList = updatedMaxSegmentColumnList.asScala.toList
// Set cardinality for new segment.
carbonMergerMapping.maxSegmentColCardinality = cardinality
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index d0b3e29..3b3bac3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -194,7 +194,7 @@ object DataManagementFunc {
)
} catch {
case e: Exception =>
- LOGGER.error(s"Exception in compaction thread ${ e.getMessage }")
+ LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }")
throw e
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 428df48..8580691 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -21,12 +21,14 @@ import java.io.File
import java.text.SimpleDateFormat
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.command.DataTypeInfo
import org.apache.spark.sql.types._
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType}
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
import org.apache.carbondata.core.util.CarbonProperties
object CarbonScalaUtil {
@@ -194,4 +196,102 @@ object CarbonScalaUtil {
}
}
}
+
+ /**
+ * This method will validate a column for its data type and check whether the column data type
+ * can be modified and update if conditions are met
+ *
+ * @param dataTypeInfo
+ * @param carbonColumn
+ */
+ def validateColumnDataType(dataTypeInfo: DataTypeInfo, carbonColumn: CarbonColumn): Unit = {
+ carbonColumn.getDataType.getName match {
+ case "INT" =>
+ if (!dataTypeInfo.dataType.equals("bigint")) {
+ sys
+ .error(s"Given column ${ carbonColumn.getColName } with data type ${
+ carbonColumn
+ .getDataType.getName
+ } cannot be modified. Int can only be changed to bigInt")
+ }
+ case "DECIMAL" =>
+ if (!dataTypeInfo.dataType.equals("decimal")) {
+ sys
+ .error(s"Given column ${ carbonColumn.getColName } with data type ${
+ carbonColumn.getDataType.getName
+ } cannot be modified. Decimal can be only be changed to Decimal of higher precision")
+ }
+ if (dataTypeInfo.precision <= carbonColumn.getColumnSchema.getPrecision) {
+ sys
+ .error(s"Given column ${
+ carbonColumn
+ .getColName
+ } cannot be modified. Specified precision value ${
+ dataTypeInfo
+ .precision
+ } should be greater or equal to current precision value ${
+ carbonColumn.getColumnSchema
+ .getPrecision
+ }")
+ } else if (dataTypeInfo.scale <= carbonColumn.getColumnSchema.getScale) {
+ sys
+ .error(s"Given column ${
+ carbonColumn
+ .getColName
+ } cannot be modified. Specified scale value ${
+ dataTypeInfo
+ .scale
+ } should be greater or equal to current scale value ${
+ carbonColumn.getColumnSchema
+ .getScale
+ }")
+ } else {
+ // difference of precision and scale specified by user should not be less than the
+ // difference of already existing precision and scale else it will result in data loss
+ val carbonColumnPrecisionScaleDiff = carbonColumn.getColumnSchema.getPrecision -
+ carbonColumn.getColumnSchema.getScale
+ val dataInfoPrecisionScaleDiff = dataTypeInfo.precision - dataTypeInfo.scale
+ if (dataInfoPrecisionScaleDiff < carbonColumnPrecisionScaleDiff) {
+ sys
+ .error(s"Given column ${
+ carbonColumn
+ .getColName
+ } cannot be modified. Specified precision and scale values will lead to data loss")
+ }
+ }
+ case _ =>
+ sys
+ .error(s"Given column ${ carbonColumn.getColName } with data type ${
+ carbonColumn
+ .getDataType.getName
+ } cannot be modified. Only Int and Decimal data types are allowed for modification")
+ }
+ }
+
+ /**
+ * This method will create a copy of the same object
+ *
+ * @param thriftColumnSchema object to be cloned
+ * @return
+ */
+ def createColumnSchemaCopyObject(thriftColumnSchema: org.apache.carbondata.format.ColumnSchema)
+ : org.apache.carbondata.format.ColumnSchema = {
+ val columnSchema = new org.apache.carbondata.format.ColumnSchema
+ columnSchema.column_group_id = thriftColumnSchema.column_group_id
+ columnSchema.column_name = thriftColumnSchema.column_name
+ columnSchema.columnProperties = thriftColumnSchema.columnProperties
+ columnSchema.columnReferenceId = thriftColumnSchema.columnReferenceId
+ columnSchema.column_id = thriftColumnSchema.column_id
+ columnSchema.data_type = thriftColumnSchema.data_type
+ columnSchema.default_value = thriftColumnSchema.default_value
+ columnSchema.encoders = thriftColumnSchema.encoders
+ columnSchema.invisible = thriftColumnSchema.invisible
+ columnSchema.columnar = thriftColumnSchema.columnar
+ columnSchema.dimension = thriftColumnSchema.dimension
+ columnSchema.num_child = thriftColumnSchema.num_child
+ columnSchema.precision = thriftColumnSchema.precision
+ columnSchema.scale = thriftColumnSchema.scale
+ columnSchema.schemaOrdinal = thriftColumnSchema.schemaOrdinal
+ columnSchema
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 7592e4e..cf88b8c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -345,4 +345,11 @@ object CommonUtil {
csvColumns
}
+
+ def validateColumnNames(columnName: String, columnNameCopy: String): Unit = {
+ if (!columnName.equalsIgnoreCase(columnNameCopy)) {
+ throw new MalformedCarbonCommandException(
+ "Column names provided are different. Both the column names should be same")
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index 85bbb93..475650f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -82,4 +82,40 @@ object DataTypeConverterUtil {
case DataType.STRUCT => "struct"
}
}
+
+ /**
+ * convert from wrapper to external data type
+ *
+ * @param dataType
+ * @return
+ */
+ def convertToThriftDataType(dataType: String): org.apache.carbondata.format.DataType = {
+ if (null == dataType) {
+ return null
+ }
+ dataType match {
+ case "string" =>
+ org.apache.carbondata.format.DataType.STRING
+ case "int" =>
+ org.apache.carbondata.format.DataType.INT
+ case "short" =>
+ org.apache.carbondata.format.DataType.SHORT
+ case "long" | "bigint" =>
+ org.apache.carbondata.format.DataType.LONG
+ case "double" =>
+ org.apache.carbondata.format.DataType.DOUBLE
+ case "decimal" =>
+ org.apache.carbondata.format.DataType.DECIMAL
+ case "date" =>
+ org.apache.carbondata.format.DataType.DATE
+ case "timestamp" =>
+ org.apache.carbondata.format.DataType.TIMESTAMP
+ case "array" =>
+ org.apache.carbondata.format.DataType.ARRAY
+ case "struct" =>
+ org.apache.carbondata.format.DataType.STRUCT
+ case _ =>
+ org.apache.carbondata.format.DataType.STRING
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index ef759cf..bcb7ff7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -17,7 +17,7 @@
package org.apache.carbondata.spark.util
-import java.io.FileNotFoundException
+import java.io.{FileNotFoundException, IOException}
import java.nio.charset.Charset
import java.util.regex.Pattern
@@ -41,15 +41,17 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.Dictionary
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentifier}
import org.apache.carbondata.core.metadata.datatype.DataType
import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
import org.apache.carbondata.core.reader.CarbonDictionaryReader
import org.apache.carbondata.core.service.CarbonCommonFactory
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.CarbonDictionaryWriter
+import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
import org.apache.carbondata.processing.csvload.CSVInputFormat
import org.apache.carbondata.processing.csvload.StringArrayWritable
import org.apache.carbondata.processing.etl.DataLoadingException
@@ -784,4 +786,105 @@ object GlobalDictionaryUtil {
throw ex
}
}
+
+ def loadDefaultDictionaryValueForNewColumn(carbonTablePath: CarbonTablePath,
+ columnSchema: ColumnSchema,
+ tableIdentifier: CarbonTableIdentifier,
+ storePath: String,
+ defaultValue: String): Unit = {
+
+ var carbonDictionarySortIndexWriter: CarbonDictionarySortIndexWriter = null
+ var dictionary: Dictionary = null
+
+ val dictLock = CarbonLockFactory
+ .getCarbonLockObj(carbonTablePath.getRelativeDictionaryDirectory,
+ columnSchema.getColumnUniqueId + LockUsage.LOCK)
+
+ val isDictionaryLocked = dictLock.lockWithRetries()
+ try {
+ if (isDictionaryLocked) {
+ LOGGER.info(s"Successfully able to get the dictionary lock for ${
+ columnSchema.getColumnName
+ }")
+ } else {
+ sys.error(s"Dictionary file ${
+ columnSchema.getColumnName
+ } is locked for updation. Please try after some time")
+ }
+ val columnIdentifier = new ColumnIdentifier(columnSchema.getColumnUniqueId,
+ null,
+ columnSchema.getDataType)
+ val writer = CarbonCommonFactory.getDictionaryService
+ .getDictionaryWriter(tableIdentifier, columnIdentifier, storePath)
+
+ val distinctValues: java.util.List[String] = new java.util.ArrayList()
+ writer.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+ distinctValues.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+
+ val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(defaultValue, columnSchema)
+ if (null != parsedValue) {
+ writer.write(parsedValue)
+ distinctValues.add(parsedValue)
+ }
+ if (null != writer) {
+ writer.close()
+ }
+
+ LOGGER.info(s"Dictionary file writing is successful for new column ${
+ columnSchema.getColumnName
+ }")
+
+ if (distinctValues.size() > 0) {
+ dictionary = CarbonLoaderUtil.getDictionary(tableIdentifier,
+ new ColumnIdentifier(columnSchema.getColumnUniqueId, null, columnSchema.getDataType),
+ storePath,
+ columnSchema.getDataType
+ )
+ val preparator: CarbonDictionarySortInfoPreparator = new CarbonDictionarySortInfoPreparator
+ val dictService = CarbonCommonFactory.getDictionaryService
+ val dictionarySortInfo: CarbonDictionarySortInfo =
+ preparator.getDictionarySortInfo(distinctValues, dictionary,
+ columnSchema.getDataType)
+ carbonDictionarySortIndexWriter =
+ dictService.getDictionarySortIndexWriter(tableIdentifier, columnIdentifier,
+ storePath)
+ carbonDictionarySortIndexWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
+ carbonDictionarySortIndexWriter
+ .writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted)
+ }
+
+ if (null != carbonDictionarySortIndexWriter) {
+ carbonDictionarySortIndexWriter.close()
+ }
+
+ LOGGER.info(s"SortIndex file writing is successful for new column ${
+ columnSchema.getColumnName
+ }")
+
+ if (null != writer) {
+ writer.commit()
+ }
+
+ LOGGER.info(s"Dictionary meta file writing is successful for new column ${
+ columnSchema.getColumnName
+ }")
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex)
+ throw ex
+ } finally {
+ CarbonUtil.clearDictionaryCache(dictionary)
+ if (dictLock != null && isDictionaryLocked) {
+ if (dictLock.unlock()) {
+ LOGGER.info(s"Dictionary ${
+ columnSchema.getColumnName
+ } Unlocked Successfully.")
+ } else {
+ LOGGER.error(s"Unable to unlock Dictionary ${
+ columnSchema.getColumnName
+ }")
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 9bfa8a9..56f6e6d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -154,6 +154,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val ARRAY = carbonKeyWord("ARRAY")
protected val STRUCT = carbonKeyWord("STRUCT")
+ protected val CHANGE = carbonKeyWord("CHANGE")
+ protected val TBLPROPERTIES = carbonKeyWord("TBLPROPERTIES")
+
protected val doubleQuotedString = "\"([^\"]+)\"".r
protected val singleQuotedString = "'([^']+)'".r
@@ -229,14 +232,14 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
, tableName: String, fields: Seq[Field],
partitionCols: Seq[PartitionerField],
tableProperties: mutable.Map[String, String],
- bucketFields: Option[BucketFields]): TableModel = {
+ bucketFields: Option[BucketFields], isAlterFlow: Boolean = false): TableModel = {
fields.zipWithIndex.foreach { x =>
x._1.schemaOrdinal = x._2
}
val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
fields, tableProperties)
- if (dims.isEmpty) {
+ if (dims.isEmpty && !isAlterFlow) {
throw new MalformedCarbonCommandException(s"Table ${
dbName.getOrElse(
CarbonCommonConstants.DATABASE_DEFAULT_NAME)
@@ -826,6 +829,19 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
case _ => ("", "")
}
+ protected lazy val valueOptions: Parser[(Int, Int)] =
+ (numericLit <~ ",") ~ numericLit ^^ {
+ case opt ~ optvalue => (opt.toInt, optvalue.toInt)
+ case _ => (0, 0)
+ }
+
+ protected lazy val columnOptions: Parser[(String, String)] =
+ (stringLit <~ ",") ~ stringLit ^^ {
+ case opt ~ optvalue => (opt, optvalue)
+ case _ =>
+ throw new MalformedCarbonCommandException(s"value cannot be empty")
+ }
+
protected lazy val dimCol: Parser[Field] = anyFieldDef
protected lazy val primitiveTypes =
@@ -1010,4 +1026,39 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
p.getClass.getSimpleName.equals("FloatLit") ||
p.getClass.getSimpleName.equals("DecimalLit")
}) ^^ (_.chars)
+
+ /**
+ * This method will parse the given data type and validate against the allowed data types
+ *
+ * @param dataType
+ * @param values
+ * @return
+ */
+ protected def parseDataType(dataType: String, values: Option[List[(Int, Int)]]): DataTypeInfo = {
+ dataType match {
+ case "bigint" =>
+ if (values.isDefined) {
+ throw new MalformedCarbonCommandException("Invalid data type")
+ }
+ DataTypeInfo(dataType)
+ case "decimal" =>
+ var precision: Int = 0
+ var scale: Int = 0
+ if (values.isDefined) {
+ precision = values.get(0)._1
+ scale = values.get(0)._2
+ } else {
+ throw new MalformedCarbonCommandException("Decimal format provided is invalid")
+ }
+ // precision should be > 0 and <= 38 and scale should be >= 0 and <= 38
+ if (precision < 1 || precision > 38) {
+ throw new MalformedCarbonCommandException("Invalid value for precision")
+ } else if (scale < 0 || scale > 38) {
+ throw new MalformedCarbonCommandException("Invalid value for scale")
+ }
+ DataTypeInfo("decimal", precision, scale)
+ case _ =>
+ throw new MalformedCarbonCommandException("Data type provided is invalid.")
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index eabcaed..eb43a93 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.datatype.DataType
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.{BucketingInfo, SchemaEvolution, SchemaEvolutionEntry}
@@ -34,11 +35,13 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo,
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.service.CarbonCommonFactory
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.CarbonSparkFactory
import org.apache.carbondata.spark.load.FailureCauses
import org.apache.carbondata.spark.merger.CompactionType
-import org.apache.carbondata.spark.util.DataTypeConverterUtil
+import org.apache.carbondata.spark.util.{DataTypeConverterUtil, GlobalDictionaryUtil}
case class TableModel(
ifNotExistsSet: Boolean,
@@ -123,6 +126,164 @@ case class CompactionCallableModel(storePath: String,
sqlContext: SQLContext,
compactionType: CompactionType)
+case class DataTypeInfo(dataType: String, precision: Int = 0, scale: Int = 0)
+
+case class AlterTableDataTypeChangeModel(dataTypeInfo: DataTypeInfo,
+ databaseName: Option[String],
+ tableName: String,
+ columnName: String,
+ newColumnName: String)
+
+case class AlterTableAddColumnsModel(
+ databaseName: Option[String],
+ tableName: String,
+ tableProperties: Map[String, String],
+ dimCols: Seq[Field],
+ msrCols: Seq[Field],
+ highCardinalityDims: Seq[String])
+
+case class AlterTableDropColumnModel(databaseName: Option[String],
+ tableName: String,
+ columns: List[String])
+
+class AlterTableProcessor(
+ alterTableModel: AlterTableAddColumnsModel,
+ dbName: String,
+ tableInfo: TableInfo,
+ carbonTablePath: CarbonTablePath,
+ tableIdentifier: CarbonTableIdentifier,
+ storePath: String) {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ def process: Seq[ColumnSchema] = {
+ val tableSchema = tableInfo.getFactTable
+ val tableCols = tableSchema.getListOfColumns.asScala
+ val existingColsSize = tableCols.size
+ var allColumns = tableCols.filter(x => x.isDimensionColumn)
+ var newCols = Seq[ColumnSchema]()
+
+ alterTableModel.dimCols.foreach(field => {
+ val encoders = new java.util.ArrayList[Encoding]()
+ encoders.add(Encoding.DICTIONARY)
+ val columnSchema: ColumnSchema = getColumnSchema(
+ DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+ field.name.getOrElse(field.column),
+ isCol = true,
+ encoders,
+ isDimensionCol = true,
+ -1,
+ field.precision,
+ field.scale,
+ field.schemaOrdinal + existingColsSize)
+ allColumns ++= Seq(columnSchema)
+ newCols ++= Seq(columnSchema)
+ })
+
+ allColumns ++= tableCols.filter(x => !x.isDimensionColumn)
+ alterTableModel.msrCols.foreach(field => {
+ val encoders = new java.util.ArrayList[Encoding]()
+ val columnSchema: ColumnSchema = getColumnSchema(
+ DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+ field.name.getOrElse(field.column),
+ isCol = true,
+ encoders,
+ isDimensionCol = false,
+ -1,
+ field.precision,
+ field.scale,
+ field.schemaOrdinal + existingColsSize)
+ allColumns ++= Seq(columnSchema)
+ newCols ++= Seq(columnSchema)
+ })
+
+ // Check if there is any duplicate measures or dimensions.
+ // Its based on the dimension name and measure name
+ allColumns.filter(x => !x.isInvisible).groupBy(_.getColumnName)
+ .foreach(f => if (f._2.size > 1) {
+ val name = f._1
+ LOGGER.error(s"Duplicate column found with name: $name")
+ LOGGER.audit(
+ s"Validation failed for Create/Alter Table Operation " +
+ s"for ${ dbName }.${ alterTableModel.tableName }. " +
+ s"Duplicate column found with name: $name")
+ sys.error(s"Duplicate column found with name: $name")
+ })
+
+ val columnValidator = CarbonSparkFactory.getCarbonColumnValidator()
+ columnValidator.validateColumns(allColumns)
+
+ // populate table properties map
+ val tablePropertiesMap = tableSchema.getTableProperties
+ alterTableModel.tableProperties.foreach {
+ x => val value = tablePropertiesMap.get(x._1)
+ if (null != value) {
+ tablePropertiesMap.put(x._1, value + "," + x._2)
+ } else {
+ tablePropertiesMap.put(x._1, x._2)
+ }
+ }
+ for (elem <- alterTableModel.tableProperties) {
+ if (elem._1.toLowerCase.startsWith("default.value.")) {
+ val col = newCols.filter(p => p.getColumnName.equalsIgnoreCase(elem._1.substring(14)))
+ if (col.size == 1) {
+ val data = DataTypeUtil.convertDataToBytesBasedOnDataType(elem._2, col(0).getDataType)
+ if (null != data) {
+ col(0).setDefaultValue(data)
+ } else {
+ LOGGER
+ .error(
+ "Invalid default value for new column " + dbName + "." + alterTableModel.tableName +
+ "." + col(0).getColumnName + " : " + elem._2)
+ }
+ if (col(0).getEncodingList.contains(Encoding.DICTIONARY) &&
+ !col(0).getEncodingList.contains(Encoding.DIRECT_DICTIONARY)) {
+ GlobalDictionaryUtil
+ .loadDefaultDictionaryValueForNewColumn(carbonTablePath,
+ col(0),
+ tableIdentifier,
+ storePath,
+ elem._2)
+ }
+ }
+ }
+ }
+ tableSchema.setListOfColumns(allColumns.asJava)
+ tableInfo.setLastUpdatedTime(System.currentTimeMillis())
+ tableInfo.setFactTable(tableSchema)
+ newCols
+ }
+
+ private def getColumnSchema(dataType: DataType, colName: String, isCol: Boolean,
+ encoders: java.util.List[Encoding], isDimensionCol: Boolean,
+ colGroup: Integer, precision: Integer, scale: Integer, schemaOrdinal: Int): ColumnSchema = {
+ val columnSchema = new ColumnSchema()
+ columnSchema.setDataType(dataType)
+ columnSchema.setColumnName(colName)
+ if (alterTableModel.highCardinalityDims.contains(colName)) {
+ encoders.remove(encoders.remove(Encoding.DICTIONARY))
+ }
+ if (dataType == DataType.TIMESTAMP || dataType == DataType.DATE) {
+ encoders.add(Encoding.DIRECT_DICTIONARY)
+ }
+ val colPropMap = new java.util.HashMap[String, String]()
+ columnSchema.setEncodingList(encoders)
+ val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
+ val columnUniqueId = colUniqueIdGenerator.generateUniqueId(
+ alterTableModel.databaseName.getOrElse(dbName),
+ columnSchema)
+ columnSchema.setColumnUniqueId(columnUniqueId)
+ columnSchema.setColumnReferenceId(columnUniqueId)
+ columnSchema.setColumnar(isCol)
+ columnSchema.setDimensionColumn(isDimensionCol)
+ columnSchema.setColumnGroup(colGroup)
+ columnSchema.setPrecision(precision)
+ columnSchema.setScale(scale)
+ columnSchema.setSchemaOrdinal(schemaOrdinal)
+ columnSchema.setUseInvertedIndex(isDimensionCol)
+ columnSchema
+ }
+}
object TableNewProcessor {
def apply(cm: TableModel): TableInfo = {
new TableNewProcessor(cm).process
@@ -170,11 +331,6 @@ class TableNewProcessor(cm: TableModel) {
if (dataType == DataType.TIMESTAMP || dataType == DataType.DATE) {
encoders.add(Encoding.DIRECT_DICTIONARY)
}
- val colPropMap = new java.util.HashMap[String, String]()
- if (cm.colProps.isDefined && null != cm.colProps.get.get(colName)) {
- val colProps = cm.colProps.get.get(colName)
- colProps.asScala.foreach { x => colPropMap.put(x.key, x.value) }
- }
columnSchema.setEncodingList(encoders)
val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 32279ed..f92b1e3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -122,7 +122,7 @@ case class CarbonDictionaryDecoder(
val getDictionaryColumnIds = {
val attributes = child.output
- val dictIds: Array[(String, ColumnIdentifier, DataType)] = attributes.map { a =>
+ val dictIds: Array[(String, ColumnIdentifier, CarbonDimension)] = attributes.map { a =>
val attr = aliasMap.getOrElse(a, a)
val relation = relations.find(p => p.contains(attr))
if (relation.isDefined && canBeDecoded(attr)) {
@@ -134,7 +134,7 @@ case class CarbonDictionaryDecoder(
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!carbonDimension.isComplex()) {
(carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
- carbonDimension.getDataType)
+ carbonDimension)
} else {
(null, null, null)
}
@@ -214,7 +214,7 @@ case class CarbonDictionaryDecoder(
try {
cache.get(new DictionaryColumnUniqueIdentifier(
atiMap(f._1).getCarbonTableIdentifier,
- f._2, f._3))
+ f._2, f._3.getDataType))
} catch {
case _: Throwable => null
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index e28ee8f..fce9b4c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -37,11 +37,30 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
* Carbon relation provider compliant to data source api.
* Creates carbon relations
*/
-class CarbonSource extends CreatableRelationProvider
+class CarbonSource extends CreatableRelationProvider with RelationProvider
with SchemaRelationProvider with DataSourceRegister {
override def shortName(): String = "carbondata"
+ // will be called if hive supported create table command is provided
+ override def createRelation(sqlContext: SQLContext,
+ parameters: Map[String, String]): BaseRelation = {
+ CarbonEnv.init(sqlContext.sparkSession)
+ // if path is provided we can directly create Hadoop relation. \
+ // Otherwise create datasource relation
+ parameters.get("tablePath") match {
+ case Some(path) => CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
+ Array(path),
+ parameters,
+ None)
+ case _ =>
+ val options = new CarbonOption(parameters)
+ val storePath = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.STORE_LOCATION)
+ val tablePath = storePath + "/" + options.dbName + "/" + options.tableName
+ CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
+ }
+ }
// called by any write operation like INSERT INTO DDL or DataFrame.write API
override def createRelation(
sqlContext: SQLContext,
@@ -108,8 +127,9 @@ class CarbonSource extends CreatableRelationProvider
private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
dataSchema: StructType): String = {
- val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
- val tableName: String = parameters.getOrElse("tableName", "default_table")
+ val dbName: String = parameters.getOrElse("dbName",
+ CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+ val tableName: String = parameters.getOrElse("tableName", "default_table").toLowerCase
if (StringUtils.isBlank(tableName)) {
throw new MalformedCarbonCommandException("The Specified Table Name is Blank")
}
@@ -125,7 +145,8 @@ class CarbonSource extends CreatableRelationProvider
val fields = dataSchema.map { col =>
val dataType = Option(col.dataType.toString)
// This is to parse complex data types
- val f: Field = Field(col.name, dataType, Option(col.name), None, null)
+ val colName = col.name.toLowerCase
+ val f: Field = Field(colName, dataType, Option(colName), None, null)
// the data type of the decimal type will be like decimal(10,0)
// so checking the start of the string and taking the precision and scale.
// resetting the data type with decimal
@@ -139,7 +160,7 @@ class CarbonSource extends CreatableRelationProvider
f
}
val map = scala.collection.mutable.Map[String, String]()
- parameters.foreach { parameter => map.put(parameter._1, parameter._2) }
+ parameters.foreach { parameter => map.put(parameter._1, parameter._2.toLowerCase) }
val bucketFields = if (options.isBucketingEnabled) {
if (options.bucketNumber.toString.contains("-") ||
options.bucketNumber.toString.contains("+") ) {
@@ -147,7 +168,8 @@ class CarbonSource extends CreatableRelationProvider
options.bucketNumber.toString)
}
else {
- Some(BucketFields(options.bucketColumns.split(","), options.bucketNumber))
+ Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
+ options.bucketNumber))
}
} else {
None
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 2879130..51808bd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -71,6 +71,33 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
} else {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
+ case dataTypeChange@AlterTableDataTypeChange(alterTableChangeDataTypeModel) =>
+ val isCarbonTable = CarbonEnv.get.carbonMetastore
+ .tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
+ alterTableChangeDataTypeModel.databaseName))(sparkSession)
+ if (isCarbonTable) {
+ ExecutedCommandExec(dataTypeChange) :: Nil
+ } else {
+ throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+ }
+ case addColumn@AlterTableAddColumns(alterTableAddColumnsModel) =>
+ val isCarbonTable = CarbonEnv.get.carbonMetastore
+ .tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
+ alterTableAddColumnsModel.databaseName))(sparkSession)
+ if (isCarbonTable) {
+ ExecutedCommandExec(addColumn) :: Nil
+ } else {
+ throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+ }
+ case dropColumn@AlterTableDropColumns(alterTableDropColumnModel) =>
+ val isCarbonTable = CarbonEnv.get.carbonMetastore
+ .tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
+ alterTableDropColumnModel.databaseName))(sparkSession)
+ if (isCarbonTable) {
+ ExecutedCommandExec(dropColumn) :: Nil
+ } else {
+ throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+ }
case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) && isFormatted =>
val resolvedTable =
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d1f1771..28f4df8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -24,6 +24,8 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
import org.apache.commons.lang3.StringUtils
@@ -33,7 +35,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
+import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation, HiveExternalCatalog}
import org.apache.spark.util.FileUtils
import org.codehaus.jackson.map.ObjectMapper
@@ -44,18 +46,20 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema}
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry}
import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil, DataTypeConverterUtil, GlobalDictionaryUtil}
object Checker {
def validateTableExists(
@@ -136,6 +140,298 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
}
}
+private[sql] case class AlterTableDataTypeChange(
+ alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ val tableName = alterTableDataTypeChangeModel.tableName
+ val dbName = alterTableDataTypeChangeModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
+ val relation =
+ CarbonEnv.get.carbonMetastore
+ .lookupRelation(Option(dbName), tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ if (relation == null) {
+ LOGGER.audit(s"Alter table change data type request has failed. " +
+ s"Table $dbName.$tableName does not exist")
+ sys.error(s"Table $dbName.$tableName does not exist")
+ }
+ // acquire the lock first
+ val table = relation.tableMeta.carbonTable
+ val carbonLock = CarbonLockFactory
+ .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ LockUsage.METADATA_LOCK)
+ try {
+ // get the latest carbon table and check for column existence
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val columnName = alterTableDataTypeChangeModel.columnName
+ var carbonColumnToBeModified: CarbonColumn = null
+ val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
+ .readSchemaFile(tableMetadataFile)
+ // maintain the added column for schema evolution history
+ var addColumnSchema: org.apache.carbondata.format.ColumnSchema = null
+ var deletedColumnSchema: org.apache.carbondata.format.ColumnSchema = null
+ val columnSchemaList = tableInfo.fact_table.table_columns.asScala
+ columnSchemaList.foreach { columnSchema =>
+ if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
+ deletedColumnSchema = CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema)
+ columnSchema.setData_type(DataTypeConverterUtil
+ .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
+ columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
+ columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
+ addColumnSchema = columnSchema
+ }
+ }
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+ schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
+ schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
+ tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+ .setTime_stamp(System.currentTimeMillis)
+ CarbonEnv.get.carbonMetastore
+ .updateTableSchema(carbonTable.getCarbonTableIdentifier,
+ tableInfo,
+ schemaEvolutionEntry,
+ carbonTable.getStorePath)(sparkSession)
+
+ val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+ val schema = CarbonEnv.get.carbonMetastore
+ .lookupRelation(tableIdentifier)(sparkSession).schema.json
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
+ s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
+ sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+ LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception =>
+ LOGGER.error("Alter table change datatype failed : " + e.getMessage)
+ throw e
+ } finally {
+ // release lock after command execution completion
+ if (carbonLock != null) {
+ if (carbonLock.unlock()) {
+ LOGGER.info("Alter table change data type lock released successfully")
+ } else {
+ LOGGER.error("Unable to release lock during alter table change data type operation")
+ }
+ }
+ }
+ Seq.empty
+ }
+}
+
+private[sql] case class AlterTableAddColumns(
+ alterTableAddColumnsModel: AlterTableAddColumnsModel) extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ val tableName = alterTableAddColumnsModel.tableName
+ val dbName = alterTableAddColumnsModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
+ val relation =
+ CarbonEnv.get.carbonMetastore
+ .lookupRelation(Option(dbName), tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ if (relation == null) {
+ LOGGER.audit(s"Alter table add columns request has failed. " +
+ s"Table $dbName.$tableName does not exist")
+ sys.error(s"Table $dbName.$tableName does not exist")
+ }
+ // acquire the lock first
+ val table = relation.tableMeta.carbonTable
+ val carbonLock = CarbonLockFactory
+ .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ LockUsage.METADATA_LOCK)
+ try {
+ // get the latest carbon table and check for column existence
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val thriftTableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
+ .readSchemaFile(tableMetadataFile)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(thriftTableInfo,
+ dbName,
+ tableName,
+ carbonTable.getStorePath)
+ val newCols = new AlterTableProcessor(alterTableAddColumnsModel,
+ dbName,
+ wrapperTableInfo,
+ carbonTablePath,
+ carbonTable.getCarbonTableIdentifier,
+ carbonTable.getStorePath).process
+ val schemaEvolutionEntry = new org.apache.carbondata.core.metadata
+ .schema.SchemaEvolutionEntry()
+ schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis)
+ schemaEvolutionEntry.setAdded(newCols.toList.asJava)
+
+ val thriftTable = schemaConverter
+ .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+ thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+ .setTime_stamp(System.currentTimeMillis)
+ CarbonEnv.get.carbonMetastore
+ .updateTableSchema(carbonTable.getCarbonTableIdentifier,
+ thriftTable,
+ schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
+ carbonTable.getStorePath)(sparkSession)
+
+ val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+ val schema = CarbonEnv.get.carbonMetastore
+ .lookupRelation(tableIdentifier)(sparkSession).schema.json
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
+ s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
+ sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+ LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception =>
+ LOGGER.error("Alter table add columns failed : " + e.getMessage)
+ throw e
+ } finally {
+ // release lock after command execution completion
+ if (carbonLock != null) {
+ if (carbonLock.unlock()) {
+ LOGGER.info("Alter table add columns lock released successfully")
+ } else {
+ LOGGER.error("Unable to release lock during alter table add columns operation")
+ }
+ }
+ }
+ Seq.empty
+ }
+}
+
+private[sql] case class AlterTableDropColumns(
+ alterTableDropColumnModel: AlterTableDropColumnModel) extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ val tableName = alterTableDropColumnModel.tableName
+ val dbName = alterTableDropColumnModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
+ val relation =
+ CarbonEnv.get.carbonMetastore
+ .lookupRelation(Option(dbName), tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ if (relation == null) {
+ LOGGER.audit(s"Alter table drop columns request has failed. " +
+ s"Table $dbName.$tableName does not exist")
+ sys.error(s"Table $dbName.$tableName does not exist")
+ }
+ // acquire the lock first
+ val table = relation.tableMeta.carbonTable
+ val carbonLock = CarbonLockFactory
+ .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ LockUsage.METADATA_LOCK)
+ try {
+ // get the latest carbon table and check for column existence
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ // check each column existence in the table
+ val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
+ var dictionaryColumns = ListBuffer[CarbonColumn]()
+ var keyColumnCountToBeDeleted = 0
+ // TODO: if deleted column list includes shared dictionary/bucketted column throw an error
+ alterTableDropColumnModel.columns.foreach { column =>
+ var columnExist = false
+ tableColumns.foreach { tableColumn =>
+ // column should not be already deleted and should exist in the table
+ if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
+ if (tableColumn.isDimesion) {
+ keyColumnCountToBeDeleted += 1
+ if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
+ dictionaryColumns += tableColumn
+ }
+ }
+ columnExist = true
+ }
+ }
+ if (!columnExist) {
+ sys.error(s"Column $column does not exists in the table $dbName.$tableName")
+ }
+ }
+ // take the total key column count. key column to be deleted should not
+ // be >= key columns in schema
+ var totalKeyColumnInSchema = 0
+ tableColumns.foreach { tableColumn =>
+ // column should not be already deleted and should exist in the table
+ if (!tableColumn.isInvisible && tableColumn.isDimesion) {
+ totalKeyColumnInSchema += 1
+ }
+ }
+ if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
+ sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
+ }
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
+ .readSchemaFile(tableMetadataFile)
+ // maintain the deleted columns for schema evolution history
+ var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
+ val columnSchemaList = tableInfo.fact_table.table_columns.asScala
+ alterTableDropColumnModel.columns.foreach { column =>
+ columnSchemaList.foreach { columnSchema =>
+ if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) {
+ deletedColumnSchema += CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema)
+ columnSchema.invisible = true
+ }
+ }
+ }
+ // add deleted columns to schema evolution history and update the schema
+ tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+ .setTime_stamp(System.currentTimeMillis)
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+ schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
+ CarbonEnv.get.carbonMetastore
+ .updateTableSchema(carbonTable.getCarbonTableIdentifier,
+ tableInfo,
+ schemaEvolutionEntry,
+ carbonTable.getStorePath)(sparkSession)
+
+ val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+ val schema = CarbonEnv.get.carbonMetastore
+ .lookupRelation(tableIdentifier)(sparkSession).schema.json
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
+ s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
+ sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+ // TODO: 1. add check for deletion of index tables
+ // delete dictionary files for dictionary column and clear dictionary cache from memory
+ CarbonUtil.deleteDictionaryFileAndCache(dictionaryColumns.toList.asJava, carbonTable)
+ LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception =>
+ LOGGER.error("Alter table drop columns failed : " + e.getMessage)
+ throw e
+ } finally {
+ // release lock after command execution completion
+ if (carbonLock != null) {
+ if (carbonLock.unlock()) {
+ LOGGER.info("Alter table drop columns lock released successfully")
+ } else {
+ LOGGER.error("Unable to release lock during alter table drop columns operation")
+ }
+ }
+ }
+ Seq.empty
+ }
+}
+
case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand {
def run(sparkSession: SparkSession): Seq[Row] = {
@@ -783,13 +1079,15 @@ private[sql] case class DescribeCommandFormatted(
.lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
val mapper = new ObjectMapper()
val colProps = StringBuilder.newBuilder
+ val dims = relation.metaData.dims.map(x => x.toLowerCase)
var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
- val comment = if (relation.metaData.dims.contains(field.name)) {
+ val fieldName = field.name.toLowerCase
+ val comment = if (dims.contains(fieldName)) {
val dimension = relation.metaData.carbonTable.getDimensionByName(
relation.tableMeta.carbonTableIdentifier.getTableName,
- field.name)
+ fieldName)
if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
- colProps.append(field.name).append(".")
+ colProps.append(fieldName).append(".")
.append(mapper.writeValueAsString(dimension.getColumnProperties))
.append(",")
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 34bd803..2e047e0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
+import org.apache.carbondata.core.fileoperations.FileWriteOperation
import org.apache.carbondata.core.locks.ZookeeperInit
import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -251,18 +252,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
val tableName = tableFolder.getName
val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName
-
-
- val createTBase = new ThriftReader.TBaseCreator() {
- override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
- new TableInfo()
- }
- }
- val thriftReader = new ThriftReader(tableMetadataFile, createTBase)
- thriftReader.open()
- val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo]
- thriftReader.close()
-
+ val tableInfo: TableInfo = readSchemaFile(tableMetadataFile)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath)
@@ -292,9 +282,56 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
}
/**
+ * This method will read the schema file from a given path
+ *
+ * @param schemaFilePath
+ * @return
+ */
+ def readSchemaFile(schemaFilePath: String): TableInfo = {
+ val createTBase = new ThriftReader.TBaseCreator() {
+ override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = {
+ new TableInfo()
+ }
+ }
+ val thriftReader = new ThriftReader(schemaFilePath, createTBase)
+ thriftReader.open()
+ val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo]
+ thriftReader.close()
+ tableInfo
+ }
+
+ /**
+ * This method will overwrite the existing schema and update it with the gievn details
+ *
+ * @param carbonTableIdentifier
+ * @param thriftTableInfo
+ * @param schemaEvolutionEntry
+ * @param carbonStorePath
+ * @param sparkSession
+ */
+ def updateTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo,
+ schemaEvolutionEntry: SchemaEvolutionEntry,
+ carbonStorePath: String)
+ (sparkSession: SparkSession): Unit = {
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(thriftTableInfo,
+ carbonTableIdentifier.getDatabaseName,
+ carbonTableIdentifier.getTableName,
+ carbonStorePath)
+ thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
+ createSchemaThriftFile(wrapperTableInfo,
+ thriftTableInfo,
+ carbonTableIdentifier.getDatabaseName,
+ carbonTableIdentifier.getTableName)(sparkSession)
+ // add a logger after completion saying update schema is success for given db and table name
+ }
+
+ /**
*
* Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
- * Load CarbonTable from wrapper tableinfo
+ * Load CarbonTable from wrapper tableInfo
*
*/
def createTableFromThrift(
@@ -304,13 +341,36 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
if (tableExists(tableName, Some(dbName))(sparkSession)) {
sys.error(s"Table [$tableName] already exists under Database [$dbName]")
}
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val thriftTableInfo = schemaConverter
.fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
- val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime)
thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history
.add(schemaEvolutionEntry)
+ val carbonTablePath = createSchemaThriftFile(tableInfo,
+ thriftTableInfo,
+ dbName,
+ tableName)(sparkSession)
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+ LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
+ carbonTablePath
+ }
+ /**
+ * This method will write the schema thrift file in carbon store and load table metadata
+ *
+ * @param tableInfo
+ * @param thriftTableInfo
+ * @param dbName
+ * @param tableName
+ * @param sparkSession
+ * @return
+ */
+ private def createSchemaThriftFile(
+ tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo,
+ dbName: String, tableName: String)
+ (sparkSession: SparkSession): String = {
val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
tableInfo.getFactTable.getTableId)
val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
@@ -318,24 +378,39 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
tableInfo.setMetaDataFilepath(schemaMetadataPath)
tableInfo.setStorePath(storePath)
- CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
- val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
- CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName))
-
val fileType = FileFactory.getFileType(schemaMetadataPath)
if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
FileFactory.mkdirs(schemaMetadataPath, fileType)
}
val thriftWriter = new ThriftWriter(schemaFilePath, false)
- thriftWriter.open()
+ thriftWriter.open(FileWriteOperation.OVERWRITE)
thriftWriter.write(thriftTableInfo)
thriftWriter.close()
+ removeTableFromMetadata(dbName, tableName)
+ CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+ val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
+ CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
metadata.tablesMeta += tableMeta
- LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
carbonTablePath.getPath
}
+ /**
+ * This method will remove the table meta from catalog metadata array
+ *
+ * @param dbName
+ * @param tableName
+ */
+ private def removeTableFromMetadata(dbName: String, tableName: String) = {
+ val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName)
+ metadataToBeRemoved match {
+ case Some(tableMeta) =>
+ metadata.tablesMeta -= tableMeta
+ CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName)
+ case None =>
+ LOGGER.debug(s"No entry for table $tableName in database $dbName")
+ }
+ }
+
private def updateMetadataByWrapperTable(
wrapperTableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo): Unit = {
@@ -612,7 +687,7 @@ object CarbonMetastoreTypes extends RegexParsers {
"binary" ^^^ BinaryType |
"boolean" ^^^ BooleanType |
fixedDecimalType |
- "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
+ "decimal" ^^^ "decimal" ^^^ DecimalType(10, 0) |
"varchar\\((\\d+)\\)".r ^^^ StringType |
"date" ^^^ DateType |
"timestamp" ^^^ TimestampType
@@ -798,8 +873,6 @@ case class CarbonRelation(
val output = CarbonMetastoreTypes.toDataType {
column.getDataType.toString
.toLowerCase match {
- case "int" => "long"
- case "short" => "long"
case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column
.getColumnSchema.getScale + ")"
case others => others
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/44bb6f1e/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 6ad404a..4960783 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -24,6 +24,9 @@ import org.apache.spark.sql.catalyst.CarbonDDLSqlParser
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
/**
* TODO remove the duplicate code and add the common methods to common class.
* Parser for All Carbon DDL, DML cases in Unified context
@@ -51,11 +54,13 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand
protected lazy val startCommand: Parser[LogicalPlan] =
- loadManagement| showLoads | alterTable
+ loadManagement| showLoads | alterTable | restructure
protected lazy val loadManagement: Parser[LogicalPlan] =
deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
+ protected lazy val restructure: Parser[LogicalPlan] =
+ alterTableModifyDataType | alterTableDropColumn | alterTableAddColumns
protected lazy val alterTable: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) <~ opt(";") ^^ {
@@ -129,4 +134,80 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
case databaseName ~ tableName ~ limit =>
ShowLoadsCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase(), limit)
}
+
+ protected lazy val alterTableModifyDataType: Parser[LogicalPlan] =
+ ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ CHANGE ~ ident ~ ident ~
+ ident ~ opt("(" ~> rep1sep(valueOptions, ",") <~ ")") <~ opt(";") ^^ {
+ case dbName ~ table ~ change ~ columnName ~ columnNameCopy ~ dataType ~ values =>
+ // both the column names should be same
+ CommonUtil.validateColumnNames(columnName, columnNameCopy)
+ val alterTableChangeDataTypeModel =
+ AlterTableDataTypeChangeModel(parseDataType(dataType.toLowerCase, values),
+ convertDbNameToLowerCase(dbName),
+ table.toLowerCase,
+ columnName.toLowerCase,
+ columnNameCopy.toLowerCase)
+ AlterTableDataTypeChange(alterTableChangeDataTypeModel)
+ }
+
+ protected lazy val alterTableAddColumns: Parser[LogicalPlan] =
+ ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~
+ (ADD ~> COLUMNS ~> "(" ~> repsep(anyFieldDef, ",") <~ ")") ~
+ (TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
+ case dbName ~ table ~ fields ~ tblProp =>
+ fields.foreach{ f =>
+ if (isComplexDimDictionaryExclude(f.dataType.get)) {
+ throw new MalformedCarbonCommandException(
+ s"Add column is unsupported for complex datatype column: ${f.column}")
+ }
+ }
+ val tableProps = if (tblProp.isDefined) {
+ // default value should not be converted to lower case
+ val tblProps = tblProp.get.map(f => if (f._1.toLowerCase.startsWith("default.value.")) {
+ f._1 -> f._2
+ } else {
+ f._1 -> f._2.toLowerCase
+ })
+ scala.collection.mutable.Map(tblProps: _*)
+ } else {
+ scala.collection.mutable.Map.empty[String, String]
+ }
+
+ val tableModel = prepareTableModel (false,
+ convertDbNameToLowerCase(dbName),
+ table.toLowerCase,
+ fields.map(convertFieldNamesToLowercase),
+ Seq.empty,
+ tableProps,
+ None,
+ true)
+
+ val alterTableAddColumnsModel = AlterTableAddColumnsModel(convertDbNameToLowerCase(dbName),
+ table,
+ tableProps,
+ tableModel.dimCols,
+ tableModel.msrCols,
+ tableModel.highcardinalitydims.getOrElse(Seq.empty))
+ AlterTableAddColumns(alterTableAddColumnsModel)
+ }
+
+ private def convertFieldNamesToLowercase(field: Field): Field = {
+ val name = field.column.toLowerCase
+ field.copy(column = name, name = Some(name))
+ }
+ protected lazy val alterTableDropColumn: Parser[LogicalPlan] =
+ ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ DROP ~ COLUMNS ~
+ ("(" ~> rep1sep(ident, ",") <~ ")") <~ opt(";") ^^ {
+ case dbName ~ table ~ drop ~ columns ~ values =>
+ // validate that same column name is not repeated
+ values.groupBy(identity).collect {
+ case (x, ys) if ys.lengthCompare(1) > 0 =>
+ throw new MalformedCarbonCommandException(s"$x is duplicate. Duplicate columns not " +
+ s"allowed")
+ }
+ val alterTableDropColumnModel = AlterTableDropColumnModel(convertDbNameToLowerCase(dbName),
+ table.toLowerCase,
+ values)
+ AlterTableDropColumns(alterTableDropColumnModel)
+ }
}