You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/07/25 09:44:59 UTC

[12/24] kylin git commit: KYLIN-2633 Upgrade Spark to 2.x

KYLIN-2633 Upgrade Spark to 2.x


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cfaeb7e7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cfaeb7e7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cfaeb7e7

Branch: refs/heads/2.1.x
Commit: cfaeb7e7eb12eefa7c0ecd40ceabeea6b5dbf62e
Parents: 309fac0
Author: 许益铭 <xu...@hys-inc.cn>
Authored: Fri Jul 7 00:13:24 2017 +0800
Committer: Hongbin Ma <ma...@kyligence.io>
Committed: Fri Jul 21 16:13:02 2017 +0800

----------------------------------------------------------------------
 assembly/pom.xml                                |  2 +-
 build/script/download-spark.sh                  |  4 ++--
 .../main/resources/kylin-defaults.properties    |  3 ++-
 .../gtrecord/GTCubeStorageQueryBase.java        |  2 +-
 engine-spark/pom.xml                            |  6 ++---
 .../apache/kylin/engine/spark/SparkCubing.java  | 22 +++++++++---------
 .../kylin/engine/spark/SparkCubingByLayer.java  | 11 +++++----
 .../kylin/engine/spark/SparkHiveDemo.java       |  5 ++--
 .../test_case_data/sandbox/kylin.properties     |  4 ++--
 kylin-it/pom.xml                                | 24 +++++++++++++++++++-
 pom.xml                                         | 10 ++++----
 server/pom.xml                                  |  2 +-
 source-kafka/pom.xml                            |  2 +-
 13 files changed, 61 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 0a64dde..b988445 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -152,7 +152,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
+            <artifactId>kafka_2.11</artifactId>
             <scope>provided</scope>
         </dependency>
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/build/script/download-spark.sh
----------------------------------------------------------------------
diff --git a/build/script/download-spark.sh b/build/script/download-spark.sh
index b73331e..8025591 100755
--- a/build/script/download-spark.sh
+++ b/build/script/download-spark.sh
@@ -27,8 +27,8 @@ if [[ `uname -a` =~ "Darwin" ]]; then
     alias md5cmd="md5 -q"
 fi
 
-spark_version="1.6.3"
-spark_pkg_md5="ce8a2e7529aac0f0175194061769dbd4"
+spark_version="2.1.1"
+spark_pkg_md5="195daab700e4332fcdaf7c66236de542"
 
 if [ ! -f "build/spark-${spark_version}-bin-hadoop2.6.tgz" ]
 then

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/core-common/src/main/resources/kylin-defaults.properties
----------------------------------------------------------------------
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index cb511e7..ee25637 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -218,7 +218,7 @@ kylin.engine.spark.max-partition=5000
 
 # Spark conf (default is in spark/conf/spark-defaults.conf)
 kylin.engine.spark-conf.spark.master=yarn
-kylin.engine.spark-conf.spark.submit.deployMode=cluster
+#kylin.engine.spark-conf.spark.submit.deployMode=cluster
 kylin.engine.spark-conf.spark.yarn.queue=default
 kylin.engine.spark-conf.spark.executor.memory=1G
 kylin.engine.spark-conf.spark.executor.cores=2
@@ -226,6 +226,7 @@ kylin.engine.spark-conf.spark.executor.instances=1
 kylin.engine.spark-conf.spark.eventLog.enabled=true
 kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
+kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
 
 # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
 #kylin.engine.spark-conf.spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 10f735e..22f5fc9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -83,7 +83,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
             if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) {
                 logger.info("Skip cube segment {} because its input record is 0", cubeSeg);
                 continue;
-            }
+        }
 
             scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), request.getGroups(), request.getMetrics(), request.getFilter(), request.getHavingFilter(), request.getContext());
             if (!scanner.isSegmentSkipped())

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/engine-spark/pom.xml
----------------------------------------------------------------------
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 93b6f9b..26a6e31 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -51,19 +51,19 @@
         <!-- Spark dependency -->
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.10</artifactId>
+            <artifactId>spark-core_2.11</artifactId>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.10</artifactId>
+            <artifactId>spark-sql_2.11</artifactId>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-hive_2.10</artifactId>
+            <artifactId>spark-hive_2.11</artifactId>
             <scope>provided</scope>
         </dependency>
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 2a0981a..57fd315 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -98,7 +98,7 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.HiveContext;
 import org.slf4j.Logger;
