You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/01 10:44:02 UTC

[1/2] incubator-kylin git commit: KYLIN-859

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 098f5800d -> 1a4d99846


KYLIN-859


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1a4d9984
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1a4d9984
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1a4d9984

Branch: refs/heads/0.8
Commit: 1a4d99846d335048be31d3f19478981a9c61f7a6
Parents: 883accb
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jun 26 17:53:39 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Jul 1 16:43:50 2015 +0800

----------------------------------------------------------------------
 bin/kylin.sh                                    |  33 +++++
 .../apache/kylin/common/util/MailService.java   |  41 ++++--
 .../kylin/common/util/MailServiceTest.java      |   8 +-
 .../apache/kylin/job/monitor/MonitorCLI.java    |  64 ++++++++++
 .../kylin/job/monitor/StreamingMonitor.java     | 127 +++++++++++++++++++
 .../kylin/job/streaming/CubeStreamConsumer.java |   3 +-
 6 files changed, 259 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1a4d9984/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/bin/kylin.sh b/bin/kylin.sh
index 4633b9f..dcc6d2d 100644
--- a/bin/kylin.sh
+++ b/bin/kylin.sh
@@ -136,6 +136,39 @@ then
     else
         echo
     fi
+elif [ $1 == "monitor" ]
+then
+    echo "monitor job"
+    tomcat_root=${dir}/../tomcat
+    export tomcat_root
+    useSandbox=`sh ${dir}/check-sandbox-properties.sh`
+    spring_profile="default"
+    if [ "$useSandbox" = "true" ]
+        then spring_profile="sandbox"
+    fi
+
+    #retrive $hive_dependency and $hbase_dependency
+    source ${dir}/find-hive-dependency.sh
+    source ${dir}/find-hbase-dependency.sh
+    #retrive $KYLIN_EXTRA_START_OPTS
+    if [ -f "${dir}/setenv.sh" ]
+        then source ${dir}/setenv.sh
+    fi
+
+    mkdir -p ${KYLIN_HOME}/ext
+    export HBASE_CLASSPATH_PREFIX=${tomcat_root}/bin/bootstrap.jar:${tomcat_root}/bin/tomcat-juli.jar:${tomcat_root}/lib/*:$HBASE_CLASSPATH_PREFIX
+    export HBASE_CLASSPATH=$hive_dependency:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/ext/*:${HBASE_CLASSPATH}
+
+    # KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
+    hbase ${KYLIN_EXTRA_START_OPTS} \
+    -Djava.util.logging.config.file=${tomcat_root}/conf/logging.properties \
+    -Djava.util.logging.manager=org.apache.juli.ClassLoaderLogManager \
+    -Dorg.apache.catalina.connector.CoyoteAdapter.ALLOW_BACKSLASH=true \
+    -Dkylin.hive.dependency=${hive_dependency} \
+    -Dkylin.hbase.dependency=${hbase_dependency} \
+    -Dspring.profiles.active=${spring_profile} \
+    org.apache.kylin.job.monitor.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1
+    exit 0
 else
     echo "usage: kylin.sh start or kylin.sh stop"
     exit 1

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1a4d9984/common/src/main/java/org/apache/kylin/common/util/MailService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MailService.java b/common/src/main/java/org/apache/kylin/common/util/MailService.java
index fae3c80..7285520 100644
--- a/common/src/main/java/org/apache/kylin/common/util/MailService.java
+++ b/common/src/main/java/org/apache/kylin/common/util/MailService.java
@@ -47,15 +47,25 @@ public class MailService {
     }
 
     public MailService(KylinConfig config) {
-        enabled = "true".equalsIgnoreCase(config.getProperty(KylinConfig.MAIL_ENABLED, "false"));
-        host = config.getProperty(KylinConfig.MAIL_HOST, "");
-        username = config.getProperty(KylinConfig.MAIL_USERNAME, "");
-        password = config.getProperty(KylinConfig.MAIL_PASSWORD, "");
-        sender = config.getProperty(KylinConfig.MAIL_SENDER, "");
+        this("true".equalsIgnoreCase(config.getProperty(KylinConfig.MAIL_ENABLED, "false")),
+                config.getProperty(KylinConfig.MAIL_HOST, ""),
+                config.getProperty(KylinConfig.MAIL_USERNAME, ""),
+                config.getProperty(KylinConfig.MAIL_PASSWORD, ""),
+                config.getProperty(KylinConfig.MAIL_SENDER, "")
+                );
+    }
+
+    public MailService(boolean enabled, String host, String username, String password, String sender) {
+        this.enabled = enabled;
+        this.host = host;
+        this.username = username;
+        this.password = password;
+        this.sender = sender;
 
         if (enabled) {
-            if (host.isEmpty())
+            if (host.isEmpty()) {
                 throw new RuntimeException("mail service host is empty");
+            }
         }
     }
 
@@ -66,7 +76,18 @@ public class MailService {
      * @return true or false indicating whether the email was delivered successfully
      * @throws IOException
      */
