You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/30 08:57:27 UTC
[38/50] [abbrv] kylin git commit: KYLIN-2328 Reduce the size of
metadata uploaded to distributed cache
KYLIN-2328 Reduce the size of metadata uploaded to distributed cache
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6ea03b86
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6ea03b86
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6ea03b86
Branch: refs/heads/master-cdh5.7
Commit: 6ea03b8638d72853959aba0666aea18f1ba97391
Parents: 64c3c61
Author: gaodayue <ga...@meituan.com>
Authored: Wed Dec 28 15:27:49 2016 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Thu Dec 29 18:39:13 2016 +0800
----------------------------------------------------------------------
.../engine/mr/common/AbstractHadoopJob.java | 43 ++++++++++++++------
.../apache/kylin/engine/mr/steps/CuboidJob.java | 2 +-
.../engine/mr/steps/FactDistinctColumnsJob.java | 7 ++--
.../kylin/engine/mr/steps/InMemCuboidJob.java | 10 ++---
.../kylin/engine/mr/steps/MergeCuboidJob.java | 3 +-
.../cardinality/HiveColumnCardinalityJob.java | 2 +-
.../source/kafka/hadoop/KafkaFlatTableJob.java | 3 --
.../kylin/storage/hbase/steps/CubeHFileJob.java | 11 +++--
8 files changed, 48 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 80636d3..4693ac3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -29,7 +29,6 @@ import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashSet;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -449,33 +448,49 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
- protected void attachKylinPropsAndMetadata(TableDesc table, Configuration conf) throws IOException {
+ protected void attachTableMetadata(TableDesc table, Configuration conf) throws IOException {
Set<String> dumpList = new LinkedHashSet<>();
dumpList.add(table.getResourcePath());
attachKylinPropsAndMetadata(dumpList, KylinConfig.getInstanceFromEnv(), conf);
}
- protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
- // write cube / model_desc / cube_desc / dict / table
+ protected void attachCubeMetadata(CubeInstance cube, Configuration conf) throws IOException {
+ attachKylinPropsAndMetadata(collectCubeMetadata(cube), cube.getConfig(), conf);
+ }
+
+ protected void attachCubeMetadataWithDict(CubeInstance cube, Configuration conf) throws IOException {
+ Set<String> dumpList = new LinkedHashSet<>();
+ dumpList.addAll(collectCubeMetadata(cube));
+ for (CubeSegment segment : cube.getSegments()) {
+ dumpList.addAll(segment.getDictionaryPaths());
+ }
+ attachKylinPropsAndMetadata(dumpList, cube.getConfig(), conf);
+ }
+
+ protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException {
+ Set<String> dumpList = new LinkedHashSet<>();
+ dumpList.addAll(collectCubeMetadata(segment.getCubeInstance()));
+ dumpList.addAll(segment.getDictionaryPaths());
+ attachKylinPropsAndMetadata(dumpList, segment.getConfig(), conf);
+ }
+
+ private Set<String> collectCubeMetadata(CubeInstance cube) {
+ // cube, model_desc, cube_desc, table
Set<String> dumpList = new LinkedHashSet<>();
dumpList.add(cube.getResourcePath());
dumpList.add(cube.getDescriptor().getModel().getResourcePath());
dumpList.add(cube.getDescriptor().getResourcePath());
- for (TableRef tableRef: cube.getDescriptor().getModel().getAllTables()) {
+ for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) {
TableDesc table = tableRef.getTableDesc();
dumpList.add(table.getResourcePath());
- List<String> dependentResources = SourceFactory.getMRDependentResources(table);
- dumpList.addAll(dependentResources);
- }
- for (CubeSegment segment : cube.getSegments()) {
- dumpList.addAll(segment.getDictionaryPaths());
+ dumpList.addAll(SourceFactory.getMRDependentResources(table));
}
- attachKylinPropsAndMetadata(dumpList, cube.getConfig(), conf);
+ return dumpList;
}
- protected void attachKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException {
+ private void attachKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException {
File tmp = File.createTempFile("kylin_job_meta", "");
FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
@@ -524,6 +539,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
private void dumpResources(KylinConfig kylinConfig, File metaDir, Set<String> dumpList) throws IOException {
+ long startTime = System.currentTimeMillis();
+
ResourceStore from = ResourceStore.getStore(kylinConfig);
KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
ResourceStore to = ResourceStore.getStore(localConfig);
@@ -534,6 +551,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
to.putResource(path, res.inputStream, res.timestamp);
res.inputStream.close();
}
+
+ logger.debug("Dump resources to {} took {} ms", metaDir, System.currentTimeMillis() - startTime);
}
protected void deletePath(Configuration conf, Path path) throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index d3cb494..bd305c1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -129,7 +129,7 @@ public class CuboidJob extends AbstractHadoopJob {
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel);
// add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
+ attachSegmentMetadataWithDict(segment, job.getConfiguration());
LayerReduerNumSizing.setReduceTaskNum(job, segment, getTotalMapInputMB(), nCuboidLevel);
http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 9fc8922..ce01eb6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -110,13 +110,12 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
logger.error(s.getName() + " with status " + s.getStatus());
}
throw new IllegalStateException();
- } else {
- logger.info("Found segment: " + segment);
}
- setupMapper(cube.getSegmentById(segmentID));
+
+ setupMapper(segment);
setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
+ attachCubeMetadata(cube, job.getConfiguration());
return waitForCompletion(job);
http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 576ace9..1612866 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -86,11 +86,11 @@ public class InMemCuboidJob extends AbstractHadoopJob {
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);
- CubeSegment cubeSeg = cube.getSegmentById(segmentID);
+ CubeSegment segment = cube.getSegmentById(segmentID);
String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID);
if (checkSkip(cubingJobId)) {
- logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + cubeSeg);
+ logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segment);
return 0;
}
@@ -101,14 +101,14 @@ public class InMemCuboidJob extends AbstractHadoopJob {
setJobClasspath(job, cube.getConfig());
// add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
+ attachSegmentMetadataWithDict(segment, job.getConfiguration());
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
// set input
- IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
flatTableInputFormat.configureJob(job);
// set mapper
@@ -118,7 +118,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
// set output
job.setReducerClass(InMemCuboidReducer.class);
- job.setNumReduceTasks(calculateReducerNum(cubeSeg));
+ job.setNumReduceTasks(calculateReducerNum(segment));
// the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade
job.setOutputFormatClass(SequenceFileOutputFormat.class);
http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index e805d25..012e19f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -79,7 +79,8 @@ public class MergeCuboidJob extends CuboidJob {
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
// add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
+ // TODO actually only dictionaries from merging segments are needed
+ attachCubeMetadataWithDict(cube, job.getConfiguration());
LayerReduerNumSizing.setReduceTaskNum(job, cube.getSegmentById(segmentID), getTotalMapInputMB(), -1);
http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
index 3c88024..ea72b54 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
@@ -103,7 +103,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
logger.info("Going to submit HiveColumnCardinalityJob for table '" + table + "'");
TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc(table);
- attachKylinPropsAndMetadata(tableDesc, job.getConfiguration());
+ attachTableMetadata(tableDesc, job.getConfiguration());
int result = waitForCompletion(job);
return result;
http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index f0f48c0..11466e5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -120,9 +120,6 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
deletePath(job.getConfiguration(), output);
-
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
return waitForCompletion(job);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index 9593372..1a624c4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -88,19 +88,18 @@ public class CubeHFileJob extends AbstractHadoopJob {
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- Configuration conf = HBaseConfiguration.create(getConf());
// add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
+ attachCubeMetadata(cube, job.getConfiguration());
- String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
- HTable htable = new HTable(conf, tableName);
+ Configuration hbaseConf = HBaseConfiguration.create(getConf());
+ HTable htable = new HTable(hbaseConf, getOptionValue(OPTION_HTABLE_NAME).toUpperCase());
// Automatic config !
HFileOutputFormat.configureIncrementalLoad(job, htable);
- reconfigurePartitions(conf, partitionFilePath);
+ reconfigurePartitions(hbaseConf, partitionFilePath);
// set block replication to 3 for hfiles
- conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
+ hbaseConf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
this.deletePath(job.getConfiguration(), output);