You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ka...@apache.org on 2017/07/23 10:26:58 UTC

[1/4] kylin git commit: KYLIN-2653 Manage the metadata use HDFSResourceStore for Spark Cubing [Forced Update!]

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2653 edbe7a52f -> 276fa2c4d (forced update)


KYLIN-2653 Manage the metadata use HDFSResourceStore for Spark Cubing


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d8d0395a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d8d0395a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d8d0395a

Branch: refs/heads/KYLIN-2653
Commit: d8d0395a80cc50fcb59bab4d402c7675aef6cd22
Parents: 1a3527c
Author: kangkaisen <ka...@live.com>
Authored: Thu May 25 14:43:02 2017 +0800
Committer: kangkaisen <ka...@meituan.com>
Committed: Sat Jul 22 14:59:47 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    | 35 ++++++++-
 .../org/apache/kylin/common/KylinConfigExt.java |  2 +-
 .../engine/mr/common/AbstractHadoopJob.java     | 45 +----------
 .../engine/mr/common/JobRelatedMetaUtil.java    | 71 +++++++++++++++++
 .../spark/SparkBatchCubingJobBuilder2.java      |  6 +-
 .../kylin/engine/spark/SparkCubingByLayer.java  | 82 ++++++--------------
 .../kylin/engine/spark/SparkExecutable.java     | 58 +++++++++++---
 7 files changed, 188 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index cc08056..a56e9b8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -36,8 +36,11 @@ import java.util.Properties;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.restclient.RestClient;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OrderedProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,10 +117,13 @@ public class KylinConfig extends KylinConfigBase {
     }
 
     public enum UriType {
-        PROPERTIES_FILE, REST_ADDR, LOCAL_FOLDER
+        PROPERTIES_FILE, REST_ADDR, LOCAL_FOLDER, HDFS_FILE
     }
 
     private static UriType decideUriType(String metaUri) {
+        if (metaUri.indexOf("@hdfs") > 0) {
+            return UriType.HDFS_FILE;
+        }
 
         try {
             File file = new File(metaUri);
@@ -157,6 +163,23 @@ public class KylinConfig extends KylinConfigBase {
          */
         UriType uriType = decideUriType(uri);
 
+        if (uriType == UriType.HDFS_FILE) {
+            KylinConfig config;
+            FileSystem fs;
+            int cut = uri.indexOf('@');
+            String realHdfsPath = uri.substring(0, cut) + "/" + KYLIN_CONF_PROPERTIES_FILE;
+            try {
+                config = new KylinConfig();
+                fs = HadoopUtil.getFileSystem(realHdfsPath);
+                InputStream is = fs.open(new Path(realHdfsPath));
+                Properties prop = streamToProps(is);
+                config.reloadKylinConfig(prop);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            return config;
+        }
+
         if (uriType == UriType.LOCAL_FOLDER) {
             KylinConfig config = new KylinConfig();
             config.setMetadataUrl(uri);
@@ -402,6 +425,16 @@ public class KylinConfig extends KylinConfigBase {
         super(props, force);
     }
 
+    public void writeProperties(Properties props, File file) throws IOException {
+        FileOutputStream fos = null;
+        try {
+            fos = new FileOutputStream(file);
+            props.store(fos, file.getAbsolutePath());
+        } finally {
+            IOUtils.closeQuietly(fos);
+        }
+    }
+
     public void writeProperties(File file) throws IOException {
         FileOutputStream fos = null;
         try {

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
index d49dee7..786f467 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
@@ -61,7 +61,7 @@ public class KylinConfigExt extends KylinConfig {
             return super.getOptional(prop, dft);
     }
 
-    protected Properties getAllProperties() {
+    public Properties getAllProperties() {
         Properties result = new Properties();
         result.putAll(super.getAllProperties());
         result.putAll(overrides);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index f9d9808..fc8fb4e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -56,8 +56,6 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
@@ -68,8 +66,6 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.source.SourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -447,12 +443,12 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     }
 
     protected void attachCubeMetadata(CubeInstance cube, Configuration conf) throws IOException {
-        dumpKylinPropsAndMetadata(collectCubeMetadata(cube), cube.getConfig(), conf);
+        dumpKylinPropsAndMetadata(JobRelatedMetaUtil.collectCubeMetadata(cube), cube.getConfig(), conf);
     }
 
     protected void attachCubeMetadataWithDict(CubeInstance cube, Configuration conf) throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
-        dumpList.addAll(collectCubeMetadata(cube));
+        dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(cube));
         for (CubeSegment segment : cube.getSegments()) {
             dumpList.addAll(segment.getDictionaryPaths());
         }
@@ -461,27 +457,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
 
     protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
-        dumpList.addAll(collectCubeMetadata(segment.getCubeInstance()));
+        dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
         dumpList.addAll(segment.getDictionaryPaths());
         dumpKylinPropsAndMetadata(dumpList, segment.getConfig(), conf);
     }
 
-    private Set<String> collectCubeMetadata(CubeInstance cube) {
-        // cube, model_desc, cube_desc, table
-        Set<String> dumpList = new LinkedHashSet<>();
-        dumpList.add(cube.getResourcePath());
-        dumpList.add(cube.getDescriptor().getModel().getResourcePath());
-        dumpList.add(cube.getDescriptor().getResourcePath());
-
-        for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) {
-            TableDesc table = tableRef.getTableDesc();
-            dumpList.add(table.getResourcePath());
-            dumpList.addAll(SourceFactory.getMRDependentResources(table));
-        }
-
-        return dumpList;
-    }
-
     protected void dumpKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException {
         File tmp = File.createTempFile("kylin_job_meta", "");
         FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
@@ -494,7 +474,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         kylinConfig.writeProperties(kylinPropsFile);
 
         // write resources
-        dumpResources(kylinConfig, metaDir, dumpList);
+        JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList);
 
         // hadoop distributed cache
         String hdfsMetaDir = OptionsHelper.convertToFileURL(metaDir.getAbsolutePath());
@@ -530,23 +510,6 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         }
     }
 
-    private void dumpResources(KylinConfig kylinConfig, File metaDir, Set<String> dumpList) throws IOException {
-        long startTime = System.currentTimeMillis();
-
-        ResourceStore from = ResourceStore.getStore(kylinConfig);
-        KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
-        ResourceStore to = ResourceStore.getStore(localConfig);
-        for (String path : dumpList) {
-            RawResource res = from.getResource(path);
-            if (res == null)
-                throw new IllegalStateException("No resource found at -- " + path);
-            to.putResource(path, res.inputStream, res.timestamp);
-            res.inputStream.close();
-        }
-
-        logger.debug("Dump resources to {} took {} ms", metaDir, System.currentTimeMillis() - startTime);
-    }
-
     protected void deletePath(Configuration conf, Path path) throws IOException {
         HadoopUtil.deletePath(conf, path);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
new file mode 100644
index 0000000..46b1d3c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.source.SourceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+public class JobRelatedMetaUtil {
+    private static final Logger logger = LoggerFactory.getLogger(JobRelatedMetaUtil.class);
+
+    public static Set<String> collectCubeMetadata(CubeInstance cube) {
+        // cube, model_desc, cube_desc, table
+        Set<String> dumpList = new LinkedHashSet<>();
+        dumpList.add(cube.getResourcePath());
+        dumpList.add(cube.getDescriptor().getModel().getResourcePath());
+        dumpList.add(cube.getDescriptor().getResourcePath());
+
+        for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) {
+            TableDesc table = tableRef.getTableDesc();
+            dumpList.add(table.getResourcePath());
+            dumpList.addAll(SourceFactory.getMRDependentResources(table));
+        }
+
+        return dumpList;
+    }
+
+    public static void dumpResources(KylinConfig kylinConfig, File metaDir, Set<String> dumpList) throws IOException {
+        long startTime = System.currentTimeMillis();
+
+        ResourceStore from = ResourceStore.getStore(kylinConfig);
+        KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
+        ResourceStore to = ResourceStore.getStore(localConfig);
+        for (String path : dumpList) {
+            RawResource res = from.getResource(path);
+            if (res == null)
+                throw new IllegalStateException("No resource found at -- " + path);
+            to.putResource(path, res.inputStream, res.timestamp);
+            res.inputStream.close();
+        }
+
+        logger.debug("Dump resources to {} took {} ms", metaDir, System.currentTimeMillis() - startTime);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 66b154d..07bc334 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -48,7 +48,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
-        sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), KylinConfig.getKylinConfPath());
+        sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), seg.getUuid()));
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
 
         StringBuilder jars = new StringBuilder();
