You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/07/25 09:44:59 UTC
[12/24] kylin git commit: KYLIN-2633 Upgrade Spark to 2.x
KYLIN-2633 Upgrade Spark to 2.x
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cfaeb7e7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cfaeb7e7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cfaeb7e7
Branch: refs/heads/2.1.x
Commit: cfaeb7e7eb12eefa7c0ecd40ceabeea6b5dbf62e
Parents: 309fac0
Author: 许益铭 <xu...@hys-inc.cn>
Authored: Fri Jul 7 00:13:24 2017 +0800
Committer: Hongbin Ma <ma...@kyligence.io>
Committed: Fri Jul 21 16:13:02 2017 +0800
----------------------------------------------------------------------
assembly/pom.xml | 2 +-
build/script/download-spark.sh | 4 ++--
.../main/resources/kylin-defaults.properties | 3 ++-
.../gtrecord/GTCubeStorageQueryBase.java | 2 +-
engine-spark/pom.xml | 6 ++---
.../apache/kylin/engine/spark/SparkCubing.java | 22 +++++++++---------
.../kylin/engine/spark/SparkCubingByLayer.java | 11 +++++----
.../kylin/engine/spark/SparkHiveDemo.java | 5 ++--
.../test_case_data/sandbox/kylin.properties | 4 ++--
kylin-it/pom.xml | 24 +++++++++++++++++++-
pom.xml | 10 ++++----
server/pom.xml | 2 +-
source-kafka/pom.xml | 2 +-
13 files changed, 61 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 0a64dde..b988445 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -152,7 +152,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka_2.11</artifactId>
<scope>provided</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/build/script/download-spark.sh
----------------------------------------------------------------------
diff --git a/build/script/download-spark.sh b/build/script/download-spark.sh
index b73331e..8025591 100755
--- a/build/script/download-spark.sh
+++ b/build/script/download-spark.sh
@@ -27,8 +27,8 @@ if [[ `uname -a` =~ "Darwin" ]]; then
alias md5cmd="md5 -q"
fi
-spark_version="1.6.3"
-spark_pkg_md5="ce8a2e7529aac0f0175194061769dbd4"
+spark_version="2.1.1"
+spark_pkg_md5="195daab700e4332fcdaf7c66236de542"
if [ ! -f "build/spark-${spark_version}-bin-hadoop2.6.tgz" ]
then
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/core-common/src/main/resources/kylin-defaults.properties
----------------------------------------------------------------------
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index cb511e7..ee25637 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -218,7 +218,7 @@ kylin.engine.spark.max-partition=5000
# Spark conf (default is in spark/conf/spark-defaults.conf)
kylin.engine.spark-conf.spark.master=yarn
-kylin.engine.spark-conf.spark.submit.deployMode=cluster
+#kylin.engine.spark-conf.spark.submit.deployMode=cluster
kylin.engine.spark-conf.spark.yarn.queue=default
kylin.engine.spark-conf.spark.executor.memory=1G
kylin.engine.spark-conf.spark.executor.cores=2
@@ -226,6 +226,7 @@ kylin.engine.spark-conf.spark.executor.instances=1
kylin.engine.spark-conf.spark.eventLog.enabled=true
kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
+kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
# manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
#kylin.engine.spark-conf.spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 10f735e..22f5fc9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -83,7 +83,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) {
logger.info("Skip cube segment {} because its input record is 0", cubeSeg);
continue;
- }
+ }
scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), request.getGroups(), request.getMetrics(), request.getFilter(), request.getHavingFilter(), request.getContext());
if (!scanner.isSegmentSkipped())
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/engine-spark/pom.xml
----------------------------------------------------------------------
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 93b6f9b..26a6e31 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -51,19 +51,19 @@
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
+ <artifactId>spark-core_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
+ <artifactId>spark-sql_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.10</artifactId>
+ <artifactId>spark-hive_2.11</artifactId>
<scope>provided</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 2a0981a..57fd315 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -98,7 +98,7 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.slf4j.Logger;
@@ -179,7 +179,7 @@ public class SparkCubing extends AbstractApplication {
}
}
- private void writeDictionary(DataFrame intermediateTable, String cubeName, String segmentId) throws Exception {
+ private void writeDictionary(Dataset<Row> intermediateTable, String cubeName, String segmentId) throws Exception {
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
@@ -204,9 +204,9 @@ public class SparkCubing extends AbstractApplication {
for (Map.Entry<Integer, TblColRef> entry : tblColRefMap.entrySet()) {
final String column = columns[entry.getKey()];
final TblColRef tblColRef = entry.getValue();
- final DataFrame frame = intermediateTable.select(column).distinct();
+ final Dataset<Row> frame = intermediateTable.select(column).distinct();
- final Row[] rows = frame.collect();
+ final List<Row> rows = frame.collectAsList();
dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionary(tblColRef.getType(), new IterableDictionaryValueEnumerator(new Iterable<String>() {
@Override
public Iterator<String> iterator() {
@@ -215,13 +215,13 @@ public class SparkCubing extends AbstractApplication {
@Override
public boolean hasNext() {
- return i < rows.length;
+ return i < rows.size();
}
@Override
public String next() {
if (hasNext()) {
- final Row row = rows[i++];
+ final Row row = rows.get(i++);
final Object o = row.get(0);
return o != null ? o.toString() : null;
} else {
@@ -367,7 +367,7 @@ public class SparkCubing extends AbstractApplication {
final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
@Override
- public Iterable<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception {
+ public Iterator<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception {
long t = System.currentTimeMillis();
prepare();
@@ -390,7 +390,7 @@ public class SparkCubing extends AbstractApplication {
throw new RuntimeException(e);
}
System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms");
- return sparkCuboidWriter.getResult();
+ return sparkCuboidWriter.getResult().iterator();
}
});
@@ -430,7 +430,7 @@ public class SparkCubing extends AbstractApplication {
}
}, UnsignedBytes.lexicographicalComparator()).mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() {
@Override
- public Iterable<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception {
+ public Iterator<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception {
return new Iterable<Tuple2<byte[], byte[]>>() {
final BufferedMeasureCodec codec = new BufferedMeasureCodec(dataTypes);
final Object[] input = new Object[measureSize];
@@ -458,7 +458,7 @@ public class SparkCubing extends AbstractApplication {
}
});
}
- };
+ }.iterator();
}
}, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() {
@Override
@@ -549,7 +549,7 @@ public class SparkCubing extends AbstractApplication {
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext sqlContext = new HiveContext(sc.sc());
- final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable);
+ final Dataset<Row> intermediateTable = sqlContext.sql("select * from " + hiveTable);
final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 91aa9f7..587ff78 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -59,7 +59,7 @@ import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
@@ -73,6 +73,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
@@ -155,7 +156,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
final KylinConfig envConfig = KylinConfig.getInstanceFromEnv();
HiveContext sqlContext = new HiveContext(sc.sc());
- final DataFrame intermediateTable = sqlContext.table(hiveTable);
+ final Dataset<Row> intermediateTable = sqlContext.table(hiveTable);
final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
@@ -354,7 +355,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
@Override
- public Iterable<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
+ public Iterator<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
if (initialized == false) {
prepare();
initialized = true;
@@ -368,7 +369,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
// if still empty or null
if (myChildren == null || myChildren.size() == 0) {
- return EMTPY_ITERATOR;
+ return EMTPY_ITERATOR.iterator();
}
List<Tuple2<ByteArray, Object[]>> tuples = new ArrayList(myChildren.size());
@@ -382,7 +383,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
tuples.add(new Tuple2<>(new ByteArray(newKey), tuple2._2()));
}
- return tuples;
+ return tuples.iterator();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
index e1ba470..58d4222 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
@@ -22,7 +22,8 @@ import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
/**
@@ -45,7 +46,7 @@ public class SparkHiveDemo extends AbstractApplication {
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext sqlContext = new HiveContext(sc.sc());
- final DataFrame dataFrame = sqlContext.sql("select * from test_kylin_fact");
+ final Dataset<Row> dataFrame = sqlContext.sql("select * from test_kylin_fact");
System.out.println("count * of the table:" + dataFrame.count());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 6a571df..619bf99 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -170,7 +170,7 @@ kylin.engine.spark.rdd-partition-cut-mb=100
### Spark conf overwrite for cube engine
kylin.engine.spark-conf.spark.yarn.submit.file.replication=1
kylin.engine.spark-conf.spark.master=yarn
-kylin.engine.spark-conf.spark.submit.deployMode=cluster
+#kylin.engine.spark-conf.spark.submit.deployMode=cluster
kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=384
kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=256
kylin.engine.spark-conf.spark.executor.memory=768M
@@ -184,7 +184,7 @@ kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin
kylin.engine.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
-
+kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
### QUERY PUSH DOWN ###
#kylin.query.pushdown.runner-class-name=org.apache.kylin.query.adhoc.PushDownRunnerJdbcImpl
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index e82867f..9583b42 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -233,7 +233,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka_2.11</artifactId>
<scope>provided</scope>
</dependency>
@@ -249,23 +249,45 @@
<scope>test</scope>
</dependency>
+
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 506d734..732deac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,7 +61,7 @@
<kafka.version>0.10.1.0</kafka.version>
<!-- Spark versions -->
- <spark.version>1.6.3</spark.version>
+ <spark.version>2.1.1</spark.version>
<kryo.version>4.0.0</kryo.version>
<!-- <reflections.version>0.9.10</reflections.version> -->
@@ -577,19 +577,19 @@
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
+ <artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
+ <artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.10</artifactId>
+ <artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
@@ -602,7 +602,7 @@
<!-- Kafka dependency -->
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index f35cb44..fe82e3f 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -256,7 +256,7 @@
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
+ <artifactId>spark-core_2.11</artifactId>
<scope>provided</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/source-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index e4ca76f..9ce665e 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -45,7 +45,7 @@
<!-- Provided -->
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka_2.11</artifactId>
</dependency>
<!-- Env & Test -->