You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/03/12 07:17:35 UTC

[1/2] kylin git commit: KYLIN-2503 KYLIN-2502 Hive/spark steps should show YARN app link

Repository: kylin
Updated Branches:
  refs/heads/master f0eca2414 -> f36b1341b


KYLIN-2503 KYLIN-2502 Hive/spark steps should show YARN app link


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/47b7c800
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/47b7c800
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/47b7c800

Branch: refs/heads/master
Commit: 47b7c800ecdfe14d10426d64eae6b47e11f2a328
Parents: f0eca24
Author: shaofengshi <sh...@apache.org>
Authored: Sun Mar 12 14:13:04 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Mar 12 14:13:04 2017 +0800

----------------------------------------------------------------------
 .../kylin/common/util/BufferedLogger.java       |   6 +-
 .../kylin/job/common/PatternedLogger.java       | 130 +++++++++++++++++++
 .../kylin/job/common/ShellExecutable.java       | 106 +--------------
 .../kylin/engine/mr/common/CubeStatsReader.java |   2 +-
 .../kylin/engine/spark/SparkExecutable.java     |  19 +--
 .../source/hive/CreateFlatHiveTableStep.java    |   5 +-
 .../apache/kylin/source/hive/HiveMRInput.java   |   6 +-
 7 files changed, 153 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java b/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java