@@ -84,4 +84,8 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
         return "";
     }
 
+    private String getSegmentMetadataUrl(KylinConfig kylinConfig, String segmentID) {
+        return kylinConfig.getHdfsWorkingDirectory() + "metadata/" + segmentID + "@hdfs";
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 91aa9f7..a8e7378 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.common.util.Pair;
@@ -51,7 +50,6 @@ import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.spark.SparkConf;
-import org.apache.spark.SparkFiles;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
@@ -67,15 +65,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
-import java.io.File;
-import java.io.FileFilter;
+import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-
 /**
  * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
  */
@@ -85,7 +81,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
     public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
     public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
-    public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath");
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true).withDescription("HDFS metadata url").create("metaUrl");
     public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
     public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
 
@@ -96,7 +92,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         options.addOption(OPTION_INPUT_TABLE);
         options.addOption(OPTION_CUBE_NAME);
         options.addOption(OPTION_SEGMENT_ID);
-        options.addOption(OPTION_CONF_PATH);
+        options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
     }
 
@@ -105,32 +101,10 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         return options;
     }
 
-    private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception {
-        ClassUtil.addClasspath(confPath);
-        final File[] files = new File(confPath).listFiles(new FileFilter() {
-            @Override
-            public boolean accept(File pathname) {
-                if (pathname.getAbsolutePath().endsWith(".xml")) {
-                    return true;
-                }
-                if (pathname.getAbsolutePath().endsWith(".properties")) {
-                    return true;
-                }
-                return false;
-            }
-        });
-        for (File file : files) {
-            sc.addFile(file.getAbsolutePath());
-        }
-    }
-
-    private static final void prepare() {
-        File file = new File(SparkFiles.get("kylin.properties"));
-        String confPath = file.getParentFile().getAbsolutePath();
-        logger.info("conf directory:" + confPath);
-        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
-        ClassUtil.addClasspath(confPath);
-
+    public static KylinConfig loadKylinConfig(String metaUrl) throws IOException {
+        KylinConfig kylinConfig = KylinConfig.createInstanceFromUri(metaUrl);
+        KylinConfig.setKylinConfigThreadLocal(kylinConfig);
+        return kylinConfig;
     }
 
     @Override
@@ -138,7 +112,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
         final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
-        final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
+        final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
         final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
 
         SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + " segment " + segmentId);
