You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/03/12 07:17:35 UTC
[1/2] kylin git commit: KYLIN-2503 KYLIN-2502 Hive/spark steps should
show YARN app link
Repository: kylin
Updated Branches:
refs/heads/master f0eca2414 -> f36b1341b
KYLIN-2503 KYLIN-2502 Hive/spark steps should show YARN app link
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/47b7c800
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/47b7c800
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/47b7c800
Branch: refs/heads/master
Commit: 47b7c800ecdfe14d10426d64eae6b47e11f2a328
Parents: f0eca24
Author: shaofengshi <sh...@apache.org>
Authored: Sun Mar 12 14:13:04 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Mar 12 14:13:04 2017 +0800
----------------------------------------------------------------------
.../kylin/common/util/BufferedLogger.java | 6 +-
.../kylin/job/common/PatternedLogger.java | 130 +++++++++++++++++++
.../kylin/job/common/ShellExecutable.java | 106 +--------------
.../kylin/engine/mr/common/CubeStatsReader.java | 2 +-
.../kylin/engine/spark/SparkExecutable.java | 19 +--
.../source/hive/CreateFlatHiveTableStep.java | 5 +-
.../apache/kylin/source/hive/HiveMRInput.java | 6 +-
7 files changed, 153 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java b/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java
index cef598d..02a1fa3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java
@@ -27,6 +27,8 @@ public class BufferedLogger implements Logger {
private final org.slf4j.Logger wrappedLogger;
private final StringBuilder buffer = new StringBuilder();
+ private static int MAX_BUFFER_SIZE = 10 * 1024 * 1024;
+
public BufferedLogger(org.slf4j.Logger wrappedLogger) {
this.wrappedLogger = wrappedLogger;
}
@@ -34,7 +36,9 @@ public class BufferedLogger implements Logger {
@Override
public void log(String message) {
wrappedLogger.info(message);
- buffer.append(message).append("\n");
+ if (buffer.length() < MAX_BUFFER_SIZE) {
+ buffer.append(message).append("\n");
+ }
}
public String getBufferedLog() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
new file mode 100644
index 0000000..8399b44
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.common;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.kylin.common.util.BufferedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A logger which parses certain patterns from log
+ */
+public class PatternedLogger extends BufferedLogger {
+ private final Map<String, String> info = Maps.newHashMap();
+
+ private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
+ private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
+ private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
+ private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
+ private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
+ private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
+
+ // hive
+ private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
+ private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
+
+ // spark
+ private static final Pattern PATTERN_SPARK_APP_ID = Pattern.compile("Submitted application (.*?)");
+ private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
+
+
+ public PatternedLogger(Logger wrappedLogger) {
+ super(wrappedLogger);
+ }
+
+ @Override
+ public void log(String message) {
+ super.log(message);
+ Matcher matcher = PATTERN_APP_ID.matcher(message);
+ if (matcher.find()) {
+ String appId = matcher.group(1);
+ info.put(ExecutableConstants.YARN_APP_ID, appId);
+ }
+
+ matcher = PATTERN_APP_URL.matcher(message);
+ if (matcher.find()) {
+ String appTrackingUrl = matcher.group(1);
+ info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
+ }
+
+ matcher = PATTERN_JOB_ID.matcher(message);
+ if (matcher.find()) {
+ String mrJobID = matcher.group(1);
+ info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
+ }
+
+ matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
+ if (matcher.find()) {
+ String hdfsWritten = matcher.group(1);
+ info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
+ }
+
+ matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
+ if (matcher.find()) {
+ String sourceCount = matcher.group(1);
+ info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
+ }
+
+ matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
+ if (matcher.find()) {
+ String sourceSize = matcher.group(1);
+ info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
+ }
+
+ // hive
+ matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
+ if (matcher.find()) {
+ String jobId = matcher.group(1);
+ String trackingUrl = matcher.group(2);
+ info.put(ExecutableConstants.MR_JOB_ID, jobId);
+ info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
+ }
+
+ matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
+ if (matcher.find()) {
+ // String hdfsRead = matcher.group(1);
+ String hdfsWritten = matcher.group(2);
+ info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
+ }
+
+ // spark
+ matcher = PATTERN_SPARK_APP_ID.matcher(message);
+ if (matcher.find()) {
+ String app_id = matcher.group(1);
+ info.put(ExecutableConstants.YARN_APP_ID, app_id);
+ }
+
+ matcher = PATTERN_SPARK_APP_URL.matcher(message);
+ if (matcher.find()) {
+ String trackingUrl = matcher.group(1);
+ info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
+ }
+ }
+
+ public Map<String, String> getInfo() {
+ return info;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
index 5e0d0db..9f431b0 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
@@ -19,21 +19,13 @@
package org.apache.kylin.job.common;
import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.kylin.common.util.Logger;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
-
/**
*/
public class ShellExecutable extends AbstractExecutable {
@@ -50,9 +42,9 @@ public class ShellExecutable extends AbstractExecutable {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
try {
logger.info("executing:" + getCmd());
- final ShellExecutableLogger logger = new ShellExecutableLogger();
- final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger);
- getManager().addJobInfo(getId(), logger.getInfo());
+ final PatternedLogger patternedLogger = new PatternedLogger(logger);
+ final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), patternedLogger);
+ getManager().addJobInfo(getId(), patternedLogger.getInfo());
return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond());
} catch (IOException e) {
logger.error("job:" + getId() + " execute finished with exception", e);
@@ -68,96 +60,4 @@ public class ShellExecutable extends AbstractExecutable {
return getParam(CMD);
}
- private static class ShellExecutableLogger implements Logger {
-
- private final Map<String, String> info = Maps.newHashMap();
-
- private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
- private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
- private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
- private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
- private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
- private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
-
- // hive
- private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
- private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
-
- // spark
- private static final Pattern PATTERN_SPARK_APP_ID = Pattern.compile("Submitted application (.*?)");
- private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
-
- @Override
- public void log(String message) {
- Matcher matcher = PATTERN_APP_ID.matcher(message);
- if (matcher.find()) {
- String appId = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_ID, appId);
- }
-
- matcher = PATTERN_APP_URL.matcher(message);
- if (matcher.find()) {
- String appTrackingUrl = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
- }
-
- matcher = PATTERN_JOB_ID.matcher(message);
- if (matcher.find()) {
- String mrJobID = matcher.group(1);
- info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
- }
-
- matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
- if (matcher.find()) {
- String hdfsWritten = matcher.group(1);
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
- }
-
- matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
- if (matcher.find()) {
- String sourceCount = matcher.group(1);
- info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
- }
-
- matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
- if (matcher.find()) {
- String sourceSize = matcher.group(1);
- info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
- }
-
- // hive
- matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
- if (matcher.find()) {
- String jobId = matcher.group(1);
- String trackingUrl = matcher.group(2);
- info.put(ExecutableConstants.MR_JOB_ID, jobId);
- info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
- }
-
- matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
- if (matcher.find()) {
- // String hdfsRead = matcher.group(1);
- String hdfsWritten = matcher.group(2);
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
- }
-
- // spark
- matcher = PATTERN_SPARK_APP_ID.matcher(message);
- if (matcher.find()) {
- String app_id = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_ID, app_id);
- }
-
- matcher = PATTERN_SPARK_APP_URL.matcher(message);
- if (matcher.find()) {
- String trackingUrl = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
- }
- }
-
- Map<String, String> getInfo() {
- return info;
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 9cb60f8..b54f401 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -224,7 +224,7 @@ public class CubeStatsReader {
double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio();
double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio();
double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio) / (1024L * 1024L);
- logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + (normalSpace + countDistinctSpace) + " bytes." + " Total size is " + ret + "M.");
+ logger.debug("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + (normalSpace + countDistinctSpace) + " bytes." + " Total size is " + ret + "M.");
return ret;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 5ad21cf..1ed2235 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.Logger;
+import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
@@ -118,22 +118,17 @@ public class SparkExecutable extends AbstractExecutable {
stringBuilder.append("--files %s --jars %s %s %s");
try {
String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs());
- logger.info("cmd:" + cmd);
- final StringBuilder output = new StringBuilder();
+ logger.info("cmd: " + cmd);
CliCommandExecutor exec = new CliCommandExecutor();
- exec.execute(cmd, new Logger() {
- @Override
- public void log(String message) {
- output.append(message);
- output.append("\n");
- logger.info(message);
- }
- });
- return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ PatternedLogger patternedLogger = new PatternedLogger(logger);
+ exec.execute(cmd, patternedLogger);
+ getManager().addJobInfo(getId(), patternedLogger.getInfo());
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
} catch (Exception e) {
logger.error("error run spark job:", e);
return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
}
}
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index b197f0e..8b241d2 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -20,12 +20,12 @@ package org.apache.kylin.source.hive;
import java.io.IOException;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BufferedLogger;
import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
public class CreateFlatHiveTableStep extends AbstractExecutable {
private static final Logger logger = LoggerFactory.getLogger(CreateFlatHiveTableStep.class);
- private final BufferedLogger stepLogger = new BufferedLogger(logger);
+ private final PatternedLogger stepLogger = new PatternedLogger(logger);
protected void createFlatHiveTable(KylinConfig config) throws IOException {
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
@@ -50,6 +50,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
stepLogger.log(cmd);
Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
+ getManager().addJobInfo(getId(), stepLogger.getInfo());
if (response.getFirst() != 0) {
throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 39a0273..bbf3c60 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BufferedLogger;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.common.util.Pair;
@@ -40,6 +39,7 @@ import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -246,7 +246,7 @@ public class HiveMRInput implements IMRInput {
}
public static class RedistributeFlatHiveTableStep extends AbstractExecutable {
- private final BufferedLogger stepLogger = new BufferedLogger(logger);
+ private final PatternedLogger stepLogger = new PatternedLogger(logger);
private long computeRowCount(String database, String table) throws Exception {
IHiveClient hiveClient = HiveClientFactory.getHiveClient();
@@ -265,6 +265,8 @@ public class HiveMRInput implements IMRInput {
stepLogger.log(cmd);
Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
+ getManager().addJobInfo(getId(), stepLogger.getInfo());
+
if (response.getFirst() != 0) {
throw new RuntimeException("Failed to redistribute flat hive table");
}
[2/2] kylin git commit: stabilize BuildCubeWithStream
Posted by sh...@apache.org.
stabilize BuildCubeWithStream
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f36b1341
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f36b1341
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f36b1341
Branch: refs/heads/master
Commit: f36b1341b7905e68b99afc77c8009ba156c0bca4
Parents: 47b7c80
Author: shaofengshi <sh...@apache.org>
Authored: Sun Mar 12 15:15:13 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Mar 12 15:15:13 2017 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/provision/BuildCubeWithStream.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f36b1341/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index c5341d6..a527839 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -183,7 +183,7 @@ public class BuildCubeWithStream {
try {
generateStreamData(dateStart, dateEnd, rand.nextInt(100));
dateStart = dateEnd;
- sleep(rand.nextInt(rand.nextInt(50)) * 1000); // wait random time
+ sleep(rand.nextInt(rand.nextInt(30)) * 1000); // wait random time
} catch (Exception e) {
e.printStackTrace();
}
@@ -206,6 +206,8 @@ public class BuildCubeWithStream {
if (generateDataDone == false) {
throw new IllegalStateException("Timeout when wait all messages be sent to Kafka"); // ensure all messages have been flushed.
}
+ } else {
+ Thread.sleep(30 * 1000); // wait for new messages
}
FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() {
@@ -290,8 +292,8 @@ public class BuildCubeWithStream {
protected void deployEnv() throws IOException {
DeployUtil.overrideJobJarLocations();
- // DeployUtil.initCliWorkDir();
- // DeployUtil.deployMetadata();
+ // DeployUtil.initCliWorkDir();
+ // DeployUtil.deployMetadata();
}
public static void beforeClass() throws Exception {