You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/08/05 06:42:03 UTC

kylin git commit: KYLIN-1941 Save user for slow queries

Repository: kylin
Updated Branches:
  refs/heads/master 5e83cd734 -> 59e5eeed3


KYLIN-1941 Save user for slow queries


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/59e5eeed
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/59e5eeed
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/59e5eeed

Branch: refs/heads/master
Commit: 59e5eeed3abae778f0e969540b2e2e1d7086bbd8
Parents: 5e83cd7
Author: lidongsjtu <li...@apache.org>
Authored: Fri Aug 5 14:28:51 2016 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Fri Aug 5 14:29:08 2016 +0800

----------------------------------------------------------------------
 .../kylin/metadata/badquery/BadQueryEntry.java  |  14 +-
 .../badquery/BadQueryHistoryManager.java        |   8 +-
 .../badquery/BadQueryHistoryManagerTest.java    |  12 +-
 .../kylin/rest/service/BadQueryDetector.java    | 181 +++++++++----------
 .../apache/kylin/rest/service/QueryService.java |   3 +-
 .../rest/service/BadQueryDetectorTest.java      |   4 +-
 6 files changed, 118 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/59e5eeed/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryEntry.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryEntry.java b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryEntry.java
index c78215d..71ce24b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryEntry.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryEntry.java
@@ -41,8 +41,10 @@ public class BadQueryEntry extends RootPersistentEntity implements Comparable<Ba
     private String server;
     @JsonProperty("thread")
     private String thread;
+    @JsonProperty("user")
+    private String user;
 
