You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/12/07 05:25:32 UTC
kylin git commit: KYLIN-2995 Set SparkContext.hadoopConfiguration to
HadoopUtil in Spark Cuing
Repository: kylin
Updated Branches:
refs/heads/master 9265e150d -> c6cfa6984
KYLIN-2995 Set SparkContext.hadoopConfiguration to HadoopUtil in Spark Cuing
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c6cfa698
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c6cfa698
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c6cfa698
Branch: refs/heads/master
Commit: c6cfa69841f96f6d3f411e375fbe1779e819cd84
Parents: 9265e15
Author: kangkaisen <ka...@meituan.com>
Authored: Mon Dec 4 20:42:22 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Dec 7 13:25:27 2017 +0800
----------------------------------------------------------------------
.../engine/mr/common/AbstractHadoopJob.java | 4 +-
.../mr/common/SerializableConfiguration.java | 50 ++++++++++++++++++++
.../kylin/engine/spark/SparkCubingByLayer.java | 37 +++++++++------
3 files changed, 76 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c6cfa698/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 6e67488..ade07e9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -468,7 +468,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
- public static KylinConfig loadKylinConfigFromHdfs(String uri) {
+ public static KylinConfig loadKylinConfigFromHdfs(SerializableConfiguration conf, String uri) {
+ HadoopUtil.setCurrentConfiguration(conf.get());
+
if (uri == null)
throw new IllegalArgumentException("meta url should not be null");
http://git-wip-us.apache.org/repos/asf/kylin/blob/c6cfa698/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/SerializableConfiguration.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/SerializableConfiguration.java
new file mode 100644
index 0000000..b390432
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/SerializableConfiguration.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+//https://stackoverflow.com/questions/38224132/use-sparkcontext-hadoop-configuration-within-rdd-methods-closures-like-foreachp
+public class SerializableConfiguration implements Serializable {
+ Configuration conf;
+
+ public SerializableConfiguration(Configuration hadoopConf) {
+ this.conf = hadoopConf;
+ }
+
+ public SerializableConfiguration() {
+ this.conf = new Configuration();
+ }
+
+ public Configuration get() {
+ return this.conf;
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+ this.conf.write(out);
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException {
+ this.conf = new Configuration();
+ this.conf.readFields(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c6cfa698/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index f7c5fee..0d26815 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -59,6 +59,7 @@ import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.MeasureIngester;
@@ -132,8 +133,9 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
JavaSparkContext sc = new JavaSparkContext(conf);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+ final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
- KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
+ KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
@@ -169,17 +171,17 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
// encode with dimension encoding, transform to <ByteArray, Object[]> RDD
final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD()
- .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl));
+ .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
Long totalCount = 0L;
if (envConfig.isSparkSanityCheckEnabled()) {
totalCount = encodedBaseRDD.count();
}
- final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(cubeName, metaUrl);
+ final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(cubeName, metaUrl, sConf);
BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction;
if (allNormalMeasure == false) {
- reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, needAggr);
+ reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, sConf, needAggr);
}
final int totalLevels = CuboidUtil.getLongestDepth(cubeSegment.getCuboidScheduler().getAllCuboidIds());
@@ -195,7 +197,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
// aggregate to ND cuboids
for (level = 1; level <= totalLevels; level++) {
partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);
- allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl))
+ allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf))
.reduceByKey(reducerFunction2, partition).persist(storageLevel);
if (envConfig.isSparkSanityCheckEnabled() == true) {
sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex);
@@ -232,6 +234,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
final CubeSegment cubeSeg, final String hdfsBaseLocation, final int level, final Job job,
final KylinConfig kylinConfig) throws Exception {
final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+ final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat();
outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, cubeSeg.getCuboidScheduler(), level);
@@ -248,7 +251,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
if (initialized == false) {
synchronized (SparkCubingByLayer.class) {
if (initialized == false) {
- KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
codec = new BufferedMeasureCodec(desc.getMeasures());
initialized = true;
@@ -272,11 +275,13 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
private String cubeName;
private String segmentId;
private String metaUrl;
+ private SerializableConfiguration conf;
- public EncodeBaseCuboid(String cubeName, String segmentId, String metaurl) {
+ public EncodeBaseCuboid(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf) {
this.cubeName = cubeName;
this.segmentId = segmentId;
this.metaUrl = metaurl;
+ this.conf = conf;
}
@Override
@@ -284,7 +289,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
if (initialized == false) {
synchronized (SparkCubingByLayer.class) {
if (initialized == false) {
- KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
CubeDesc cubeDesc = cubeInstance.getDescriptor();
CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
@@ -327,14 +332,16 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
protected int measureNum;
protected MeasureAggregators aggregators;
protected volatile transient boolean initialized = false;
+ protected SerializableConfiguration conf;
- public BaseCuboidReducerFunction2(String cubeName, String metaUrl) {
+ public BaseCuboidReducerFunction2(String cubeName, String metaUrl, SerializableConfiguration conf) {
this.cubeName = cubeName;
this.metaUrl = metaUrl;
+ this.conf = conf;
}
public void init() {
- KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
cubeDesc = cubeInstance.getDescriptor();
aggregators = new MeasureAggregators(cubeDesc.getMeasures());
@@ -360,8 +367,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
static public class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 {
private boolean[] needAggr;
- public CuboidReducerFunction2(String cubeName, String metaUrl, boolean[] needAggr) {
- super(cubeName, metaUrl);
+ public CuboidReducerFunction2(String cubeName, String metaUrl, SerializableConfiguration conf, boolean[] needAggr) {
+ super(cubeName, metaUrl, conf);
this.needAggr = needAggr;
}
@@ -394,15 +401,17 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
private NDCuboidBuilder ndCuboidBuilder;
private RowKeySplitter rowKeySplitter;
private volatile transient boolean initialized = false;
+ private SerializableConfiguration conf;
- public CuboidFlatMap(String cubeName, String segmentId, String metaUrl) {
+ public CuboidFlatMap(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf) {
this.cubeName = cubeName;
this.segmentId = segmentId;
this.metaUrl = metaUrl;
+ this.conf = conf;
}
public void init() {
- KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
this.cubeSegment = cubeInstance.getSegmentById(segmentId);
this.cubeDesc = cubeInstance.getDescriptor();