@@ -146,30 +120,22 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true");
-
         JavaSparkContext sc = new JavaSparkContext(conf);
-        setupClasspath(sc, confPath);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-
-        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
-        final KylinConfig envConfig = KylinConfig.getInstanceFromEnv();
+        final KylinConfig kylinConfig = loadKylinConfig(metaUrl);
 
         HiveContext sqlContext = new HiveContext(sc.sc());
         final DataFrame intermediateTable = sqlContext.table(hiveTable);
-
-        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
         final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
 
-        final KylinConfig kylinConfig = cubeDesc.getConfig();
         final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc);
         final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment);
         final NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(vCubeSegment.getValue()));
-
         final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue()));
         final int measureNum = cubeDesc.getMeasures().size();
-
         int countMeasureIndex = 0;
         for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
             if (measureDesc.getFunction().isCount() == true) {
@@ -186,9 +152,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             allNormalMeasure = allNormalMeasure && needAggr[i];
         }
         logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
-
         StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
-
         // encode with dimension encoding, transform to <ByteArray, Object[]> RDD
         final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, Object[]>() {
             volatile transient boolean initialized = false;
@@ -199,7 +163,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
                 if (initialized == false) {
                     synchronized (SparkCubingByLayer.class) {
                         if (initialized == false) {
-                            prepare();
+                            loadKylinConfig(metaUrl);
                             long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
                             Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
                             baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap());
@@ -236,29 +200,23 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             totalCount = encodedBaseRDD.count();
             logger.info("encodedBaseRDD row count: " + encodedBaseRDD.count());
         }
-
         final MeasureAggregators measureAggregators = new MeasureAggregators(cubeDesc.getMeasures());
         final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators);
         BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction;
         if (allNormalMeasure == false) {
             reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators, needAggr);
         }
-
         final int totalLevels = cubeDesc.getBuildLevel();
         JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1];
         int level = 0;
         int partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig);
-
         // aggregate to calculate base cuboid
         allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel);
         Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
         confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
-
         saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, confOverwrite);
-
         // aggregate to ND cuboids
