You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/07 06:13:11 UTC

carbondata git commit: [CARBONDATA-1864] Using org.apache.spark.SPARK_VERSION instead of sparkSession.version

Repository: carbondata
Updated Branches:
  refs/heads/master 49763b72b -> d19f01855


[CARBONDATA-1864] Using org.apache.spark.SPARK_VERSION instead of sparkSession.version

Using org.apache.spark.SPARK_VERSION instead of sparkSession.version

This closes #1620


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d19f0185
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d19f0185
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d19f0185

Branch: refs/heads/master
Commit: d19f018555208300f67369610acd53851da4d00a
Parents: 49763b7
Author: QiangCai <qi...@qq.com>
Authored: Wed Dec 6 10:23:15 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Dec 7 14:12:59 2017 +0800

----------------------------------------------------------------------
 .../optimizer/CarbonDecoderOptimizerHelper.scala  |  4 ++--
 .../apache/spark/util/CarbonReflectionUtils.scala | 18 +++++++++---------
 .../spark/sql/hive/CarbonAnalysisRules.scala      |  8 ++------
 .../spark/sql/hive/CarbonFileMetastore.scala      |  7 +++----
 .../spark/sql/hive/CarbonPreAggregateRules.scala  |  3 ++-
 .../spark/sql/parser/CarbonSpark2SqlParser.scala  | 15 +++------------
 6 files changed, 21 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d19f0185/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
