You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:52 UTC

[40/50] [abbrv] incubator-kylin git commit: KYLIN-760 Bug fix in Improve the hasing performance in Sampling cuboid size

KYLIN-760 Bug fix in Improve the hasing performance in Sampling cuboid size


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/12bbce6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/12bbce6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/12bbce6f

Branch: refs/heads/streaming-localdict
Commit: 12bbce6f13e6bd7b98967cc10613512f590c9157
Parents: 386e0c4
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Wed May 13 13:05:21 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Wed May 13 13:05:35 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/constant/ExecutableConstants.java |   1 +
 .../apache/kylin/job/cube/CubingJobBuilder.java |  14 ++-
 .../job/hadoop/cube/FactDistinctColumnsJob.java |  27 +----
 .../job/hadoop/cubev2/SaveStatisticsStep.java   | 112 +++++++++++++++++++
 4 files changed, 127 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/12bbce6f/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 2e5d97a..6c27cd7 100644
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -55,6 +55,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
     public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
     public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
+    public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
     public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
     

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/12bbce6f/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index e534441..aca62e8 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -34,6 +34,7 @@ import org.apache.kylin.job.hadoop.cube.*;
 import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
 import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
 import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
+import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
 import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
 import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
 import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
@@ -214,9 +215,9 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId);
         result.addTask(intermediateHiveTableStep);
         result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId));
-        result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
         MapReduceExecutable baseCuboidStep = null;
         if (!inMemoryCubing()) {
+            result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
             // base cuboid step
             baseCuboidStep = createBaseCuboidStep(seg, intermediateHiveTableLocation, cuboidOutputTempPath);
             result.addTask(baseCuboidStep);
@@ -228,6 +229,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
             }
         } else {
             // create htable step
+            result.addTask(createSaveStatisticsStep(seg, getStatisticsPath(seg, jobId)));
+            result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
             result.addTask(createCreateHTableStep(seg));
             baseCuboidStep = createInMemCubingStep(seg, intermediateHiveTableLocation, intermediateHiveTableName, cuboidOutputTempPath, result.getId());
             result.addTask(baseCuboidStep);
@@ -516,6 +519,15 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         return result;
     }
 
+    private SaveStatisticsStep createSaveStatisticsStep(CubeSegment seg, String statisticsPath) {
+        SaveStatisticsStep result = new SaveStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setStatisticsPath(statisticsPath);
+        return result;
+    }
+
 
     private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
         MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/12bbce6f/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index f8863a5..da587db 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -97,14 +97,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment();
             attachKylinPropsAndMetadata(cubeInstance, job.getConfiguration());
 
-            int result = waitForCompletion(job);
-
-            if(result == 0 && Boolean.parseBoolean(statistics_enabled)) {
-                putStatisticsToResourceStore(statistics_output, newSegment);
-            }
-
-            return result;
-
+            return waitForCompletion(job);
 
         } catch (Exception e) {
             logger.error("error in FactDistinctColumnsJob", e);
@@ -115,8 +108,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
     }
 
     private void setupMapper(String intermediateTable) throws IOException {
-//        FileInputFormat.setInputPaths(job, input);
-
         String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
         HCatInputFormat.setInput(job, dbTableNames[0],
                 dbTableNames[1]);
@@ -142,22 +133,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
         deletePath(job.getConfiguration(), output);
     }
 
-    private void putStatisticsToResourceStore(String statisticsFolder, CubeSegment cubeSegment) throws IOException {
-        Path statisticsFilePath = new Path(statisticsFolder, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
-        FileSystem fs = FileSystem.get(HadoopUtil.getCurrentConfiguration());
-        if (!fs.exists(statisticsFilePath))
-            throw new IOException("File " + statisticsFilePath + " does not exists;");
-
-        FSDataInputStream is = fs.open(statisticsFilePath);
-        try {
-            // put the statistics to metadata store
-            String statisticsFileName = cubeSegment.getStatisticsResourcePath();
-            ResourceStore rs = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
-            rs.putResource(statisticsFileName, is, System.currentTimeMillis());
-        } finally {
-            IOUtils.closeStream(is);
-        }
-    }
 
     public static void main(String[] args) throws Exception {
         FactDistinctColumnsJob job = new FactDistinctColumnsJob();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/12bbce6f/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/SaveStatisticsStep.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/SaveStatisticsStep.java
new file mode 100644
index 0000000..00fde45
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/SaveStatisticsStep.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cubev2;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Save the cube segment statistic to Kylin metadata store
+ *
+ * @author shaoshi
+ */
+public class SaveStatisticsStep extends AbstractExecutable {
+
+    private static final String CUBE_NAME = "cubeName";
+    private static final String SEGMENT_ID = "segmentId";
+    private static final String STATISTICS_PATH = "statisticsPath";
+
+    public SaveStatisticsStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        KylinConfig kylinConf = context.getConfig();
+        final CubeManager mgr = CubeManager.getInstance(kylinConf);
+        final CubeInstance cube = mgr.getCube(getCubeName());
+        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+
+        ResourceStore rs = ResourceStore.getStore(kylinConf);
+        try {
+            Path statisticsFilePath = new Path(getStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+            FileSystem fs = FileSystem.get(HadoopUtil.getCurrentConfiguration());
+            if (!fs.exists(statisticsFilePath))
+                throw new IOException("File " + statisticsFilePath + " does not exists;");
+
+            FSDataInputStream is = fs.open(statisticsFilePath);
+            try {
+                // put the statistics to metadata store
+                String statisticsFileName = newSegment.getStatisticsResourcePath();
+                rs.putResource(statisticsFileName, is, System.currentTimeMillis());
+            } finally {
+                IOUtils.closeStream(is);
+            }
+
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (IOException e) {
+            logger.error("fail to save cuboid statistics", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+
+
+    public void setCubeName(String cubeName) {
+        this.setParam(CUBE_NAME, cubeName);
+    }
+
+    private String getCubeName() {
+        return getParam(CUBE_NAME);
+    }
+
+    public void setSegmentId(String segmentId) {
+        this.setParam(SEGMENT_ID, segmentId);
+    }
+
+    private String getSegmentId() {
+        return getParam(SEGMENT_ID);
+    }
+
+    public void setStatisticsPath(String path) {
+        this.setParam(STATISTICS_PATH, path);
+    }
+
+    private String getStatisticsPath() {
+        return getParam(STATISTICS_PATH);
+    }
+
+}