You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ka...@apache.org on 2017/07/23 10:26:58 UTC
[1/4] kylin git commit: KYLIN-2653 Manage the metadata use
HDFSResourceStore for Spark Cubing [Forced Update!]
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2653 edbe7a52f -> 276fa2c4d (forced update)
KYLIN-2653 Manage the metadata use HDFSResourceStore for Spark Cubing
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d8d0395a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d8d0395a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d8d0395a
Branch: refs/heads/KYLIN-2653
Commit: d8d0395a80cc50fcb59bab4d402c7675aef6cd22
Parents: 1a3527c
Author: kangkaisen <ka...@live.com>
Authored: Thu May 25 14:43:02 2017 +0800
Committer: kangkaisen <ka...@meituan.com>
Committed: Sat Jul 22 14:59:47 2017 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfig.java | 35 ++++++++-
.../org/apache/kylin/common/KylinConfigExt.java | 2 +-
.../engine/mr/common/AbstractHadoopJob.java | 45 +----------
.../engine/mr/common/JobRelatedMetaUtil.java | 71 +++++++++++++++++
.../spark/SparkBatchCubingJobBuilder2.java | 6 +-
.../kylin/engine/spark/SparkCubingByLayer.java | 82 ++++++--------------
.../kylin/engine/spark/SparkExecutable.java | 58 +++++++++++---
7 files changed, 188 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index cc08056..a56e9b8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -36,8 +36,11 @@ import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OrderedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,10 +117,13 @@ public class KylinConfig extends KylinConfigBase {
}
public enum UriType {
- PROPERTIES_FILE, REST_ADDR, LOCAL_FOLDER
+ PROPERTIES_FILE, REST_ADDR, LOCAL_FOLDER, HDFS_FILE
}
private static UriType decideUriType(String metaUri) {
+ if (metaUri.indexOf("@hdfs") > 0) {
+ return UriType.HDFS_FILE;
+ }
try {
File file = new File(metaUri);
@@ -157,6 +163,23 @@ public class KylinConfig extends KylinConfigBase {
*/
UriType uriType = decideUriType(uri);
+ if (uriType == UriType.HDFS_FILE) {
+ KylinConfig config;
+ FileSystem fs;
+ int cut = uri.indexOf('@');
+ String realHdfsPath = uri.substring(0, cut) + "/" + KYLIN_CONF_PROPERTIES_FILE;
+ try {
+ config = new KylinConfig();
+ fs = HadoopUtil.getFileSystem(realHdfsPath);
+ InputStream is = fs.open(new Path(realHdfsPath));
+ Properties prop = streamToProps(is);
+ config.reloadKylinConfig(prop);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return config;
+ }
+
if (uriType == UriType.LOCAL_FOLDER) {
KylinConfig config = new KylinConfig();
config.setMetadataUrl(uri);
@@ -402,6 +425,16 @@ public class KylinConfig extends KylinConfigBase {
super(props, force);
}
+ public void writeProperties(Properties props, File file) throws IOException {
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(file);
+ props.store(fos, file.getAbsolutePath());
+ } finally {
+ IOUtils.closeQuietly(fos);
+ }
+ }
+
public void writeProperties(File file) throws IOException {
FileOutputStream fos = null;
try {
http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
index d49dee7..786f467 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
@@ -61,7 +61,7 @@ public class KylinConfigExt extends KylinConfig {
return super.getOptional(prop, dft);
}
- protected Properties getAllProperties() {
+ public Properties getAllProperties() {
Properties result = new Properties();
result.putAll(super.getAllProperties());
result.putAll(overrides);
http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index f9d9808..fc8fb4e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -56,8 +56,6 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
@@ -68,8 +66,6 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.source.SourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -447,12 +443,12 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
protected void attachCubeMetadata(CubeInstance cube, Configuration conf) throws IOException {
- dumpKylinPropsAndMetadata(collectCubeMetadata(cube), cube.getConfig(), conf);
+ dumpKylinPropsAndMetadata(JobRelatedMetaUtil.collectCubeMetadata(cube), cube.getConfig(), conf);
}
protected void attachCubeMetadataWithDict(CubeInstance cube, Configuration conf) throws IOException {
Set<String> dumpList = new LinkedHashSet<>();
- dumpList.addAll(collectCubeMetadata(cube));
+ dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(cube));
for (CubeSegment segment : cube.getSegments()) {
dumpList.addAll(segment.getDictionaryPaths());
}
@@ -461,27 +457,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException {
Set<String> dumpList = new LinkedHashSet<>();
- dumpList.addAll(collectCubeMetadata(segment.getCubeInstance()));
+ dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
dumpList.addAll(segment.getDictionaryPaths());
dumpKylinPropsAndMetadata(dumpList, segment.getConfig(), conf);
}
- private Set<String> collectCubeMetadata(CubeInstance cube) {
- // cube, model_desc, cube_desc, table
- Set<String> dumpList = new LinkedHashSet<>();
- dumpList.add(cube.getResourcePath());
- dumpList.add(cube.getDescriptor().getModel().getResourcePath());
- dumpList.add(cube.getDescriptor().getResourcePath());
-
- for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) {
- TableDesc table = tableRef.getTableDesc();
- dumpList.add(table.getResourcePath());
- dumpList.addAll(SourceFactory.getMRDependentResources(table));
- }
-
- return dumpList;
- }
-
protected void dumpKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException {
File tmp = File.createTempFile("kylin_job_meta", "");
FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
@@ -494,7 +474,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
kylinConfig.writeProperties(kylinPropsFile);
// write resources
- dumpResources(kylinConfig, metaDir, dumpList);
+ JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList);
// hadoop distributed cache
String hdfsMetaDir = OptionsHelper.convertToFileURL(metaDir.getAbsolutePath());
@@ -530,23 +510,6 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
- private void dumpResources(KylinConfig kylinConfig, File metaDir, Set<String> dumpList) throws IOException {
- long startTime = System.currentTimeMillis();
-
- ResourceStore from = ResourceStore.getStore(kylinConfig);
- KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
- ResourceStore to = ResourceStore.getStore(localConfig);
- for (String path : dumpList) {
- RawResource res = from.getResource(path);
- if (res == null)
- throw new IllegalStateException("No resource found at -- " + path);
- to.putResource(path, res.inputStream, res.timestamp);
- res.inputStream.close();
- }
-
- logger.debug("Dump resources to {} took {} ms", metaDir, System.currentTimeMillis() - startTime);
- }
-
protected void deletePath(Configuration conf, Path path) throws IOException {
HadoopUtil.deletePath(conf, path);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
new file mode 100644
index 0000000..46b1d3c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.source.SourceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+public class JobRelatedMetaUtil {
+ private static final Logger logger = LoggerFactory.getLogger(JobRelatedMetaUtil.class);
+
+ public static Set<String> collectCubeMetadata(CubeInstance cube) {
+ // cube, model_desc, cube_desc, table
+ Set<String> dumpList = new LinkedHashSet<>();
+ dumpList.add(cube.getResourcePath());
+ dumpList.add(cube.getDescriptor().getModel().getResourcePath());
+ dumpList.add(cube.getDescriptor().getResourcePath());
+
+ for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) {
+ TableDesc table = tableRef.getTableDesc();
+ dumpList.add(table.getResourcePath());
+ dumpList.addAll(SourceFactory.getMRDependentResources(table));
+ }
+
+ return dumpList;
+ }
+
+ public static void dumpResources(KylinConfig kylinConfig, File metaDir, Set<String> dumpList) throws IOException {
+ long startTime = System.currentTimeMillis();
+
+ ResourceStore from = ResourceStore.getStore(kylinConfig);
+ KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
+ ResourceStore to = ResourceStore.getStore(localConfig);
+ for (String path : dumpList) {
+ RawResource res = from.getResource(path);
+ if (res == null)
+ throw new IllegalStateException("No resource found at -- " + path);
+ to.putResource(path, res.inputStream, res.timestamp);
+ res.inputStream.close();
+ }
+
+ logger.debug("Dump resources to {} took {} ms", metaDir, System.currentTimeMillis() - startTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 66b154d..07bc334 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -48,7 +48,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName());
- sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), KylinConfig.getKylinConfPath());
+ sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), seg.getUuid()));
sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
StringBuilder jars = new StringBuilder();
@@ -84,4 +84,8 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
return "";
}
+ private String getSegmentMetadataUrl(KylinConfig kylinConfig, String segmentID) {
+ return kylinConfig.getHdfsWorkingDirectory() + "metadata/" + segmentID + "@hdfs";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 91aa9f7..a8e7378 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.Pair;
@@ -51,7 +50,6 @@ import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.spark.SparkConf;
-import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
@@ -67,15 +65,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
-import java.io.File;
-import java.io.FileFilter;
+import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-
/**
* Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
*/
@@ -85,7 +81,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
- public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath");
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true).withDescription("HDFS metadata url").create("metaUrl");
public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
@@ -96,7 +92,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
options.addOption(OPTION_INPUT_TABLE);
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_SEGMENT_ID);
- options.addOption(OPTION_CONF_PATH);
+ options.addOption(OPTION_META_URL);
options.addOption(OPTION_OUTPUT_PATH);
}
@@ -105,32 +101,10 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
return options;
}
- private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception {
- ClassUtil.addClasspath(confPath);
- final File[] files = new File(confPath).listFiles(new FileFilter() {
- @Override
- public boolean accept(File pathname) {
- if (pathname.getAbsolutePath().endsWith(".xml")) {
- return true;
- }
- if (pathname.getAbsolutePath().endsWith(".properties")) {
- return true;
- }
- return false;
- }
- });
- for (File file : files) {
- sc.addFile(file.getAbsolutePath());
- }
- }
-
- private static final void prepare() {
- File file = new File(SparkFiles.get("kylin.properties"));
- String confPath = file.getParentFile().getAbsolutePath();
- logger.info("conf directory:" + confPath);
- System.setProperty(KylinConfig.KYLIN_CONF, confPath);
- ClassUtil.addClasspath(confPath);
-
+ public static KylinConfig loadKylinConfig(String metaUrl) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.createInstanceFromUri(metaUrl);
+ KylinConfig.setKylinConfigThreadLocal(kylinConfig);
+ return kylinConfig;
}
@Override
@@ -138,7 +112,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
- final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
+ final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + " segment " + segmentId);
@@ -146,30 +120,22 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
conf.set("spark.kryo.registrationRequired", "true");
-
JavaSparkContext sc = new JavaSparkContext(conf);
- setupClasspath(sc, confPath);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-
- System.setProperty(KylinConfig.KYLIN_CONF, confPath);
- final KylinConfig envConfig = KylinConfig.getInstanceFromEnv();
+ final KylinConfig kylinConfig = loadKylinConfig(metaUrl);
HiveContext sqlContext = new HiveContext(sc.sc());
final DataFrame intermediateTable = sqlContext.table(hiveTable);
-
- final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
- final KylinConfig kylinConfig = cubeDesc.getConfig();
final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc);
final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment);
final NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(vCubeSegment.getValue()));
-
final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue()));
final int measureNum = cubeDesc.getMeasures().size();
-
int countMeasureIndex = 0;
for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
if (measureDesc.getFunction().isCount() == true) {
@@ -186,9 +152,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
allNormalMeasure = allNormalMeasure && needAggr[i];
}
logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
-
StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
-
// encode with dimension encoding, transform to <ByteArray, Object[]> RDD
final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, Object[]>() {
volatile transient boolean initialized = false;
@@ -199,7 +163,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
if (initialized == false) {
synchronized (SparkCubingByLayer.class) {
if (initialized == false) {
- prepare();
+ loadKylinConfig(metaUrl);
long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap());
@@ -236,29 +200,23 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
totalCount = encodedBaseRDD.count();
logger.info("encodedBaseRDD row count: " + encodedBaseRDD.count());
}
-
final MeasureAggregators measureAggregators = new MeasureAggregators(cubeDesc.getMeasures());
final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators);
BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction;
if (allNormalMeasure == false) {
reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators, needAggr);
}
-
final int totalLevels = cubeDesc.getBuildLevel();
JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1];
int level = 0;
int partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig);
-
// aggregate to calculate base cuboid
allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel);
Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
-
saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, confOverwrite);
-
// aggregate to ND cuboids
- PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
-
+ PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(metaUrl, vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
for (level = 1; level <= totalLevels; level++) {
partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig);
logger.info("Level " + level + " partition number: " + partition);
@@ -271,6 +229,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
allRDDs[totalLevels].unpersist();
logger.info("Finished on calculating all level cuboids.");
+
+ deleteHDFSMeta(metaUrl);
}
private static int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) {
@@ -338,6 +298,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> {
+ String metaUrl;
CubeSegment cubeSegment;
CubeDesc cubeDesc;
CuboidScheduler cuboidScheduler;
@@ -345,7 +306,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
RowKeySplitter rowKeySplitter;
transient boolean initialized = false;
- CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) {
+ CuboidFlatMap(String metaUrl, CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) {
+ this.metaUrl = metaUrl;
this.cubeSegment = cubeSegment;
this.cubeDesc = cubeDesc;
this.cuboidScheduler = cuboidScheduler;
@@ -356,7 +318,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
@Override
public Iterable<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
if (initialized == false) {
- prepare();
+ loadKylinConfig(metaUrl);
initialized = true;
}
@@ -387,7 +349,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
//sanity check
-
private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount, int thisLevel, CubeStatsReader cubeStatsReader, final int countMeasureIndex) {
int thisCuboidNum = cubeStatsReader.getCuboidsByLayer(thisLevel).size();
Long count2 = getRDDCountSum(rdd, countMeasureIndex);
@@ -413,4 +374,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
})._2();
return count;
}
+
+ private void deleteHDFSMeta(String metaUrl) throws IOException {
+ int cut = metaUrl.indexOf('@');
+ String path = metaUrl.substring(0, cut);
+ HadoopUtil.getFileSystem(path).delete(new Path(path), true);
+ logger.info("Delete metadata in HDFS for this job: " + path);
+ };
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 1e032c6..c211ec5 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -18,13 +18,22 @@
package org.apache.kylin.engine.spark;
import java.io.File;
+import java.io.IOException;
+import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.persistence.ResourceTool;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
@@ -99,19 +108,20 @@ public class SparkExecutable extends AbstractExecutable {
}
logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
- //hbase-site.xml
- String hbaseConf = ClassLoader.getSystemClassLoader().getResource("hbase-site.xml").getFile().toString();
- logger.info("Get hbase-site.xml location from classpath: " + hbaseConf);
- File hbaseConfFile = new File(hbaseConf);
- if (hbaseConfFile.exists() == false) {
- throw new IllegalArgumentException("Couldn't find hbase-site.xml from classpath.");
- }
-
String jobJar = config.getKylinJobJarPath();
if (StringUtils.isEmpty(jars)) {
jars = jobJar;
}
+ String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+ CubeSegment segment = cube.getSegmentById(segmentID);
+
+ try {
+ attachSegmentMetadataWithDict(segment);
+ } catch (IOException e) {
+ throw new ExecuteException("meta dump fialed");
+ }
+
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry ");
@@ -120,9 +130,9 @@ public class SparkExecutable extends AbstractExecutable {
stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
}
- stringBuilder.append("--files %s --jars %s %s %s");
+ stringBuilder.append("--jars %s %s %s");
try {
- String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs());
+ String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), jars, jobJar, formatArgs());
logger.info("cmd: " + cmd);
CliCommandExecutor exec = new CliCommandExecutor();
PatternedLogger patternedLogger = new PatternedLogger(logger);
@@ -135,5 +145,33 @@ public class SparkExecutable extends AbstractExecutable {
}
}
+ private void attachSegmentMetadataWithDict(CubeSegment segment) throws IOException {
+ Set<String> dumpList = new LinkedHashSet<>();
+ dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
+ dumpList.addAll(segment.getDictionaryPaths());
+ dumpList.add(segment.getStatisticsResourcePath());
+ dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig());
+ }
+
+ private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig) throws IOException {
+ File tmp = File.createTempFile("kylin_job_meta", "");
+ FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
+
+ File metaDir = new File(tmp, "meta");
+ metaDir.mkdirs();
+ // dump metadata
+ JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList);
+
+ // write kylin.properties
+ File kylinPropsFile = new File(metaDir, "kylin.properties");
+ Properties properties = kylinConfig.getAllProperties();
+ String metadataUrl = this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt());
+ properties.setProperty("kylin.metadata.url", metadataUrl);
+ kylinConfig.writeProperties(properties, kylinPropsFile);
+
+ KylinConfig dstConfig = KylinConfig.createKylinConfig(properties);
+ //upload metadata
+ ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
+ }
}
[2/4] kylin git commit: build ci_inner_join_cube with spark
Posted by ka...@apache.org.
build ci_inner_join_cube with spark
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6413c286
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6413c286
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6413c286
Branch: refs/heads/KYLIN-2653
Commit: 6413c286ea1d18ea1a8bb2bf9e975e162a08b39f
Parents: d8d0395
Author: kangkaisen <ka...@meituan.com>
Authored: Thu Jul 13 16:46:41 2017 +0800
Committer: kangkaisen <ka...@meituan.com>
Committed: Sat Jul 22 14:59:54 2017 +0800
----------------------------------------------------------------------
.../test_case_data/localmeta/cube_desc/ci_inner_join_cube.json | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/6413c286/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
index 28a63d5..27acdd3 100644
--- a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
+++ b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
@@ -610,7 +610,7 @@
"status_need_notify": [],
"auto_merge_time_ranges": null,
"retention_range": 0,
- "engine_type": 2,
+ "engine_type": 4,
"storage_type": 2,
"override_kylin_properties": {
"kylin.cube.algorithm": "LAYER"
[4/4] kylin git commit: KryoRegistrator add ComputedColumnDesc
Posted by ka...@apache.org.
KryoRegistrator add ComputedColumnDesc
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3326b089
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3326b089
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3326b089
Branch: refs/heads/KYLIN-2653
Commit: 3326b0897b3f839cc34c9c8030571d02627d2629
Parents: 6413c28
Author: kangkaisen <ka...@meituan.com>
Authored: Thu Jul 13 20:18:17 2017 +0800
Committer: kangkaisen <ka...@meituan.com>
Committed: Sat Jul 22 14:59:54 2017 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/metadata/model/ComputedColumnDesc.java | 4 +++-
.../java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java | 1 +
2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/3326b089/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
index 540b5fc..e8cc351 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
@@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.kylin.metadata.model.tool.CalciteParser;
+import java.io.Serializable;
+
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ComputedColumnDesc {
+public class ComputedColumnDesc implements Serializable {
@JsonProperty
private String tableIdentity;
@JsonProperty
http://git-wip-us.apache.org/repos/asf/kylin/blob/3326b089/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index 106653f..048f62b 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -269,6 +269,7 @@ public class KylinKryoRegistrator implements KryoRegistrator {
kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity.class);
kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.TableKind.class);
kyroClasses.add(org.apache.kylin.metadata.model.DatabaseDesc.class);
+ kyroClasses.add(org.apache.kylin.metadata.model.ComputedColumnDesc.class);
kyroClasses.add(org.apache.kylin.metadata.model.ExternalFilterDesc.class);
kyroClasses.add(org.apache.kylin.metadata.model.FunctionDesc.class);
kyroClasses.add(org.apache.kylin.metadata.model.JoinDesc.class);
[3/4] kylin git commit: Tmp annotated
CalciteParser.ensureNoTableNameExists
Posted by ka...@apache.org.
Tmp annotated CalciteParser.ensureNoTableNameExists
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/276fa2c4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/276fa2c4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/276fa2c4
Branch: refs/heads/KYLIN-2653
Commit: 276fa2c4dafd22cd897227d5cc592cd528f40cc5
Parents: 3326b08
Author: kangkaisen <ka...@meituan.com>
Authored: Fri Jul 14 21:31:49 2017 +0800
Committer: kangkaisen <ka...@meituan.com>
Committed: Sat Jul 22 14:59:54 2017 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/metadata/model/ComputedColumnDesc.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/276fa2c4/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
index e8cc351..13cb889 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
@@ -20,7 +20,6 @@ package org.apache.kylin.metadata.model;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import org.apache.kylin.metadata.model.tool.CalciteParser;
import java.io.Serializable;
@@ -46,7 +45,7 @@ public class ComputedColumnDesc implements Serializable {
tableIdentity = tableIdentity.toUpperCase();
columnName = columnName.toUpperCase();
- CalciteParser.ensureNoTableNameExists(expression);
+ //CalciteParser.ensureNoTableNameExists(expression);
}
public String getFullName() {