You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/11 06:34:58 UTC

[1/2] incubator-carbondata git commit: do not use inner interface of spark

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 56aa1f8c0 -> 2a6d097d1


do not use inner interface of spark

style

add r

style

comment fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/06d44608
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/06d44608
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/06d44608

Branch: refs/heads/master
Commit: 06d44608acaebe99a5a99e754a27e92262242004
Parents: 56aa1f8
Author: wangfei <wa...@126.com>
Authored: Sun Dec 11 07:04:37 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun Dec 11 14:34:15 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/CarbonDictionaryDecoder.scala     | 13 ++---
 .../spark/sql/SparkUnknownExpression.scala      |  4 +-
 .../execution/CarbonLateDecodeStrategy.scala    | 56 +++++++++++---------
 3 files changed, 39 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/06d44608/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index db864c7..940c6d7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -181,7 +181,7 @@ case class CarbonDictionaryDecoder(
                     getDictionaryColumnIds(index)._3)
                 }
               }
-              val result = unsafeProjection(new GenericMutableRow(data))
+              val result = unsafeProjection(new GenericInternalRow(data))
               total += System.currentTimeMillis() - startTime
               result
             }
@@ -223,11 +223,12 @@ case class CarbonDictionaryDecoder(
 
 
 
-class CarbonDecoderRDD(relations: Seq[CarbonDecoderRelation],
-                profile: CarbonProfile,
-                aliasMap: CarbonAliasDecoderRelation,
-                prev: RDD[Row],
-                       output: Seq[Attribute])
+class CarbonDecoderRDD(
+    relations: Seq[CarbonDecoderRelation],
+    profile: CarbonProfile,
+    aliasMap: CarbonAliasDecoderRelation,
+    prev: RDD[Row],
+    output: Seq[Attribute])
     extends RDD[Row](prev) {
 
   def canBeDecoded(attr: Attribute): Boolean = {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/06d44608/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index 1a310c7..b4b0f3c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -22,7 +22,7 @@ import java.util.{ArrayList, List}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericInternalRow}
 
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import org.apache.carbondata.scan.expression.{ColumnExpression, ExpressionResult, UnknownExpression}
@@ -48,7 +48,7 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
     }
     try {
       val result = evaluateExpression(
-        new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
+        new GenericInternalRow(values.map(a => a.asInstanceOf[Any]).toArray))
       val sparkRes = if (isExecutor) {
         result.asInstanceOf[InternalRow].get(0, sparkExp.dataType)
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/06d44608/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 57b2139..7a8920f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
@@ -63,10 +64,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
   }
 
 
-  def getDecoderRDD(logicalRelation: LogicalRelation,
-                    projectExprsNeedToDecode: ArrayBuffer[AttributeReference],
-                    rdd: RDD[Row],
-                    output: Seq[Attribute]): RDD[Row] = {
+  def getDecoderRDD(
+      logicalRelation: LogicalRelation,
+      projectExprsNeedToDecode: ArrayBuffer[AttributeReference],
+      rdd: RDD[Row],
+      output: Seq[Attribute]): RDD[Row] = {
     val table = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
     val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
       logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
@@ -83,10 +85,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
   }
 
   private[this] def toCatalystRDD(
-                                   relation: LogicalRelation,
-                                   output: Seq[Attribute],
-                                   rdd: RDD[Row],
-                                   needDecode: ArrayBuffer[AttributeReference]):
+      relation: LogicalRelation,
+      output: Seq[Attribute],
+      rdd: RDD[Row],
+      needDecode: ArrayBuffer[AttributeReference]):
   RDD[InternalRow] = {
     val newRdd = if (needDecode.size > 0) {
       getDecoderRDD(relation, needDecode, rdd, output)
@@ -101,12 +103,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
   }
 
   protected def pruneFilterProject(
-                                    relation: LogicalRelation,
-                                    projects: Seq[NamedExpression],
-                                    filterPredicates: Seq[Expression],
-                                    scanBuilder: (Seq[Attribute], Array[Filter],
-                                        ArrayBuffer[AttributeReference]) =>
-                                        RDD[InternalRow]) = {
+      relation: LogicalRelation,
+      projects: Seq[NamedExpression],
+      filterPredicates: Seq[Expression],
+      scanBuilder: (Seq[Attribute], Array[Filter],
+        ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
     pruneFilterProjectRaw(
       relation,
       projects,
@@ -117,12 +118,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
   }
 
   protected def pruneFilterProjectRaw(
-                                       relation: LogicalRelation,
-                                       projects: Seq[NamedExpression],
-                                       filterPredicates: Seq[Expression],
-                                       scanBuilder: (Seq[Attribute], Seq[Expression],
-                                           Seq[Filter], ArrayBuffer[AttributeReference]) =>
-                                           RDD[InternalRow]) = {
+      relation: LogicalRelation,
+      projects: Seq[NamedExpression],
+      filterPredicates: Seq[Expression],
+      scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
+        ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
 
     val projectSet = AttributeSet(projects.flatMap(_.references))
     val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
@@ -212,20 +212,24 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         }
         attr
       }
-      val scan = execution.DataSourceScanExec.create(
+      val scan = new execution.RowDataSourceScanExec(
         updateProject,
         scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
-        relation.relation, metadata, relation.metastoreTableIdentifier)
+        // now carbon do not support partitioning, use UnknownPartitioning here, in future if
+        // we add bucket, we should change the partitioning
+        relation.relation, UnknownPartitioning(0), metadata, None)
       filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
     } else {
       // Don't request columns that are only referenced by pushed filters.
       val requestedColumns =
       (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
       val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
-      val scan = execution.DataSourceScanExec.create(
+      val scan = new execution.RowDataSourceScanExec(
         updateRequestedColumns,
         scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
-        relation.relation, metadata, relation.metastoreTableIdentifier)
+        // now carbon do not support partitioning, use UnknownPartitioning here, in future if
+        // we add bucket, we should change the partitioning
+        relation.relation, UnknownPartitioning(0), metadata, None)
       execution.ProjectExec(
         projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
     }
@@ -254,8 +258,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
 
 
   protected[sql] def selectFilters(
-                                    relation: BaseRelation,
-                                    predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {
+      relation: BaseRelation,
+      predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {
 
     // For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are
     // called `predicate`s, while all data source filters of type `sources.Filter` are simply called


[2/2] incubator-carbondata git commit: [CARBONDATA-521]Depends on more stable class of spark in spark2 This closes #415

Posted by ja...@apache.org.
[CARBONDATA-521]Depends on more stable class of spark in spark2 This closes #415


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2a6d097d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2a6d097d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2a6d097d

Branch: refs/heads/master
Commit: 2a6d097d19bfd91a0f25de882125fa26ad4a9756
Parents: 56aa1f8 06d4460
Author: jackylk <ja...@huawei.com>
Authored: Sun Dec 11 14:34:42 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun Dec 11 14:34:42 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/CarbonDictionaryDecoder.scala     | 13 ++---
 .../spark/sql/SparkUnknownExpression.scala      |  4 +-
 .../execution/CarbonLateDecodeStrategy.scala    | 56 +++++++++++---------
 3 files changed, 39 insertions(+), 34 deletions(-)
----------------------------------------------------------------------