You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/10/27 16:35:29 UTC
carbondata git commit: [CARBONDATA-1517]- Pre Aggregate Create Table
Support
Repository: carbondata
Updated Branches:
refs/heads/pre-aggregate 334aa1ccd -> 33d11997a
[CARBONDATA-1517]- Pre Aggregate Create Table Support
Support CTAS in carbon and support creating aggregation tables using CTAS and update aggregation table information to main table schema.
This closes #1433
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/33d11997
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/33d11997
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/33d11997
Branch: refs/heads/pre-aggregate
Commit: 33d11997abe1250b55c7e23e632ea44d31636956
Parents: 334aa1c
Author: kumarvishal <ku...@gmail.com>
Authored: Sun Oct 15 18:05:55 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Oct 27 22:04:54 2017 +0530
----------------------------------------------------------------------
.../table/column/ParentColumnTableRelation.java | 71 +++
.../ThriftWrapperSchemaConverterImplTest.java | 28 +-
.../preaggregate/TestPreAggCreateCommand.scala | 148 +++++++
.../carbondata/spark/util/CommonUtil.scala | 9 +
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 12 +-
.../command/carbonTableSchemaCommon.scala | 122 ++++--
.../command/management/LoadTableCommand.scala | 2 +-
.../CreatePreAggregateTableCommand.scala | 136 ++++++
.../preaaggregate/PreAggregateUtil.scala | 431 +++++++++++++++++++
.../schema/AlterTableRenameTableCommand.scala | 2 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 41 +-
.../spark/sql/hive/CarbonHiveMetaStore.scala | 72 +++-
.../apache/spark/sql/hive/CarbonMetaStore.scala | 21 +-
.../sql/parser/CarbonSpark2SqlParser.scala | 2 +-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 37 +-
.../org/apache/spark/util/AlterTableUtil.scala | 10 +-
16 files changed, 1066 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
new file mode 100644
index 0000000..425d0f2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata.schema.table.column;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+/**
+ * To maintain the relation of child column to parent table column
+ */
+public class ParentColumnTableRelation implements Serializable, Writable {
+
+ private RelationIdentifier relationIdentifier;
+ /**
+ * parent column id
+ */
+ private String columnId;
+
+ private String columnName;
+
+ public ParentColumnTableRelation(RelationIdentifier relationIdentifier, String columId,
+ String columnName) {
+ this.relationIdentifier = relationIdentifier;
+ this.columnId = columId;
+ this.columnName = columnName;
+ }
+
+ public RelationIdentifier getRelationIdentifier() {
+ return relationIdentifier;
+ }
+
+ public String getColumnId() {
+ return columnId;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ @Override public void write(DataOutput out) throws IOException {
+ relationIdentifier.write(out);
+ out.writeUTF(columnId);
+ out.writeUTF(columnName);
+ }
+
+ @Override public void readFields(DataInput in) throws IOException {
+ this.relationIdentifier = new RelationIdentifier(null, null, null);
+ relationIdentifier.readFields(in);
+ this.columnId = in.readUTF();
+ this.columnName = in.readUTF();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index 4a3ef32..f05e4ac 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.format.DataMapSchema;
import mockit.Mock;
import mockit.MockUp;
@@ -82,6 +83,7 @@ public class ThriftWrapperSchemaConverterImplTest {
new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRING,
"columnName", "1", true, encoders, true);
thriftColumnSchema.setSchemaOrdinal(1);
+ thriftColumnSchema.setAggregate_function("");
thriftColumnSchemas = new ArrayList<org.apache.carbondata.format.ColumnSchema>();
thriftColumnSchemas.add(thriftColumnSchema);
thriftSchemaEvolutionEntries = new ArrayList<>();
@@ -419,6 +421,7 @@ public class ThriftWrapperSchemaConverterImplTest {
new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.BOOLEAN,
"columnName", "1", true, encoders, true);
thriftColumnSchema.setSchemaOrdinal(1);
+ thriftColumnSchema.setAggregate_function("");
ColumnSchema wrapperColumnSchema = new ColumnSchema();
new MockUp<ColumnSchema>() {
@@ -481,6 +484,8 @@ public class ThriftWrapperSchemaConverterImplTest {
@Mock public String getColumnReferenceId() {
return "1";
}
+
+ @Mock public String getAggFunction() {return "" ;}
};
org.apache.carbondata.format.ColumnSchema actualResult =
@@ -494,7 +499,7 @@ public class ThriftWrapperSchemaConverterImplTest {
new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRING,
"columnName", "1", true, encoders, true);
thriftColumnSchema.setSchemaOrdinal(1);
-
+ thriftColumnSchema.setAggregate_function("");
new MockUp<ColumnSchema>() {
@Mock public List<Encoding> getEncodingList() {
return encodings;
@@ -569,7 +574,7 @@ public class ThriftWrapperSchemaConverterImplTest {
new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.INT,
"columnName", "1", true, encoders, true);
thriftColumnSchema.setSchemaOrdinal(1);
-
+ thriftColumnSchema.setAggregate_function("");
new MockUp<ColumnSchema>() {
@Mock public List<Encoding> getEncodingList() {
return encodings;
@@ -643,6 +648,7 @@ public class ThriftWrapperSchemaConverterImplTest {
new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.SHORT,
"columnName", "1", true, encoders, true);
thriftColumnSchema.setSchemaOrdinal(1);
+ thriftColumnSchema.setAggregate_function("");
new MockUp<ColumnSchema>() {
@Mock public List<Encoding> getEncodingList() {
return encodings;
@@ -716,7 +722,7 @@ public class ThriftWrapperSchemaConverterImplTest {
new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.LONG,
"columnName", "1", true, encoders, true);
thriftColumnSchema.setSchemaOrdinal(1);
-
+ thriftColumnSchema.setAggregate_function("");
new MockUp<ColumnSchema>() {
@Mock public List<Encoding> getEncodingList() {
return encodings;
@@ -790,6 +796,7 @@ public class ThriftWrapperSchemaConverterImplTest {
new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.DOUBLE,
"columnName", "1", true, encoders, true);
thriftColumnSchema.setSchemaOrdinal(1);
+ thriftColumnSchema.setAggregate_function("");
new MockUp<ColumnSchema>() {
@Mock public List<Encoding> getEncodingList() {
@@ -864,6 +871,7 @@ public class ThriftWrapperSchemaConverterImplTest {
new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.DECIMAL,
"columnName", "1", true, encoders, true);
thriftColumnSchema.setSchemaOrdinal(1);
+ thriftColumnSchema.setAggregate_function("");
new MockUp<ColumnSchema>() {
@Mock public List<Encoding> getEncodingList() {
return encodings;
@@ -924,6 +932,10 @@ public class ThriftWrapperSchemaConverterImplTest {
@Mock public String getColumnReferenceId() {
return "1";
}
+
+ @Mock public String getAggFunction() {
+ return "";
+ }
};
ColumnSchema wrapperColumnSchema = new ColumnSchema();
@@ -938,6 +950,7 @@ public class ThriftWrapperSchemaConverterImplTest {
org.apache.carbondata.format.DataType.TIMESTAMP, "columnName", "1", true, encoders,
true);
thriftColumnSchema.setSchemaOrdinal(1);
+ thriftColumnSchema.setAggregate_function("");
new MockUp<ColumnSchema>() {
@Mock public List<Encoding> getEncodingList() {
@@ -1012,7 +1025,7 @@ public class ThriftWrapperSchemaConverterImplTest {
new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.ARRAY,
"columnName", "1", true, encoders, true);
thriftColumnSchema.setSchemaOrdinal(1);
-
+ thriftColumnSchema.setAggregate_function("");
new MockUp<ColumnSchema>() {
@Mock public List<Encoding> getEncodingList() {
return encodings;
@@ -1086,6 +1099,7 @@ public class ThriftWrapperSchemaConverterImplTest {
new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRUCT,
"columnName", "1", true, encoders, true);
thriftColumnSchema.setSchemaOrdinal(1);
+ thriftColumnSchema.setAggregate_function("");
new MockUp<ColumnSchema>() {
@Mock public List<Encoding> getEncodingList() {
@@ -1161,6 +1175,7 @@ public class ThriftWrapperSchemaConverterImplTest {
new org.apache.carbondata.format.ColumnSchema(null, "columnName", "1", true, encoders,
true);
thriftColumnSchema.setSchemaOrdinal(1);
+ thriftColumnSchema.setAggregate_function("");
new MockUp<ColumnSchema>() {
@Mock public List<Encoding> getEncodingList() {
@@ -1311,7 +1326,7 @@ public class ThriftWrapperSchemaConverterImplTest {
new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.BOOLEAN,
"columnName", "1", true, encoders, true);
thriftColumnSchema.setSchemaOrdinal(1);
-
+ thriftColumnSchema.setAggregate_function("");
ColumnSchema wrapperColumnSchema = new ColumnSchema();
org.apache.carbondata.format.ColumnSchema actualResult =
thriftWrapperSchemaConverter.fromWrapperToExternalColumnSchema(wrapperColumnSchema);
@@ -1499,6 +1514,8 @@ public class ThriftWrapperSchemaConverterImplTest {
@Mock public String getColumnReferenceId() {
return "1";
}
+
+ @Mock public String getAggFunction() { return "";}
};
new MockUp<TableInfo>() {
@@ -1535,6 +1552,7 @@ public class ThriftWrapperSchemaConverterImplTest {
org.apache.carbondata.format.TableInfo expectedResult =
new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache
.carbondata.format.TableSchema>());
+ expectedResult.setDataMapSchemas(new ArrayList<DataMapSchema>());
assertEquals(expectedResult, actualResult);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
new file mode 100644
index 0000000..6120e88
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -0,0 +1,148 @@
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll {
+ sql("drop table if exists PreAggMain")
+ sql("drop table if exists PreAggMain1")
+ sql("drop table if exists PreAggMain2")
+ sql("create table preaggMain (a string, b string, c string) stored by 'carbondata'")
+ sql("create table preaggMain1 (a string, b string, c string) stored by 'carbondata' tblProperties('DICTIONARY_INCLUDE' = 'a')")
+ sql("create table preaggMain2 (a string, b string, c string) stored by 'carbondata'")
+ }
+
+
+ test("test pre agg create table One") {
+ sql("create table preagg1 stored BY 'carbondata' tblproperties('parent'='PreAggMain') as select a,sum(b) from PreAggMain group by a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg1"), true, "preaggmain_a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg1"), true, "preaggmain_b_sum")
+ sql("drop table preagg1")
+ }
+
+ test("test pre agg create table Two") {
+ sql("create table preagg2 stored BY 'carbondata' tblproperties('parent'='PreAggMain') as select a as a1,sum(b) from PreAggMain group by a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg2"), true, "preaggmain_a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg2"), true, "preaggmain_b_sum")
+ sql("drop table preagg2")
+ }
+
+ test("test pre agg create table Three") {
+ sql("create table preagg3 stored BY 'carbondata' tblproperties('parent'='PreAggMain') as select a,sum(b) as sum from PreAggMain group by a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg3"), true, "preaggmain_a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg3"), true, "preaggmain_b_sum")
+ sql("drop table preagg3")
+ }
+
+ test("test pre agg create table four") {
+ sql("create table preagg4 stored BY 'carbondata' tblproperties('parent'='PreAggMain') as select a as a1,sum(b) as sum from PreAggMain group by a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg4"), true, "preaggmain_a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg4"), true, "preaggmain_b_sum")
+ sql("drop table preagg4")
+ }
+
+
+ test("test pre agg create table five") {
+ sql("create table preagg11 stored BY 'carbondata' tblproperties('parent'='PreAggMain1') as select a,sum(b) from PreAggMain1 group by a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "preaggmain1_a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "preaggmain1_b_sum")
+ checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "DICTIONARY")
+ sql("drop table preagg11")
+ }
+
+ test("test pre agg create table six") {
+ sql("create table preagg12 stored BY 'carbondata' tblproperties('parent'='PreAggMain1') as select a as a1,sum(b) from PreAggMain1 group by a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "preaggmain1_a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "preaggmain1_b_sum")
+ checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "DICTIONARY")
+ sql("drop table preagg12")
+ }
+
+ test("test pre agg create table seven") {
+ sql("create table preagg13 stored BY 'carbondata' tblproperties('parent'='PreAggMain1') as select a,sum(b) as sum from PreAggMain1 group by a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "preaggmain1_a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "preaggmain1_b_sum")
+ checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "DICTIONARY")
+ sql("drop table preagg13")
+ }
+
+ test("test pre agg create table eight") {
+ sql("create table preagg14 stored BY 'carbondata' tblproperties('parent'='PreAggMain1') as select a as a1,sum(b) as sum from PreAggMain1 group by a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "preaggmain1_a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "preaggmain1_b_sum")
+ checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "DICTIONARY")
+ sql("drop table preagg14")
+ }
+
+
+ test("test pre agg create table nine") {
+ sql("create table preagg15 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a,avg(b) from PreAggMain2 group by a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_b_sum")
+ checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_b_count")
+ sql("drop table preagg15")
+ }
+
+ test("test pre agg create table ten") {
+ sql("create table preagg16 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,max(b) from PreAggMain2 group by a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg16"), true, "preaggmain2_a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg16"), true, "preaggmain2_b_max")
+ sql("drop table preagg16")
+ }
+
+ test("test pre agg create table eleven") {
+ sql("create table preagg17 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a,min(b) from PreAggMain2 group by a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg17"), true, "preaggmain2_a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg17"), true, "preaggmain2_b_min")
+ sql("drop table preagg17")
+ }
+
+ test("test pre agg create table twelve") {
+ sql("create table preagg18 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,count(b) from PreAggMain2 group by a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg18"), true, "preaggmain2_a")
+ checkExistence(sql("DESCRIBE FORMATTED preagg18"), true, "preaggmain2_b_count")
+ sql("drop table preagg18")
+ }
+
+ test("test pre agg create table thirteen") {
+ try {
+ sql(
+ "create table preagg19 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,count(distinct b) from PreAggMain2 group by a")
+ assert(false)
+ } catch {
+ case _: Exception =>
+ assert(true)
+ }
+ }
+
+ test("test pre agg create table fourteen") {
+ try {
+ sql(
+ "create table preagg20 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,sum(distinct b) from PreAggMain2 group by a")
+ assert(false)
+ } catch {
+ case _: Exception =>
+ assert(true)
+ }
+ }
+
+ test("test pre agg create table fifteen") {
+ try {
+ sql(
+ "create table preagg21 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,sum(b) from PreAggMain2 where a='vishal' group by a")
+ assert(false)
+ } catch {
+ case _: Exception =>
+ assert(true)
+ }
+ }
+
+
+ override def afterAll {
+ sql("drop table if exists PreAggMain")
+ sql("drop table if exists PreAggMain1")
+ sql("drop table if exists PreAggMain2")
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index bc24c12..3d06a05 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.util
import java.text.SimpleDateFormat
import java.util
+import java.util.regex.{Matcher, Pattern}
import scala.collection.JavaConverters._
import scala.collection.mutable.Map
@@ -831,4 +832,12 @@ object CommonUtil {
LOGGER.error(s)
}
}
+
+ def getScaleAndPrecision(dataType: String): (Int, Int) = {
+ val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
+ m.find()
+ val matchedString: String = m.group(1)
+ val scaleAndPrecision = matchedString.split(",")
+ (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 4649082..5d389e8 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -222,16 +222,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
dimensions ++ complexDimensions
}
-
-
- def getScaleAndPrecision(dataType: String): (Int, Int) = {
- val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
- m.find()
- val matchedString: String = m.group(1)
- val scaleAndPrecision = matchedString.split(",")
- (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
- }
-
/**
* This will prepate the Model from the Tree details.
*
@@ -1069,7 +1059,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
// if it is present then extracting the precision and scale. resetting the data type
// with Decimal.
case _ if dataType.startsWith("decimal") =>
- val (precision, scale) = getScaleAndPrecision(dataType)
+ val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType)
Field(field.column,
Some("Decimal"),
field.name,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index e5cfc84..c985017 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -33,8 +33,8 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema._
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation}
import org.apache.carbondata.core.service.CarbonCommonFactory
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.DataTypeUtil
@@ -60,13 +60,30 @@ case class TableModel(
colProps: Option[util.Map[String,
util.List[ColumnProperty]]] = None,
bucketFields: Option[BucketFields],
- partitionInfo: Option[PartitionInfo])
+ partitionInfo: Option[PartitionInfo],
+ var parentTable: Option[CarbonTable] = None,
+ var dataMapRelation: Option[scala.collection.mutable.LinkedHashMap[Field, DataMapField]] = None)
case class Field(column: String, var dataType: Option[String], name: Option[String],
children: Option[List[Field]], parent: String = null,
storeType: Option[String] = Some("columnar"),
var schemaOrdinal: Int = -1,
- var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "")
+ var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "") {
+ override def equals(o: Any) : Boolean = o match {
+ case that: Field =>
+ that.column.equalsIgnoreCase(this.column)
+ case _ => false
+ }
+ override def hashCode : Int = column.hashCode
+}
+
+case class DataMapField(aggregateFunction: String = "",
+ columnTableRelation: Option[ColumnTableRelation] = None) {
+}
+
+case class ColumnTableRelation(parentColumnName: String, parentColumnId: String,
+ parentTableName: String, parentDatabaseName: String, parentTableId: String) {
+}
case class ColumnProperty(key: String, value: String)
@@ -351,11 +368,10 @@ class TableNewProcessor(cm: TableModel) {
fields.foreach(field => {
val encoders = new java.util.ArrayList[Encoding]()
encoders.add(Encoding.DICTIONARY)
- val columnSchema: ColumnSchema = getColumnSchema(
+ val columnSchema = getColumnSchema(
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
field.name.getOrElse(field.column), index,
- isCol = true, encoders, isDimensionCol = true, rowGroup, field.precision, field.scale,
- field.schemaOrdinal)
+ isCol = true, encoders, isDimensionCol = true, rowGroup, field, cm.dataMapRelation)
allColumns ++= Seq(columnSchema)
index = index + 1
rowGroup = rowGroup + 1
@@ -370,19 +386,24 @@ class TableNewProcessor(cm: TableModel) {
def getColumnSchema(dataType: DataType, colName: String, index: Integer, isCol: Boolean,
encoders: java.util.List[Encoding], isDimensionCol: Boolean,
- colGroup: Integer, precision: Integer, scale: Integer, schemaOrdinal: Int): ColumnSchema = {
+ colGroup: Integer, field: Field,
+ map: Option[scala.collection.mutable.LinkedHashMap[Field, DataMapField]]
+ ) : ColumnSchema = {
val columnSchema = new ColumnSchema()
columnSchema.setDataType(dataType)
columnSchema.setColumnName(colName)
- val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
- if (highCardinalityDims.contains(colName)) {
- encoders.remove(Encoding.DICTIONARY)
- }
+ val isParentColumnRelation = map.isDefined && map.get.get(field).isDefined
+ if(!isParentColumnRelation) {
+ val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
+ if (highCardinalityDims.contains(colName)) {
+ encoders.remove(Encoding.DICTIONARY)
+ }
if (dataType == DataTypes.DATE) {
- encoders.add(Encoding.DIRECT_DICTIONARY)
- }
+ encoders.add(Encoding.DIRECT_DICTIONARY)
+ }
if (dataType == DataTypes.TIMESTAMP && !highCardinalityDims.contains(colName)) {
- encoders.add(Encoding.DIRECT_DICTIONARY)
+ encoders.add(Encoding.DIRECT_DICTIONARY)
+ }
}
columnSchema.setEncodingList(encoders)
val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
@@ -393,10 +414,22 @@ class TableNewProcessor(cm: TableModel) {
columnSchema.setColumnar(isCol)
columnSchema.setDimensionColumn(isDimensionCol)
columnSchema.setColumnGroup(colGroup)
- columnSchema.setPrecision(precision)
- columnSchema.setScale(scale)
- columnSchema.setSchemaOrdinal(schemaOrdinal)
+ columnSchema.setPrecision(field.precision)
+ columnSchema.setScale(field.scale)
+ columnSchema.setSchemaOrdinal(field.schemaOrdinal)
columnSchema.setSortColumn(false)
+ if(isParentColumnRelation) {
+ val dataMapField = map.get.get(field).get
+ columnSchema.setAggFunction(dataMapField.aggregateFunction);
+ val relation = dataMapField.columnTableRelation.get
+ val parentColumnTableRelationList = new util.ArrayList[ParentColumnTableRelation]
+ val relationIdentifier = new RelationIdentifier(
+ relation.parentDatabaseName, relation.parentTableName, relation.parentTableId)
+ val parentColumnTableRelation = new ParentColumnTableRelation(
+ relationIdentifier, relation.parentColumnId, relation.parentColumnName)
+ parentColumnTableRelationList.add(parentColumnTableRelation)
+ columnSchema.setParentColumnTableRelations(parentColumnTableRelationList)
+ }
// TODO: Need to fill RowGroupID, converted type
// & Number of Children after DDL finalization
columnSchema
@@ -412,9 +445,16 @@ class TableNewProcessor(cm: TableModel) {
// Sort columns should be at the begin of all columns
cm.sortKeyDims.get.foreach { keyDim =>
val field = cm.dimCols.find(keyDim equals _.column).get
- val encoders = new java.util.ArrayList[Encoding]()
- encoders.add(Encoding.DICTIONARY)
- val columnSchema: ColumnSchema = getColumnSchema(
+ val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) {
+ cm.parentTable.get.getColumnByName(
+ cm.parentTable.get.getFactTableName,
+ cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName).getEncoder
+ } else {
+ val encoders = new java.util.ArrayList[Encoding]()
+ encoders.add(Encoding.DICTIONARY)
+ encoders
+ }
+ val columnSchema = getColumnSchema(
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
field.name.getOrElse(field.column),
index,
@@ -422,9 +462,8 @@ class TableNewProcessor(cm: TableModel) {
encoders,
isDimensionCol = true,
-1,
- field.precision,
- field.scale,
- field.schemaOrdinal)
+ field,
+ cm.dataMapRelation)
columnSchema.setSortColumn(true)
allColumns :+= columnSchema
index = index + 1
@@ -433,9 +472,18 @@ class TableNewProcessor(cm: TableModel) {
cm.dimCols.foreach(field => {
val sortField = cm.sortKeyDims.get.find(field.column equals _)
if (sortField.isEmpty) {
- val encoders = new java.util.ArrayList[Encoding]()
- encoders.add(Encoding.DICTIONARY)
- val columnSchema: ColumnSchema = getColumnSchema(
+ val encoders = if (cm.parentTable.isDefined &&
+ cm.dataMapRelation.get.get(field).isDefined) {
+ cm.parentTable.get.getColumnByName(
+ cm.parentTable.get.getFactTableName,
+ cm.dataMapRelation.get.get(field).get.
+ columnTableRelation.get.parentColumnName).getEncoder
+ } else {
+ val encoders = new java.util.ArrayList[Encoding]()
+ encoders.add(Encoding.DICTIONARY)
+ encoders
+ }
+ val columnSchema = getColumnSchema(
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
field.name.getOrElse(field.column),
index,
@@ -443,9 +491,8 @@ class TableNewProcessor(cm: TableModel) {
encoders,
isDimensionCol = true,
-1,
- field.precision,
- field.scale,
- field.schemaOrdinal)
+ field,
+ cm.dataMapRelation)
allColumns :+= columnSchema
index = index + 1
if (field.children.isDefined && field.children.get != null) {
@@ -457,7 +504,7 @@ class TableNewProcessor(cm: TableModel) {
cm.msrCols.foreach(field => {
val encoders = new java.util.ArrayList[Encoding]()
- val columnSchema: ColumnSchema = getColumnSchema(
+ val columnSchema = getColumnSchema(
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
field.name.getOrElse(field.column),
index,
@@ -465,9 +512,8 @@ class TableNewProcessor(cm: TableModel) {
encoders,
isDimensionCol = false,
-1,
- field.precision,
- field.scale,
- field.schemaOrdinal)
+ field,
+ cm.dataMapRelation)
allColumns :+= columnSchema
index = index + 1
measureCount += 1
@@ -508,13 +554,16 @@ class TableNewProcessor(cm: TableModel) {
// Adding dummy measure if no measure is provided
if (measureCount == 0) {
val encoders = new java.util.ArrayList[Encoding]()
- val columnSchema: ColumnSchema = getColumnSchema(DataTypes.DOUBLE,
+ val field = Field(column = CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
+ dataType = None, name = None, children = None)
+ val columnSchema: ColumnSchema =
+ getColumnSchema(DataTypes.DOUBLE,
CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
index,
true,
encoders,
false,
- -1, 0, 0, schemaOrdinal = -1)
+ -1, field, cm.dataMapRelation)
columnSchema.setInvisible(true)
allColumns :+= columnSchema
}
@@ -523,6 +572,7 @@ class TableNewProcessor(cm: TableModel) {
val tableInfo = new TableInfo()
val tableSchema = new TableSchema()
+
val schemaEvol = new SchemaEvolution()
schemaEvol.setSchemaEvolutionEntryList(new util.ArrayList[SchemaEvolutionEntry]())
tableSchema.setTableId(UUID.randomUUID().toString)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 9018f7b..35e747b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -504,7 +504,7 @@ case class LoadTableCommand(
entry.setTime_stamp(System.currentTimeMillis())
// write TableInfo
- metastore.updateTableSchema(carbonTablePath.getCarbonTableIdentifier,
+ metastore.updateTableSchemaForAlter(carbonTablePath.getCarbonTableIdentifier,
carbonTablePath.getCarbonTableIdentifier,
tableInfo, entry, carbonTablePath.getPath)(sparkSession)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
new file mode 100644
index 0000000..ca384f9
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.exception.InvalidConfigurationException
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.{RelationIdentifier, TableInfo}
+import org.apache.carbondata.core.util.CarbonUtil
+
+/**
+ * Below command class will be used to create pre-aggregate table
+ * and updating the parent table about the child table information
+ * Failure case:
+ * 1. failed to create pre aggregate table.
+ * 2. failed to update main table
+ *
+ * @param cm
+ * @param dataFrame
+ * @param createDSTable
+ * @param queryString
+ */
+case class CreatePreAggregateTableCommand(
+ cm: TableModel,
+ dataFrame: DataFrame,
+ createDSTable: Boolean = true,
+ queryString: String,
+ fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField])
+ extends RunnableCommand with SchemaProcessCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processSchema(sparkSession)
+ }
+
+ override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+ val storePath = CarbonEnv.getInstance(sparkSession).storePath
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ checkSchemasModifiedTimeAndReloadTables(storePath)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
+ val tbName = cm.tableName
+ val dbName = cm.databaseName
+ LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
+ // getting the parent table
+ val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan)
+ // getting the table name
+ val parentTableName = parentTable.getFactTableName
+ // getting the db name of parent table
+ val parentDbName = parentTable.getDatabaseName
+ // updating the relation identifier, this will be stored in child table
+ // which can be used during dropping of pre-aggreate table as parent table will
+ // also get updated
+ cm.parentTable = Some(parentTable)
+ cm.dataMapRelation = Some(fieldRelationMap)
+ val tableInfo: TableInfo = TableNewProcessor(cm)
+ // Add validation for sort scope when create table
+ val sortScope = tableInfo.getFactTable.getTableProperties
+ .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+ if (!CarbonUtil.isValidSortOption(sortScope)) {
+ throw new InvalidConfigurationException(
+ s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," +
+ s" 'LOCAL_SORT' and 'GLOBAL_SORT' ")
+ }
+
+ if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
+ sys.error("No Dimensions found. Table should have at least one dimesnion !")
+ }
+
+ if (sparkSession.sessionState.catalog.listTables(dbName)
+ .exists(_.table.equalsIgnoreCase(tbName))) {
+ if (!cm.ifNotExistsSet) {
+ LOGGER.audit(
+ s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
+ s"Table [$tbName] already exists under database [$dbName]")
+ sys.error(s"Table [$tbName] already exists under database [$dbName]")
+ }
+ } else {
+ val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
+ // Add Database to catalog and persist
+ val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val tablePath = tableIdentifier.getTablePath
+ val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
+ if (createDSTable) {
+ try {
+ val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
+ cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
+ cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
+ sparkSession.sql(
+ s"""CREATE TABLE $dbName.$tbName
+ |(${ fields.map(f => f.rawSchema).mkString(",") })
+ |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+ s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
+ s""""$tablePath"$carbonSchemaString) """)
+ // child schema object which will be updated on parent table about the
+ val childSchema = tableInfo.getFactTable
+ .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION")
+ // upadting the parent table about child table
+ PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
+ } catch {
+ case e: Exception =>
+ val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
+ // call the drop table to delete the created table.
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .dropTable(tablePath, identifier)(sparkSession)
+ LOGGER.audit(s"Table creation with Database name [$dbName] " +
+ s"and Table name [$tbName] failed")
+ throw e
+ }
+ }
+
+ LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
+ }
+ Seq.empty
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
new file mode 100644
index 0000000..c4b6783
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Utility class for keeping all the utility method for pre-aggregate
+ */
+object PreAggregateUtil {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def getParentCarbonTable(plan: LogicalPlan): CarbonTable = {
+ plan match {
+ case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
+ if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable
+ case _ => throw new MalformedCarbonCommandException("table does not exist")
+ }
+ }
+
+ /**
+ * Below method will be used to validate the select plan
+ * and get the required fields from select plan
+ * Currently only aggregate query is support any other type of query will
+ * fail
+ * @param plan
+ * @param selectStmt
+ * @return list of fields
+ */
+ def validateActualSelectPlanAndGetAttrubites(plan: LogicalPlan,
+ selectStmt: String): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
+ val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+ plan match {
+ case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
+ if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ val carbonTable = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
+ .metaData.carbonTable
+ val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+ .getTableName
+ val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+ .getDatabaseName
+ val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+ .getTableId
+ if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
+ throw new MalformedCarbonCommandException(
+ "Pre Aggregation is not supported on Pre-Aggregated Table")
+ }
+ aExp.map {
+ case Alias(attr: AggregateExpression, _) =>
+ if (attr.isDistinct) {
+ throw new MalformedCarbonCommandException(
+ "Distinct is not supported On Pre Aggregation")
+ }
+ fieldToDataMapFieldMap ++= ((validateAggregateFunctionAndGetFields(carbonTable,
+ attr.aggregateFunction,
+ parentTableName,
+ parentDatabaseName,
+ parentTableId)))
+ case attr: AttributeReference =>
+ fieldToDataMapFieldMap += getField(attr.name,
+ attr.dataType,
+ parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName = parentTableName,
+ parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ case Alias(attr: AttributeReference, _) =>
+ fieldToDataMapFieldMap += getField(attr.name,
+ attr.dataType,
+ parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName = parentTableName,
+ parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ case _ =>
+ throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
+ selectStmt } ")
+ }
+ Some(carbonTable)
+ case _ =>
+ throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ selectStmt } ")
+ }
+ fieldToDataMapFieldMap
+ }
+
+ /**
+ * Below method will be used to validate about the aggregate function
+ * which is applied on select query.
+ * Currently sum, max, min, count, avg is supported
+ * in case of any other aggregate function it will throw error
+ * In case of avg it will return two fields one for count
+ * and other of sum of that column to support rollup
+ * @param carbonTable
+ * @param aggFunctions
+ * @param parentTableName
+ * @param parentDatabaseName
+ * @param parentTableId
+ * @return list of fields
+ */
+ def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable,
+ aggFunctions: AggregateFunction,
+ parentTableName: String,
+ parentDatabaseName: String,
+ parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
+ val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)]
+ aggFunctions match {
+ case sum@Sum(attr: AttributeReference) =>
+ list += getField(attr.name,
+ attr.dataType,
+ sum.prettyName,
+ carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName,
+ parentDatabaseName, parentTableId = parentTableId)
+ case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ list += getField(attr.name,
+ changeDataType,
+ sum.prettyName,
+ carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName,
+ parentDatabaseName, parentTableId = parentTableId)
+ case count@Count(Seq(attr: AttributeReference)) =>
+ list += getField(attr.name,
+ attr.dataType,
+ count.prettyName,
+ carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName,
+ parentDatabaseName, parentTableId = parentTableId)
+ case min@Min(attr: AttributeReference) =>
+ list += getField(attr.name,
+ attr.dataType,
+ min.prettyName,
+ carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName,
+ parentDatabaseName, parentTableId = parentTableId)
+ case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ list += getField(attr.name,
+ changeDataType,
+ min.prettyName,
+ carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName,
+ parentDatabaseName, parentTableId = parentTableId)
+ case max@Max(attr: AttributeReference) =>
+ list += getField(attr.name,
+ attr.dataType,
+ max.prettyName,
+ carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName,
+ parentDatabaseName, parentTableId = parentTableId)
+ case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ list += getField(attr.name,
+ changeDataType,
+ max.prettyName,
+ carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName,
+ parentDatabaseName, parentTableId = parentTableId)
+ case Average(attr: AttributeReference) =>
+ getField(attr.name,
+ attr.dataType,
+ "sum",
+ carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName,
+ parentDatabaseName, parentTableId = parentTableId)
+ list += getField(attr.name,
+ attr.dataType,
+ "count",
+ carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName,
+ parentDatabaseName, parentTableId = parentTableId)
+ case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ list += getField(attr.name,
+ changeDataType,
+ "sum",
+ carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName,
+ parentDatabaseName, parentTableId = parentTableId)
+ list += getField(attr.name,
+ changeDataType,
+ "count",
+ carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName,
+ parentDatabaseName, parentTableId = parentTableId)
+ case _ =>
+ throw new MalformedCarbonCommandException("Un-Supported Aggregation Type")
+ }
+ }
+
+ /**
+ * Below method will be used to get the fields object for pre aggregate table
+ * @param columnName
+ * @param dataType
+ * @param aggregateType
+ * @param parentColumnId
+ * @param parentTableName
+ * @param parentDatabaseName
+ * @param parentTableId
+ * @return fields object
+ */
+ def getField(columnName: String,
+ dataType: DataType,
+ aggregateType: String = "",
+ parentColumnId: String,
+ parentTableName: String,
+ parentDatabaseName: String,
+ parentTableId: String): (Field, DataMapField) = {
+ val actualColumnName = if (aggregateType.equals("")) {
+ parentTableName + '_' + columnName
+ } else {
+ parentTableName + '_' + columnName + '_' + aggregateType
+ }
+ val rawSchema = '`' + actualColumnName + '`' + ' ' + dataType.typeName
+ val columnTableRelation = ColumnTableRelation(parentColumnName = columnName,
+ parentColumnId = parentColumnId,
+ parentTableName = parentTableName,
+ parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ val dataMapField = DataMapField(aggregateType, Some(columnTableRelation))
+ if (dataType.typeName.startsWith("decimal")) {
+ val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType.catalogString)
+ (Field(column = actualColumnName,
+ dataType = Some(dataType.typeName),
+ name = Some(actualColumnName),
+ children = None,
+ precision = precision,
+ scale = scale,
+ rawSchema = rawSchema), dataMapField)
+ }
+ else {
+ (Field(column = actualColumnName,
+ dataType = Some(dataType.typeName),
+ name = Some(actualColumnName),
+ children = None,
+ rawSchema = rawSchema), dataMapField)
+ }
+ }
+
+ /**
+ * Below method will be used to update the main table about the pre aggregate table information
+ * in case of any exption it will throw error so pre aggregate table creation will fail
+ * @param dbName
+ * @param tableName
+ * @param childSchema
+ * @param sparkSession
+ */
+ def updateMainTable(dbName: String, tableName: String,
+ childSchema: DataMapSchema, sparkSession: SparkSession): Unit = {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+ LockUsage.DROP_TABLE_LOCK)
+ var locks = List.empty[ICarbonLock]
+ var carbonTable: CarbonTable = null
+ var numberOfCurrentChild: Int = 0
+ try {
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ carbonTable = metastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ .tableMeta.carbonTable
+ locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
+ // get the latest carbon table and check for column existence
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(thriftTableInfo,
+ dbName,
+ tableName,
+ carbonTable.getStorePath)
+ numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
+ wrapperTableInfo.getDataMapSchemaList.add(childSchema)
+ val thriftTable = schemaConverter
+ .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+ updateSchemaInfo(carbonTable,
+ thriftTable)(sparkSession,
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+ LOGGER.info(s"Pre Aggeragte Parent table updated is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes")
+ revertMainTableChanges(dbName, tableName, numberOfCurrentChild)(sparkSession)
+ throw e
+ } finally {
+ // release lock after command execution completion
+ releaseLocks(locks)
+ }
+ Seq.empty
+ }
+
+ /**
+ * Below method will be used to update the main table schema
+ * @param carbonTable
+ * @param thriftTable
+ * @param sparkSession
+ * @param sessionState
+ */
+ def updateSchemaInfo(carbonTable: CarbonTable,
+ thriftTable: TableInfo)(sparkSession: SparkSession,
+ sessionState: CarbonSessionState): Unit = {
+ val dbName = carbonTable.getDatabaseName
+ val tableName = carbonTable.getFactTableName
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .updateTableSchemaForDataMap(carbonTable.getCarbonTableIdentifier,
+ carbonTable.getCarbonTableIdentifier,
+ thriftTable,
+ carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+ val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+ sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+ }
+
+ /**
+ * This method will split schema string into multiple parts of configured size and
+ * registers the parts as keys in tableProperties which will be read by spark to prepare
+ * Carbon Table fields
+ *
+ * @param sparkConf
+ * @param schemaJsonString
+ * @return
+ */
+ private def prepareSchemaJson(sparkConf: SparkConf,
+ schemaJsonString: String): String = {
+ val threshold = sparkConf
+ .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD,
+ CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT)
+ // Split the JSON string.
+ val parts = schemaJsonString.grouped(threshold).toSeq
+ var schemaParts: Seq[String] = Seq.empty
+ schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ parts.size }'"
+ parts.zipWithIndex.foreach { case (part, index) =>
+ schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'"
+ }
+ schemaParts.mkString(",")
+ }
+
+ /**
+ * Validates that the table exists and acquires meta lock on it.
+ *
+ * @param dbName
+ * @param tableName
+ * @return
+ */
+ def acquireLock(dbName: String,
+ tableName: String,
+ locksToBeAcquired: List[String],
+ table: CarbonTable): List[ICarbonLock] = {
+ // acquire the lock first
+ val acquiredLocks = ListBuffer[ICarbonLock]()
+ try {
+ locksToBeAcquired.foreach { lock =>
+ acquiredLocks += CarbonLockUtil.getLockObject(table.getCarbonTableIdentifier, lock)
+ }
+ acquiredLocks.toList
+ } catch {
+ case e: Exception =>
+ releaseLocks(acquiredLocks.toList)
+ throw e
+ }
+ }
+
+ /**
+ * This method will release the locks acquired for an operation
+ *
+ * @param locks
+ */
+ def releaseLocks(locks: List[ICarbonLock]): Unit = {
+ locks.foreach { carbonLock =>
+ if (carbonLock.unlock()) {
+ LOGGER.info("Pre agg table lock released successfully")
+ } else {
+ LOGGER.error("Unable to release lock during Pre agg table cretion")
+ }
+ }
+ }
+
+ /**
+ * This method reverts the changes to the schema if add column command fails.
+ *
+ * @param dbName
+ * @param tableName
+ * @param numberOfChildSchema
+ * @param sparkSession
+ */
+ def revertMainTableChanges(dbName: String, tableName: String, numberOfChildSchema: Int)
+ (sparkSession: SparkSession): Unit = {
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val carbonTable = metastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+ .carbonTable
+ carbonTable.getTableLastUpdatedTime
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
+ metastore
+ .revertTableSchemaForPreAggCreationFailure(carbonTable.getCarbonTableIdentifier,
+ thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
index af361d5..2be3c7c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
@@ -105,7 +105,7 @@ private[sql] case class AlterTableRenameTableCommand(
}
val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
- val newTablePath = metastore.updateTableSchema(newTableIdentifier,
+ val newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier,
carbonTable.getCarbonTableIdentifier,
tableInfo,
schemaEvolutionEntry,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 16724fc..6c29f83 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
+import org.apache.carbondata.format
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -110,6 +111,22 @@ class CarbonFileMetastore extends CarbonMetaStore {
}
}
+ /**
+ * This method will overwrite the existing schema and update it with the given details
+ *
+ * @param newTableIdentifier
+ * @param thriftTableInfo
+ * @param carbonStorePath
+ * @param sparkSession
+ */
+ def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
+ oldTableIdentifier: CarbonTableIdentifier,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo,
+ carbonStorePath: String)(sparkSession: SparkSession): String = {
+ updateTableSchemaForAlter(newTableIdentifier,
+ oldTableIdentifier, thriftTableInfo, null, carbonStorePath) (sparkSession)
+ }
+
def lookupRelation(dbName: Option[String], tableName: String)
(sparkSession: SparkSession): LogicalPlan = {
lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
@@ -205,7 +222,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
* @param tablePath
* @param sparkSession
*/
- def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
+ def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
@@ -242,7 +259,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
* @param tablePath
* @param sparkSession
*/
- def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+ def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
tablePath: String)(sparkSession: SparkSession): String = {
val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
@@ -262,7 +279,27 @@ class CarbonFileMetastore extends CarbonMetaStore {
path
}
+ override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier:
+ CarbonTableIdentifier,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo,
+ tablePath: String)(sparkSession: SparkSession): String = {
+ val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(thriftTableInfo,
+ carbonTableIdentifier.getDatabaseName,
+ carbonTableIdentifier.getTableName,
+ tableIdentifier.getStorePath)
+ val childSchemaList = wrapperTableInfo.getDataMapSchemaList
+ childSchemaList.remove(childSchemaList.size() - 1)
+ wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+ val path = createSchemaThriftFile(wrapperTableInfo,
+ thriftTableInfo,
+ tableIdentifier.getCarbonTableIdentifier)
+ addTableCache(wrapperTableInfo, tableIdentifier)
+ path
+ }
/**
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 76241a6..c64b7bb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -115,7 +115,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
* @param schemaEvolutionEntry
* @param sparkSession
*/
- override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
+ override def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
@@ -126,7 +126,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
if (schemaEvolutionEntry != null) {
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
}
- updateHiveMetaStore(newTableIdentifier,
+ updateHiveMetaStoreForAlter(newTableIdentifier,
oldTableIdentifier,
thriftTableInfo,
identifier.getStorePath,
@@ -134,7 +134,29 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
schemaConverter)
}
- private def updateHiveMetaStore(newTableIdentifier: CarbonTableIdentifier,
+ /**
+ * This method will overwrite the existing schema and update it with the given details
+ *
+ * @param newTableIdentifier
+ * @param thriftTableInfo
+ * @param carbonStorePath
+ * @param sparkSession
+ */
+ override def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
+ oldTableIdentifier: CarbonTableIdentifier,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo,
+ carbonStorePath: String)(sparkSession: SparkSession): String = {
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val identifier = AbsoluteTableIdentifier.fromTablePath(carbonStorePath)
+ updateHiveMetaStoreForDataMap(newTableIdentifier,
+ oldTableIdentifier,
+ thriftTableInfo,
+ identifier.getStorePath,
+ sparkSession,
+ schemaConverter)
+ }
+
+ private def updateHiveMetaStoreForAlter(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: format.TableInfo,
carbonStorePath: String,
@@ -161,6 +183,30 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath
}
+ private def updateHiveMetaStoreForDataMap(newTableIdentifier: CarbonTableIdentifier,
+ oldTableIdentifier: CarbonTableIdentifier,
+ thriftTableInfo: format.TableInfo,
+ carbonStorePath: String,
+ sparkSession: SparkSession,
+ schemaConverter: ThriftWrapperSchemaConverterImpl) = {
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(thriftTableInfo,
+ newTableIdentifier.getDatabaseName,
+ newTableIdentifier.getTableName,
+ carbonStorePath)
+ wrapperTableInfo.setStorePath(carbonStorePath)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier)
+ val schemaMetadataPath =
+ CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+ wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
+ val dbName = oldTableIdentifier.getDatabaseName
+ val tableName = oldTableIdentifier.getTableName
+ sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
+ removeTableFromMetadata(dbName, tableName)
+ CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+ CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath
+ }
+
/**
* This method will is used to remove the evolution entry in case of failure.
*
@@ -168,7 +214,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
* @param thriftTableInfo
* @param sparkSession
*/
- override def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+ override def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: format.TableInfo,
tablePath: String)
(sparkSession: SparkSession): String = {
@@ -176,7 +222,23 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
evolutionEntries.remove(evolutionEntries.size() - 1)
- updateHiveMetaStore(carbonTableIdentifier,
+ updateHiveMetaStoreForAlter(carbonTableIdentifier,
+ carbonTableIdentifier,
+ thriftTableInfo,
+ identifier.getStorePath,
+ sparkSession,
+ schemaConverter)
+ }
+
+ override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier:
+ CarbonTableIdentifier,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo,
+ tablePath: String)(sparkSession: SparkSession): String = {
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+ val childSchemas = thriftTableInfo.dataMapSchemas
+ childSchemas.remove(childSchemas.size())
+ updateHiveMetaStoreForAlter(carbonTableIdentifier,
carbonTableIdentifier,
thriftTableInfo,
identifier.getStorePath,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index dcb43d1..24996ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -66,13 +66,26 @@ trait CarbonMetaStore {
* @param carbonStorePath
* @param sparkSession
*/
- def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
+ def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier,
oldTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
schemaEvolutionEntry: SchemaEvolutionEntry,
carbonStorePath: String)(sparkSession: SparkSession): String
/**
+ * This method will overwrite the existing schema and update it with the given details
+ *
+ * @param newTableIdentifier
+ * @param thriftTableInfo
+ * @param carbonStorePath
+ * @param sparkSession
+ */
+ def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
+ oldTableIdentifier: CarbonTableIdentifier,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo,
+ carbonStorePath: String)(sparkSession: SparkSession): String
+
+ /**
* This method will is used to remove the evolution entry in case of failure.
*
* @param carbonTableIdentifier
@@ -80,11 +93,15 @@ trait CarbonMetaStore {
* @param tablePath
* @param sparkSession
*/
- def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+ def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
tablePath: String)
(sparkSession: SparkSession): String
+
+ def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier: CarbonTableIdentifier,
+ thriftTableInfo: org.apache.carbondata.format.TableInfo,
+ tablePath: String)(sparkSession: SparkSession): String
/**
* Prepare Thrift Schema from wrapper TableInfo and write to disk
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 9f4a8ce..e015c4f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -440,7 +440,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
// so checking the start of the string and taking the precision and scale.
// resetting the data type with decimal
if (f.dataType.getOrElse("").startsWith("decimal")) {
- val (precision, scale) = getScaleAndPrecision(col.dataType.catalogString)
+ val (precision, scale) = CommonUtil.getScaleAndPrecision(col.dataType.catalogString)
f.precision = precision
f.scale = scale
f.dataType = Some("decimal")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 52008f2..0ce2155 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,16 +18,17 @@ package org.apache.spark.sql.parser
import scala.collection.mutable
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
import org.apache.spark.sql.catalyst.parser.ParserUtils._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCommand, Field, PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
-import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
@@ -38,7 +39,7 @@ import org.apache.carbondata.spark.util.CommonUtil
*/
class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser {
- val astBuilder = new CarbonSqlAstBuilder(conf)
+ val astBuilder = new CarbonSqlAstBuilder(conf, sparkSession: SparkSession)
private val substitutor = new VariableSubstitution(conf)
@@ -69,7 +70,8 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
}
}
-class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
+class CarbonSqlAstBuilder(conf: SQLConf, sparkSession: SparkSession)
+ extends SparkSqlAstBuilder(conf) {
val parser = new CarbonSpark2SqlParser
@@ -119,7 +121,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
val tableProperties = mutable.Map[String, String]()
properties.foreach{property => tableProperties.put(property._1, property._2)}
-
+ val isAggTable = tableProperties.get("parent").isDefined
// validate partition clause
if (partitionerFields.nonEmpty) {
if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
@@ -129,10 +131,20 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
if (badPartCols.nonEmpty) {
operationNotAllowed(s"Partition columns should not be specified in the schema: " +
- badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
+ badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
}
}
- val fields = parser.getFields(cols ++ partitionByStructFields)
+ var fields = parser.getFields(cols ++ partitionByStructFields)
+ val dfAndFieldRelationTuple = if (isAggTable) {
+ val selectQuery = Option(ctx.query).map(plan).get
+ val df = Dataset.ofRows(sparkSession, selectQuery)
+ val fieldRelationMap = PreAggregateUtil
+ .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, source(ctx.query()))
+ fields = fieldRelationMap.keySet.toSeq
+ Some(df, fieldRelationMap)
+ } else {
+ None
+ }
val options = new CarbonOption(properties)
// validate tblProperties
val bucketFields = parser.getBucketFields(tableProperties, fields, options)
@@ -146,7 +158,14 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
tableProperties,
bucketFields)
- CarbonCreateTableCommand(tableModel)
+ if(!isAggTable) {
+ CarbonCreateTableCommand(tableModel)
+ } else {
+ CreatePreAggregateTableCommand(tableModel,
+ dfAndFieldRelationTuple.get._1,
+ queryString = source(ctx.query).toString,
+ fieldRelationMap = dfAndFieldRelationTuple.get._2)
+ }
} else {
super.visitCreateTable(ctx)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/33d11997/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 74f4dd0..a82823d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -132,7 +132,7 @@ object AlterTableUtil {
val dbName = carbonTable.getDatabaseName
val tableName = carbonTable.getFactTableName
CarbonEnv.getInstance(sparkSession).carbonMetastore
- .updateTableSchema(carbonTable.getCarbonTableIdentifier,
+ .updateTableSchemaForAlter(carbonTable.getCarbonTableIdentifier,
carbonTable.getCarbonTableIdentifier,
thriftTable,
schemaEvolutionEntry,
@@ -207,7 +207,7 @@ object AlterTableUtil {
.renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
oldTableIdentifier.table)
val tableIdentifier = new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)
- metastore.revertTableSchema(tableIdentifier,
+ metastore.revertTableSchemaInAlterFailure(tableIdentifier,
tableInfo, carbonTablePath.getPath)(sparkSession)
metastore.removeTableFromMetadata(database, newTableName)
}
@@ -239,7 +239,7 @@ object AlterTableUtil {
val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added
thriftTable.fact_table.table_columns.removeAll(addedSchemas)
metastore
- .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+ .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
}
}
@@ -274,7 +274,7 @@ object AlterTableUtil {
}
}
metastore
- .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+ .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
}
}
@@ -312,7 +312,7 @@ object AlterTableUtil {
}
}
metastore
- .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+ .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
}
}