-    public boolean sendMail(List<String> receivers, String subject, String content) throws IOException {
+    public boolean sendMail(List<String> receivers, String subject, String content) {
+        return sendMail(receivers, subject, content, true);
+    }
+
+    /**
+     * @param receivers
+     * @param subject
+     * @param content
+     * @return true or false indicating whether the email was delivered successfully
+     * @throws IOException
+     */
+    public boolean sendMail(List<String> receivers, String subject, String content, boolean isHtmlMsg) {
 
         if (!enabled) {
             logger.info("Email service is disabled; this mail will not be delivered: " + subject);
@@ -89,7 +110,11 @@ public class MailService {
             email.setFrom(sender);
             email.setSubject(subject);
             email.setCharset("UTF-8");
-            ((HtmlEmail) email).setHtmlMsg(content);
+            if (isHtmlMsg) {
+                ((HtmlEmail) email).setHtmlMsg(content);
+            } else {
+                ((HtmlEmail) email).setTextMsg(content);
+            }
             email.send();
             email.getMailSession();
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1a4d9984/common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java b/common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
index d5ace18..bc4d7cf 100644
--- a/common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
@@ -63,12 +63,6 @@ public class MailServiceTest extends LocalFileMetadataTestCase {
 
         List<String> receivers = new ArrayList<String>(1);
         receivers.add("foobar@foobar.com");
-        try {
-            return mailservice.sendMail(receivers, "A test email from Kylin", "Hello!");
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        return false;
+        return mailservice.sendMail(receivers, "A test email from Kylin", "Hello!");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1a4d9984/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java b/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
new file mode 100644
index 0000000..100bdd4
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
@@ -0,0 +1,64 @@
+package org.apache.kylin.job.monitor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ */
+public class MonitorCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
+
+    public static void main(String[] args) {
+        Preconditions.checkArgument(args[0].equals("monitor"));
+
+        int i = 1;
+        List<String> receivers = null;
+        String host = null;
+        String tableName = null;
+        String authorization = null;
+        String cubeName = null;
+        while (i < args.length) {
+            String argName = args[i];
+            switch (argName) {
+                case "-receivers":
+                    receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
+                    break;
+                case "-host":
+                    host = args[++i];
+                    break;
+                case "-tableName":
+                    tableName = args[++i];
+                    break;
+                case "-authorization":
+                    authorization = args[++i];
+                    break;
+                case "-cubeName":
+                    cubeName = args[++i];
+                    break;
+                default:
+                    throw new RuntimeException("invalid argName:" + argName);
+            }
+            i++;
+        }
+        Preconditions.checkArgument(receivers != null && receivers.size() > 0);
+        final StreamingMonitor streamingMonitor = new StreamingMonitor();
+        if (tableName != null) {
+            logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
+            Preconditions.checkNotNull(host);
+            Preconditions.checkNotNull(authorization);
+            Preconditions.checkNotNull(tableName);
+            streamingMonitor.checkCountAll(receivers, host, authorization, tableName);
+        }
+        if (cubeName != null) {
+            logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
+            streamingMonitor.checkCube(receivers, cubeName);
+        }
+        System.exit(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1a4d9984/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java b/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
new file mode 100644
index 0000000..77c9b34
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
@@ -0,0 +1,127 @@
+package org.apache.kylin.job.monitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.MailService;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ */
+public class StreamingMonitor {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
+
+    public void checkCountAll(List<String> receivers, String host, String authorization, String tableName) {
+        String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
+        StringBuilder stringBuilder = new StringBuilder();
+        String url = host + "/kylin/api/query";
+        PostMethod request = new PostMethod(url);
+        try {
+
+            request.addRequestHeader("Authorization", "Basic " + authorization);
+            request.addRequestHeader("Content-Type", "application/json");
+            String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"default\"}", tableName);
+            request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
+
+            int statusCode = new HttpClient().executeMethod(request);
+            String msg = Bytes.toString(request.getResponseBody());
+            stringBuilder.append("host:").append(host).append("\n");
+            stringBuilder.append("query:").append(query).append("\n");
+            stringBuilder.append("statusCode:").append(statusCode).append("\n");
+            if (statusCode == 200) {
+                title += "succeed";
+                final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
+                stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
+                stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
+            } else {
+                title += "failed";
+                stringBuilder.append("response:").append(msg).append("\n");
+            }
+        } catch (Exception e) {
+            final StringWriter out = new StringWriter();
+            e.printStackTrace(new PrintWriter(out));
+            title += "failed";
+            stringBuilder.append(out.toString());
+        } finally {
+            request.releaseConnection();
+        }
+        logger.info("title:" + title);
+        logger.info("content:" + stringBuilder.toString());
+        sendMail(receivers, title, stringBuilder.toString());
+    }
+
+    public void checkCube(List<String> receivers, String cubeName) {
+        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        if (cube == null) {
+            logger.info("cube:" + cubeName + " does not exist");
+            return;
+        }
+        final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
+        logger.info("totally " + segments.size() + " cubeSegments");
+        Collections.sort(segments);
+        List<Pair<Long, Long>> gaps = Lists.newArrayList();
+        List<Pair<String, String>> overlaps = Lists.newArrayList();
+        for (int i = 0; i < segments.size() - 1; ++i) {
+            CubeSegment first = segments.get(i);
+            CubeSegment second = segments.get(i + 1);
+            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+                continue;
+            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
+                gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
+            } else {
+                overlaps.add(Pair.newPair(first.getName(), second.getName()));
+            }
+        }
+        StringBuilder content = new StringBuilder();
+        if (!gaps.isEmpty()) {
+            content.append("all gaps:").append("\n").append(
+                    StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
+                        @Nullable
+                        @Override
+                        public String apply(Pair<Long, Long> input) {
+                            return parseInterval(input);
+                        }
+                    }), "\n")).append("\n");
+        }
+        if (!overlaps.isEmpty()) {
+            content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
+        }
+        if (content.length() > 0) {
+            logger.info(content.toString());
+            sendMail(receivers, String.format("%s has gaps or overlaps", cubeName), content.toString());
+        } else {
+            logger.info("no gaps or overlaps");
+        }
+    }
+
+    private String parseInterval(Pair<Long, Long> interval) {
+        return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
+    }
+
+    private void sendMail(List<String> receivers, String title, String content) {
+        final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
+        mailService.sendMail(receivers, title, content, false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1a4d9984/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 0508a4e..2bbbe19 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -108,10 +108,9 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
         final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(cubeInstance, parsedStreamMessages);
         writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset);
 
+        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
         final HTableInterface hTable = createHTable(cubeSegment);
-
         final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable);
-        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
 
         executorService.submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, gtRecordWriter)).get();
         gtRecordWriter.flush();


[2/2] incubator-kylin git commit: KYLIN-861

Posted by qh...@apache.org.
KYLIN-861


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/883accba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/883accba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/883accba

Branch: refs/heads/0.8
Commit: 883accbadee11a3aff81ec53f42335e5e0b5f0a7
Parents: 098f580
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jun 25 15:19:44 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Jul 1 16:43:50 2015 +0800

----------------------------------------------------------------------
 bin/cleanup_streaming_files.sh | 24 ++++++++++++++++++++++++
 1 file changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/883accba/bin/cleanup_streaming_files.sh
----------------------------------------------------------------------
diff --git a/bin/cleanup_streaming_files.sh b/bin/cleanup_streaming_files.sh
new file mode 100644
index 0000000..6b759ca
--- /dev/null
+++ b/bin/cleanup_streaming_files.sh
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+
+if [ $# != 1 ]
+then
+    echo 'invalid input' || exit -1
+fi
+
+cd $KYLIN_HOME
+
+for pidfile in `find . -name "$1_1*"`
+do
+    pidfile=`echo "$pidfile" | cut -c 3-`
+    echo "pidfile:$pidfile"
+    pid=`cat $pidfile`
+    if [ `ps -ef | awk '{print $2}' | grep -w $pid | wc -l` = 1 ]
+    then
+        echo "pid:$pid still running"
+    else
+        echo "pid:$pid not running, try to delete files"
+        echo $pidfile | xargs rm
+        echo "logs/streaming_$pidfile.log" | xargs rm
+    fi
+done
+