You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/10/13 08:43:25 UTC

incubator-kylin git commit: KYLIN-967 Kill bad queries on low memory

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging 672bf421e -> e00e5a922


KYLIN-967 Kill bad queries on low memory


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e00e5a92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e00e5a92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e00e5a92

Branch: refs/heads/2.x-staging
Commit: e00e5a92247d3c74d04b4a946c60c64f43569e62
Parents: 672bf42
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Oct 13 14:43:10 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Oct 13 14:43:10 2015 +0800

----------------------------------------------------------------------
 .../kylin/rest/service/BadQueryDetector.java    | 61 ++++++++++++++------
 .../rest/service/BadQueryDetectorTest.java      |  8 +--
 2 files changed, 46 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e00e5a92/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
index 7c48abd..622dd35 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
@@ -36,24 +36,26 @@ public class BadQueryDetector extends Thread {
     private final long detectionInterval;
     private final int alertMB;
     private final int alertRunningSec;
+    private final int killRunningSec;
 
     private ArrayList<Notifier> notifiers = new ArrayList<Notifier>();
 
     public BadQueryDetector() {
-        this(60 * 1000, 100, 60); // 1 minute, 100 MB, 60 seconds
+        this(60 * 1000, 100, 60, 5 * 60); // 1 minute, 100 MB, 60 seconds, 5 minutes
     }
 
-    public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec) {
+    public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec, int killRunningSec) {
         super("BadQueryDetector");
         this.setDaemon(true);
         this.detectionInterval = detectionInterval;
         this.alertMB = alertMB;
         this.alertRunningSec = alertRunningSec;
+        this.killRunningSec = killRunningSec;
 
         this.notifiers.add(new Notifier() {
             @Override
-            public void badQueryFound(String adj, int runningSec, String sql) {
-                logger.info(adj + " query has been running " + runningSec + " seconds -- " + sql);
+            public void badQueryFound(String adj, int runningSec, String sql, Thread t) {
+                logger.info(adj + " query has been running " + runningSec + " seconds (thread id 0x" + Long.toHexString(t.getId()) + ") -- " + sql);
             }
         });
     }
@@ -62,10 +64,10 @@ public class BadQueryDetector extends Thread {
         notifiers.add(notifier);
     }
 
-    private void notify(String adj, int runningSec, String sql) {
+    private void notify(String adj, int runningSec, String sql, Thread t) {
         for (Notifier notifier : notifiers) {
             try {
-                notifier.badQueryFound(adj, runningSec, sql);
+                notifier.badQueryFound(adj, runningSec, sql, t);
             } catch (Exception e) {
                 logger.error("", e);
             }
@@ -73,11 +75,11 @@ public class BadQueryDetector extends Thread {
     }
 
     public interface Notifier {
-        void badQueryFound(String adj, int runningSec, String sql);
+        void badQueryFound(String adj, int runningSec, String sql, Thread t);
     }
 
     public void queryStart(Thread thread, SQLRequest sqlRequest) {
-        runningQueries.put(thread, new Entry(sqlRequest));
+        runningQueries.put(thread, new Entry(sqlRequest, thread));
     }
 
     public void queryEnd(Thread thread) {
@@ -87,10 +89,12 @@ public class BadQueryDetector extends Thread {
     private class Entry implements Comparable<Entry> {
         final SQLRequest sqlRequest;
         final long startTime;
+        final Thread thread;
 
-        Entry(SQLRequest sqlRequest) {
+        Entry(SQLRequest sqlRequest, Thread thread) {
             this.sqlRequest = sqlRequest;
             this.startTime = System.currentTimeMillis();
+            this.thread = thread;
         }
 
         @Override
@@ -121,24 +125,43 @@ public class BadQueryDetector extends Thread {
         ArrayList<Entry> entries = new ArrayList<Entry>(runningQueries.values());
         Collections.sort(entries);
 
-        // report if low memory
-        if (getSystemAvailMB() < alertMB) {
-            logger.info("System free memory less than " + alertMB + " MB. " + entries.size() + " queries running.");
-            for (int i = 0; i < entries.size(); i++) {
-                Entry e = entries.get(i);
-                notify("Low mem", (int) ((now - e.startTime) / 1000), e.sqlRequest.getSql());
-            }
-        }
-
         // report if query running long
         for (Entry e : entries) {
             int runningSec = (int) ((now - e.startTime) / 1000);
             if (runningSec >= alertRunningSec) {
-                notify("Slow", runningSec, e.sqlRequest.getSql());
+                notify("Slow", runningSec, e.sqlRequest.getSql(), e.thread);
             } else {
                 break; // entries are sorted by startTime
             }
         }
+        
+        // report if low memory
+        if (getSystemAvailMB() < alertMB) {
+            logger.info("System free memory less than " + alertMB + " MB. " + entries.size() + " queries running.");
+            
+            for (Entry e : entries) {
+                int duration = (int) ((now - e.startTime) / 1000);
+                if (duration > killRunningSec) {
+                    notify("Kill", duration, e.sqlRequest.getSql(), e.thread);
+                    killQueryThread(e.thread);
+                } else {
+                    notify("Low mem", duration, e.sqlRequest.getSql(), e.thread);
+                }
+            }
+        }
+    }
+
+    private void killQueryThread(Thread t) {
+        StackTraceElement[] stackTrace = t.getStackTrace();
+        t.interrupt();
+        
+        // log the stack trace of bad query thread for further analysis
+        StringBuilder buf = new StringBuilder("Interrupted thread 0x" + Long.toHexString(t.getId()));
+        buf.append("\n");
+        for (StackTraceElement e : stackTrace) {
+            buf.append("\t").append("at ").append(e.toString()).append("\n");
+        }
+        logger.info(buf.toString());
     }
 
     public static final int ONE_MB = 1024 * 1024;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e00e5a92/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
index c849efd..2358d7a 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
@@ -35,10 +35,10 @@ public class BadQueryDetectorTest {
         String mockSql = "select * from just_a_test";
         final ArrayList<String[]> alerts = new ArrayList<>();
 
-        BadQueryDetector badQueryDetector = new BadQueryDetector(alertRunningSec * 1000, alertMB, alertRunningSec);
+        BadQueryDetector badQueryDetector = new BadQueryDetector(alertRunningSec * 1000, alertMB, alertRunningSec, Integer.MAX_VALUE);
         badQueryDetector.registerNotifier(new BadQueryDetector.Notifier() {
             @Override
-            public void badQueryFound(String adj, int runningSec, String sql) {
+            public void badQueryFound(String adj, int runningSec, String sql, Thread t) {
                 alerts.add(new String[] { adj, sql });
             }
         });
@@ -63,7 +63,7 @@ public class BadQueryDetectorTest {
         // first check founds Low mem
         assertArrayEquals(new String[] { "Low mem", mockSql }, alerts.get(0));
         // second check founds Low mem & Slow
-        assertArrayEquals(new String[] { "Low mem", mockSql }, alerts.get(1));
-        assertArrayEquals(new String[] { "Slow", mockSql }, alerts.get(2));
+        assertArrayEquals(new String[] { "Slow", mockSql }, alerts.get(1));
+        assertArrayEquals(new String[] { "Low mem", mockSql }, alerts.get(2));
     }
 }