You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/03/17 14:14:51 UTC

[1/2] incubator-carbondata git commit: Modified optimizer to place decoder on top of limit in case of sort and limit.

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 1dc3111e3 -> acd5f1b48


Modified optimizer to place decoder on top of limit in case of sort and limit.

Fixed comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b282e50d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b282e50d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b282e50d

Branch: refs/heads/master
Commit: b282e50d95bcf0249b29eb3dd4327f3227dad411
Parents: 1dc3111
Author: ravipesala <ra...@gmail.com>
Authored: Fri Mar 17 00:48:26 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Fri Mar 17 21:53:32 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   5 +
 .../processor/AbstractDataBlockIterator.java    |   4 +-
 .../spark/sql/optimizer/CarbonOptimizer.scala   |  10 ++
 .../spark/sql/CarbonCatalystOperators.scala     |   4 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     | 155 +++++++++++--------
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  10 ++
 6 files changed, 119 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b282e50d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 853d3b1..6e2be5d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -174,6 +174,11 @@ public final class CarbonCommonConstants {
    * MEMBER_DEFAULT_VAL_ARRAY
    */
   public static final byte[] MEMBER_DEFAULT_VAL_ARRAY = MEMBER_DEFAULT_VAL.getBytes();
+
+  /**
+   * Bytes for string 0, it is used in codegen in case of null values.
+   */
+  public static final byte[] ZERO_BYTE_ARRAY = "0".getBytes();
   /**
    * FILE STATUS IN-PROGRESS
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b282e50d/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index 4d01a05..aec4933 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -222,7 +222,9 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
     if (null != future) {
       try {
         AbstractScannedResult abstractScannedResult = future.get();
-        abstractScannedResult.freeMemory();
+        if (abstractScannedResult != null) {
+          abstractScannedResult.freeMemory();
+        }
       } catch (InterruptedException | ExecutionException e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b282e50d/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 0e5b70c..11f5ea2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -207,6 +207,16 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
 
     def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = {
       currentPlan match {
+        case limit@Limit(_, child: Sort) =>
+          if (!decoder) {
+            decoder = true
+            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+              new util.HashSet[AttributeReferenceWrapper](),
+              limit,
+              isOuter = true)
+          } else {
+            limit
+          }
         case sort: Sort if !sort.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
           val attrsOnSort = new util.HashSet[AttributeReferenceWrapper]()
           sort.order.map { s =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b282e50d/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 2943a52..d94489b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -32,7 +32,9 @@ case class CarbonDictionaryCatalystDecoder(
     aliasMap: CarbonAliasDecoderRelation,
     isOuter: Boolean,
     child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
+  // the output should be updated with converted datatype, it is need for limit+sort plan.
+  override val output: Seq[Attribute] =
+    CarbonDictionaryDecoder.convertOutput(child.output, relations, profile, aliasMap)
 }
 
 abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b282e50d/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 865f867..3916b1c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -49,83 +49,19 @@ case class CarbonDictionaryDecoder(
     child: SparkPlan)
   extends UnaryExecNode with CodegenSupport {
 
-  override val output: Seq[Attribute] = {
-    child.output.map { a =>
-      val attr = aliasMap.getOrElse(a, a)
-      val relation = relations.find(p => p.contains(attr))
-      if (relation.isDefined && canBeDecoded(attr)) {
-        val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
-        val carbonDimension = carbonTable
-          .getDimensionByName(carbonTable.getFactTableName, attr.name)
-        if (carbonDimension != null &&
-            carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-            !carbonDimension.isComplex()) {
-          val newAttr = AttributeReference(a.name,
-            convertCarbonToSparkDataType(carbonDimension,
-              relation.get.carbonRelation.carbonRelation),
-            a.nullable,
-            a.metadata)(a.exprId).asInstanceOf[Attribute]
-          newAttr
-        } else {
-          a
-        }
-      } else {
-        a
-      }
-    }
-  }
+  override val output: Seq[Attribute] =
+    CarbonDictionaryDecoder.convertOutput(child.output, relations, profile, aliasMap)
 
 
   override def outputPartitioning: Partitioning = {
     child.outputPartitioning
   }
 
-  def canBeDecoded(attr: Attribute): Boolean = {
-    profile match {
-      case ip: IncludeProfile if ip.attributes.nonEmpty =>
-        ip.attributes
-          .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
-      case ep: ExcludeProfile =>
-        !ep.attributes
-          .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
-      case _ => true
-    }
-  }
-
-  def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
-      relation: CarbonRelation): types.DataType = {
-    carbonDimension.getDataType match {
-      case DataType.STRING => StringType
-      case DataType.SHORT => ShortType
-      case DataType.INT => IntegerType
-      case DataType.LONG => LongType
-      case DataType.DOUBLE => DoubleType
-      case DataType.BOOLEAN => BooleanType
-      case DataType.DECIMAL =>
-        val scale: Int = carbonDimension.getColumnSchema.getScale
-        val precision: Int = carbonDimension.getColumnSchema.getPrecision
-        if (scale == 0 && precision == 0) {
-          DecimalType(18, 2)
-        } else {
-          DecimalType(precision, scale)
-        }
-      case DataType.TIMESTAMP => TimestampType
-      case DataType.DATE => DateType
-      case DataType.STRUCT =>
-        CarbonMetastoreTypes
-          .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
-      case DataType.ARRAY =>
-        CarbonMetastoreTypes
-          .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
-    }
-  }
-
   val getDictionaryColumnIds: Array[(String, ColumnIdentifier, CarbonDimension)] = {
     child.output.map { attribute =>
       val attr = aliasMap.getOrElse(attribute, attribute)
       val relation = relations.find(p => p.contains(attr))
-      if (relation.isDefined && canBeDecoded(attr)) {
+      if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, profile)) {
         val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension =
           carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
@@ -235,6 +171,8 @@ case class CarbonDictionaryDecoder(
              |if (java.util.Arrays.equals(org.apache.carbondata.core.constants
              |.CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, $valueIntern)) {
              |  $isNull = true;
+             |  $valueIntern = org.apache.carbondata.core.constants
+             |  .CarbonCommonConstants.ZERO_BYTE_ARRAY;
              |}
              """.stripMargin
 
@@ -371,6 +309,89 @@ case class CarbonDictionaryDecoder(
 
 }
 
+object CarbonDictionaryDecoder {
+
+  /**
+   * Converts the datatypes of attributes as per the decoder plan. If a column needs to be decoded
+   * here then that datatype is updated to its original datatype from Int type.
+   */
+  def convertOutput(output: Seq[Attribute],
+      relations: Seq[CarbonDecoderRelation],
+      profile: CarbonProfile,
+      aliasMap: CarbonAliasDecoderRelation): Seq[Attribute] = {
+    output.map { a =>
+      val attr = aliasMap.getOrElse(a, a)
+      val relation = relations.find(p => p.contains(attr))
+      if (relation.isDefined && canBeDecoded(attr, profile)) {
+        val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
+        val carbonDimension = carbonTable
+          .getDimensionByName(carbonTable.getFactTableName, attr.name)
+        if (carbonDimension != null &&
+            carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+            !carbonDimension.isComplex()) {
+          val newAttr = AttributeReference(a.name,
+            convertCarbonToSparkDataType(carbonDimension,
+              relation.get.carbonRelation.carbonRelation),
+            a.nullable,
+            a.metadata)(a.exprId).asInstanceOf[Attribute]
+          newAttr
+        } else {
+          a
+        }
+      } else {
+        a
+      }
+    }
+  }
+
+  /**
+   * Whether the attributed requires to decode or not based on the profile.
+   */
+  def canBeDecoded(attr: Attribute, profile: CarbonProfile): Boolean = {
+    profile match {
+      case ip: IncludeProfile if ip.attributes.nonEmpty =>
+        ip.attributes
+          .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
+      case ep: ExcludeProfile =>
+        !ep.attributes
+          .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
+      case _ => true
+    }
+  }
+
+  /**
+   * Converts from carbon datatype to corresponding spark datatype.
+   */
+  def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
+      relation: CarbonRelation): types.DataType = {
+    carbonDimension.getDataType match {
+      case DataType.STRING => StringType
+      case DataType.SHORT => ShortType
+      case DataType.INT => IntegerType
+      case DataType.LONG => LongType
+      case DataType.DOUBLE => DoubleType
+      case DataType.BOOLEAN => BooleanType
+      case DataType.DECIMAL =>
+        val scale: Int = carbonDimension.getColumnSchema.getScale
+        val precision: Int = carbonDimension.getColumnSchema.getPrecision
+        if (scale == 0 && precision == 0) {
+          DecimalType(18, 2)
+        } else {
+          DecimalType(precision, scale)
+        }
+      case DataType.TIMESTAMP => TimestampType
+      case DataType.DATE => DateType
+      case DataType.STRUCT =>
+        CarbonMetastoreTypes
+          .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
+      case DataType.ARRAY =>
+        CarbonMetastoreTypes
+          .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
+    }
+  }
+}
+
 
 class CarbonDecoderRDD(
     relations: Seq[CarbonDecoderRelation],

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b282e50d/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index a20326f..8c38ec0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -145,6 +145,16 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
 
     def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = {
       currentPlan match {
+        case limit@GlobalLimit(_, LocalLimit(_, child: Sort)) =>
+          if (!decoder) {
+            decoder = true
+            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+              new util.HashSet[AttributeReferenceWrapper](),
+              limit,
+              isOuter = true)
+          } else {
+            limit
+          }
         case sort: Sort if !sort.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
           val attrsOnSort = new util.HashSet[AttributeReferenceWrapper]()
           sort.order.map { s =>


[2/2] incubator-carbondata git commit: [CARBONDATA-530] Keep the CarbonDecoder on top of limit in case of limit+sort plan This closes #666

Posted by ja...@apache.org.
[CARBONDATA-530] Keep the CarbonDecoder on top of limit in case of limit+sort plan This closes #666


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/acd5f1b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/acd5f1b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/acd5f1b4

Branch: refs/heads/master
Commit: acd5f1b48520d7e4f9edd661750e6305bf301cf7
Parents: 1dc3111 b282e50
Author: jackylk <ja...@huawei.com>
Authored: Fri Mar 17 22:14:34 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri Mar 17 22:14:34 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   5 +
 .../processor/AbstractDataBlockIterator.java    |   4 +-
 .../spark/sql/optimizer/CarbonOptimizer.scala   |  10 ++
 .../spark/sql/CarbonCatalystOperators.scala     |   4 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     | 155 +++++++++++--------
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  10 ++
 6 files changed, 119 insertions(+), 69 deletions(-)
----------------------------------------------------------------------