You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/09 15:50:23 UTC

[21/45] carbondata git commit: [CARBONDATA-2989] Upgrade spark integration version to 2.3.2

[CARBONDATA-2989] Upgrade spark integration version to 2.3.2

1.According to SPARK-PR#22346, change the parameter type from 'outputColumns: Seq[Attribute]' to 'outputColumnNames: Seq[String]' when call 'writeAndRead' method; 2.According to SPARK-PR#21815, there are some parameters added 'lazy', so move original class 'CarbonDataSourceScan' to src path 'commonTo2.1And2.2', and add a new class 'CarbonDataSourceScan' in src path 'spark2.3' which is added some lazy parameters. 3.Upgrade spark integration version to 2.3.2.

This closes #2779


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2081bc87
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2081bc87
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2081bc87

Branch: refs/heads/branch-1.5
Commit: 2081bc87a5846055c861b28dfc1e3383c53e7ee0
Parents: 1c1ced3
Author: Zhang Zhichao <44...@qq.com>
Authored: Fri Sep 28 01:30:34 2018 +0800
Committer: chenliang613 <ch...@huawei.com>
Committed: Sun Sep 30 16:59:47 2018 +0800

----------------------------------------------------------------------
 .../testsuite/bigdecimal/TestBigDecimal.scala   |  2 +-
 .../spark/util/CarbonReflectionUtils.scala      |  6 +-
 integration/spark-datasource/pom.xml            |  2 +-
 integration/spark2/pom.xml                      |  5 +-
 .../strategy/CarbonDataSourceScan.scala         | 53 ++++++++++++++++++
 .../strategy/CarbonDataSourceScan.scala         | 53 ------------------
 .../strategy/CarbonDataSourceScan.scala         | 58 ++++++++++++++++++++
 pom.xml                                         |  4 +-
 8 files changed, 124 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
index 1f7aafe..551b00b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
@@ -45,7 +45,7 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
     sql("create table if not exists hiveTable(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary Decimal(17,2))row format delimited fields terminated by ','")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalDataWithHeader.csv' into table carbonTable")
     sql(s"LOAD DATA local inpath '$resourcesPath/decimalDataWithoutHeader.csv' INTO table hiveTable")
-    sql("create table if not exists hiveBigDecimal(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(27, 10))row format delimited fields terminated by ','")
+    sql("create table if not exists hiveBigDecimal(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10))row format delimited fields terminated by ','")
     sql(s"LOAD DATA local inpath '$resourcesPath/decimalBoundaryDataHive.csv' INTO table hiveBigDecimal")
     sql("create table if not exists carbonBigDecimal_2 (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10)) STORED BY 'org.apache.carbondata.format'")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal_2")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 9955286..0055e87 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -294,9 +294,11 @@ object CarbonReflectionUtils {
         .getMethod("writeAndRead",
           classOf[SaveMode],
           classOf[LogicalPlan],
-          classOf[Seq[Attribute]],
+          classOf[Seq[String]],
           classOf[SparkPlan])
