You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/16 02:38:08 UTC
[3/4] incubator-carbondata git commit: fixUnionIssue and add test case
fixUnionIssue and add test case
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/462f6422
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/462f6422
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/462f6422
Branch: refs/heads/master
Commit: 462f64226428fc255938d8752226cda262ad0ae4
Parents: 526243b
Author: QiangCai <qi...@qq.com>
Authored: Thu Dec 8 19:06:33 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri Dec 16 10:13:39 2016 +0800
----------------------------------------------------------------------
.../impl/DictionaryBasedResultCollector.java | 29 +-
.../DictionaryBasedResultCollectorTest.java | 9 +-
.../carbondata/examples/CarbonExample.scala | 12 +
.../apache/carbondata/spark/CarbonFilters.scala | 7 +
.../CarbonDecoderOptimizerHelper.scala | 24 +-
.../readsupport/SparkRowReadSupportImpl.java | 5 +-
.../apache/carbondata/spark/CarbonFilters.scala | 6 +
.../spark/sql/CarbonDataFrameWriter.scala | 3 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 4 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 60 +-
.../scala/org/apache/spark/sql/CarbonScan.scala | 44 +-
.../org/apache/spark/sql/CarbonSource.scala | 3 +-
.../sql/optimizer/CarbonLateDecodeRule.scala | 124 +-
integration/spark2/src/test/resources/data.csv | 11 +
.../AllDataTypesTestCaseAggregate.scala | 1161 ++++++++++++++++++
.../spark/sql/common/util/CarbonFunSuite.scala | 49 +
.../sql/common/util/CarbonSessionTest.scala | 74 ++
.../apache/spark/sql/common/util/PlanTest.scala | 59 +
.../spark/sql/common/util/QueryTest.scala | 149 +++
19 files changed, 1654 insertions(+), 179 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
index 108677f..2462caa 100644
--- a/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -20,11 +20,13 @@ package org.apache.carbondata.scan.collector.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -35,6 +37,7 @@ import org.apache.carbondata.scan.model.QueryDimension;
import org.apache.carbondata.scan.model.QueryMeasure;
import org.apache.carbondata.scan.result.AbstractScannedResult;
+import org.apache.commons.lang3.ArrayUtils;
/**
* It is not a collector it is just a scanned result holder.
*/
@@ -52,9 +55,31 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
* it will keep track of how many record is processed, to handle limit scenario
*/
@Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
+
List<Object[]> listBasedResult = new ArrayList<>(batchSize);
boolean isMsrsPresent = measureDatatypes.length > 0;
+
QueryDimension[] queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
+ List<Integer> dictionaryIndexes = new ArrayList<Integer>();
+ for (int i = 0; i < queryDimensions.length; i++) {
+ if(queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) ||
+ queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY) ) {
+ dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal());
+ }
+ }
+ int[] primitive = ArrayUtils.toPrimitive(dictionaryIndexes.toArray(
+ new Integer[dictionaryIndexes.size()]));
+ Arrays.sort(primitive);
+ int[] actualIndexInSurrogateKey = new int[dictionaryIndexes.size()];
+ int index = 0;
+ for (int i = 0; i < queryDimensions.length; i++) {
+ if(queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) ||
+ queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY) ) {
+ actualIndexInSurrogateKey[index++] = Arrays.binarySearch(primitive,
+ queryDimensions[i].getDimension().getOrdinal());
+ }
+ }
+
QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
Map<Integer, GenericQueryType> comlexDimensionInfoMap =
tableBlockExecutionInfos.getComlexDimensionInfoMap();
@@ -99,7 +124,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
.getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
if (directDictionaryGenerator != null) {
row[order[i]] = directDictionaryGenerator.getValueFromSurrogate(
- surrogateResult[dictionaryColumnIndex++]);
+ surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]);
}
} else if (complexDataTypeArray[i]) {
row[order[i]] = comlexDimensionInfoMap
@@ -107,7 +132,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
.getDataBasedOnDataTypeFromSurrogates(
ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
} else {
- row[order[i]] = surrogateResult[dictionaryColumnIndex++];
+ row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]];
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java b/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java
index 7f21d90..48eab1e 100644
--- a/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java
+++ b/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java
@@ -19,14 +19,12 @@
package org.apache.carbondata.scan.collector.impl;
import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.PresenceMeta;
import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -74,6 +72,9 @@ public class DictionaryBasedResultCollectorTest {
QueryDimension queryDimension1 = new QueryDimension("QDCol1");
queryDimension1.setQueryOrder(1);
ColumnSchema columnSchema = new ColumnSchema();
+ List encodeList= new ArrayList<Encoding>();
+ encodeList.add(Encoding.DICTIONARY);
+ columnSchema.setEncodingList(encodeList);
queryDimension1.setDimension(new CarbonDimension(columnSchema, 0, 0, 0, 0));
QueryDimension queryDimension2 = new QueryDimension("QDCol2");
queryDimension2.setQueryOrder(2);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 4aff45a..c2e135a 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -153,6 +153,18 @@ object CarbonExample {
|where t1.stringField = t2.stringField
""".stripMargin).show
+ spark.sql(
+ """
+ |with t1 as (
+ |select * from carbon_table
+ |union all
+ |select * from carbon_table
+ |)
+ |select t1.*, t2.*
+ |from t1, carbon_table t2
+ |where t1.stringField = t2.stringField
+ """.stripMargin).show
+
// Drop table
spark.sql("DROP TABLE IF EXISTS carbon_table")
spark.sql("DROP TABLE IF EXISTS csv_table")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
index 2a580dc..5e58235 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -83,6 +83,13 @@ object CarbonFilters {
new ListExpression(
convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+ case sources.IsNull(name) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.IsNotNull(name) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+
case sources.And(lhs, rhs) =>
(createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
index 7909a13..6e84e7e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
@@ -29,7 +29,7 @@ abstract class AbstractNode
case class Node(cd: CarbonDictionaryTempDecoder) extends AbstractNode
-case class BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode])
+case class ArrayCarbonNode(children: Seq[util.List[AbstractNode]])
extends AbstractNode
case class CarbonDictionaryTempDecoder(
@@ -70,9 +70,16 @@ class CarbonDecoderProcessor {
case j: BinaryNode =>
val leftList = new util.ArrayList[AbstractNode]
val rightList = new util.ArrayList[AbstractNode]
- nodeList.add(BinaryCarbonNode(leftList, rightList))
+ nodeList.add(ArrayCarbonNode(Seq(leftList, rightList)))
process(j.left, leftList)
process(j.right, rightList)
+ case u: Union =>
+ val nodeListSeq = u.children.map { child =>
+ val list = new util.ArrayList[AbstractNode]
+ process(child, list)
+ list
+ }
+ nodeList.add(ArrayCarbonNode(nodeListSeq))
case e: UnaryNode => process(e.child, nodeList)
case _ =>
}
@@ -91,13 +98,12 @@ class CarbonDecoderProcessor {
decoderNotDecode.asScala.foreach(cd.attrsNotDecode.add)
decoderNotDecode.asScala.foreach(cd.attrList.remove)
decoderNotDecode.addAll(cd.attrList)
- case BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode]) =>
- val leftNotDecode = new util.HashSet[AttributeReferenceWrapper]
- val rightNotDecode = new util.HashSet[AttributeReferenceWrapper]
- updateDecoderInternal(left.asScala, leftNotDecode)
- updateDecoderInternal(right.asScala, rightNotDecode)
- decoderNotDecode.addAll(leftNotDecode)
- decoderNotDecode.addAll(rightNotDecode)
+ case ArrayCarbonNode(children) =>
+ children.foreach { child =>
+ val notDecode = new util.HashSet[AttributeReferenceWrapper]
+ updateDecoderInternal(child.asScala, notDecode)
+ decoderNotDecode.addAll(notDecode)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 1bfcdea..499ef0c 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -39,7 +39,10 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor
@Override public Row readRow(Object[] data) {
for (int i = 0; i < dictionaries.length; i++) {
- if (dictionaries[i] == null && data[i] != null) {
+ if (data[i] == null) {
+ continue;
+ }
+ if (dictionaries[i] == null) {
if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
//convert the long to timestamp in case of direct dictionary column
if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
index 2cd4eb7..6d9fb24 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -83,6 +83,12 @@ object CarbonFilters {
new ListExpression(
convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+ case sources.IsNull(name) =>
+ Some(new EqualToExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
+ case sources.IsNotNull(name) =>
+ Some(new NotEqualsExpression(getCarbonExpression(name),
+ getCarbonLiteralExpression(name, null), true))
case sources.And(lhs, rhs) =>
(createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 5db5d14..3057bee 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -146,7 +146,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
s"""
CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
(${ carbonSchema.mkString(", ") })
- using 'org.apache.spark.sql.CarbonRelationProvider'
+ using org.apache.spark.sql.CarbonSource
+ OPTIONS('dbName'='${options.dbName}', 'tableName'='${options.tableName}')
"""
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 997106c..09a58ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -21,9 +21,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.{ExecutedCommandExec, LoadTableByInsert}
+import org.apache.spark.sql.execution.command.LoadTableByInsert
import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, PrunedFilteredScan}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
import org.apache.spark.sql.types.StructType
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 3b63021..9a625ee 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -303,41 +303,41 @@ class CarbonDecoderRDD(
(carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
}.toMap
- val cacheProvider: CacheProvider = CacheProvider.getInstance
- val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
- cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath)
- val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
- forwardDictionaryCache)
- val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
- // add a task completion listener to clear dictionary that is a decisive factor for
- // LRU eviction policy
- val dictionaryTaskCleaner = TaskContext.get
- dictionaryTaskCleaner.addTaskCompletionListener(context =>
- dicts.foreach { dictionary =>
- if (null != dictionary) {
- dictionary.clear
- }
+ val cacheProvider: CacheProvider = CacheProvider.getInstance
+ val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
+ cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath)
+ val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
+ forwardDictionaryCache)
+ val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
+ // add a task completion listener to clear dictionary that is a decisive factor for
+ // LRU eviction policy
+ val dictionaryTaskCleaner = TaskContext.get
+ dictionaryTaskCleaner.addTaskCompletionListener(context =>
+ dicts.foreach { dictionary =>
+ if (null != dictionary) {
+ dictionary.clear
}
- )
- val iter = firstParent[Row].iterator(split, context)
- new Iterator[Row] {
- var flag = true
- var total = 0L
- override final def hasNext: Boolean = iter.hasNext
+ }
+ )
+ val iter = firstParent[Row].iterator(split, context)
+ new Iterator[Row] {
+ var flag = true
+ var total = 0L
+ override final def hasNext: Boolean = iter.hasNext
- override final def next(): Row = {
- val startTime = System.currentTimeMillis()
- val data = iter.next().asInstanceOf[GenericRow].toSeq.toArray
- dictIndex.foreach { index =>
- if ( data(index) != null) {
- data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
- .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
- getDictionaryColumnIds(index)._3)
- }
+ override final def next(): Row = {
+ val startTime = System.currentTimeMillis()
+ val data = iter.next().asInstanceOf[GenericRow].toSeq.toArray
+ dictIndex.foreach { index =>
+ if ( data(index) != null) {
+ data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
+ .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+ getDictionaryColumnIds(index)._3)
}
- new GenericRow(data)
}
+ new GenericRow(data)
}
+ }
}
private def isRequiredToDecode = {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 9e42b44..19c3c9c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -78,39 +78,27 @@ case class CarbonScan(
attributesRaw = attributeOut
}
- val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
- val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
- val dimAttr = new Array[Attribute](dimensions.size())
- val msrAttr = new Array[Attribute](measures.size())
+ val columns = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName)
+ val colAttr = new Array[Attribute](columns.size())
attributesRaw.foreach { attr =>
- val carbonDimension =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
- if(carbonDimension != null) {
- dimAttr(dimensions.indexOf(carbonDimension)) = attr
- } else {
- val carbonMeasure =
- carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
- if(carbonMeasure != null) {
- msrAttr(measures.indexOf(carbonMeasure)) = attr
- }
- }
+ val column =
+ carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
+ if(column != null) {
+ colAttr(columns.indexOf(column)) = attr
+ }
}
-
- attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
+ attributesRaw = colAttr.filter(f => f != null)
var queryOrder: Integer = 0
attributesRaw.foreach { attr =>
- val carbonDimension =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
- if (carbonDimension != null) {
- val dim = new QueryDimension(attr.name)
- dim.setQueryOrder(queryOrder)
- queryOrder = queryOrder + 1
- selectedDims += dim
- } else {
- val carbonMeasure =
- carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
- if (carbonMeasure != null) {
+ val carbonColumn = carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name)
+ if (carbonColumn != null) {
+ if (carbonColumn.isDimesion()) {
+ val dim = new QueryDimension(attr.name)
+ dim.setQueryOrder(queryOrder)
+ queryOrder = queryOrder + 1
+ selectedDims += dim
+ } else {
val m1 = new QueryMeasure(attr.name)
m1.setQueryOrder(queryOrder)
queryOrder = queryOrder + 1
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 2ba8a03..b639ea8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.CarbonOption
/**
@@ -54,7 +55,7 @@ class CarbonSource extends CreatableRelationProvider
"the path to store carbon file is the 'storePath' specified when creating CarbonContext")
val options = new CarbonOption(parameters)
- val storePath = sqlContext.sparkSession.conf.get(CarbonCommonConstants.STORE_LOCATION)
+ val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
.exists(tablePath)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index fb9df70..b3a7d5a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -165,34 +165,27 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
case union: Union
if !(union.children(0).isInstanceOf[CarbonDictionaryTempDecoder] ||
union.children(1).isInstanceOf[CarbonDictionaryTempDecoder]) =>
- val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
- val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
- union.children(0).output.foreach(attr =>
- leftCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
- union.children(1).output.foreach(attr =>
- rightCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
- var leftPlan = union.children(0)
- var rightPlan = union.children(1)
- if (hasCarbonRelation(leftPlan) && leftCondAttrs.size() > 0 &&
- !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
- leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
- new util.HashSet[AttributeReferenceWrapper](),
- union.children(0))
- }
- if (hasCarbonRelation(rightPlan) && rightCondAttrs.size() > 0 &&
- !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
- rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
- new util.HashSet[AttributeReferenceWrapper](),
- union.children(1))
+ val children = union.children.map { child =>
+ val condAttrs = new util.HashSet[AttributeReferenceWrapper]
+ child.output.foreach(attr =>
+ condAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
+ if (hasCarbonRelation(child) && condAttrs.size() > 0 &&
+ !child.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+ CarbonDictionaryTempDecoder(condAttrs,
+ new util.HashSet[AttributeReferenceWrapper](),
+ union.children(0))
+ } else {
+ child
+ }
}
if (!decoder) {
decoder = true
CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
new util.HashSet[AttributeReferenceWrapper](),
- Union(leftPlan, rightPlan),
+ Union(children),
isOuter = true)
} else {
- Union(leftPlan, rightPlan)
+ Union(children)
}
case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
@@ -487,68 +480,32 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
case cd: CarbonDictionaryCatalystDecoder =>
cd
case sort: Sort =>
- val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
- if (sort.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
- val tempDecoder = sort.child.asInstanceOf[CarbonDictionaryTempDecoder]
- tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
- }
val sortExprs = sort.order.map { s =>
s.transform {
case attr: AttributeReference =>
- val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
- if(tempAttr.isDefined) {
- tempAttr.get
- } else {
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
}.asInstanceOf[SortOrder]
}
Sort(sortExprs, sort.global, sort.child)
case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] =>
- val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
- if (agg.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
- val tempDecoder = agg.child.asInstanceOf[CarbonDictionaryTempDecoder]
- tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
- }
-
val aggExps = agg.aggregateExpressions.map { aggExp =>
aggExp.transform {
case attr: AttributeReference =>
- val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
- if(tempAttr.isDefined) {
- tempAttr.get
- } else {
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
}
}.asInstanceOf[Seq[NamedExpression]]
val grpExps = agg.groupingExpressions.map { gexp =>
gexp.transform {
case attr: AttributeReference =>
- val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
- if(tempAttr.isDefined) {
- tempAttr.get
- } else {
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
}
}
Aggregate(grpExps, aggExps, agg.child)
case expand: Expand =>
- val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
- if (expand.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
- val tempDecoder = expand.child.asInstanceOf[CarbonDictionaryTempDecoder]
- tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
- }
expand.transformExpressions {
case attr: AttributeReference =>
- val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
- if(tempAttr.isDefined) {
- tempAttr.get
- } else {
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
}
case filter: Filter =>
filter
@@ -559,71 +516,36 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
marker.pushBinaryMarker(allAttrsNotDecode)
u
case p: Project if relations.nonEmpty =>
- val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
- if (p.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
- val tempDecoder = p.child.asInstanceOf[CarbonDictionaryTempDecoder]
- tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
- }
val prExps = p.projectList.map { prExp =>
prExp.transform {
case attr: AttributeReference =>
- val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
- if(tempAttr.isDefined) {
- tempAttr.get
- } else {
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
}
}.asInstanceOf[Seq[NamedExpression]]
Project(prExps, p.child)
case wd: Window if relations.nonEmpty =>
- val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
- if (wd.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
- val tempDecoder = wd.child.asInstanceOf[CarbonDictionaryTempDecoder]
- tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
- }
val prExps = wd.output.map { prExp =>
prExp.transform {
case attr: AttributeReference =>
- val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
- if(tempAttr.isDefined) {
- tempAttr.get
- } else {
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
}
}.asInstanceOf[Seq[Attribute]]
val wdExps = wd.windowExpressions.map { gexp =>
gexp.transform {
case attr: AttributeReference =>
- val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
- if(tempAttr.isDefined) {
- tempAttr.get
- } else {
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
}
}.asInstanceOf[Seq[NamedExpression]]
val partitionSpec = wd.partitionSpec.map{ exp =>
exp.transform {
case attr: AttributeReference =>
- val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
- if(tempAttr.isDefined) {
- tempAttr.get
- } else {
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
}
}
val orderSpec = wd.orderSpec.map { exp =>
exp.transform {
case attr: AttributeReference =>
- val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
- if(tempAttr.isDefined) {
- tempAttr.get
- } else {
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
}
}.asInstanceOf[Seq[SortOrder]]
Window(wdExps, partitionSpec, orderSpec, wd.child)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/test/resources/data.csv
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/resources/data.csv b/integration/spark2/src/test/resources/data.csv
new file mode 100644
index 0000000..4ff67da
--- /dev/null
+++ b/integration/spark2/src/test/resources/data.csv
@@ -0,0 +1,11 @@
+empno,empname,designation,doj,workgroupcategory,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary
+11,arvind,SE,17-01-2007,1,developer,10,network,928478,17-02-2007,29-11-2016,96,96.2,5040.56
+12,krithin,SSE,29-05-2008,1,developer,11,protocol,928378,29-06-2008,30-12-2016,85,95.1,7124.21
+13,madhan,TPL,07-07-2009,2,tester,10,network,928478,07-08-2009,30-12-2016,88,99,9054.235
+14,anandh,SA,29-12-2010,3,manager,11,protocol,928278,29-01-2011,29-06-2016,77,92.2,11248.25
+15,ayushi,SSA,09-11-2011,1,developer,12,security,928375,09-12-2011,29-05-2016,99,91.5,13245.48
+16,pramod,SE,14-10-2012,1,developer,13,configManagement,928478,14-11-2012,29-12-2016,86,93,5040.56
+17,gawrav,PL,22-09-2013,2,tester,12,security,928778,22-10-2013,15-11-2016,78,97.45,9574.24
+18,sibi,TL,15-08-2014,2,tester,14,Learning,928176,15-09-2014,29-05-2016,84,98.23,7245.25
+19,shivani,PL,12-05-2015,1,developer,10,network,928977,12-06-2015,12-11-2016,88,91.678,11254.24
+20,bill,PM,01-12-2015,3,manager,14,Learning,928479,01-01-2016,30-11-2016,75,94.22,13547.25