You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/22 08:44:24 UTC

[carbondata] branch master updated: [CARBONDATA-3442]Fix creating mv datamap with column name having length more than 128

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new a269bde  [CARBONDATA-3442]Fix creating mv datamap with column name having length more than 128
a269bde is described below

commit a269bdec9d15d62b3f52516f2ca8eec0cbead0e2
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Tue Jun 18 20:31:45 2019 +0530

    [CARBONDATA-3442]Fix creating mv datamap with column name having length more than 128
    
    Problem:
    creating mv datamap with column name having length more than 128 fails
    Solution:
    If column name is more than 128, then take substring and append a counter
    
    This closes #3290
---
 .../core/constants/CarbonCommonConstants.java        |  5 +++++
 .../org/apache/carbondata/mv/datamap/MVHelper.scala  | 20 +++++++++++++-------
 .../org/apache/carbondata/mv/datamap/MVUtil.scala    | 11 +++++++----
 .../org/apache/carbondata/mv/rewrite/Navigator.scala |  7 ++++---
 .../carbondata/mv/rewrite/MVCreateTestCase.scala     | 16 ++++++++++++++++
 5 files changed, 45 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 6888f8e..9b5260f 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2239,4 +2239,9 @@ public final class CarbonCommonConstants {
    * index server temp file name
    */
   public static final String INDEX_SERVER_TEMP_FOLDER_NAME = "indexservertmp";
+
+  /**
+   * hive column-name maximum length
+   */
+  public static final int MAXIMUM_CHAR_LENGTH = 128;
 }
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 22ec9e3..f554ef3 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -61,7 +61,8 @@ object MVHelper {
         s"MV datamap does not support streaming"
       )
     }
-    MVUtil.validateDMProperty(dmProperties)
+    val mvUtil = new MVUtil
+    mvUtil.validateDMProperty(dmProperties)
     val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
     val query = sparkSession.sql(updatedQuery)
     val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
@@ -79,6 +80,7 @@ object MVHelper {
     }
     val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
     val fullRebuild = isFullReload(logicalPlan)
