You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/01 10:44:02 UTC
[1/2] incubator-kylin git commit: KYLIN-859
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 098f5800d -> 1a4d99846
KYLIN-859
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1a4d9984
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1a4d9984
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1a4d9984
Branch: refs/heads/0.8
Commit: 1a4d99846d335048be31d3f19478981a9c61f7a6
Parents: 883accb
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jun 26 17:53:39 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Jul 1 16:43:50 2015 +0800
----------------------------------------------------------------------
bin/kylin.sh | 33 +++++
.../apache/kylin/common/util/MailService.java | 41 ++++--
.../kylin/common/util/MailServiceTest.java | 8 +-
.../apache/kylin/job/monitor/MonitorCLI.java | 64 ++++++++++
.../kylin/job/monitor/StreamingMonitor.java | 127 +++++++++++++++++++
.../kylin/job/streaming/CubeStreamConsumer.java | 3 +-
6 files changed, 259 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1a4d9984/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/bin/kylin.sh b/bin/kylin.sh
index 4633b9f..dcc6d2d 100644
--- a/bin/kylin.sh
+++ b/bin/kylin.sh
@@ -136,6 +136,39 @@ then
else
echo
fi
+elif [ $1 == "monitor" ]
+then
+ echo "monitor job"
+ tomcat_root=${dir}/../tomcat
+ export tomcat_root
+ useSandbox=`sh ${dir}/check-sandbox-properties.sh`
+ spring_profile="default"
+ if [ "$useSandbox" = "true" ]
+ then spring_profile="sandbox"
+ fi
+
+ #retrive $hive_dependency and $hbase_dependency
+ source ${dir}/find-hive-dependency.sh
+ source ${dir}/find-hbase-dependency.sh
+ #retrive $KYLIN_EXTRA_START_OPTS
+ if [ -f "${dir}/setenv.sh" ]
+ then source ${dir}/setenv.sh
+ fi
+
+ mkdir -p ${KYLIN_HOME}/ext
+ export HBASE_CLASSPATH_PREFIX=${tomcat_root}/bin/bootstrap.jar:${tomcat_root}/bin/tomcat-juli.jar:${tomcat_root}/lib/*:$HBASE_CLASSPATH_PREFIX
+ export HBASE_CLASSPATH=$hive_dependency:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/ext/*:${HBASE_CLASSPATH}
+
+ # KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
+ hbase ${KYLIN_EXTRA_START_OPTS} \
+ -Djava.util.logging.config.file=${tomcat_root}/conf/logging.properties \
+ -Djava.util.logging.manager=org.apache.juli.ClassLoaderLogManager \
+ -Dorg.apache.catalina.connector.CoyoteAdapter.ALLOW_BACKSLASH=true \
+ -Dkylin.hive.dependency=${hive_dependency} \
+ -Dkylin.hbase.dependency=${hbase_dependency} \
+ -Dspring.profiles.active=${spring_profile} \
+ org.apache.kylin.job.monitor.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1
+ exit 0
else
echo "usage: kylin.sh start or kylin.sh stop"
exit 1
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1a4d9984/common/src/main/java/org/apache/kylin/common/util/MailService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MailService.java b/common/src/main/java/org/apache/kylin/common/util/MailService.java
index fae3c80..7285520 100644
--- a/common/src/main/java/org/apache/kylin/common/util/MailService.java
+++ b/common/src/main/java/org/apache/kylin/common/util/MailService.java
@@ -47,15 +47,25 @@ public class MailService {
}
public MailService(KylinConfig config) {
- enabled = "true".equalsIgnoreCase(config.getProperty(KylinConfig.MAIL_ENABLED, "false"));
- host = config.getProperty(KylinConfig.MAIL_HOST, "");
- username = config.getProperty(KylinConfig.MAIL_USERNAME, "");
- password = config.getProperty(KylinConfig.MAIL_PASSWORD, "");
- sender = config.getProperty(KylinConfig.MAIL_SENDER, "");
+ this("true".equalsIgnoreCase(config.getProperty(KylinConfig.MAIL_ENABLED, "false")),
+ config.getProperty(KylinConfig.MAIL_HOST, ""),
+ config.getProperty(KylinConfig.MAIL_USERNAME, ""),
+ config.getProperty(KylinConfig.MAIL_PASSWORD, ""),
+ config.getProperty(KylinConfig.MAIL_SENDER, "")
+ );
+ }
+
+ public MailService(boolean enabled, String host, String username, String password, String sender) {
+ this.enabled = enabled;
+ this.host = host;
+ this.username = username;
+ this.password = password;
+ this.sender = sender;
if (enabled) {
- if (host.isEmpty())
+ if (host.isEmpty()) {
throw new RuntimeException("mail service host is empty");
+ }
}
}
@@ -66,7 +76,18 @@ public class MailService {
* @return true or false indicating whether the email was delivered successfully
* @throws IOException
*/
- public boolean sendMail(List<String> receivers, String subject, String content) throws IOException {
+ public boolean sendMail(List<String> receivers, String subject, String content) {
+ return sendMail(receivers, subject, content, true);
+ }
+
+ /**
+ * @param receivers
+ * @param subject
+ * @param content
+ * @return true or false indicating whether the email was delivered successfully
+ * @throws IOException
+ */
+ public boolean sendMail(List<String> receivers, String subject, String content, boolean isHtmlMsg) {
if (!enabled) {
logger.info("Email service is disabled; this mail will not be delivered: " + subject);
@@ -89,7 +110,11 @@ public class MailService {
email.setFrom(sender);
email.setSubject(subject);
email.setCharset("UTF-8");
- ((HtmlEmail) email).setHtmlMsg(content);
+ if (isHtmlMsg) {
+ ((HtmlEmail) email).setHtmlMsg(content);
+ } else {
+ ((HtmlEmail) email).setTextMsg(content);
+ }
email.send();
email.getMailSession();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1a4d9984/common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java b/common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
index d5ace18..bc4d7cf 100644
--- a/common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
@@ -63,12 +63,6 @@ public class MailServiceTest extends LocalFileMetadataTestCase {
List<String> receivers = new ArrayList<String>(1);
receivers.add("foobar@foobar.com");
- try {
- return mailservice.sendMail(receivers, "A test email from Kylin", "Hello!");
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- return false;
+ return mailservice.sendMail(receivers, "A test email from Kylin", "Hello!");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1a4d9984/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java b/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
new file mode 100644
index 0000000..100bdd4
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
@@ -0,0 +1,64 @@
+package org.apache.kylin.job.monitor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ */
+public class MonitorCLI {
+
+ private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
+
+ public static void main(String[] args) {
+ Preconditions.checkArgument(args[0].equals("monitor"));
+
+ int i = 1;
+ List<String> receivers = null;
+ String host = null;
+ String tableName = null;
+ String authorization = null;
+ String cubeName = null;
+ while (i < args.length) {
+ String argName = args[i];
+ switch (argName) {
+ case "-receivers":
+ receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
+ break;
+ case "-host":
+ host = args[++i];
+ break;
+ case "-tableName":
+ tableName = args[++i];
+ break;
+ case "-authorization":
+ authorization = args[++i];
+ break;
+ case "-cubeName":
+ cubeName = args[++i];
+ break;
+ default:
+ throw new RuntimeException("invalid argName:" + argName);
+ }
+ i++;
+ }
+ Preconditions.checkArgument(receivers != null && receivers.size() > 0);
+ final StreamingMonitor streamingMonitor = new StreamingMonitor();
+ if (tableName != null) {
+ logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
+ Preconditions.checkNotNull(host);
+ Preconditions.checkNotNull(authorization);
+ Preconditions.checkNotNull(tableName);
+ streamingMonitor.checkCountAll(receivers, host, authorization, tableName);
+ }
+ if (cubeName != null) {
+ logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
+ streamingMonitor.checkCube(receivers, cubeName);
+ }
+ System.exit(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1a4d9984/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java b/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
new file mode 100644
index 0000000..77c9b34
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
@@ -0,0 +1,127 @@
+package org.apache.kylin.job.monitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.MailService;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ */
+public class StreamingMonitor {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
+
+ public void checkCountAll(List<String> receivers, String host, String authorization, String tableName) {
+ String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
+ StringBuilder stringBuilder = new StringBuilder();
+ String url = host + "/kylin/api/query";
+ PostMethod request = new PostMethod(url);
+ try {
+
+ request.addRequestHeader("Authorization", "Basic " + authorization);
+ request.addRequestHeader("Content-Type", "application/json");
+ String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"default\"}", tableName);
+ request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
+
+ int statusCode = new HttpClient().executeMethod(request);
+ String msg = Bytes.toString(request.getResponseBody());
+ stringBuilder.append("host:").append(host).append("\n");
+ stringBuilder.append("query:").append(query).append("\n");
+ stringBuilder.append("statusCode:").append(statusCode).append("\n");
+ if (statusCode == 200) {
+ title += "succeed";
+ final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
+ stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
+ stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
+ } else {
+ title += "failed";
+ stringBuilder.append("response:").append(msg).append("\n");
+ }
+ } catch (Exception e) {
+ final StringWriter out = new StringWriter();
+ e.printStackTrace(new PrintWriter(out));
+ title += "failed";
+ stringBuilder.append(out.toString());
+ } finally {
+ request.releaseConnection();
+ }
+ logger.info("title:" + title);
+ logger.info("content:" + stringBuilder.toString());
+ sendMail(receivers, title, stringBuilder.toString());
+ }
+
+ public void checkCube(List<String> receivers, String cubeName) {
+ final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+ if (cube == null) {
+ logger.info("cube:" + cubeName + " does not exist");
+ return;
+ }
+ final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
+ logger.info("totally " + segments.size() + " cubeSegments");
+ Collections.sort(segments);
+ List<Pair<Long, Long>> gaps = Lists.newArrayList();
+ List<Pair<String, String>> overlaps = Lists.newArrayList();
+ for (int i = 0; i < segments.size() - 1; ++i) {
+ CubeSegment first = segments.get(i);
+ CubeSegment second = segments.get(i + 1);
+ if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+ continue;
+ } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
+ gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
+ } else {
+ overlaps.add(Pair.newPair(first.getName(), second.getName()));
+ }
+ }
+ StringBuilder content = new StringBuilder();
+ if (!gaps.isEmpty()) {
+ content.append("all gaps:").append("\n").append(
+ StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
+ @Nullable
+ @Override
+ public String apply(Pair<Long, Long> input) {
+ return parseInterval(input);
+ }
+ }), "\n")).append("\n");
+ }
+ if (!overlaps.isEmpty()) {
+ content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
+ }
+ if (content.length() > 0) {
+ logger.info(content.toString());
+ sendMail(receivers, String.format("%s has gaps or overlaps", cubeName), content.toString());
+ } else {
+ logger.info("no gaps or overlaps");
+ }
+ }
+
+ private String parseInterval(Pair<Long, Long> interval) {
+ return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
+ }
+
+ private void sendMail(List<String> receivers, String title, String content) {
+ final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
+ mailService.sendMail(receivers, title, content, false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1a4d9984/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 0508a4e..2bbbe19 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -108,10 +108,9 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(cubeInstance, parsedStreamMessages);
writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
+ InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
final HTableInterface hTable = createHTable(cubeSegment);
-
final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
- InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
executorService.submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, gtRecordWriter)).get();
gtRecordWriter.flush();
[2/2] incubator-kylin git commit: KYLIN-861
Posted by qh...@apache.org.
KYLIN-861
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/883accba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/883accba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/883accba
Branch: refs/heads/0.8
Commit: 883accbadee11a3aff81ec53f42335e5e0b5f0a7
Parents: 098f580
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jun 25 15:19:44 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Jul 1 16:43:50 2015 +0800
----------------------------------------------------------------------
bin/cleanup_streaming_files.sh | 24 ++++++++++++++++++++++++
1 file changed, 24 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/883accba/bin/cleanup_streaming_files.sh
----------------------------------------------------------------------
diff --git a/bin/cleanup_streaming_files.sh b/bin/cleanup_streaming_files.sh
new file mode 100644
index 0000000..6b759ca
--- /dev/null
+++ b/bin/cleanup_streaming_files.sh
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+
+if [ $# != 1 ]
+then
+ echo 'invalid input' || exit -1
+fi
+
+cd $KYLIN_HOME
+
+for pidfile in `find . -name "$1_1*"`
+do
+ pidfile=`echo "$pidfile" | cut -c 3-`
+ echo "pidfile:$pidfile"
+ pid=`cat $pidfile`
+ if [ `ps -ef | awk '{print $2}' | grep -w $pid | wc -l` = 1 ]
+ then
+ echo "pid:$pid still running"
+ else
+ echo "pid:$pid not running, try to delete files"
+ echo $pidfile | xargs rm
+ echo "logs/streaming_$pidfile.log" | xargs rm
+ fi
+done
+