You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/30 08:57:27 UTC

[38/50] [abbrv] kylin git commit: KYLIN-2328 Reduce the size of metadata uploaded to distributed cache

KYLIN-2328 Reduce the size of metadata uploaded to distributed cache


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6ea03b86
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6ea03b86
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6ea03b86

Branch: refs/heads/master-cdh5.7
Commit: 6ea03b8638d72853959aba0666aea18f1ba97391
Parents: 64c3c61
Author: gaodayue <ga...@meituan.com>
Authored: Wed Dec 28 15:27:49 2016 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Thu Dec 29 18:39:13 2016 +0800

----------------------------------------------------------------------
 .../engine/mr/common/AbstractHadoopJob.java     | 43 ++++++++++++++------
 .../apache/kylin/engine/mr/steps/CuboidJob.java |  2 +-
 .../engine/mr/steps/FactDistinctColumnsJob.java |  7 ++--
 .../kylin/engine/mr/steps/InMemCuboidJob.java   | 10 ++---
 .../kylin/engine/mr/steps/MergeCuboidJob.java   |  3 +-
 .../cardinality/HiveColumnCardinalityJob.java   |  2 +-
 .../source/kafka/hadoop/KafkaFlatTableJob.java  |  3 --
 .../kylin/storage/hbase/steps/CubeHFileJob.java | 11 +++--
 8 files changed, 48 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 80636d3..4693ac3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -29,7 +29,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -449,33 +448,49 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         }
     }
 
-    protected void attachKylinPropsAndMetadata(TableDesc table, Configuration conf) throws IOException {
+    protected void attachTableMetadata(TableDesc table, Configuration conf) throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
         dumpList.add(table.getResourcePath());
         attachKylinPropsAndMetadata(dumpList, KylinConfig.getInstanceFromEnv(), conf);
     }
 
-    protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
-        // write cube / model_desc / cube_desc / dict / table
+    protected void attachCubeMetadata(CubeInstance cube, Configuration conf) throws IOException {
+        attachKylinPropsAndMetadata(collectCubeMetadata(cube), cube.getConfig(), conf);
+    }
+
+    protected void attachCubeMetadataWithDict(CubeInstance cube, Configuration conf) throws IOException {
+        Set<String> dumpList = new LinkedHashSet<>();
+        dumpList.addAll(collectCubeMetadata(cube));
+        for (CubeSegment segment : cube.getSegments()) {
+            dumpList.addAll(segment.getDictionaryPaths());
+        }
+        attachKylinPropsAndMetadata(dumpList, cube.getConfig(), conf);
+    }
+
+    protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException {
+        Set<String> dumpList = new LinkedHashSet<>();
+        dumpList.addAll(collectCubeMetadata(segment.getCubeInstance()));
+        dumpList.addAll(segment.getDictionaryPaths());
+        attachKylinPropsAndMetadata(dumpList, segment.getConfig(), conf);
+    }
+
+    private Set<String> collectCubeMetadata(CubeInstance cube) {
+        // cube, model_desc, cube_desc, table
         Set<String> dumpList = new LinkedHashSet<>();
         dumpList.add(cube.getResourcePath());
         dumpList.add(cube.getDescriptor().getModel().getResourcePath());
         dumpList.add(cube.getDescriptor().getResourcePath());
 
-        for (TableRef tableRef: cube.getDescriptor().getModel().getAllTables()) {
+        for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) {
             TableDesc table = tableRef.getTableDesc();
             dumpList.add(table.getResourcePath());
-            List<String> dependentResources = SourceFactory.getMRDependentResources(table);
-            dumpList.addAll(dependentResources);
-        }
-        for (CubeSegment segment : cube.getSegments()) {
-            dumpList.addAll(segment.getDictionaryPaths());
+            dumpList.addAll(SourceFactory.getMRDependentResources(table));
         }
 
-        attachKylinPropsAndMetadata(dumpList, cube.getConfig(), conf);
+        return dumpList;
     }
 
