You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/03/17 14:14:51 UTC
[1/2] incubator-carbondata git commit: Modified optimizer to place
decoder on top of limit in case of sort and limit.
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 1dc3111e3 -> acd5f1b48
Modified optimizer to place decoder on top of limit in case of sort and limit.
Fixed comments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b282e50d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b282e50d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b282e50d
Branch: refs/heads/master
Commit: b282e50d95bcf0249b29eb3dd4327f3227dad411
Parents: 1dc3111
Author: ravipesala <ra...@gmail.com>
Authored: Fri Mar 17 00:48:26 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Fri Mar 17 21:53:32 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 5 +
.../processor/AbstractDataBlockIterator.java | 4 +-
.../spark/sql/optimizer/CarbonOptimizer.scala | 10 ++
.../spark/sql/CarbonCatalystOperators.scala | 4 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 155 +++++++++++--------
.../sql/optimizer/CarbonLateDecodeRule.scala | 10 ++
6 files changed, 119 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b282e50d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 853d3b1..6e2be5d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -174,6 +174,11 @@ public final class CarbonCommonConstants {
* MEMBER_DEFAULT_VAL_ARRAY
*/
public static final byte[] MEMBER_DEFAULT_VAL_ARRAY = MEMBER_DEFAULT_VAL.getBytes();
+
+ /**
+ * Bytes for string 0, it is used in codegen in case of null values.
+ */
+ public static final byte[] ZERO_BYTE_ARRAY = "0".getBytes();
/**
* FILE STATUS IN-PROGRESS
*/
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b282e50d/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index 4d01a05..aec4933 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -222,7 +222,9 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
if (null != future) {
try {
AbstractScannedResult abstractScannedResult = future.get();
- abstractScannedResult.freeMemory();
+ if (abstractScannedResult != null) {
+ abstractScannedResult.freeMemory();
+ }
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b282e50d/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 0e5b70c..11f5ea2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -207,6 +207,16 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = {
currentPlan match {
+ case limit@Limit(_, child: Sort) =>
+ if (!decoder) {
+ decoder = true
+ CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+ new util.HashSet[AttributeReferenceWrapper](),
+ limit,
+ isOuter = true)
+ } else {
+ limit
+ }
case sort: Sort if !sort.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
val attrsOnSort = new util.HashSet[AttributeReferenceWrapper]()
sort.order.map { s =>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b282e50d/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 2943a52..d94489b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -32,7 +32,9 @@ case class CarbonDictionaryCatalystDecoder(
aliasMap: CarbonAliasDecoderRelation,
isOuter: Boolean,
child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
+ // the output should be updated with converted datatype, it is need for limit+sort plan.
+ override val output: Seq[Attribute] =
+ CarbonDictionaryDecoder.convertOutput(child.output, relations, profile, aliasMap)
}
abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b282e50d/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 865f867..3916b1c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -49,83 +49,19 @@ case class CarbonDictionaryDecoder(
child: SparkPlan)
extends UnaryExecNode with CodegenSupport {
- override val output: Seq[Attribute] = {
- child.output.map { a =>
- val attr = aliasMap.getOrElse(a, a)
- val relation = relations.find(p => p.contains(attr))
- if (relation.isDefined && canBeDecoded(attr)) {
- val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
- val carbonDimension = carbonTable
- .getDimensionByName(carbonTable.getFactTableName, attr.name)
- if (carbonDimension != null &&
- carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
- !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
- !carbonDimension.isComplex()) {
- val newAttr = AttributeReference(a.name,
- convertCarbonToSparkDataType(carbonDimension,
- relation.get.carbonRelation.carbonRelation),
- a.nullable,
- a.metadata)(a.exprId).asInstanceOf[Attribute]
- newAttr
- } else {
- a
- }
- } else {
- a
- }
- }
- }
+ override val output: Seq[Attribute] =
+ CarbonDictionaryDecoder.convertOutput(child.output, relations, profile, aliasMap)
override def outputPartitioning: Partitioning = {
child.outputPartitioning
}
- def canBeDecoded(attr: Attribute): Boolean = {
- profile match {
- case ip: IncludeProfile if ip.attributes.nonEmpty =>
- ip.attributes
- .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
- case ep: ExcludeProfile =>
- !ep.attributes
- .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
- case _ => true
- }
- }
-
- def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
- relation: CarbonRelation): types.DataType = {
- carbonDimension.getDataType match {
- case DataType.STRING => StringType
- case DataType.SHORT => ShortType
- case DataType.INT => IntegerType
- case DataType.LONG => LongType
- case DataType.DOUBLE => DoubleType
- case DataType.BOOLEAN => BooleanType
- case DataType.DECIMAL =>
- val scale: Int = carbonDimension.getColumnSchema.getScale
- val precision: Int = carbonDimension.getColumnSchema.getPrecision
- if (scale == 0 && precision == 0) {
- DecimalType(18, 2)
- } else {
- DecimalType(precision, scale)
- }
- case DataType.TIMESTAMP => TimestampType
- case DataType.DATE => DateType
- case DataType.STRUCT =>
- CarbonMetastoreTypes
- .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
- case DataType.ARRAY =>
- CarbonMetastoreTypes
- .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
- }
- }
-
val getDictionaryColumnIds: Array[(String, ColumnIdentifier, CarbonDimension)] = {
child.output.map { attribute =>
val attr = aliasMap.getOrElse(attribute, attribute)
val relation = relations.find(p => p.contains(attr))
- if (relation.isDefined && canBeDecoded(attr)) {
+ if (relation.isDefined && CarbonDictionaryDecoder.canBeDecoded(attr, profile)) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension =
carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
@@ -235,6 +171,8 @@ case class CarbonDictionaryDecoder(
|if (java.util.Arrays.equals(org.apache.carbondata.core.constants
|.CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, $valueIntern)) {
| $isNull = true;
+ | $valueIntern = org.apache.carbondata.core.constants
+ | .CarbonCommonConstants.ZERO_BYTE_ARRAY;
|}
""".stripMargin
@@ -371,6 +309,89 @@ case class CarbonDictionaryDecoder(
}
+object CarbonDictionaryDecoder {
+
+ /**
+ * Converts the datatypes of attributes as per the decoder plan. If a column needs to be decoded
+ * here then that datatype is updated to its original datatype from Int type.
+ */
+ def convertOutput(output: Seq[Attribute],
+ relations: Seq[CarbonDecoderRelation],
+ profile: CarbonProfile,
+ aliasMap: CarbonAliasDecoderRelation): Seq[Attribute] = {
+ output.map { a =>
+ val attr = aliasMap.getOrElse(a, a)
+ val relation = relations.find(p => p.contains(attr))
+ if (relation.isDefined && canBeDecoded(attr, profile)) {
+ val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
+ val carbonDimension = carbonTable
+ .getDimensionByName(carbonTable.getFactTableName, attr.name)
+ if (carbonDimension != null &&
+ carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+ !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+ !carbonDimension.isComplex()) {
+ val newAttr = AttributeReference(a.name,
+ convertCarbonToSparkDataType(carbonDimension,
+ relation.get.carbonRelation.carbonRelation),
+ a.nullable,
+ a.metadata)(a.exprId).asInstanceOf[Attribute]
+ newAttr
+ } else {
+ a
+ }
+ } else {
+ a
+ }
+ }
+ }
+
+ /**
+ * Whether the attributed requires to decode or not based on the profile.
+ */
+ def canBeDecoded(attr: Attribute, profile: CarbonProfile): Boolean = {
+ profile match {
+ case ip: IncludeProfile if ip.attributes.nonEmpty =>
+ ip.attributes
+ .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
+ case ep: ExcludeProfile =>
+ !ep.attributes
+ .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
+ case _ => true
+ }
+ }
+
+ /**
+ * Converts from carbon datatype to corresponding spark datatype.
+ */
+ def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
+ relation: CarbonRelation): types.DataType = {
+ carbonDimension.getDataType match {
+ case DataType.STRING => StringType
+ case DataType.SHORT => ShortType
+ case DataType.INT => IntegerType
+ case DataType.LONG => LongType
+ case DataType.DOUBLE => DoubleType
+ case DataType.BOOLEAN => BooleanType
+ case DataType.DECIMAL =>
+ val scale: Int = carbonDimension.getColumnSchema.getScale
+ val precision: Int = carbonDimension.getColumnSchema.getPrecision
+ if (scale == 0 && precision == 0) {
+ DecimalType(18, 2)
+ } else {
+ DecimalType(precision, scale)
+ }
+ case DataType.TIMESTAMP => TimestampType
+ case DataType.DATE => DateType
+ case DataType.STRUCT =>
+ CarbonMetastoreTypes
+ .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
+ case DataType.ARRAY =>
+ CarbonMetastoreTypes
+ .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
+ }
+ }
+}
+
class CarbonDecoderRDD(
relations: Seq[CarbonDecoderRelation],
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b282e50d/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index a20326f..8c38ec0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -145,6 +145,16 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = {
currentPlan match {
+ case limit@GlobalLimit(_, LocalLimit(_, child: Sort)) =>
+ if (!decoder) {
+ decoder = true
+ CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+ new util.HashSet[AttributeReferenceWrapper](),
+ limit,
+ isOuter = true)
+ } else {
+ limit
+ }
case sort: Sort if !sort.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
val attrsOnSort = new util.HashSet[AttributeReferenceWrapper]()
sort.order.map { s =>
[2/2] incubator-carbondata git commit: [CARBONDATA-530] Keep the
CarbonDecoder on top of limit in case of limit+sort plan This closes #666
Posted by ja...@apache.org.
[CARBONDATA-530] Keep the CarbonDecoder on top of limit in case of limit+sort plan This closes #666
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/acd5f1b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/acd5f1b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/acd5f1b4
Branch: refs/heads/master
Commit: acd5f1b48520d7e4f9edd661750e6305bf301cf7
Parents: 1dc3111 b282e50
Author: jackylk <ja...@huawei.com>
Authored: Fri Mar 17 22:14:34 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri Mar 17 22:14:34 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 5 +
.../processor/AbstractDataBlockIterator.java | 4 +-
.../spark/sql/optimizer/CarbonOptimizer.scala | 10 ++
.../spark/sql/CarbonCatalystOperators.scala | 4 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 155 +++++++++++--------
.../sql/optimizer/CarbonLateDecodeRule.scala | 10 ++
6 files changed, 119 insertions(+), 69 deletions(-)
----------------------------------------------------------------------