index cef598d..02a1fa3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java
@@ -27,6 +27,8 @@ public class BufferedLogger implements Logger {
     private final org.slf4j.Logger wrappedLogger;
     private final StringBuilder buffer = new StringBuilder();
 
+    private static int MAX_BUFFER_SIZE = 10 * 1024 * 1024;
+
     public BufferedLogger(org.slf4j.Logger wrappedLogger) {
         this.wrappedLogger = wrappedLogger;
     }
@@ -34,7 +36,9 @@ public class BufferedLogger implements Logger {
     @Override
     public void log(String message) {
         wrappedLogger.info(message);
-        buffer.append(message).append("\n");
+        if (buffer.length() < MAX_BUFFER_SIZE) {
+            buffer.append(message).append("\n");
+        }
     }
 
     public String getBufferedLog() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
new file mode 100644
index 0000000..8399b44
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.common;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.kylin.common.util.BufferedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A logger which parses certain patterns from log
+ */
+public class PatternedLogger extends BufferedLogger {
+    private final Map<String, String> info = Maps.newHashMap();
+
+    private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
+    private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
+    private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
+    private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
+    private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
+    private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
+
+    // hive
+    private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
+    private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
+
+    // spark
+    private static final Pattern PATTERN_SPARK_APP_ID = Pattern.compile("Submitted application (.*?)");
+    private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
+
+
+    public PatternedLogger(Logger wrappedLogger) {
+        super(wrappedLogger);
+    }
+
+    @Override
+    public void log(String message) {
+        super.log(message);
+        Matcher matcher = PATTERN_APP_ID.matcher(message);
+        if (matcher.find()) {
+            String appId = matcher.group(1);
+            info.put(ExecutableConstants.YARN_APP_ID, appId);
+        }
+
+        matcher = PATTERN_APP_URL.matcher(message);
+        if (matcher.find()) {
+            String appTrackingUrl = matcher.group(1);
+            info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
+        }
+
+        matcher = PATTERN_JOB_ID.matcher(message);
+        if (matcher.find()) {
+            String mrJobID = matcher.group(1);
+            info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
+        }
+
+        matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
+        if (matcher.find()) {
+            String hdfsWritten = matcher.group(1);
+            info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
+        }
+
+        matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
+        if (matcher.find()) {
+            String sourceCount = matcher.group(1);
+            info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
+        }
+
+        matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
+        if (matcher.find()) {
+            String sourceSize = matcher.group(1);
+            info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
+        }
+
+        // hive
+        matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
+        if (matcher.find()) {
+            String jobId = matcher.group(1);
+            String trackingUrl = matcher.group(2);
+            info.put(ExecutableConstants.MR_JOB_ID, jobId);
+            info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
+        }
+
+        matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
+        if (matcher.find()) {
+            // String hdfsRead = matcher.group(1);
+            String hdfsWritten = matcher.group(2);
+            info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
+        }
+
+        // spark
+        matcher = PATTERN_SPARK_APP_ID.matcher(message);
+        if (matcher.find()) {
+            String app_id = matcher.group(1);
+            info.put(ExecutableConstants.YARN_APP_ID, app_id);
+        }
+
+        matcher = PATTERN_SPARK_APP_URL.matcher(message);
+        if (matcher.find()) {
+            String trackingUrl = matcher.group(1);
+            info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
+        }
+    }
+
+    public Map<String, String> getInfo() {
+        return info;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
index 5e0d0db..9f431b0 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
@@ -19,21 +19,13 @@
 package org.apache.kylin.job.common;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.kylin.common.util.Logger;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-
 /**
  */
 public class ShellExecutable extends AbstractExecutable {
@@ -50,9 +42,9 @@ public class ShellExecutable extends AbstractExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         try {
             logger.info("executing:" + getCmd());
-            final ShellExecutableLogger logger = new ShellExecutableLogger();
-            final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger);
-            getManager().addJobInfo(getId(), logger.getInfo());
+            final PatternedLogger patternedLogger = new PatternedLogger(logger);
+            final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), patternedLogger);
+            getManager().addJobInfo(getId(), patternedLogger.getInfo());
             return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond());
         } catch (IOException e) {
             logger.error("job:" + getId() + " execute finished with exception", e);
@@ -68,96 +60,4 @@ public class ShellExecutable extends AbstractExecutable {
         return getParam(CMD);
     }
 
-    private static class ShellExecutableLogger implements Logger {
-
-        private final Map<String, String> info = Maps.newHashMap();
-
-        private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
-        private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
-        private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
-        private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
-        private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
-        private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
-
-        // hive
-        private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
-        private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
-
-        // spark
-        private static final Pattern PATTERN_SPARK_APP_ID = Pattern.compile("Submitted application (.*?)");
-        private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
-
-        @Override
-        public void log(String message) {
-            Matcher matcher = PATTERN_APP_ID.matcher(message);
-            if (matcher.find()) {
-                String appId = matcher.group(1);
-                info.put(ExecutableConstants.YARN_APP_ID, appId);
-            }
-
-            matcher = PATTERN_APP_URL.matcher(message);
-            if (matcher.find()) {
-                String appTrackingUrl = matcher.group(1);
-                info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
-            }
-
-            matcher = PATTERN_JOB_ID.matcher(message);
-            if (matcher.find()) {
-                String mrJobID = matcher.group(1);
-                info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
-            }
-
-            matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
-            if (matcher.find()) {
-                String hdfsWritten = matcher.group(1);
-                info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
-            }
-
-            matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
-            if (matcher.find()) {
-                String sourceCount = matcher.group(1);
-                info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
-            }
-
-            matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
-            if (matcher.find()) {
-                String sourceSize = matcher.group(1);
-                info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
-            }
-
-            // hive
-            matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
-            if (matcher.find()) {
-                String jobId = matcher.group(1);
-                String trackingUrl = matcher.group(2);
-                info.put(ExecutableConstants.MR_JOB_ID, jobId);
-                info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
-            }
-
-            matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
-            if (matcher.find()) {
-                // String hdfsRead = matcher.group(1);
-                String hdfsWritten = matcher.group(2);
-                info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
-            }
-
-            // spark
-            matcher = PATTERN_SPARK_APP_ID.matcher(message);
-            if (matcher.find()) {
-                String app_id = matcher.group(1);
-                info.put(ExecutableConstants.YARN_APP_ID, app_id);
-            }
-
-            matcher = PATTERN_SPARK_APP_URL.matcher(message);
-            if (matcher.find()) {
-                String trackingUrl = matcher.group(1);
-                info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
-            }
-        }
-
-        Map<String, String> getInfo() {
-            return info;
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 9cb60f8..b54f401 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -224,7 +224,7 @@ public class CubeStatsReader {
         double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio();
         double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio();
         double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio) / (1024L * 1024L);
-        logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + (normalSpace + countDistinctSpace) + " bytes." + " Total size is " + ret + "M.");
+        logger.debug("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + (normalSpace + countDistinctSpace) + " bytes." + " Total size is " + ret + "M.");
         return ret;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 5ad21cf..1ed2235 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.Logger;
+import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
@@ -118,22 +118,17 @@ public class SparkExecutable extends AbstractExecutable {
         stringBuilder.append("--files %s --jars %s %s %s");
         try {
             String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs());
-            logger.info("cmd:" + cmd);
-            final StringBuilder output = new StringBuilder();
+            logger.info("cmd: " + cmd);
             CliCommandExecutor exec = new CliCommandExecutor();
-            exec.execute(cmd, new Logger() {
-                @Override
-                public void log(String message) {
-                    output.append(message);
-                    output.append("\n");
-                    logger.info(message);
-                }
-            });
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+            PatternedLogger patternedLogger = new PatternedLogger(logger);
+            exec.execute(cmd, patternedLogger);
+            getManager().addJobInfo(getId(), patternedLogger.getInfo());
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
         } catch (Exception e) {
             logger.error("error run spark job:", e);
             return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
         }
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index b197f0e..8b241d2 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -20,12 +20,12 @@ package org.apache.kylin.source.hive;
 import java.io.IOException;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BufferedLogger;
 import org.apache.kylin.common.util.HiveCmdBuilder;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
 public class CreateFlatHiveTableStep extends AbstractExecutable {
 
     private static final Logger logger = LoggerFactory.getLogger(CreateFlatHiveTableStep.class);
-    private final BufferedLogger stepLogger = new BufferedLogger(logger);
+    private final PatternedLogger stepLogger = new PatternedLogger(logger);
 
     protected void createFlatHiveTable(KylinConfig config) throws IOException {
         final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
@@ -50,6 +50,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
         stepLogger.log(cmd);
 
         Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
+        getManager().addJobInfo(getId(), stepLogger.getInfo());
         if (response.getFirst() != 0) {
             throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst());
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 39a0273..bbf3c60 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BufferedLogger;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.HiveCmdBuilder;
 import org.apache.kylin.common.util.Pair;
@@ -40,6 +39,7 @@ import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -246,7 +246,7 @@ public class HiveMRInput implements IMRInput {
     }
 
     public static class RedistributeFlatHiveTableStep extends AbstractExecutable {
-        private final BufferedLogger stepLogger = new BufferedLogger(logger);
+        private final PatternedLogger stepLogger = new PatternedLogger(logger);
 
         private long computeRowCount(String database, String table) throws Exception {
             IHiveClient hiveClient = HiveClientFactory.getHiveClient();
@@ -265,6 +265,8 @@ public class HiveMRInput implements IMRInput {
             stepLogger.log(cmd);
 
             Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
+            getManager().addJobInfo(getId(), stepLogger.getInfo());
+
             if (response.getFirst() != 0) {
                 throw new RuntimeException("Failed to redistribute flat hive table");
             }


[2/2] kylin git commit: stabilize BuildCubeWithStream

Posted by sh...@apache.org.
stabilize BuildCubeWithStream


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f36b1341
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f36b1341
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f36b1341

Branch: refs/heads/master
Commit: f36b1341b7905e68b99afc77c8009ba156c0bca4
Parents: 47b7c80
Author: shaofengshi <sh...@apache.org>
Authored: Sun Mar 12 15:15:13 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Mar 12 15:15:13 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/provision/BuildCubeWithStream.java | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f36b1341/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index c5341d6..a527839 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -183,7 +183,7 @@ public class BuildCubeWithStream {
                     try {
                         generateStreamData(dateStart, dateEnd, rand.nextInt(100));
                         dateStart = dateEnd;
-                        sleep(rand.nextInt(rand.nextInt(50)) * 1000); // wait random time
+                        sleep(rand.nextInt(rand.nextInt(30)) * 1000); // wait random time
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
@@ -206,6 +206,8 @@ public class BuildCubeWithStream {
                 if (generateDataDone == false) {
                     throw new IllegalStateException("Timeout when wait all messages be sent to Kafka"); // ensure all messages have been flushed.
                 }
+            } else {
+                Thread.sleep(30 * 1000); // wait for new messages
             }
 
             FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() {
@@ -290,8 +292,8 @@ public class BuildCubeWithStream {
 
     protected void deployEnv() throws IOException {
         DeployUtil.overrideJobJarLocations();
-        //        DeployUtil.initCliWorkDir();
-        //        DeployUtil.deployMetadata();
+        //                DeployUtil.initCliWorkDir();
+        //                DeployUtil.deployMetadata();
     }
 
     public static void beforeClass() throws Exception {