You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/10/13 08:43:25 UTC
incubator-kylin git commit: KYLIN-967 Kill bad queries on low memory
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging 672bf421e -> e00e5a922
KYLIN-967 Kill bad queries on low memory
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e00e5a92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e00e5a92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e00e5a92
Branch: refs/heads/2.x-staging
Commit: e00e5a92247d3c74d04b4a946c60c64f43569e62
Parents: 672bf42
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Oct 13 14:43:10 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Oct 13 14:43:10 2015 +0800
----------------------------------------------------------------------
.../kylin/rest/service/BadQueryDetector.java | 61 ++++++++++++++------
.../rest/service/BadQueryDetectorTest.java | 8 +--
2 files changed, 46 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e00e5a92/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
index 7c48abd..622dd35 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
@@ -36,24 +36,26 @@ public class BadQueryDetector extends Thread {
private final long detectionInterval;
private final int alertMB;
private final int alertRunningSec;
+ private final int killRunningSec;
private ArrayList<Notifier> notifiers = new ArrayList<Notifier>();
public BadQueryDetector() {
- this(60 * 1000, 100, 60); // 1 minute, 100 MB, 60 seconds
+ this(60 * 1000, 100, 60, 5 * 60); // 1 minute, 100 MB, 60 seconds, 5 minutes
}
- public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec) {
+ public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec, int killRunningSec) {
super("BadQueryDetector");
this.setDaemon(true);
this.detectionInterval = detectionInterval;
this.alertMB = alertMB;
this.alertRunningSec = alertRunningSec;
+ this.killRunningSec = killRunningSec;
this.notifiers.add(new Notifier() {
@Override
- public void badQueryFound(String adj, int runningSec, String sql) {
- logger.info(adj + " query has been running " + runningSec + " seconds -- " + sql);
+ public void badQueryFound(String adj, int runningSec, String sql, Thread t) {
+ logger.info(adj + " query has been running " + runningSec + " seconds (thread id 0x" + Long.toHexString(t.getId()) + ") -- " + sql);
}
});
}
@@ -62,10 +64,10 @@ public class BadQueryDetector extends Thread {
notifiers.add(notifier);
}
- private void notify(String adj, int runningSec, String sql) {
+ private void notify(String adj, int runningSec, String sql, Thread t) {
for (Notifier notifier : notifiers) {
try {
- notifier.badQueryFound(adj, runningSec, sql);
+ notifier.badQueryFound(adj, runningSec, sql, t);
} catch (Exception e) {
logger.error("", e);
}
@@ -73,11 +75,11 @@ public class BadQueryDetector extends Thread {
}
public interface Notifier {
- void badQueryFound(String adj, int runningSec, String sql);
+ void badQueryFound(String adj, int runningSec, String sql, Thread t);
}
public void queryStart(Thread thread, SQLRequest sqlRequest) {
- runningQueries.put(thread, new Entry(sqlRequest));
+ runningQueries.put(thread, new Entry(sqlRequest, thread));
}
public void queryEnd(Thread thread) {
@@ -87,10 +89,12 @@ public class BadQueryDetector extends Thread {
private class Entry implements Comparable<Entry> {
final SQLRequest sqlRequest;
final long startTime;
+ final Thread thread;
- Entry(SQLRequest sqlRequest) {
+ Entry(SQLRequest sqlRequest, Thread thread) {
this.sqlRequest = sqlRequest;
this.startTime = System.currentTimeMillis();
+ this.thread = thread;
}
@Override
@@ -121,24 +125,43 @@ public class BadQueryDetector extends Thread {
ArrayList<Entry> entries = new ArrayList<Entry>(runningQueries.values());
Collections.sort(entries);
- // report if low memory
- if (getSystemAvailMB() < alertMB) {
- logger.info("System free memory less than " + alertMB + " MB. " + entries.size() + " queries running.");
- for (int i = 0; i < entries.size(); i++) {
- Entry e = entries.get(i);
- notify("Low mem", (int) ((now - e.startTime) / 1000), e.sqlRequest.getSql());
- }
- }
-
// report if query running long
for (Entry e : entries) {
int runningSec = (int) ((now - e.startTime) / 1000);
if (runningSec >= alertRunningSec) {
- notify("Slow", runningSec, e.sqlRequest.getSql());
+ notify("Slow", runningSec, e.sqlRequest.getSql(), e.thread);
} else {
break; // entries are sorted by startTime
}
}
+
+ // report if low memory
+ if (getSystemAvailMB() < alertMB) {
+ logger.info("System free memory less than " + alertMB + " MB. " + entries.size() + " queries running.");
+
+ for (Entry e : entries) {
+ int duration = (int) ((now - e.startTime) / 1000);
+ if (duration > killRunningSec) {
+ notify("Kill", duration, e.sqlRequest.getSql(), e.thread);
+ killQueryThread(e.thread);
+ } else {
+ notify("Low mem", duration, e.sqlRequest.getSql(), e.thread);
+ }
+ }
+ }
+ }
+
+ private void killQueryThread(Thread t) {
+ StackTraceElement[] stackTrace = t.getStackTrace();
+ t.interrupt();
+
+ // log the stack trace of bad query thread for further analysis
+ StringBuilder buf = new StringBuilder("Interrupted thread 0x" + Long.toHexString(t.getId()));
+ buf.append("\n");
+ for (StackTraceElement e : stackTrace) {
+ buf.append("\t").append("at ").append(e.toString()).append("\n");
+ }
+ logger.info(buf.toString());
}
public static final int ONE_MB = 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e00e5a92/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
index c849efd..2358d7a 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
@@ -35,10 +35,10 @@ public class BadQueryDetectorTest {
String mockSql = "select * from just_a_test";
final ArrayList<String[]> alerts = new ArrayList<>();
- BadQueryDetector badQueryDetector = new BadQueryDetector(alertRunningSec * 1000, alertMB, alertRunningSec);
+ BadQueryDetector badQueryDetector = new BadQueryDetector(alertRunningSec * 1000, alertMB, alertRunningSec, Integer.MAX_VALUE);
badQueryDetector.registerNotifier(new BadQueryDetector.Notifier() {
@Override
- public void badQueryFound(String adj, int runningSec, String sql) {
+ public void badQueryFound(String adj, int runningSec, String sql, Thread t) {
alerts.add(new String[] { adj, sql });
}
});
@@ -63,7 +63,7 @@ public class BadQueryDetectorTest {
// first check founds Low mem
assertArrayEquals(new String[] { "Low mem", mockSql }, alerts.get(0));
// second check founds Low mem & Slow
- assertArrayEquals(new String[] { "Low mem", mockSql }, alerts.get(1));
- assertArrayEquals(new String[] { "Slow", mockSql }, alerts.get(2));
+ assertArrayEquals(new String[] { "Slow", mockSql }, alerts.get(1));
+ assertArrayEquals(new String[] { "Low mem", mockSql }, alerts.get(2));
}
}