-        PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
-
+        PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(metaUrl, vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
         for (level = 1; level <= totalLevels; level++) {
             partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig);
             logger.info("Level " + level + " partition number: " + partition);
@@ -271,6 +229,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         }
         allRDDs[totalLevels].unpersist();
         logger.info("Finished on calculating all level cuboids.");
+
+        deleteHDFSMeta(metaUrl);
     }
 
     private static int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) {
@@ -338,6 +298,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
     class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> {
 
+        String metaUrl;
         CubeSegment cubeSegment;
         CubeDesc cubeDesc;
         CuboidScheduler cuboidScheduler;
@@ -345,7 +306,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         RowKeySplitter rowKeySplitter;
         transient boolean initialized = false;
 
-        CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) {
+        CuboidFlatMap(String metaUrl, CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) {
+            this.metaUrl = metaUrl;
             this.cubeSegment = cubeSegment;
             this.cubeDesc = cubeDesc;
             this.cuboidScheduler = cuboidScheduler;
@@ -356,7 +318,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         @Override
         public Iterable<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
             if (initialized == false) {
-                prepare();
+                loadKylinConfig(metaUrl);
                 initialized = true;
             }
 
@@ -387,7 +349,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
     }
 
     //sanity check
-
     private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount, int thisLevel, CubeStatsReader cubeStatsReader, final int countMeasureIndex) {
         int thisCuboidNum = cubeStatsReader.getCuboidsByLayer(thisLevel).size();
         Long count2 = getRDDCountSum(rdd, countMeasureIndex);
@@ -413,4 +374,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         })._2();
         return count;
     }
+
+    private void deleteHDFSMeta(String metaUrl) throws IOException {
+        int cut = metaUrl.indexOf('@');
+        String path = metaUrl.substring(0, cut);
+        HadoopUtil.getFileSystem(path).delete(new Path(path), true);
+        logger.info("Delete metadata in HDFS for this job: " + path);
+    };
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 1e032c6..c211ec5 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -18,13 +18,22 @@
 package org.apache.kylin.engine.spark;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.persistence.ResourceTool;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -99,19 +108,20 @@ public class SparkExecutable extends AbstractExecutable {
         }
         logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
 
-        //hbase-site.xml
-        String hbaseConf = ClassLoader.getSystemClassLoader().getResource("hbase-site.xml").getFile().toString();
-        logger.info("Get hbase-site.xml location from classpath: " + hbaseConf);
-        File hbaseConfFile = new File(hbaseConf);
-        if (hbaseConfFile.exists() == false) {
-            throw new IllegalArgumentException("Couldn't find hbase-site.xml from classpath.");
-        }
-
         String jobJar = config.getKylinJobJarPath();
         if (StringUtils.isEmpty(jars)) {
             jars = jobJar;
         }
 
+        String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+        CubeSegment segment = cube.getSegmentById(segmentID);
+
+        try {
+            attachSegmentMetadataWithDict(segment);
+        } catch (IOException e) {
+            throw new ExecuteException("meta dump fialed");
+        }
+
         StringBuilder stringBuilder = new StringBuilder();
         stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
 
@@ -120,9 +130,9 @@ public class SparkExecutable extends AbstractExecutable {
             stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
         }
 
-        stringBuilder.append("--files %s --jars %s %s %s");
+        stringBuilder.append("--jars %s %s %s");
         try {
-            String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs());
+            String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), jars, jobJar, formatArgs());
             logger.info("cmd: " + cmd);
             CliCommandExecutor exec = new CliCommandExecutor();
             PatternedLogger patternedLogger = new PatternedLogger(logger);
@@ -135,5 +145,33 @@ public class SparkExecutable extends AbstractExecutable {
         }
     }
 
+    private void attachSegmentMetadataWithDict(CubeSegment segment) throws IOException {
+        Set<String> dumpList = new LinkedHashSet<>();
+        dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
+        dumpList.addAll(segment.getDictionaryPaths());
+        dumpList.add(segment.getStatisticsResourcePath());
+        dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig());
+    }
+
+    private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig) throws IOException {
+        File tmp = File.createTempFile("kylin_job_meta", "");
+        FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
+
+        File metaDir = new File(tmp, "meta");
+        metaDir.mkdirs();
 
+        // dump metadata
+        JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList);
+
+        // write kylin.properties
+        File kylinPropsFile = new File(metaDir, "kylin.properties");
+        Properties properties = kylinConfig.getAllProperties();
+        String metadataUrl = this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt());
+        properties.setProperty("kylin.metadata.url", metadataUrl);
+        kylinConfig.writeProperties(properties, kylinPropsFile);
+
+        KylinConfig dstConfig = KylinConfig.createKylinConfig(properties);
+        //upload metadata
+        ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
+    }
 }