index 886f27c..fee4b66 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
@@ -22,7 +22,7 @@ import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.util.CarbonReflectionUtils
@@ -87,7 +87,7 @@ class CarbonDecoderProcessor {
         nodeList.add(ArrayCarbonNode(nodeListSeq))
       case e: UnaryNode => process(e.child, nodeList)
       case i: InsertIntoTable =>
-        val version = SparkSession.getActiveSession.get.version
+        val version = SPARK_VERSION
 
         val child: LogicalPlan = if (version.startsWith("2.1")) {
           CarbonReflectionUtils.getField("child", i).asInstanceOf[LogicalPlan]

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d19f0185/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 6864495..9a3f28e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
 import scala.reflect.runtime._
 import scala.reflect.runtime.universe._
 
+import org.apache.spark.SPARK_VERSION
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -56,20 +57,19 @@ object CarbonReflectionUtils {
 
   def getUnresolvedRelation(
       tableIdentifier: TableIdentifier,
-      version: String,
       tableAlias: Option[String] = None): UnresolvedRelation = {
     val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation"
-    if (version.startsWith("2.1")) {
+    if (SPARK_VERSION.startsWith("2.1")) {
       createObject(
         className,
         tableIdentifier,
         tableAlias)._1.asInstanceOf[UnresolvedRelation]
-    } else if (version.startsWith("2.2")) {
+    } else if (SPARK_VERSION.startsWith("2.2")) {
       createObject(
         className,
         tableIdentifier)._1.asInstanceOf[UnresolvedRelation]
     } else {
-      throw new UnsupportedOperationException(s"Unsupported Spark version $version")
+      throw new UnsupportedOperationException(s"Unsupported Spark version $SPARK_VERSION")
     }
   }
 
@@ -77,13 +77,13 @@ object CarbonReflectionUtils {
       relation: LogicalPlan,
       view: Option[TableIdentifier]): SubqueryAlias = {
     val className = "org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias"
-    if (sparkSession.version.startsWith("2.1")) {
+    if (SPARK_VERSION.startsWith("2.1")) {
       createObject(
         className,
         alias.getOrElse(""),
         relation,
         Option(view))._1.asInstanceOf[SubqueryAlias]
-    } else if (sparkSession.version.startsWith("2.2")) {
+    } else if (SPARK_VERSION.startsWith("2.2")) {
       createObject(
         className,
         alias.getOrElse(""),
@@ -130,7 +130,7 @@ object CarbonReflectionUtils {
   def getAstBuilder(conf: Object,
       sqlParser: Object,
       sparkSession: SparkSession): AstBuilder = {
-    if (sparkSession.version.startsWith("2.1") || sparkSession.version.startsWith("2.2")) {
+    if (SPARK_VERSION.startsWith("2.1") || SPARK_VERSION.startsWith("2.2")) {
       createObject(
         "org.apache.spark.sql.hive.CarbonSqlAstBuilder",
         conf,
@@ -141,12 +141,12 @@ object CarbonReflectionUtils {
   }
 
   def getSessionState(sparkContext: SparkContext, carbonSession: Object): Any = {
-    if (sparkContext.version.startsWith("2.1")) {
+    if (SPARK_VERSION.startsWith("2.1")) {
       val className = sparkContext.conf.get(
         CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
         "org.apache.spark.sql.hive.CarbonSessionState")
       createObject(className, carbonSession)._1
-    } else if (sparkContext.version.startsWith("2.2")) {
+    } else if (SPARK_VERSION.startsWith("2.2")) {
       val className = sparkContext.conf.get(
         CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
         "org.apache.spark.sql.hive.CarbonSessionStateBuilder")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d19f0185/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 0b3a2b3..b595896 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql._
 import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
-import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
 import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.plans.Inner
@@ -127,11 +127,7 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
     } else {
       updatedSelectPlan
     }
-    val destinationTable =
-      CarbonReflectionUtils.getUnresolvedRelation(
-        table.tableIdentifier,
-        sparkSession.version,
-        alias)
+    val destinationTable = CarbonReflectionUtils.getUnresolvedRelation(table.tableIdentifier, alias)
 
     ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d19f0185/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 5078259..cdbdb10 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -23,15 +23,14 @@ import java.util.concurrent.atomic.AtomicLong
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
-import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonSource, SparkSession}
 import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
-import org.apache.spark.sql.CarbonSource
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.SparkSession
 import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -147,7 +146,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
       case LogicalRelation(
       carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
         carbonDatasourceHadoopRelation.carbonRelation
-      case SubqueryAlias(_, c: CatalogRelation) if sparkSession.version.startsWith("2.2") =>
+      case SubqueryAlias(_, c: CatalogRelation) if SPARK_VERSION.startsWith("2.2") =>
         val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable(
           "tableMeta",
           c).asInstanceOf[CatalogTable]

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d19f0185/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 2b74ed7..426048f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql._
 import org.apache.spark.sql.CarbonExpressions.CarbonSubqueryAlias
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -1201,7 +1202,7 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
           case attr => attr
         }
       }
-      val version = sparkSession.version
+      val version = SPARK_VERSION
       val newChild: LogicalPlan = if (newChildOutput == child.output) {
         if (version.startsWith("2.1")) {
           CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d19f0185/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index b01c6d9..343db49 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.parser
 import scala.collection.mutable
 import scala.language.implicitConversions
 
-import org.apache.spark.sql.{DeleteRecords, SparkSession, UpdateTable}
+import org.apache.spark.sql.{DeleteRecords, UpdateTable}
 import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -244,12 +244,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         }
         // Use Reflection to choose between Spark2.1 and Spark2.2
         // Move UnresolvedRelation(tableIdentifier, tableAlias) to reflection.
-        val unresolvedrelation =
-        CarbonReflectionUtils.getUnresolvedRelation(
-          tableIdentifier,
-          SparkSession.getActiveSession.get.version,
-          tableAlias)
-        unresolvedrelation
+        CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, tableAlias)
     }
   }
 
@@ -273,11 +268,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
         // Use Reflection to choose between Spark2.1 and Spark2.2
         // Move (UnresolvedRelation(tableIdent, alias), tableIdent, alias) to reflection.
-        val unresolvedRelation =
-        CarbonReflectionUtils.getUnresolvedRelation(
-          tableIdentifier,
-          SparkSession.getActiveSession.get.version,
-          alias)
+        val unresolvedRelation = CarbonReflectionUtils.getUnresolvedRelation(tableIdentifier, alias)
 
         (unresolvedRelation, tableIdent, alias, tableIdentifier)
     }