-    public BadQueryEntry(String sql, String adj, long startTime, float runningSec, String server, String thread) {
+    public BadQueryEntry(String sql, String adj, long startTime, float runningSec, String server, String thread, String user) {
         this.updateRandomUuid();
         this.adj = adj;
         this.sql = sql;
@@ -50,9 +52,19 @@ public class BadQueryEntry extends RootPersistentEntity implements Comparable<Ba
         this.runningSec = runningSec;
         this.server = server;
         this.thread = thread;
+        this.user = user;
     }
 
     public BadQueryEntry() {
+
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
     }
 
     public float getRunningSec() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/59e5eeed/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
index e89f6a1..86e282e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManager.java
@@ -116,12 +116,12 @@ public class BadQueryHistoryManager {
         return badQueryHistory;
     }
 
-    public BadQueryHistory addEntryToProject(String sql, long startTime, String adj, float runningSecs, String server, String threadName, String project) throws IOException {
-        return addEntryToProject(new BadQueryEntry(sql, adj, startTime, runningSecs, server, threadName), project);
+    public BadQueryHistory addEntryToProject(String sql, long startTime, String adj, float runningSecs, String server, String threadName, String user, String project) throws IOException {
+        return addEntryToProject(new BadQueryEntry(sql, adj, startTime, runningSecs, server, threadName, user), project);
     }
 
-    public BadQueryHistory updateEntryToProject(String sql, long startTime, String adj, float runningSecs, String server, String threadName, String project) throws IOException {
-        return updateEntryToProject(new BadQueryEntry(sql, adj, startTime, runningSecs, server, threadName), project);
+    public BadQueryHistory updateEntryToProject(String sql, long startTime, String adj, float runningSecs, String server, String threadName, String user, String project) throws IOException {
+        return updateEntryToProject(new BadQueryEntry(sql, adj, startTime, runningSecs, server, threadName, user), project);
     }
 
     public void removeBadQueryHistory(String project) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/59e5eeed/core-metadata/src/test/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManagerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManagerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManagerTest.java
index 0e0df70..690e1fe 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManagerTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/badquery/BadQueryHistoryManagerTest.java
@@ -64,7 +64,7 @@ public class BadQueryHistoryManagerTest extends LocalFileMetadataTestCase {
     public void testAddEntryToProject() throws IOException {
         KylinConfig kylinConfig = getTestConfig();
         BadQueryHistoryManager manager = BadQueryHistoryManager.getInstance(kylinConfig);
-        BadQueryHistory history = manager.addEntryToProject("sql", 1459362239992L, "adj", 100, "server", "t-0", "default");
+        BadQueryHistory history = manager.addEntryToProject("sql", 1459362239992L, "adj", 100, "server", "t-0", "user", "default");
         NavigableSet<BadQueryEntry> entries = history.getEntries();
         assertEquals(3, entries.size());
 
@@ -75,10 +75,11 @@ public class BadQueryHistoryManagerTest extends LocalFileMetadataTestCase {
         assertEquals("adj", newEntry.getAdj());
         assertEquals(1459362239992L, newEntry.getStartTime());
         assertEquals("server", newEntry.getServer());
+        assertEquals("user", newEntry.getUser());
         assertEquals("t-0", newEntry.getThread());
 
         for (int i = 0; i < kylinConfig.getBadQueryHistoryNum(); i++) {
-            history = manager.addEntryToProject("sql", 1459362239993L + i, "adj", 100 + i, "server", "t-0", "default");
+            history = manager.addEntryToProject("sql", 1459362239993L + i, "adj", 100 + i, "server", "t-0", "user", "default");
         }
         assertEquals(kylinConfig.getBadQueryHistoryNum(), history.getEntries().size());
     }
@@ -88,15 +89,16 @@ public class BadQueryHistoryManagerTest extends LocalFileMetadataTestCase {
         KylinConfig kylinConfig = getTestConfig();
         BadQueryHistoryManager manager = BadQueryHistoryManager.getInstance(kylinConfig);
 
-        manager.addEntryToProject("sql", 1459362239000L, "adj", 100, "server", "t-0", "default");
-        BadQueryHistory history = manager.updateEntryToProject("sql", 1459362239000L, "adj2", 120, "server2", "t-1", "default");
+        manager.addEntryToProject("sql", 1459362239000L, "adj", 100, "server", "t-0", "user", "default");
+        BadQueryHistory history = manager.updateEntryToProject("sql", 1459362239000L, "adj2", 120, "server2", "t-1", "user", "default");
 
         NavigableSet<BadQueryEntry> entries = history.getEntries();
-        BadQueryEntry newEntry = entries.floor(new BadQueryEntry("sql", "adj2", 1459362239000L, 120, "server2", "t-1"));
+        BadQueryEntry newEntry = entries.floor(new BadQueryEntry("sql", "adj2", 1459362239000L, 120, "server2", "t-1", "user"));
         System.out.println(newEntry);
         assertEquals("adj2", newEntry.getAdj());
         assertEquals("server2", newEntry.getServer());
         assertEquals("t-1", newEntry.getThread());
+        assertEquals("user", newEntry.getUser());
         assertEquals(120, (int) newEntry.getRunningSec());
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/59e5eeed/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
index 79e1d4a..2b85e69 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
@@ -39,14 +39,13 @@ import com.google.common.collect.Maps;
 
 public class BadQueryDetector extends Thread {
 
+    public static final int ONE_MB = 1024 * 1024;
     private static final Logger logger = LoggerFactory.getLogger(BadQueryDetector.class);
-
     private final ConcurrentMap<Thread, Entry> runningQueries = Maps.newConcurrentMap();
     private final long detectionInterval;
     private final int alertMB;
     private final int alertRunningSec;
     private KylinConfig kylinConfig;
-
     private ArrayList<Notifier> notifiers = new ArrayList<Notifier>();
 
     public BadQueryDetector() {
@@ -71,6 +70,20 @@ public class BadQueryDetector extends Thread {
         initNotifiers();
     }
 
+    public static long getSystemAvailBytes() {
+        Runtime runtime = Runtime.getRuntime();
+        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+        return availableMemory;
+    }
+
+    public static int getSystemAvailMB() {
+        return (int) (getSystemAvailBytes() / ONE_MB);
+    }
+
     private void initNotifiers() {
         this.notifiers.add(new LoggerNotifier());
         if (kylinConfig.getBadQueryPersistentEnabled()) {
@@ -82,97 +95,24 @@ public class BadQueryDetector extends Thread {
         notifiers.add(notifier);
     }
 
-    private void notify(String adj, float runningSec, long startTime, String project, String sql, Thread t) {
+    private void notify(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t) {
         for (Notifier notifier : notifiers) {
             try {
-                notifier.badQueryFound(adj, runningSec, startTime, project, sql, t);
+                notifier.badQueryFound(adj, runningSec, startTime, project, sql, user, t);
             } catch (Exception e) {
                 logger.error("", e);
             }
         }
     }
 
-    public interface Notifier {
-        void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, Thread t);
-    }
-
-    private class LoggerNotifier implements Notifier {
-        @Override
-        public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, Thread t) {
-            logger.info(adj + " query has been running " + runningSec + " seconds (project:" + project + ", thread: 0x" + Long.toHexString(t.getId()) + ") -- " + sql);
-        }
-    }
-
-    private class PersistenceNotifier implements Notifier {
-        BadQueryHistoryManager badQueryManager = BadQueryHistoryManager.getInstance(kylinConfig);
-        String serverHostname;
-        NavigableSet<Pair<Long, String>> cacheQueue = new TreeSet<>(new Comparator<Pair<Long, String>>() {
-            @Override
-            public int compare(Pair<Long, String> o1, Pair<Long, String> o2) {
-                if (o1.equals(o2)) {
-                    return 0;
-                } else if (o1.getFirst().equals(o2.getFirst())) {
-                    return o2.getSecond().compareTo(o2.getSecond());
-                } else {
-                    return (int) (o1.getFirst() - o2.getFirst());
-                }
-            }
-        });
-
-        public PersistenceNotifier() {
-            try {
-                serverHostname = InetAddress.getLocalHost().getHostName();
-            } catch (UnknownHostException e) {
-                serverHostname = "Unknow";
-                logger.warn("Error in get current hostname.", e);
-            }
-        }
-
-        @Override
-        public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, Thread t) {
-            try {
-                long cachingSeconds = (kylinConfig.getBadQueryDefaultAlertingSeconds() + 1) * 30;
-                Pair<Long, String> sqlPair = new Pair<>(startTime, sql);
-                if (!cacheQueue.contains(sqlPair)) {
-                    badQueryManager.addEntryToProject(sql, startTime, adj, runningSec, serverHostname, t.getName(), project);
-                    cacheQueue.add(sqlPair);
-                    while (!cacheQueue.isEmpty() && (System.currentTimeMillis() - cacheQueue.first().getFirst() > cachingSeconds * 1000 || cacheQueue.size() > kylinConfig.getBadQueryHistoryNum() * 3)) {
-                        cacheQueue.pollFirst();
-                    }
-                } else {
-                    badQueryManager.updateEntryToProject(sql, startTime, adj, runningSec, serverHostname, t.getName(), project);
-                }
-            } catch (IOException e) {
-                logger.error("Error in bad query persistence.", e);
-            }
-        }
-    }
-
-    public void queryStart(Thread thread, SQLRequest sqlRequest) {
-        runningQueries.put(thread, new Entry(sqlRequest, thread));
+    public void queryStart(Thread thread, SQLRequest sqlRequest, String user) {
+        runningQueries.put(thread, new Entry(sqlRequest, user, thread));
     }
 
     public void queryEnd(Thread thread) {
         runningQueries.remove(thread);
     }
 
-    private class Entry implements Comparable<Entry> {
-        final SQLRequest sqlRequest;
-        final long startTime;
-        final Thread thread;
-
-        Entry(SQLRequest sqlRequest, Thread thread) {
-            this.sqlRequest = sqlRequest;
-            this.startTime = System.currentTimeMillis();
-            this.thread = thread;
-        }
-
-        @Override
-        public int compareTo(Entry o) {
-            return (int) (this.startTime - o.startTime);
-        }
-    }
-
     public void run() {
         while (true) {
             try {
@@ -199,7 +139,7 @@ public class BadQueryDetector extends Thread {
         for (Entry e : entries) {
             float runningSec = (float) (now - e.startTime) / 1000;
             if (runningSec >= alertRunningSec) {
-                notify("Slow", runningSec, e.startTime, e.sqlRequest.getProject(), e.sqlRequest.getSql(), e.thread);
+                notify("Slow", runningSec, e.startTime, e.sqlRequest.getProject(), e.sqlRequest.getSql(), e.user, e.thread);
                 dumpStackTrace(e.thread);
             } else {
                 break; // entries are sorted by startTime
@@ -229,20 +169,79 @@ public class BadQueryDetector extends Thread {
         logger.info(buf.toString());
     }
 
-    public static final int ONE_MB = 1024 * 1024;
+    public interface Notifier {
+        void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t);
+    }
 
-    public static long getSystemAvailBytes() {
-        Runtime runtime = Runtime.getRuntime();
-        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
-        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
-        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
-        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
-        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
-        return availableMemory;
+    private class LoggerNotifier implements Notifier {
+        @Override
+        public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t) {
+            logger.info("{} query has been running {} seconds (project:{}, thread: 0x{}, user:{}) -- {}", adj, runningSec, project, Long.toHexString(t.getId()), user, sql);
+        }
     }
 
-    public static int getSystemAvailMB() {
-        return (int) (getSystemAvailBytes() / ONE_MB);
+    private class PersistenceNotifier implements Notifier {
+        BadQueryHistoryManager badQueryManager = BadQueryHistoryManager.getInstance(kylinConfig);
+        String serverHostname;
+        NavigableSet<Pair<Long, String>> cacheQueue = new TreeSet<>(new Comparator<Pair<Long, String>>() {
+            @Override
+            public int compare(Pair<Long, String> o1, Pair<Long, String> o2) {
+                if (o1.equals(o2)) {
+                    return 0;
+                } else if (o1.getFirst().equals(o2.getFirst())) {
+                    return o2.getSecond().compareTo(o2.getSecond());
+                } else {
+                    return (int) (o1.getFirst() - o2.getFirst());
+                }
+            }
+        });
+
+        public PersistenceNotifier() {
+            try {
+                serverHostname = InetAddress.getLocalHost().getHostName();
+            } catch (UnknownHostException e) {
+                serverHostname = "Unknow";
+                logger.warn("Error in get current hostname.", e);
+            }
+        }
+
+        @Override
+        public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t) {
+            try {
+                long cachingSeconds = (kylinConfig.getBadQueryDefaultAlertingSeconds() + 1) * 30;
+                Pair<Long, String> sqlPair = new Pair<>(startTime, sql);
+                if (!cacheQueue.contains(sqlPair)) {
+                    badQueryManager.addEntryToProject(sql, startTime, adj, runningSec, serverHostname, t.getName(), user, project);
+                    cacheQueue.add(sqlPair);
+                    while (!cacheQueue.isEmpty() && (System.currentTimeMillis() - cacheQueue.first().getFirst() > cachingSeconds * 1000 || cacheQueue.size() > kylinConfig.getBadQueryHistoryNum() * 3)) {
+                        cacheQueue.pollFirst();
+                    }
+                } else {
+                    badQueryManager.updateEntryToProject(sql, startTime, adj, runningSec, serverHostname, t.getName(), user, project);
+                }
+            } catch (IOException e) {
+                logger.error("Error in bad query persistence.", e);
+            }
+        }
+    }
+
+    private class Entry implements Comparable<Entry> {
+        final SQLRequest sqlRequest;
+        final long startTime;
+        final Thread thread;
+        final String user;
+
+        Entry(SQLRequest sqlRequest, String user, Thread thread) {
+            this.sqlRequest = sqlRequest;
+            this.startTime = System.currentTimeMillis();
+            this.thread = thread;
+            this.user = user;
+        }
+
+        @Override
+        public int compareTo(Entry o) {
+            return (int) (this.startTime - o.startTime);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/59e5eeed/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 47f4149..6d778d0 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -118,7 +118,8 @@ public class QueryService extends BasicService {
 
     public SQLResponse query(SQLRequest sqlRequest) throws Exception {
         try {
-            badQueryDetector.queryStart(Thread.currentThread(), sqlRequest);
+            final String user = SecurityContextHolder.getContext().getAuthentication().getName();
+            badQueryDetector.queryStart(Thread.currentThread(), sqlRequest, user);
 
             return queryWithSqlMassage(sqlRequest);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/59e5eeed/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
index 9108990..7aabb0e 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
@@ -52,7 +52,7 @@ public class BadQueryDetectorTest extends LocalFileMetadataTestCase {
         BadQueryDetector badQueryDetector = new BadQueryDetector(alertRunningSec * 1000, alertMB, alertRunningSec);
         badQueryDetector.registerNotifier(new BadQueryDetector.Notifier() {
             @Override
-            public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, Thread t) {
+            public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t) {
                 alerts.add(new String[] { adj, sql });
             }
         });
@@ -63,7 +63,7 @@ public class BadQueryDetectorTest extends LocalFileMetadataTestCase {
 
             SQLRequest sqlRequest = new SQLRequest();
             sqlRequest.setSql(mockSql);
-            badQueryDetector.queryStart(Thread.currentThread(), sqlRequest);
+            badQueryDetector.queryStart(Thread.currentThread(), sqlRequest, "user");
 
             // make sure bad query check happens twice
             Thread.sleep((alertRunningSec * 2 + 1) * 1000);