You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/22 08:44:24 UTC
[carbondata] branch master updated: [CARBONDATA-3442]Fix creating
mv datamap with column name having length more than 128
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new a269bde [CARBONDATA-3442]Fix creating mv datamap with column name having length more than 128
a269bde is described below
commit a269bdec9d15d62b3f52516f2ca8eec0cbead0e2
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Tue Jun 18 20:31:45 2019 +0530
[CARBONDATA-3442]Fix creating mv datamap with column name having length more than 128
Problem:
creating mv datamap with column name having length more than 128 fails
Solution:
If column name is more than 128, then take substring and append a counter
This closes #3290
---
.../core/constants/CarbonCommonConstants.java | 5 +++++
.../org/apache/carbondata/mv/datamap/MVHelper.scala | 20 +++++++++++++-------
.../org/apache/carbondata/mv/datamap/MVUtil.scala | 11 +++++++----
.../org/apache/carbondata/mv/rewrite/Navigator.scala | 7 ++++---
.../carbondata/mv/rewrite/MVCreateTestCase.scala | 16 ++++++++++++++++
5 files changed, 45 insertions(+), 14 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6888f8e..9b5260f 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2239,4 +2239,9 @@ public final class CarbonCommonConstants {
* index server temp file name
*/
public static final String INDEX_SERVER_TEMP_FOLDER_NAME = "indexservertmp";
+
+ /**
+ * hive column-name maximum length
+ */
+ public static final int MAXIMUM_CHAR_LENGTH = 128;
}
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 22ec9e3..f554ef3 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -61,7 +61,8 @@ object MVHelper {
s"MV datamap does not support streaming"
)
}
- MVUtil.validateDMProperty(dmProperties)
+ val mvUtil = new MVUtil
+ mvUtil.validateDMProperty(dmProperties)
val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
val query = sparkSession.sql(updatedQuery)
val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
@@ -79,6 +80,7 @@ object MVHelper {
}
val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
val fullRebuild = isFullReload(logicalPlan)
+ var counter = 0
// the ctas query can have duplicate columns, so we should take distinct and create fields,
// so that it won't fail during create mv table
val fields = logicalPlan.output.map { attr =>
@@ -87,7 +89,8 @@ object MVHelper {
throw new UnsupportedOperationException(
s"MV datamap is unsupported for ComplexData type column: " + attr.name)
}
- val name = updateColumnName(attr)
+ val name = updateColumnName(attr, counter)
+ counter += 1
val rawSchema = '`' + name + '`' + ' ' + attr.dataType.typeName
if (attr.dataType.typeName.startsWith("decimal")) {
val (precision, scale) = CommonUtil.getScaleAndPrecision(attr.dataType.catalogString)
@@ -137,7 +140,7 @@ object MVHelper {
tableProperties.put(CarbonCommonConstants.DATAMAP_NAME, dataMapSchema.getDataMapName)
tableProperties.put(CarbonCommonConstants.PARENT_TABLES, parentTables.asScala.mkString(","))
- val fieldRelationMap = MVUtil.getFieldsAndDataMapFieldsFromPlan(
+ val fieldRelationMap = mvUtil.getFieldsAndDataMapFieldsFromPlan(
logicalPlan, queryString, sparkSession)
// If dataMap is mapped to single main table, then inherit table properties from main table,
// else, will use default table properties. If DMProperties contains table properties, then
@@ -330,19 +333,22 @@ object MVHelper {
modularPlan.asCompactSQL
}
- def getUpdatedName(name: String): String = {
- val updatedName = name.replace("(", "_")
+ def getUpdatedName(name: String, counter: Int): String = {
+ var updatedName = name.replace("(", "_")
.replace(")", "")
.replace(" ", "_")
.replace("=", "")
.replace(",", "")
.replace(".", "_")
.replace("`", "")
+ if (updatedName.length >= CarbonCommonConstants.MAXIMUM_CHAR_LENGTH) {
+ updatedName = updatedName.substring(0, 110) + CarbonCommonConstants.UNDERSCORE + counter
+ }
updatedName
}
- def updateColumnName(attr: Attribute): String = {
- val name = getUpdatedName(attr.name)
+ def updateColumnName(attr: Attribute, counter: Int): String = {
+ val name = getUpdatedName(attr.name, counter)
attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
}
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index 4dff5b8..048e22d 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -35,7 +35,9 @@ import org.apache.carbondata.spark.util.CommonUtil
/**
* Utility class for keeping all the utility method for mv datamap
*/
-object MVUtil {
+class MVUtil {
+
+ var counter = 0
/**
* Below method will be used to validate and get the required fields from select plan
@@ -127,9 +129,9 @@ object MVUtil {
aggExp.map { agg =>
var aggregateType: String = ""
val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
+ var isLiteralPresent = false
agg.collect {
case Alias(attr: AggregateExpression, name) =>
- var isLiteralPresent = false
attr.aggregateFunction.collect {
case l@Literal(_, _) =>
isLiteralPresent = true
@@ -175,7 +177,7 @@ object MVUtil {
}
}
}
- if (!aggregateType.isEmpty && arrayBuffer.nonEmpty) {
+ if (!aggregateType.isEmpty && arrayBuffer.nonEmpty && !isLiteralPresent) {
fieldToDataMapFieldMap +=
getFieldToDataMapFields(agg.name,
agg.dataType,
@@ -260,7 +262,8 @@ object MVUtil {
aggregateType: String,
columnTableRelationList: ArrayBuffer[ColumnTableRelation],
parenTableName: String) = {
- var actualColumnName = MVHelper.getUpdatedName(name)
+ var actualColumnName = MVHelper.getUpdatedName(name, counter)
+ counter += 1
if (qualifier.isDefined) {
actualColumnName = qualifier.map(qualifier => qualifier + "_" + name)
.getOrElse(actualColumnName)
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
index b6cbc24..54a684e 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -121,17 +121,18 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession)
subsumee: ModularPlan,
dataMapRelation: ModularPlan): ModularPlan = {
// Update datamap table relation to the subsumer modular plan
+ val mVUtil = new MVUtil
val updatedSubsumer = subsumer match {
// In case of order by it adds extra select but that can be ignored while doing selection.
case s@Select(_, _, _, _, _, Seq(g: GroupBy), _, _, _, _) =>
s.copy(children = Seq(g.copy(dataMapTableRelation = Some(dataMapRelation))),
- outputList = MVUtil.updateDuplicateColumns(s.outputList))
+ outputList = mVUtil.updateDuplicateColumns(s.outputList))
case s: Select => s
.copy(dataMapTableRelation = Some(dataMapRelation),
- outputList = MVUtil.updateDuplicateColumns(s.outputList))
+ outputList = mVUtil.updateDuplicateColumns(s.outputList))
case g: GroupBy => g
.copy(dataMapTableRelation = Some(dataMapRelation),
- outputList = MVUtil.updateDuplicateColumns(g.outputList))
+ outputList = mVUtil.updateDuplicateColumns(g.outputList))
case other => other
}
(updatedSubsumer, subsumee) match {
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index e7da16a..535ddef 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -1136,6 +1136,22 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
assert(TestUtil.verifyMVDataMap(analyzed1, "same_mv"))
}
+ test("test datamap column having more than 128 characters") {
+ sql("drop table IF EXISTS maintable")
+ sql("create table maintable (m_month smallint, c_code string, " +
+ "c_country smallint, d_dollar_value double, q_quantity double, u_unit smallint, b_country smallint, i_id int, y_year smallint) stored by 'carbondata'")
+ sql("insert into maintable select 10, 'xxx', 123, 456, 45, 5, 23, 1, 2000")
+ sql("drop datamap if exists da_agg")
+ sql("create datamap da_agg using 'mv' as select u_unit, y_year, m_month, c_country, b_country, sum(case when i_id=1 and (y_year=2000 and m_month=10)" +
+ "then d_dollar_value else 0 end), sum(case when i_id=1 and (y_year=2000 and m_month=10) then q_quantity else 0 end) ex, sum(case when i_id=1 and (y_year=2011 and " +
+ "(m_month>=7 and m_month <=12)) then q_quantity else 0 end) from maintable group by u_unit, y_year, m_month, c_country, b_country")
+ val df = sql("select u_unit, y_year, m_month, c_country, b_country, sum(case when i_id=1 and (y_year=2000 and m_month=10) then d_dollar_value else 0 end), " +
+ "sum(case when i_id=1 and (y_year=2000 and m_month=10) then q_quantity else 0 end) ex, sum(case when i_id=1 and (y_year=2011 and (m_month>=7 and m_month " +
+ "<=12)) then q_quantity else 0 end) from maintable group by u_unit,y_year, m_month, c_country, b_country")
+ val analyzed = df.queryExecution.analyzed
+ assert(TestUtil.verifyMVDataMap(analyzed, "da_agg"))
+ sql("drop table IF EXISTS maintable")
+ }
def drop(): Unit = {
sql("drop table IF EXISTS fact_table1")