You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/12/07 05:25:32 UTC

kylin git commit: KYLIN-2995 Set SparkContext.hadoopConfiguration to HadoopUtil in Spark Cuing

Repository: kylin
Updated Branches:
  refs/heads/master 9265e150d -> c6cfa6984


KYLIN-2995 Set SparkContext.hadoopConfiguration to HadoopUtil in Spark Cuing

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c6cfa698
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c6cfa698
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c6cfa698

Branch: refs/heads/master
Commit: c6cfa69841f96f6d3f411e375fbe1779e819cd84
Parents: 9265e15
Author: kangkaisen <ka...@meituan.com>
Authored: Mon Dec 4 20:42:22 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Dec 7 13:25:27 2017 +0800

----------------------------------------------------------------------
 .../engine/mr/common/AbstractHadoopJob.java     |  4 +-
 .../mr/common/SerializableConfiguration.java    | 50 ++++++++++++++++++++
 .../kylin/engine/spark/SparkCubingByLayer.java  | 37 +++++++++------
 3 files changed, 76 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c6cfa698/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 6e67488..ade07e9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -468,7 +468,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         }
     }
 
-    public static KylinConfig loadKylinConfigFromHdfs(String uri) {
+    public static KylinConfig loadKylinConfigFromHdfs(SerializableConfiguration conf, String uri) {
+        HadoopUtil.setCurrentConfiguration(conf.get());
+
         if (uri == null)
             throw new IllegalArgumentException("meta url should not be null");
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c6cfa698/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/SerializableConfiguration.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/SerializableConfiguration.java
new file mode 100644
index 0000000..b390432
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/SerializableConfiguration.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+//https://stackoverflow.com/questions/38224132/use-sparkcontext-hadoop-configuration-within-rdd-methods-closures-like-foreachp
+public class SerializableConfiguration implements Serializable {
+    Configuration conf;
+
+    public SerializableConfiguration(Configuration hadoopConf) {
+        this.conf = hadoopConf;
+    }
+
+    public SerializableConfiguration() {
+        this.conf = new Configuration();
+    }
+
+    public Configuration get() {
+        return this.conf;
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+        this.conf.write(out);
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws IOException {
+        this.conf = new Configuration();
+        this.conf.readFields(in);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c6cfa698/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index f7c5fee..0d26815 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -59,6 +59,7 @@ import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureIngester;
@@ -132,8 +133,9 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
         JavaSparkContext sc = new JavaSparkContext(conf);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
 
-        KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
+        KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
         final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
@@ -169,17 +171,17 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
         // encode with dimension encoding, transform to <ByteArray, Object[]> RDD
         final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD()
-                .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl));
+                .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf));
 
         Long totalCount = 0L;
         if (envConfig.isSparkSanityCheckEnabled()) {
             totalCount = encodedBaseRDD.count();
         }
 
-        final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(cubeName, metaUrl);
+        final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(cubeName, metaUrl, sConf);
         BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction;
         if (allNormalMeasure == false) {
-            reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, needAggr);
+            reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, sConf, needAggr);
         }
 
         final int totalLevels = CuboidUtil.getLongestDepth(cubeSegment.getCuboidScheduler().getAllCuboidIds());
@@ -195,7 +197,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         // aggregate to ND cuboids
         for (level = 1; level <= totalLevels; level++) {
             partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);
-            allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl))
+            allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf))
                     .reduceByKey(reducerFunction2, partition).persist(storageLevel);
             if (envConfig.isSparkSanityCheckEnabled() == true) {
                 sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex);
@@ -232,6 +234,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             final CubeSegment cubeSeg, final String hdfsBaseLocation, final int level, final Job job,
             final KylinConfig kylinConfig) throws Exception {
         final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+        final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
 
         IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat();
         outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, cubeSeg.getCuboidScheduler(), level);
@@ -248,7 +251,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
                         if (initialized == false) {
                             synchronized (SparkCubingByLayer.class) {
                                 if (initialized == false) {
-                                    KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
+                                    KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
                                     CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
                                     codec = new BufferedMeasureCodec(desc.getMeasures());
                                     initialized = true;
@@ -272,11 +275,13 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         private String cubeName;
         private String segmentId;
         private String metaUrl;
+        private SerializableConfiguration conf;
 
-        public EncodeBaseCuboid(String cubeName, String segmentId, String metaurl) {
+        public EncodeBaseCuboid(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf) {
             this.cubeName = cubeName;
             this.segmentId = segmentId;
             this.metaUrl = metaurl;
+            this.conf = conf;
         }
 
         @Override
@@ -284,7 +289,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             if (initialized == false) {
                 synchronized (SparkCubingByLayer.class) {
                     if (initialized == false) {
-                        KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
+                        KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
                         CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
                         CubeDesc cubeDesc = cubeInstance.getDescriptor();
                         CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
@@ -327,14 +332,16 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         protected int measureNum;
         protected MeasureAggregators aggregators;
         protected volatile transient boolean initialized = false;
+        protected SerializableConfiguration conf;
 
-        public BaseCuboidReducerFunction2(String cubeName, String metaUrl) {
+        public BaseCuboidReducerFunction2(String cubeName, String metaUrl, SerializableConfiguration conf) {
             this.cubeName = cubeName;
             this.metaUrl = metaUrl;
+            this.conf = conf;
         }
 
         public void init() {
-            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
             CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
             cubeDesc = cubeInstance.getDescriptor();
             aggregators = new MeasureAggregators(cubeDesc.getMeasures());
@@ -360,8 +367,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
     static public class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 {
         private boolean[] needAggr;
 
-        public CuboidReducerFunction2(String cubeName, String metaUrl, boolean[] needAggr) {
-            super(cubeName, metaUrl);
+        public CuboidReducerFunction2(String cubeName, String metaUrl, SerializableConfiguration conf, boolean[] needAggr) {
+            super(cubeName, metaUrl, conf);
             this.needAggr = needAggr;
         }
 
@@ -394,15 +401,17 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         private NDCuboidBuilder ndCuboidBuilder;
         private RowKeySplitter rowKeySplitter;
         private volatile transient boolean initialized = false;
+        private SerializableConfiguration conf;
 
-        public CuboidFlatMap(String cubeName, String segmentId, String metaUrl) {
+        public CuboidFlatMap(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf) {
             this.cubeName = cubeName;
             this.segmentId = segmentId;
             this.metaUrl = metaUrl;
+            this.conf = conf;
         }
 
         public void init() {
-            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
             CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
             this.cubeSegment = cubeInstance.getSegmentById(segmentId);
             this.cubeDesc = cubeInstance.getDescriptor();