[2/4] kylin git commit: build ci_inner_join_cube with spark

Posted by ka...@apache.org.
build ci_inner_join_cube with spark


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6413c286
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6413c286
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6413c286

Branch: refs/heads/KYLIN-2653
Commit: 6413c286ea1d18ea1a8bb2bf9e975e162a08b39f
Parents: d8d0395
Author: kangkaisen <ka...@meituan.com>
Authored: Thu Jul 13 16:46:41 2017 +0800
Committer: kangkaisen <ka...@meituan.com>
Committed: Sat Jul 22 14:59:54 2017 +0800

----------------------------------------------------------------------
 .../test_case_data/localmeta/cube_desc/ci_inner_join_cube.json     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/6413c286/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
index 28a63d5..27acdd3 100644
--- a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
+++ b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
@@ -610,7 +610,7 @@
   "status_need_notify": [],
   "auto_merge_time_ranges": null,
   "retention_range": 0,
-  "engine_type": 2,
+  "engine_type": 4,
   "storage_type": 2,
   "override_kylin_properties": {
     "kylin.cube.algorithm": "LAYER"


[4/4] kylin git commit: KryoRegistrator add ComputedColumnDesc

Posted by ka...@apache.org.
KryoRegistrator add ComputedColumnDesc


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3326b089
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3326b089
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3326b089

Branch: refs/heads/KYLIN-2653
Commit: 3326b0897b3f839cc34c9c8030571d02627d2629
Parents: 6413c28
Author: kangkaisen <ka...@meituan.com>
Authored: Thu Jul 13 20:18:17 2017 +0800
Committer: kangkaisen <ka...@meituan.com>
Committed: Sat Jul 22 14:59:54 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/metadata/model/ComputedColumnDesc.java | 4 +++-
 .../java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java | 1 +
 2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3326b089/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
index 540b5fc..e8cc351 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
@@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
 
+import java.io.Serializable;
+
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ComputedColumnDesc {
+public class ComputedColumnDesc implements Serializable {
     @JsonProperty
     private String tableIdentity;
     @JsonProperty

http://git-wip-us.apache.org/repos/asf/kylin/blob/3326b089/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index 106653f..048f62b 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -269,6 +269,7 @@ public class KylinKryoRegistrator implements KryoRegistrator {
         kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity.class);
         kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.TableKind.class);
         kyroClasses.add(org.apache.kylin.metadata.model.DatabaseDesc.class);
+        kyroClasses.add(org.apache.kylin.metadata.model.ComputedColumnDesc.class);
         kyroClasses.add(org.apache.kylin.metadata.model.ExternalFilterDesc.class);
         kyroClasses.add(org.apache.kylin.metadata.model.FunctionDesc.class);
         kyroClasses.add(org.apache.kylin.metadata.model.JoinDesc.class);


[3/4] kylin git commit: Tmp annotated CalciteParser.ensureNoTableNameExists

Posted by ka...@apache.org.
Tmp annotated CalciteParser.ensureNoTableNameExists


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/276fa2c4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/276fa2c4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/276fa2c4

Branch: refs/heads/KYLIN-2653
Commit: 276fa2c4dafd22cd897227d5cc592cd528f40cc5
Parents: 3326b08
Author: kangkaisen <ka...@meituan.com>
Authored: Fri Jul 14 21:31:49 2017 +0800
Committer: kangkaisen <ka...@meituan.com>
Committed: Sat Jul 22 14:59:54 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/metadata/model/ComputedColumnDesc.java  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/276fa2c4/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
index e8cc351..13cb889 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
@@ -20,7 +20,6 @@ package org.apache.kylin.metadata.model;
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import org.apache.kylin.metadata.model.tool.CalciteParser;
 
 import java.io.Serializable;
 
@@ -46,7 +45,7 @@ public class ComputedColumnDesc implements Serializable {
         tableIdentity = tableIdentity.toUpperCase();
         columnName = columnName.toUpperCase();
 
-        CalciteParser.ensureNoTableNameExists(expression);
+        //CalciteParser.ensureNoTableNameExists(expression);
     }
 
     public String getFullName() {