@@ -179,7 +179,7 @@ public class SparkCubing extends AbstractApplication {
         }
     }
 
-    private void writeDictionary(DataFrame intermediateTable, String cubeName, String segmentId) throws Exception {
+    private void writeDictionary(Dataset<Row> intermediateTable, String cubeName, String segmentId) throws Exception {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
@@ -204,9 +204,9 @@ public class SparkCubing extends AbstractApplication {
         for (Map.Entry<Integer, TblColRef> entry : tblColRefMap.entrySet()) {
             final String column = columns[entry.getKey()];
             final TblColRef tblColRef = entry.getValue();
-            final DataFrame frame = intermediateTable.select(column).distinct();
+            final Dataset<Row> frame = intermediateTable.select(column).distinct();
 
-            final Row[] rows = frame.collect();
+            final List<Row> rows = frame.collectAsList();
             dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionary(tblColRef.getType(), new IterableDictionaryValueEnumerator(new Iterable<String>() {
                 @Override
                 public Iterator<String> iterator() {
@@ -215,13 +215,13 @@ public class SparkCubing extends AbstractApplication {
 
                         @Override
                         public boolean hasNext() {
-                            return i < rows.length;
+                            return i < rows.size();
                         }
 
                         @Override
                         public String next() {
                             if (hasNext()) {
-                                final Row row = rows[i++];
+                                final Row row = rows.get(i++);
                                 final Object o = row.get(0);
                                 return o != null ? o.toString() : null;
                             } else {
@@ -367,7 +367,7 @@ public class SparkCubing extends AbstractApplication {
         final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
 
             @Override
-            public Iterable<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception {
+            public Iterator<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception {
                 long t = System.currentTimeMillis();
                 prepare();
 
@@ -390,7 +390,7 @@ public class SparkCubing extends AbstractApplication {
                     throw new RuntimeException(e);
                 }
                 System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms");
-                return sparkCuboidWriter.getResult();
+                return sparkCuboidWriter.getResult().iterator();
             }
         });
 
@@ -430,7 +430,7 @@ public class SparkCubing extends AbstractApplication {
             }
         }, UnsignedBytes.lexicographicalComparator()).mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() {
             @Override
-            public Iterable<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception {
+            public Iterator<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception {
                 return new Iterable<Tuple2<byte[], byte[]>>() {
                     final BufferedMeasureCodec codec = new BufferedMeasureCodec(dataTypes);
                     final Object[] input = new Object[measureSize];
@@ -458,7 +458,7 @@ public class SparkCubing extends AbstractApplication {
                             }
                         });
                     }
-                };
+                }.iterator();
             }
         }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() {
             @Override
@@ -549,7 +549,7 @@ public class SparkCubing extends AbstractApplication {
 
         JavaSparkContext sc = new JavaSparkContext(conf);
         HiveContext sqlContext = new HiveContext(sc.sc());
-        final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable);
+        final Dataset<Row> intermediateTable = sqlContext.sql("select * from " + hiveTable);
         final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 91aa9f7..587ff78 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -59,7 +59,7 @@ import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.HiveContext;
 import org.apache.spark.storage.StorageLevel;
@@ -73,6 +73,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
 
@@ -155,7 +156,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         final KylinConfig envConfig = KylinConfig.getInstanceFromEnv();
 
         HiveContext sqlContext = new HiveContext(sc.sc());
-        final DataFrame intermediateTable = sqlContext.table(hiveTable);
+        final Dataset<Row> intermediateTable = sqlContext.table(hiveTable);
 
         final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
@@ -354,7 +355,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         }
 
         @Override
-        public Iterable<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
+        public Iterator<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
             if (initialized == false) {
                 prepare();
                 initialized = true;
@@ -368,7 +369,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
             // if still empty or null
             if (myChildren == null || myChildren.size() == 0) {
-                return EMTPY_ITERATOR;
+                return EMTPY_ITERATOR.iterator();
             }
 
             List<Tuple2<ByteArray, Object[]>> tuples = new ArrayList(myChildren.size());
@@ -382,7 +383,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
                 tuples.add(new Tuple2<>(new ByteArray(newKey), tuple2._2()));
             }
 
-            return tuples;
+            return tuples.iterator();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
index e1ba470..58d4222 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
@@ -22,7 +22,8 @@ import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.HiveContext;
 
 /**
@@ -45,7 +46,7 @@ public class SparkHiveDemo extends AbstractApplication {
         SparkConf conf = new SparkConf().setAppName("Simple Application");
         JavaSparkContext sc = new JavaSparkContext(conf);
         HiveContext sqlContext = new HiveContext(sc.sc());
-        final DataFrame dataFrame = sqlContext.sql("select * from test_kylin_fact");
+        final Dataset<Row> dataFrame = sqlContext.sql("select * from test_kylin_fact");
         System.out.println("count * of the table:" + dataFrame.count());
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 6a571df..619bf99 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -170,7 +170,7 @@ kylin.engine.spark.rdd-partition-cut-mb=100
 ### Spark conf overwrite for cube engine
 kylin.engine.spark-conf.spark.yarn.submit.file.replication=1
 kylin.engine.spark-conf.spark.master=yarn
-kylin.engine.spark-conf.spark.submit.deployMode=cluster
+#kylin.engine.spark-conf.spark.submit.deployMode=cluster
 kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=384
 kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=256
 kylin.engine.spark-conf.spark.executor.memory=768M
@@ -184,7 +184,7 @@ kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin
 kylin.engine.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
 kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
-
+kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
 
 ### QUERY PUSH DOWN  ###
 #kylin.query.pushdown.runner-class-name=org.apache.kylin.query.adhoc.PushDownRunnerJdbcImpl

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index e82867f..9583b42 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -233,7 +233,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
+            <artifactId>kafka_2.11</artifactId>
             <scope>provided</scope>
         </dependency>
 
@@ -249,23 +249,45 @@
             <scope>test</scope>
         </dependency>
 
+
         <!-- Spark dependency -->
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.10</artifactId>
+            <version>${spark.version}</version>
             <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.10</artifactId>
+            <version>${spark.version}</version>
             <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-hive_2.10</artifactId>
+            <version>${spark.version}</version>
             <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 506d734..732deac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,7 +61,7 @@
         <kafka.version>0.10.1.0</kafka.version>
 
         <!-- Spark versions -->
-        <spark.version>1.6.3</spark.version>
+        <spark.version>2.1.1</spark.version>
         <kryo.version>4.0.0</kryo.version>
 
         <!-- <reflections.version>0.9.10</reflections.version> -->
@@ -577,19 +577,19 @@
             <!-- Spark dependency -->
             <dependency>
                 <groupId>org.apache.spark</groupId>
-                <artifactId>spark-core_2.10</artifactId>
+                <artifactId>spark-core_2.11</artifactId>
                 <version>${spark.version}</version>
                 <scope>provided</scope>
             </dependency>
             <dependency>
                 <groupId>org.apache.spark</groupId>
-                <artifactId>spark-sql_2.10</artifactId>
+                <artifactId>spark-sql_2.11</artifactId>
                 <version>${spark.version}</version>
                 <scope>provided</scope>
             </dependency>
             <dependency>
                 <groupId>org.apache.spark</groupId>
-                <artifactId>spark-hive_2.10</artifactId>
+                <artifactId>spark-hive_2.11</artifactId>
                 <version>${spark.version}</version>
                 <scope>provided</scope>
             </dependency>
@@ -602,7 +602,7 @@
             <!-- Kafka dependency -->
             <dependency>
                 <groupId>org.apache.kafka</groupId>
-                <artifactId>kafka_2.10</artifactId>
+                <artifactId>kafka_2.11</artifactId>
                 <version>${kafka.version}</version>
                 <scope>provided</scope>
             </dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index f35cb44..fe82e3f 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -256,7 +256,7 @@
         <!-- Spark dependency -->
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.10</artifactId>
+            <artifactId>spark-core_2.11</artifactId>
             <scope>provided</scope>
         </dependency>
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index e4ca76f..9ce665e 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -45,7 +45,7 @@
         <!-- Provided -->
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
+            <artifactId>kafka_2.11</artifactId>
         </dependency>
 
         <!-- Env & Test -->