-      method.invoke(dataSourceObj, mode, query, query.output, physicalPlan)
+      // since spark 2.3.2 version (SPARK-PR#22346),
+      // change 'query.output' to 'query.output.map(_.name)'
+      method.invoke(dataSourceObj, mode, query, query.output.map(_.name), physicalPlan)
         .asInstanceOf[BaseRelation]
     } else {
       throw new UnsupportedOperationException("Spark version not supported")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark-datasource/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/pom.xml b/integration/spark-datasource/pom.xml
index 9f0d3ff..002b9f3 100644
--- a/integration/spark-datasource/pom.xml
+++ b/integration/spark-datasource/pom.xml
@@ -278,7 +278,7 @@
     <profile>
       <id>spark-2.3</id>
       <properties>
-        <spark.version>2.3.1</spark.version>
+        <spark.version>2.3.2</spark.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scala.version>2.11.8</scala.version>
       </properties>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 1eba780..b0d8bbe 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -283,6 +283,7 @@
                 <configuration>
                   <sources>
                     <source>src/main/spark2.1</source>
+                    <source>src/main/commonTo2.1And2.2</source>
                   </sources>
                 </configuration>
               </execution>
@@ -328,6 +329,7 @@
                   <sources>
                     <source>src/main/spark2.2</source>
                     <source>src/main/commonTo2.2And2.3</source>
+                    <source>src/main/commonTo2.1And2.2</source>
                   </sources>
                 </configuration>
               </execution>
@@ -339,7 +341,7 @@
     <profile>
       <id>spark-2.3</id>
       <properties>
-        <spark.version>2.3.1</spark.version>
+        <spark.version>2.3.2</spark.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scala.version>2.11.8</scala.version>
       </properties>
@@ -352,6 +354,7 @@
               <excludes>
                 <exclude>src/main/spark2.1</exclude>
                 <exclude>src/main/spark2.2</exclude>
+                <exclude>src/main/commonTo2.1And2.2</exclude>
               </excludes>
             </configuration>
           </plugin>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
new file mode 100644
index 0000000..7605574
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.strategy
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+
+/**
+ *  Physical plan node for scanning data. It is applied for both tables
+ *  USING carbondata and STORED AS CARBONDATA.
+ */
+class CarbonDataSourceScan(
+    override val output: Seq[Attribute],
+    val rdd: RDD[InternalRow],
+    @transient override val relation: HadoopFsRelation,
+    val partitioning: Partitioning,
+    override val metadata: Map[String, String],
+    identifier: Option[TableIdentifier],
+    @transient private val logicalRelation: LogicalRelation)
+  extends FileSourceScanExec(
+    relation,
+    output,
+    relation.dataSchema,
+    Seq.empty,
+    Seq.empty,
+    identifier) {
+
+  override val supportsBatch: Boolean = true
+
+  override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
+    (partitioning, Nil)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
deleted file mode 100644
index 7605574..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.strategy
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
-
-/**
- *  Physical plan node for scanning data. It is applied for both tables
- *  USING carbondata and STORED AS CARBONDATA.
- */
-class CarbonDataSourceScan(
-    override val output: Seq[Attribute],
-    val rdd: RDD[InternalRow],
-    @transient override val relation: HadoopFsRelation,
-    val partitioning: Partitioning,
-    override val metadata: Map[String, String],
-    identifier: Option[TableIdentifier],
-    @transient private val logicalRelation: LogicalRelation)
-  extends FileSourceScanExec(
-    relation,
-    output,
-    relation.dataSchema,
-    Seq.empty,
-    Seq.empty,
-    identifier) {
-
-  override val supportsBatch: Boolean = true
-
-  override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
-    (partitioning, Nil)
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
new file mode 100644
index 0000000..5435f04
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.strategy
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+
+/**
+ *  Physical plan node for scanning data. It is applied for both tables
+ *  USING carbondata and STORED AS CARBONDATA.
+ */
+class CarbonDataSourceScan(
+    override val output: Seq[Attribute],
+    val rdd: RDD[InternalRow],
+    @transient override val relation: HadoopFsRelation,
+    val partitioning: Partitioning,
+    val md: Map[String, String],
+    identifier: Option[TableIdentifier],
+    @transient private val logicalRelation: LogicalRelation)
+  extends FileSourceScanExec(
+    relation,
+    output,
+    relation.dataSchema,
+    Seq.empty,
+    Seq.empty,
+    identifier) {
+
+  // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+  override lazy val supportsBatch: Boolean = true
+
+  // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+  override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
+    (partitioning, Nil)
+
+  // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+  override lazy val metadata: Map[String, String] = md
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 00a5287..7b1d487 100644
--- a/pom.xml
+++ b/pom.xml
@@ -522,6 +522,7 @@
                 <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.1</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
@@ -582,6 +583,7 @@
                 <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.2</sourceDirectory>
+                <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
                 <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
@@ -608,7 +610,7 @@
     <profile>
       <id>spark-2.3</id>
       <properties>
-        <spark.version>2.3.1</spark.version>
+        <spark.version>2.3.2</spark.version>
         <scala.binary.version>2.11</scala.binary.version>
         <scala.version>2.11.8</scala.version>
       </properties>