+    var counter = 0
     // the ctas query can have duplicate columns, so we should take distinct and create fields,
     // so that it won't fail during create mv table
     val fields = logicalPlan.output.map { attr =>
@@ -87,7 +89,8 @@ object MVHelper {
         throw new UnsupportedOperationException(
           s"MV datamap is unsupported for ComplexData type column: " + attr.name)
       }
-      val name = updateColumnName(attr)
+      val name = updateColumnName(attr, counter)
+      counter += 1
       val rawSchema = '`' + name + '`' + ' ' + attr.dataType.typeName
       if (attr.dataType.typeName.startsWith("decimal")) {
         val (precision, scale) = CommonUtil.getScaleAndPrecision(attr.dataType.catalogString)
@@ -137,7 +140,7 @@ object MVHelper {
     tableProperties.put(CarbonCommonConstants.DATAMAP_NAME, dataMapSchema.getDataMapName)
     tableProperties.put(CarbonCommonConstants.PARENT_TABLES, parentTables.asScala.mkString(","))
 
-    val fieldRelationMap = MVUtil.getFieldsAndDataMapFieldsFromPlan(
+    val fieldRelationMap = mvUtil.getFieldsAndDataMapFieldsFromPlan(
       logicalPlan, queryString, sparkSession)
     // If dataMap is mapped to single main table, then inherit table properties from main table,
     // else, will use default table properties. If DMProperties contains table properties, then
@@ -330,19 +333,22 @@ object MVHelper {
     modularPlan.asCompactSQL
   }
 
-  def getUpdatedName(name: String): String = {
-    val updatedName = name.replace("(", "_")
+  def getUpdatedName(name: String, counter: Int): String = {
+    var updatedName = name.replace("(", "_")
       .replace(")", "")
       .replace(" ", "_")
       .replace("=", "")
       .replace(",", "")
       .replace(".", "_")
       .replace("`", "")
+    if (updatedName.length >= CarbonCommonConstants.MAXIMUM_CHAR_LENGTH) {
+      updatedName = updatedName.substring(0, 110) + CarbonCommonConstants.UNDERSCORE + counter
+    }
     updatedName
   }
 
-  def updateColumnName(attr: Attribute): String = {
-    val name = getUpdatedName(attr.name)
+  def updateColumnName(attr: Attribute, counter: Int): String = {
+    val name = getUpdatedName(attr.name, counter)
     attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
   }
 
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index 4dff5b8..048e22d 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -35,7 +35,9 @@ import org.apache.carbondata.spark.util.CommonUtil
 /**
  * Utility class for keeping all the utility method for mv datamap
  */
-object MVUtil {
+class MVUtil {
+
+  var counter = 0
 
   /**
    * Below method will be used to validate and get the required fields from select plan
@@ -127,9 +129,9 @@ object MVUtil {
     aggExp.map { agg =>
       var aggregateType: String = ""
       val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
+      var isLiteralPresent = false
       agg.collect {
         case Alias(attr: AggregateExpression, name) =>
-          var isLiteralPresent = false
           attr.aggregateFunction.collect {
             case l@Literal(_, _) =>
               isLiteralPresent = true
@@ -175,7 +177,7 @@ object MVUtil {
             }
           }
       }
-      if (!aggregateType.isEmpty && arrayBuffer.nonEmpty) {
+      if (!aggregateType.isEmpty && arrayBuffer.nonEmpty && !isLiteralPresent) {
         fieldToDataMapFieldMap +=
         getFieldToDataMapFields(agg.name,
           agg.dataType,
@@ -260,7 +262,8 @@ object MVUtil {
       aggregateType: String,
       columnTableRelationList: ArrayBuffer[ColumnTableRelation],
       parenTableName: String) = {
-    var actualColumnName = MVHelper.getUpdatedName(name)
+    var actualColumnName = MVHelper.getUpdatedName(name, counter)
+    counter += 1
     if (qualifier.isDefined) {
       actualColumnName = qualifier.map(qualifier => qualifier + "_" + name)
         .getOrElse(actualColumnName)
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
index b6cbc24..54a684e 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -121,17 +121,18 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession)
       subsumee: ModularPlan,
       dataMapRelation: ModularPlan): ModularPlan = {
     // Update datamap table relation to the subsumer modular plan
+    val mVUtil = new MVUtil
     val updatedSubsumer = subsumer match {
       // In case of order by it adds extra select but that can be ignored while doing selection.
       case s@Select(_, _, _, _, _, Seq(g: GroupBy), _, _, _, _) =>
         s.copy(children = Seq(g.copy(dataMapTableRelation = Some(dataMapRelation))),
-            outputList = MVUtil.updateDuplicateColumns(s.outputList))
+          outputList = mVUtil.updateDuplicateColumns(s.outputList))
       case s: Select => s
         .copy(dataMapTableRelation = Some(dataMapRelation),
-          outputList = MVUtil.updateDuplicateColumns(s.outputList))
+          outputList = mVUtil.updateDuplicateColumns(s.outputList))
       case g: GroupBy => g
         .copy(dataMapTableRelation = Some(dataMapRelation),
-          outputList = MVUtil.updateDuplicateColumns(g.outputList))
+          outputList = mVUtil.updateDuplicateColumns(g.outputList))
       case other => other
     }
     (updatedSubsumer, subsumee) match {
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index e7da16a..535ddef 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -1136,6 +1136,22 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     assert(TestUtil.verifyMVDataMap(analyzed1, "same_mv"))
   }
 
+  test("test datamap column having more than 128 characters") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable (m_month smallint, c_code string, " +
+        "c_country smallint, d_dollar_value double, q_quantity double, u_unit smallint, b_country smallint, i_id int, y_year smallint) stored by 'carbondata'")
+    sql("insert into maintable select 10, 'xxx', 123, 456, 45, 5, 23, 1, 2000")
+    sql("drop datamap if exists da_agg")
+    sql("create datamap da_agg using 'mv' as select u_unit, y_year, m_month, c_country, b_country, sum(case when i_id=1 and (y_year=2000 and m_month=10)" +
+        "then d_dollar_value else 0 end), sum(case when i_id=1 and (y_year=2000 and m_month=10) then q_quantity else 0 end) ex, sum(case when i_id=1 and (y_year=2011 and " +
+        "(m_month>=7 and m_month <=12)) then q_quantity else 0 end) from maintable group by u_unit, y_year, m_month, c_country, b_country")
+    val df = sql("select u_unit, y_year, m_month, c_country, b_country, sum(case when i_id=1 and (y_year=2000 and m_month=10) then d_dollar_value else 0 end), " +
+                 "sum(case when i_id=1 and (y_year=2000 and m_month=10) then q_quantity else 0 end) ex, sum(case when i_id=1 and (y_year=2011 and (m_month>=7 and m_month " +
+                 "<=12)) then q_quantity else 0 end) from maintable group by u_unit,y_year, m_month, c_country, b_country")
+    val analyzed = df.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed, "da_agg"))
+    sql("drop table IF EXISTS maintable")
+  }
 
   def drop(): Unit = {
     sql("drop table IF EXISTS fact_table1")