You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/11 06:34:58 UTC
[1/2] incubator-carbondata git commit: do not use inner interface of
spark
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 56aa1f8c0 -> 2a6d097d1
do not use inner interface of spark
style
add r
style
comment fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/06d44608
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/06d44608
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/06d44608
Branch: refs/heads/master
Commit: 06d44608acaebe99a5a99e754a27e92262242004
Parents: 56aa1f8
Author: wangfei <wa...@126.com>
Authored: Sun Dec 11 07:04:37 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun Dec 11 14:34:15 2016 +0800
----------------------------------------------------------------------
.../spark/sql/CarbonDictionaryDecoder.scala | 13 ++---
.../spark/sql/SparkUnknownExpression.scala | 4 +-
.../execution/CarbonLateDecodeStrategy.scala | 56 +++++++++++---------
3 files changed, 39 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/06d44608/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index db864c7..940c6d7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -181,7 +181,7 @@ case class CarbonDictionaryDecoder(
getDictionaryColumnIds(index)._3)
}
}
- val result = unsafeProjection(new GenericMutableRow(data))
+ val result = unsafeProjection(new GenericInternalRow(data))
total += System.currentTimeMillis() - startTime
result
}
@@ -223,11 +223,12 @@ case class CarbonDictionaryDecoder(
-class CarbonDecoderRDD(relations: Seq[CarbonDecoderRelation],
- profile: CarbonProfile,
- aliasMap: CarbonAliasDecoderRelation,
- prev: RDD[Row],
- output: Seq[Attribute])
+class CarbonDecoderRDD(
+ relations: Seq[CarbonDecoderRelation],
+ profile: CarbonProfile,
+ aliasMap: CarbonAliasDecoderRelation,
+ prev: RDD[Row],
+ output: Seq[Attribute])
extends RDD[Row](prev) {
def canBeDecoded(attr: Attribute): Boolean = {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/06d44608/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index 1a310c7..b4b0f3c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -22,7 +22,7 @@ import java.util.{ArrayList, List}
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericInternalRow}
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
import org.apache.carbondata.scan.expression.{ColumnExpression, ExpressionResult, UnknownExpression}
@@ -48,7 +48,7 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
}
try {
val result = evaluateExpression(
- new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
+ new GenericInternalRow(values.map(a => a.asInstanceOf[Any]).toArray))
val sparkRes = if (isExecutor) {
result.asInstanceOf[InternalRow].get(0, sparkExp.dataType)
} else {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/06d44608/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 57b2139..7a8920f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.sources.{BaseRelation, Filter}
@@ -63,10 +64,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
}
- def getDecoderRDD(logicalRelation: LogicalRelation,
- projectExprsNeedToDecode: ArrayBuffer[AttributeReference],
- rdd: RDD[Row],
- output: Seq[Attribute]): RDD[Row] = {
+ def getDecoderRDD(
+ logicalRelation: LogicalRelation,
+ projectExprsNeedToDecode: ArrayBuffer[AttributeReference],
+ rdd: RDD[Row],
+ output: Seq[Attribute]): RDD[Row] = {
val table = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
@@ -83,10 +85,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
}
private[this] def toCatalystRDD(
- relation: LogicalRelation,
- output: Seq[Attribute],
- rdd: RDD[Row],
- needDecode: ArrayBuffer[AttributeReference]):
+ relation: LogicalRelation,
+ output: Seq[Attribute],
+ rdd: RDD[Row],
+ needDecode: ArrayBuffer[AttributeReference]):
RDD[InternalRow] = {
val newRdd = if (needDecode.size > 0) {
getDecoderRDD(relation, needDecode, rdd, output)
@@ -101,12 +103,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
}
protected def pruneFilterProject(
- relation: LogicalRelation,
- projects: Seq[NamedExpression],
- filterPredicates: Seq[Expression],
- scanBuilder: (Seq[Attribute], Array[Filter],
- ArrayBuffer[AttributeReference]) =>
- RDD[InternalRow]) = {
+ relation: LogicalRelation,
+ projects: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ scanBuilder: (Seq[Attribute], Array[Filter],
+ ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
pruneFilterProjectRaw(
relation,
projects,
@@ -117,12 +118,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
}
protected def pruneFilterProjectRaw(
- relation: LogicalRelation,
- projects: Seq[NamedExpression],
- filterPredicates: Seq[Expression],
- scanBuilder: (Seq[Attribute], Seq[Expression],
- Seq[Filter], ArrayBuffer[AttributeReference]) =>
- RDD[InternalRow]) = {
+ relation: LogicalRelation,
+ projects: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
+ ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
val projectSet = AttributeSet(projects.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
@@ -212,20 +212,24 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
}
attr
}
- val scan = execution.DataSourceScanExec.create(
+ val scan = new execution.RowDataSourceScanExec(
updateProject,
scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
- relation.relation, metadata, relation.metastoreTableIdentifier)
+ // now carbon do not support partitioning, use UnknownPartitioning here, in future if
+ // we add bucket, we should change the partitioning
+ relation.relation, UnknownPartitioning(0), metadata, None)
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
- val scan = execution.DataSourceScanExec.create(
+ val scan = new execution.RowDataSourceScanExec(
updateRequestedColumns,
scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
- relation.relation, metadata, relation.metastoreTableIdentifier)
+ // now carbon do not support partitioning, use UnknownPartitioning here, in future if
+ // we add bucket, we should change the partitioning
+ relation.relation, UnknownPartitioning(0), metadata, None)
execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
@@ -254,8 +258,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
protected[sql] def selectFilters(
- relation: BaseRelation,
- predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {
+ relation: BaseRelation,
+ predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {
// For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are
// called `predicate`s, while all data source filters of type `sources.Filter` are simply called
[2/2] incubator-carbondata git commit: [CARBONDATA-521]Depends on
more stable class of spark in spark2 This closes #415
Posted by ja...@apache.org.
[CARBONDATA-521]Depends on more stable class of spark in spark2 This closes #415
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2a6d097d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2a6d097d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2a6d097d
Branch: refs/heads/master
Commit: 2a6d097d19bfd91a0f25de882125fa26ad4a9756
Parents: 56aa1f8 06d4460
Author: jackylk <ja...@huawei.com>
Authored: Sun Dec 11 14:34:42 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun Dec 11 14:34:42 2016 +0800
----------------------------------------------------------------------
.../spark/sql/CarbonDictionaryDecoder.scala | 13 ++---
.../spark/sql/SparkUnknownExpression.scala | 4 +-
.../execution/CarbonLateDecodeStrategy.scala | 56 +++++++++++---------
3 files changed, 39 insertions(+), 34 deletions(-)
----------------------------------------------------------------------