You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/08/05 06:42:03 UTC
kylin git commit: KYLIN-1941 Save user for slow queries
Repository: kylin
Updated Branches:
refs/heads/master 5e83cd734 -> 59e5eeed3
KYLIN-1941 Save user for slow queries
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/59e5eeed
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/59e5eeed
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/59e5eeed
Branch: refs/heads/master
Commit: 59e5eeed3abae778f0e969540b2e2e1d7086bbd8
Parents: 5e83cd7
Author: lidongsjtu <li...@apache.org>
Authored: Fri Aug 5 14:28:51 2016 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Fri Aug 5 14:29:08 2016 +0800
----------------------------------------------------------------------
.../kylin/metadata/badquery/BadQueryEntry.java | 14 +-
.../badquery/BadQueryHistoryManager.java | 8 +-
.../badquery/BadQueryHistoryManagerTest.java | 12 +-
.../kylin/rest/service/BadQueryDetector.java | 181 +++++++++----------
.../apache/kylin/rest/service/QueryService.java | 3 +-
.../rest/service/BadQueryDetectorTest.java | 4 +-
6 files changed, 118 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/59e5eeed/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryEntry.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryEntry.java b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryEntry.java
index c78215d..71ce24b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryEntry.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryEntry.java
@@ -41,8 +41,10 @@ public class BadQueryEntry extends RootPersistentEntity implements Comparable<Ba
private String server;
@JsonProperty("thread")
private String thread;
+ @JsonProperty("user")
+ private String user;
- public BadQueryEntry(String sql, String adj, long startTime, float runningSec, String server, String thread) {
+ public BadQueryEntry(String sql, String adj, long startTime, float runningSec, String server, String thread, String user) {
this.updateRandomUuid();
this.adj = adj;
this.sql = sql;
@@ -50,9 +52,19 @@ public class BadQueryEntry extends RootPersistentEntity implements Comparable<Ba
this.runningSec = runningSec;
this.server = server;
this.thread = thread;
+ this.user = user;
}
public BadQueryEntry() {
+
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
}
public float getRunningSec() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/59e5eeed/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
index e89f6a1..86e282e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
@@ -116,12 +116,12 @@ public class BadQueryHistoryManager {
return badQueryHistory;
}
- public BadQueryHistory addEntryToProject(String sql, long startTime, String adj, float runningSecs, String server, String threadName, String project) throws IOException {
- return addEntryToProject(new BadQueryEntry(sql, adj, startTime, runningSecs, server, threadName), project);
+ public BadQueryHistory addEntryToProject(String sql, long startTime, String adj, float runningSecs, String server, String threadName, String user, String project) throws IOException {
+ return addEntryToProject(new BadQueryEntry(sql, adj, startTime, runningSecs, server, threadName, user), project);
}
- public BadQueryHistory updateEntryToProject(String sql, long startTime, String adj, float runningSecs, String server, String threadName, String project) throws IOException {
- return updateEntryToProject(new BadQueryEntry(sql, adj, startTime, runningSecs, server, threadName), project);
+ public BadQueryHistory updateEntryToProject(String sql, long startTime, String adj, float runningSecs, String server, String threadName, String user, String project) throws IOException {
+ return updateEntryToProject(new BadQueryEntry(sql, adj, startTime, runningSecs, server, threadName, user), project);
}
public void removeBadQueryHistory(String project) throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/59e5eeed/core-metadata/src/test/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManagerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManagerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManagerTest.java
index 0e0df70..690e1fe 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManagerTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManagerTest.java
@@ -64,7 +64,7 @@ public class BadQueryHistoryManagerTest extends LocalFileMetadataTestCase {
public void testAddEntryToProject() throws IOException {
KylinConfig kylinConfig = getTestConfig();
BadQueryHistoryManager manager = BadQueryHistoryManager.getInstance(kylinConfig);
- BadQueryHistory history = manager.addEntryToProject("sql", 1459362239992L, "adj", 100, "server", "t-0", "default");
+ BadQueryHistory history = manager.addEntryToProject("sql", 1459362239992L, "adj", 100, "server", "t-0", "user", "default");
NavigableSet<BadQueryEntry> entries = history.getEntries();
assertEquals(3, entries.size());
@@ -75,10 +75,11 @@ public class BadQueryHistoryManagerTest extends LocalFileMetadataTestCase {
assertEquals("adj", newEntry.getAdj());
assertEquals(1459362239992L, newEntry.getStartTime());
assertEquals("server", newEntry.getServer());
+ assertEquals("user", newEntry.getUser());
assertEquals("t-0", newEntry.getThread());
for (int i = 0; i < kylinConfig.getBadQueryHistoryNum(); i++) {
- history = manager.addEntryToProject("sql", 1459362239993L + i, "adj", 100 + i, "server", "t-0", "default");
+ history = manager.addEntryToProject("sql", 1459362239993L + i, "adj", 100 + i, "server", "t-0", "user", "default");
}
assertEquals(kylinConfig.getBadQueryHistoryNum(), history.getEntries().size());
}
@@ -88,15 +89,16 @@ public class BadQueryHistoryManagerTest extends LocalFileMetadataTestCase {
KylinConfig kylinConfig = getTestConfig();
BadQueryHistoryManager manager = BadQueryHistoryManager.getInstance(kylinConfig);
- manager.addEntryToProject("sql", 1459362239000L, "adj", 100, "server", "t-0", "default");
- BadQueryHistory history = manager.updateEntryToProject("sql", 1459362239000L, "adj2", 120, "server2", "t-1", "default");
+ manager.addEntryToProject("sql", 1459362239000L, "adj", 100, "server", "t-0", "user", "default");
+ BadQueryHistory history = manager.updateEntryToProject("sql", 1459362239000L, "adj2", 120, "server2", "t-1", "user", "default");
NavigableSet<BadQueryEntry> entries = history.getEntries();
- BadQueryEntry newEntry = entries.floor(new BadQueryEntry("sql", "adj2", 1459362239000L, 120, "server2", "t-1"));
+ BadQueryEntry newEntry = entries.floor(new BadQueryEntry("sql", "adj2", 1459362239000L, 120, "server2", "t-1", "user"));
System.out.println(newEntry);
assertEquals("adj2", newEntry.getAdj());
assertEquals("server2", newEntry.getServer());
assertEquals("t-1", newEntry.getThread());
+ assertEquals("user", newEntry.getUser());
assertEquals(120, (int) newEntry.getRunningSec());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/59e5eeed/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
index 79e1d4a..2b85e69 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
@@ -39,14 +39,13 @@ import com.google.common.collect.Maps;
public class BadQueryDetector extends Thread {
+ public static final int ONE_MB = 1024 * 1024;
private static final Logger logger = LoggerFactory.getLogger(BadQueryDetector.class);
-
private final ConcurrentMap<Thread, Entry> runningQueries = Maps.newConcurrentMap();
private final long detectionInterval;
private final int alertMB;
private final int alertRunningSec;
private KylinConfig kylinConfig;
-
private ArrayList<Notifier> notifiers = new ArrayList<Notifier>();
public BadQueryDetector() {
@@ -71,6 +70,20 @@ public class BadQueryDetector extends Thread {
initNotifiers();
}
+ public static long getSystemAvailBytes() {
+ Runtime runtime = Runtime.getRuntime();
+ long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+ long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+ long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+ long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+ long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+ return availableMemory;
+ }
+
+ public static int getSystemAvailMB() {
+ return (int) (getSystemAvailBytes() / ONE_MB);
+ }
+
private void initNotifiers() {
this.notifiers.add(new LoggerNotifier());
if (kylinConfig.getBadQueryPersistentEnabled()) {
@@ -82,97 +95,24 @@ public class BadQueryDetector extends Thread {
notifiers.add(notifier);
}
- private void notify(String adj, float runningSec, long startTime, String project, String sql, Thread t) {
+ private void notify(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t) {
for (Notifier notifier : notifiers) {
try {
- notifier.badQueryFound(adj, runningSec, startTime, project, sql, t);
+ notifier.badQueryFound(adj, runningSec, startTime, project, sql, user, t);
} catch (Exception e) {
logger.error("", e);
}
}
}
- public interface Notifier {
- void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, Thread t);
- }
-
- private class LoggerNotifier implements Notifier {
- @Override
- public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, Thread t) {
- logger.info(adj + " query has been running " + runningSec + " seconds (project:" + project + ", thread: 0x" + Long.toHexString(t.getId()) + ") -- " + sql);
- }
- }
-
- private class PersistenceNotifier implements Notifier {
- BadQueryHistoryManager badQueryManager = BadQueryHistoryManager.getInstance(kylinConfig);
- String serverHostname;
- NavigableSet<Pair<Long, String>> cacheQueue = new TreeSet<>(new Comparator<Pair<Long, String>>() {
- @Override
- public int compare(Pair<Long, String> o1, Pair<Long, String> o2) {
- if (o1.equals(o2)) {
- return 0;
- } else if (o1.getFirst().equals(o2.getFirst())) {
- return o2.getSecond().compareTo(o2.getSecond());
- } else {
- return (int) (o1.getFirst() - o2.getFirst());
- }
- }
- });
-
- public PersistenceNotifier() {
- try {
- serverHostname = InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- serverHostname = "Unknow";
- logger.warn("Error in get current hostname.", e);
- }
- }
-
- @Override
- public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, Thread t) {
- try {
- long cachingSeconds = (kylinConfig.getBadQueryDefaultAlertingSeconds() + 1) * 30;
- Pair<Long, String> sqlPair = new Pair<>(startTime, sql);
- if (!cacheQueue.contains(sqlPair)) {
- badQueryManager.addEntryToProject(sql, startTime, adj, runningSec, serverHostname, t.getName(), project);
- cacheQueue.add(sqlPair);
- while (!cacheQueue.isEmpty() && (System.currentTimeMillis() - cacheQueue.first().getFirst() > cachingSeconds * 1000 || cacheQueue.size() > kylinConfig.getBadQueryHistoryNum() * 3)) {
- cacheQueue.pollFirst();
- }
- } else {
- badQueryManager.updateEntryToProject(sql, startTime, adj, runningSec, serverHostname, t.getName(), project);
- }
- } catch (IOException e) {
- logger.error("Error in bad query persistence.", e);
- }
- }
- }
-
- public void queryStart(Thread thread, SQLRequest sqlRequest) {
- runningQueries.put(thread, new Entry(sqlRequest, thread));
+ public void queryStart(Thread thread, SQLRequest sqlRequest, String user) {
+ runningQueries.put(thread, new Entry(sqlRequest, user, thread));
}
public void queryEnd(Thread thread) {
runningQueries.remove(thread);
}
- private class Entry implements Comparable<Entry> {
- final SQLRequest sqlRequest;
- final long startTime;
- final Thread thread;
-
- Entry(SQLRequest sqlRequest, Thread thread) {
- this.sqlRequest = sqlRequest;
- this.startTime = System.currentTimeMillis();
- this.thread = thread;
- }
-
- @Override
- public int compareTo(Entry o) {
- return (int) (this.startTime - o.startTime);
- }
- }
-
public void run() {
while (true) {
try {
@@ -199,7 +139,7 @@ public class BadQueryDetector extends Thread {
for (Entry e : entries) {
float runningSec = (float) (now - e.startTime) / 1000;
if (runningSec >= alertRunningSec) {
- notify("Slow", runningSec, e.startTime, e.sqlRequest.getProject(), e.sqlRequest.getSql(), e.thread);
+ notify("Slow", runningSec, e.startTime, e.sqlRequest.getProject(), e.sqlRequest.getSql(), e.user, e.thread);
dumpStackTrace(e.thread);
} else {
break; // entries are sorted by startTime
@@ -229,20 +169,79 @@ public class BadQueryDetector extends Thread {
logger.info(buf.toString());
}
- public static final int ONE_MB = 1024 * 1024;
+ public interface Notifier {
+ void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t);
+ }
- public static long getSystemAvailBytes() {
- Runtime runtime = Runtime.getRuntime();
- long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
- long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
- long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
- long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
- long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
- return availableMemory;
+ private class LoggerNotifier implements Notifier {
+ @Override
+ public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t) {
+ logger.info("{} query has been running {} seconds (project:{}, thread: 0x{}, user:{}) -- {}", adj, runningSec, project, Long.toHexString(t.getId()), user, sql);
+ }
}
- public static int getSystemAvailMB() {
- return (int) (getSystemAvailBytes() / ONE_MB);
+ private class PersistenceNotifier implements Notifier {
+ BadQueryHistoryManager badQueryManager = BadQueryHistoryManager.getInstance(kylinConfig);
+ String serverHostname;
+ NavigableSet<Pair<Long, String>> cacheQueue = new TreeSet<>(new Comparator<Pair<Long, String>>() {
+ @Override
+ public int compare(Pair<Long, String> o1, Pair<Long, String> o2) {
+ if (o1.equals(o2)) {
+ return 0;
+ } else if (o1.getFirst().equals(o2.getFirst())) {
+ return o2.getSecond().compareTo(o2.getSecond());
+ } else {
+ return (int) (o1.getFirst() - o2.getFirst());
+ }
+ }
+ });
+
+ public PersistenceNotifier() {
+ try {
+ serverHostname = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ serverHostname = "Unknow";
+ logger.warn("Error in get current hostname.", e);
+ }
+ }
+
+ @Override
+ public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t) {
+ try {
+ long cachingSeconds = (kylinConfig.getBadQueryDefaultAlertingSeconds() + 1) * 30;
+ Pair<Long, String> sqlPair = new Pair<>(startTime, sql);
+ if (!cacheQueue.contains(sqlPair)) {
+ badQueryManager.addEntryToProject(sql, startTime, adj, runningSec, serverHostname, t.getName(), user, project);
+ cacheQueue.add(sqlPair);
+ while (!cacheQueue.isEmpty() && (System.currentTimeMillis() - cacheQueue.first().getFirst() > cachingSeconds * 1000 || cacheQueue.size() > kylinConfig.getBadQueryHistoryNum() * 3)) {
+ cacheQueue.pollFirst();
+ }
+ } else {
+ badQueryManager.updateEntryToProject(sql, startTime, adj, runningSec, serverHostname, t.getName(), user, project);
+ }
+ } catch (IOException e) {
+ logger.error("Error in bad query persistence.", e);
+ }
+ }
+ }
+
+ private class Entry implements Comparable<Entry> {
+ final SQLRequest sqlRequest;
+ final long startTime;
+ final Thread thread;
+ final String user;
+
+ Entry(SQLRequest sqlRequest, String user, Thread thread) {
+ this.sqlRequest = sqlRequest;
+ this.startTime = System.currentTimeMillis();
+ this.thread = thread;
+ this.user = user;
+ }
+
+ @Override
+ public int compareTo(Entry o) {
+ return (int) (this.startTime - o.startTime);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/59e5eeed/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 47f4149..6d778d0 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -118,7 +118,8 @@ public class QueryService extends BasicService {
public SQLResponse query(SQLRequest sqlRequest) throws Exception {
try {
- badQueryDetector.queryStart(Thread.currentThread(), sqlRequest);
+ final String user = SecurityContextHolder.getContext().getAuthentication().getName();
+ badQueryDetector.queryStart(Thread.currentThread(), sqlRequest, user);
return queryWithSqlMassage(sqlRequest);
http://git-wip-us.apache.org/repos/asf/kylin/blob/59e5eeed/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
index 9108990..7aabb0e 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
@@ -52,7 +52,7 @@ public class BadQueryDetectorTest extends LocalFileMetadataTestCase {
BadQueryDetector badQueryDetector = new BadQueryDetector(alertRunningSec * 1000, alertMB, alertRunningSec);
badQueryDetector.registerNotifier(new BadQueryDetector.Notifier() {
@Override
- public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, Thread t) {
+ public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t) {
alerts.add(new String[] { adj, sql });
}
});
@@ -63,7 +63,7 @@ public class BadQueryDetectorTest extends LocalFileMetadataTestCase {
SQLRequest sqlRequest = new SQLRequest();
sqlRequest.setSql(mockSql);
- badQueryDetector.queryStart(Thread.currentThread(), sqlRequest);
+ badQueryDetector.queryStart(Thread.currentThread(), sqlRequest, "user");
// make sure bad query check happens twice
Thread.sleep((alertRunningSec * 2 + 1) * 1000);