-    protected void attachKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException {
+    private void attachKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException {
         File tmp = File.createTempFile("kylin_job_meta", "");
         FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
 
@@ -524,6 +539,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     }
 
     private void dumpResources(KylinConfig kylinConfig, File metaDir, Set<String> dumpList) throws IOException {
+        long startTime = System.currentTimeMillis();
+
         ResourceStore from = ResourceStore.getStore(kylinConfig);
         KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
         ResourceStore to = ResourceStore.getStore(localConfig);
@@ -534,6 +551,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             to.putResource(path, res.inputStream, res.timestamp);
             res.inputStream.close();
         }
+
+        logger.debug("Dump resources to {} took {} ms", metaDir, System.currentTimeMillis() - startTime);
     }
 
     protected void deletePath(Configuration conf, Path path) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index d3cb494..bd305c1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -129,7 +129,7 @@ public class CuboidJob extends AbstractHadoopJob {
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
             job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel);
             // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+            attachSegmentMetadataWithDict(segment, job.getConfiguration());
 
             LayerReduerNumSizing.setReduceTaskNum(job, segment, getTotalMapInputMB(), nCuboidLevel);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 9fc8922..ce01eb6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -110,13 +110,12 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
                     logger.error(s.getName() + " with status " + s.getStatus());
                 }
                 throw new IllegalStateException();
-            } else {
-                logger.info("Found segment: " + segment);
             }
-            setupMapper(cube.getSegmentById(segmentID));
+
+            setupMapper(segment);
             setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
 
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+            attachCubeMetadata(cube, job.getConfiguration());
 
             return waitForCompletion(job);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 576ace9..1612866 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -86,11 +86,11 @@ public class InMemCuboidJob extends AbstractHadoopJob {
 
             CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = cubeMgr.getCube(cubeName);
-            CubeSegment cubeSeg = cube.getSegmentById(segmentID);
+            CubeSegment segment = cube.getSegmentById(segmentID);
             String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID);
 
             if (checkSkip(cubingJobId)) {
-                logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + cubeSeg);
+                logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segment);
                 return 0;
             }
 
@@ -101,14 +101,14 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             setJobClasspath(job, cube.getConfig());
 
             // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+            attachSegmentMetadataWithDict(segment, job.getConfiguration());
 
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
 
             // set input
-            IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+            IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
             flatTableInputFormat.configureJob(job);
 
             // set mapper
@@ -118,7 +118,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
 
             // set output
             job.setReducerClass(InMemCuboidReducer.class);
-            job.setNumReduceTasks(calculateReducerNum(cubeSeg));
+            job.setNumReduceTasks(calculateReducerNum(segment));
 
             // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade
             job.setOutputFormatClass(SequenceFileOutputFormat.class);

http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index e805d25..012e19f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -79,7 +79,8 @@ public class MergeCuboidJob extends CuboidJob {
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
 
             // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+            // TODO actually only dictionaries from merging segments are needed
+            attachCubeMetadataWithDict(cube, job.getConfiguration());
 
             LayerReduerNumSizing.setReduceTaskNum(job, cube.getSegmentById(segmentID), getTotalMapInputMB(), -1);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
index 3c88024..ea72b54 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
@@ -103,7 +103,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
         logger.info("Going to submit HiveColumnCardinalityJob for table '" + table + "'");
 
         TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc(table);
-        attachKylinPropsAndMetadata(tableDesc, job.getConfiguration());
+        attachTableMetadata(tableDesc, job.getConfiguration());
         int result = waitForCompletion(job);
 
         return result;

http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
index f0f48c0..11466e5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java
@@ -120,9 +120,6 @@ public class KafkaFlatTableJob extends AbstractHadoopJob {
             job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
 
             deletePath(job.getConfiguration(), output);
-
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
             return waitForCompletion(job);
 
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index 9593372..1a624c4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -88,19 +88,18 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            Configuration conf = HBaseConfiguration.create(getConf());
             // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+            attachCubeMetadata(cube, job.getConfiguration());
 
-            String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
-            HTable htable = new HTable(conf, tableName);
+            Configuration hbaseConf = HBaseConfiguration.create(getConf());
+            HTable htable = new HTable(hbaseConf, getOptionValue(OPTION_HTABLE_NAME).toUpperCase());
 
             // Automatic config !
             HFileOutputFormat.configureIncrementalLoad(job, htable);
-            reconfigurePartitions(conf, partitionFilePath);
+            reconfigurePartitions(hbaseConf, partitionFilePath);
 
             // set block replication to 3 for hfiles
-            conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
+            hbaseConf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
 
             this.deletePath(job.getConfiguration(), output);