You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2018/09/30 09:00:16 UTC
carbondata git commit: [CARBONDATA-2989] Upgrade spark integration
version to 2.3.2
Repository: carbondata
Updated Branches:
refs/heads/master 1c1ced32d -> 2081bc87a
[CARBONDATA-2989] Upgrade spark integration version to 2.3.2
1.According to SPARK-PR#22346, change the parameter type from 'outputColumns: Seq[Attribute]' to 'outputColumnNames: Seq[String]' when call 'writeAndRead' method; 2.According to SPARK-PR#21815, there are some parameters added 'lazy', so move original class 'CarbonDataSourceScan' to src path 'commonTo2.1And2.2', and add a new class 'CarbonDataSourceScan' in src path 'spark2.3' which is added some lazy parameters. 3.Upgrade spark integration version to 2.3.2.
This closes #2779
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2081bc87
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2081bc87
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2081bc87
Branch: refs/heads/master
Commit: 2081bc87a5846055c861b28dfc1e3383c53e7ee0
Parents: 1c1ced3
Author: Zhang Zhichao <44...@qq.com>
Authored: Fri Sep 28 01:30:34 2018 +0800
Committer: chenliang613 <ch...@huawei.com>
Committed: Sun Sep 30 16:59:47 2018 +0800
----------------------------------------------------------------------
.../testsuite/bigdecimal/TestBigDecimal.scala | 2 +-
.../spark/util/CarbonReflectionUtils.scala | 6 +-
integration/spark-datasource/pom.xml | 2 +-
integration/spark2/pom.xml | 5 +-
.../strategy/CarbonDataSourceScan.scala | 53 ++++++++++++++++++
.../strategy/CarbonDataSourceScan.scala | 53 ------------------
.../strategy/CarbonDataSourceScan.scala | 58 ++++++++++++++++++++
pom.xml | 4 +-
8 files changed, 124 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
index 1f7aafe..551b00b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
@@ -45,7 +45,7 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
sql("create table if not exists hiveTable(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary Decimal(17,2))row format delimited fields terminated by ','")
sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalDataWithHeader.csv' into table carbonTable")
sql(s"LOAD DATA local inpath '$resourcesPath/decimalDataWithoutHeader.csv' INTO table hiveTable")
- sql("create table if not exists hiveBigDecimal(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(27, 10))row format delimited fields terminated by ','")
+ sql("create table if not exists hiveBigDecimal(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10))row format delimited fields terminated by ','")
sql(s"LOAD DATA local inpath '$resourcesPath/decimalBoundaryDataHive.csv' INTO table hiveBigDecimal")
sql("create table if not exists carbonBigDecimal_2 (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10)) STORED BY 'org.apache.carbondata.format'")
sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal_2")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 9955286..0055e87 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -294,9 +294,11 @@ object CarbonReflectionUtils {
.getMethod("writeAndRead",
classOf[SaveMode],
classOf[LogicalPlan],
- classOf[Seq[Attribute]],
+ classOf[Seq[String]],
classOf[SparkPlan])
- method.invoke(dataSourceObj, mode, query, query.output, physicalPlan)
+ // since spark 2.3.2 version (SPARK-PR#22346),
+ // change 'query.output' to 'query.output.map(_.name)'
+ method.invoke(dataSourceObj, mode, query, query.output.map(_.name), physicalPlan)
.asInstanceOf[BaseRelation]
} else {
throw new UnsupportedOperationException("Spark version not supported")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark-datasource/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/pom.xml b/integration/spark-datasource/pom.xml
index 9f0d3ff..002b9f3 100644
--- a/integration/spark-datasource/pom.xml
+++ b/integration/spark-datasource/pom.xml
@@ -278,7 +278,7 @@
<profile>
<id>spark-2.3</id>
<properties>
- <spark.version>2.3.1</spark.version>
+ <spark.version>2.3.2</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 1eba780..b0d8bbe 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -283,6 +283,7 @@
<configuration>
<sources>
<source>src/main/spark2.1</source>
+ <source>src/main/commonTo2.1And2.2</source>
</sources>
</configuration>
</execution>
@@ -328,6 +329,7 @@
<sources>
<source>src/main/spark2.2</source>
<source>src/main/commonTo2.2And2.3</source>
+ <source>src/main/commonTo2.1And2.2</source>
</sources>
</configuration>
</execution>
@@ -339,7 +341,7 @@
<profile>
<id>spark-2.3</id>
<properties>
- <spark.version>2.3.1</spark.version>
+ <spark.version>2.3.2</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
@@ -352,6 +354,7 @@
<excludes>
<exclude>src/main/spark2.1</exclude>
<exclude>src/main/spark2.2</exclude>
+ <exclude>src/main/commonTo2.1And2.2</exclude>
</excludes>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
new file mode 100644
index 0000000..7605574
--- /dev/null
+++ b/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.strategy
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+
+/**
+ * Physical plan node for scanning data. It is applied for both tables
+ * USING carbondata and STORED AS CARBONDATA.
+ */
+class CarbonDataSourceScan(
+ override val output: Seq[Attribute],
+ val rdd: RDD[InternalRow],
+ @transient override val relation: HadoopFsRelation,
+ val partitioning: Partitioning,
+ override val metadata: Map[String, String],
+ identifier: Option[TableIdentifier],
+ @transient private val logicalRelation: LogicalRelation)
+ extends FileSourceScanExec(
+ relation,
+ output,
+ relation.dataSchema,
+ Seq.empty,
+ Seq.empty,
+ identifier) {
+
+ override val supportsBatch: Boolean = true
+
+ override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
+ (partitioning, Nil)
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
deleted file mode 100644
index 7605574..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.strategy
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
-
-/**
- * Physical plan node for scanning data. It is applied for both tables
- * USING carbondata and STORED AS CARBONDATA.
- */
-class CarbonDataSourceScan(
- override val output: Seq[Attribute],
- val rdd: RDD[InternalRow],
- @transient override val relation: HadoopFsRelation,
- val partitioning: Partitioning,
- override val metadata: Map[String, String],
- identifier: Option[TableIdentifier],
- @transient private val logicalRelation: LogicalRelation)
- extends FileSourceScanExec(
- relation,
- output,
- relation.dataSchema,
- Seq.empty,
- Seq.empty,
- identifier) {
-
- override val supportsBatch: Boolean = true
-
- override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
- (partitioning, Nil)
-
- override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
new file mode 100644
index 0000000..5435f04
--- /dev/null
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.strategy
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+
+/**
+ * Physical plan node for scanning data. It is applied for both tables
+ * USING carbondata and STORED AS CARBONDATA.
+ */
+class CarbonDataSourceScan(
+ override val output: Seq[Attribute],
+ val rdd: RDD[InternalRow],
+ @transient override val relation: HadoopFsRelation,
+ val partitioning: Partitioning,
+ val md: Map[String, String],
+ identifier: Option[TableIdentifier],
+ @transient private val logicalRelation: LogicalRelation)
+ extends FileSourceScanExec(
+ relation,
+ output,
+ relation.dataSchema,
+ Seq.empty,
+ Seq.empty,
+ identifier) {
+
+ // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+ override lazy val supportsBatch: Boolean = true
+
+ // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+ override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) =
+ (partitioning, Nil)
+
+ // added lazy since spark 2.3.2 version (SPARK-PR#21815)
+ override lazy val metadata: Map[String, String] = md
+
+ override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 00a5287..7b1d487 100644
--- a/pom.xml
+++ b/pom.xml
@@ -522,6 +522,7 @@
<sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/spark2.1</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory>
@@ -582,6 +583,7 @@
<sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/spark2.2</sourceDirectory>
+ <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory>
<sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory>
@@ -608,7 +610,7 @@
<profile>
<id>spark-2.3</id>
<properties>
- <spark.version>2.3.1</spark.version>
+ <spark.version>2.3.2</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>