You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/03 03:31:49 UTC

[34/50] [abbrv] incubator-kylin git commit: KYLIN-967 Add BadQueryDetector to log bad queriest

KYLIN-967 Add BadQueryDetector to log bad queriest


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/294a3eba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/294a3eba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/294a3eba

Branch: refs/heads/0.7
Commit: 294a3ebabb9f5a81cd73c9aa977f1cb272407b29
Parents: 9fa3584
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Aug 28 10:52:16 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Aug 28 10:52:45 2015 +0800

----------------------------------------------------------------------
 .../kylin/rest/controller/QueryController.java  |  47 ++----
 .../kylin/rest/service/BadQueryDetector.java    | 157 +++++++++++++++++++
 .../apache/kylin/rest/service/QueryService.java |  72 +++++----
 .../org/apache/kylin/rest/util/QueryUtil.java   |  23 ++-
 .../rest/service/BadQueryDetectorTest.java      |  63 ++++++++
 .../kylin/rest/service/QueryServiceTest.java    |   3 +-
 .../apache/kylin/storage/StorageContext.java    |  11 +-
 .../storage/hbase/CubeSegmentTupleIterator.java |  15 +-
 .../hbase/SerializedHBaseTupleIterator.java     |   1 -
 9 files changed, 309 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index 1b34616..5ac8296 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -21,9 +21,7 @@ package org.apache.kylin.rest.controller;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
-import java.util.Map;
 
 import javax.servlet.http.HttpServletResponse;
 
@@ -33,7 +31,6 @@ import net.sf.ehcache.Element;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
@@ -84,14 +81,14 @@ public class QueryController extends BasicController {
     @RequestMapping(value = "/query", method = RequestMethod.POST)
     @ResponseBody
     public SQLResponse query(@RequestBody SQLRequest sqlRequest) {
-        return doQuery(sqlRequest);
+        return doQueryWithCache(sqlRequest);
     }
 
     // TODO should be just "prepare" a statement, get back expected ResultSetMetaData
     @RequestMapping(value = "/query/prestate", method = RequestMethod.POST, produces = "application/json")
     @ResponseBody
     public SQLResponse prepareQuery(@RequestBody PrepareSqlRequest sqlRequest) {
-        return doQuery(sqlRequest);
+        return doQueryWithCache(sqlRequest);
     }
 
     @RequestMapping(value = "/saved_queries", method = RequestMethod.POST)
@@ -120,7 +117,7 @@ public class QueryController extends BasicController {
     @RequestMapping(value = "/query/format/{format}", method = RequestMethod.GET)
     @ResponseBody
     public void downloadQueryResult(@PathVariable String format, SQLRequest sqlRequest, HttpServletResponse response) {
-        SQLResponse result = doQuery(sqlRequest);
+        SQLResponse result = doQueryWithCache(sqlRequest);
         response.setContentType("text/" + format + ";charset=utf-8");
         response.setHeader("Content-Disposition", "attachment; filename=\"result." + format + "\"");
         ICsvListWriter csvWriter = null;
@@ -158,19 +155,7 @@ public class QueryController extends BasicController {
         }
     }
 
-    private SQLResponse doQuery(SQLRequest sqlRequest) {
-        initDebugToggles(sqlRequest);
-
-        long startTimestamp = System.currentTimeMillis();
-        SQLResponse response = doQueryInternal(sqlRequest);
-        response.setDuration(System.currentTimeMillis() - startTimestamp);
-        queryService.logQuery(sqlRequest, response, new Date(startTimestamp), new Date(System.currentTimeMillis()));
-
-        cleanupDebugToggles();
-        return response;
-    }
-
-    private SQLResponse doQueryInternal(SQLRequest sqlRequest) {
+    private SQLResponse doQueryWithCache(SQLRequest sqlRequest) {
         String sql = sqlRequest.getSql();
         String project = sqlRequest.getProject();
         logger.info("Using project: " + project);
@@ -189,13 +174,11 @@ public class QueryController extends BasicController {
         SQLResponse sqlResponse = searchQueryInCache(sqlRequest);
         try {
             if (null == sqlResponse) {
-                long start = System.currentTimeMillis();
                 sqlResponse = queryService.query(sqlRequest);
-                long duration = System.currentTimeMillis() - start;
 
                 long durationThreshold = KylinConfig.getInstanceFromEnv().getQueryDurationCacheThreshold();
                 long scancountThreshold = KylinConfig.getInstanceFromEnv().getQueryScanCountCacheThreshold();
-                if (!sqlResponse.getIsException() && (duration > durationThreshold || sqlResponse.getTotalScanCount() > scancountThreshold)) {
+                if (!sqlResponse.getIsException() && (sqlResponse.getDuration() > durationThreshold || sqlResponse.getTotalScanCount() > scancountThreshold)) {
                     cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest, sqlResponse));
                 }
             }
@@ -215,10 +198,12 @@ public class QueryController extends BasicController {
             }
         }
 
+        queryService.logQuery(sqlRequest, sqlResponse);
+        
         if (sqlResponse.getIsException())
             throw new InternalErrorException(sqlResponse.getExceptionMessage());
-        else
-            return sqlResponse;
+        
+        return sqlResponse;
     }
 
     private SQLResponse searchQueryInCache(SQLRequest sqlRequest) {
@@ -254,18 +239,4 @@ public class QueryController extends BasicController {
         this.cacheManager = cacheManager;
     }
 
-    private void initDebugToggles(SQLRequest sqlRequest) {
-
-        Map<String, String> toggles = sqlRequest.getBackdoorToggles();
-        if (toggles == null || toggles.size() == 0) {
-            return;
-        }
-
-        BackdoorToggles.setToggles(toggles);
-    }
-
-    private void cleanupDebugToggles() {
-        BackdoorToggles.cleanToggles();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
new file mode 100644
index 0000000..32410f4
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.kylin.rest.request.SQLRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class BadQueryDetector extends Thread {
+
+    private static final Logger logger = LoggerFactory.getLogger(BadQueryDetector.class);
+
+    private final ConcurrentMap<Thread, Entry> runningQueries = Maps.newConcurrentMap();
+    private final long detectionInterval;
+    private final int alertMB;
+    private final int alertRunningSec;
+
+    private ArrayList<Notifier> notifiers = new ArrayList<Notifier>();
+
+    public BadQueryDetector() {
+        this(60 * 1000, 100, 60); // 1 minute, 100 MB, 60 seconds
+    }
+
+    public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec) {
+        super("BadQueryDetector");
+        this.setDaemon(true);
+        this.detectionInterval = detectionInterval;
+        this.alertMB = alertMB;
+        this.alertRunningSec = alertRunningSec;
+
+        this.notifiers.add(new Notifier() {
+            @Override
+            public void badQueryFound(String adj, int runningSec, String sql) {
+                logger.info(adj + " query has been running " + runningSec + " seconds -- " + sql);
+            }
+        });
+    }
+
+    public void registerNotifier(Notifier notifier) {
+        notifiers.add(notifier);
+    }
+
+    private void notify(String adj, int runningSec, String sql) {
+        for (Notifier notifier : notifiers) {
+            try {
+                notifier.badQueryFound(adj, runningSec, sql);
+            } catch (Exception e) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    public interface Notifier {
+        void badQueryFound(String adj, int runningSec, String sql);
+    }
+    
+    public void queryStart(Thread thread, SQLRequest sqlRequest) {
+        runningQueries.put(thread, new Entry(sqlRequest));
+    }
+
+    public void queryEnd(Thread thread) {
+        Entry e = runningQueries.remove(thread);
+
+        if (e != null) {
+            int runningSec = (int) ((System.currentTimeMillis() - e.startTime) / 1000);
+            if (runningSec >= alertRunningSec) {
+                notify("Slow", runningSec, e.sqlRequest.getSql());
+            }
+        }
+    }
+
+    private class Entry implements Comparable<Entry> {
+        final SQLRequest sqlRequest;
+        final long startTime;
+
+        Entry(SQLRequest sqlRequest) {
+            this.sqlRequest = sqlRequest;
+            this.startTime = System.currentTimeMillis();
+        }
+
+        @Override
+        public int compareTo(Entry o) {
+            return (int) (this.startTime - o.startTime);
+        }
+    }
+
+    public void run() {
+        while (true) {
+            try {
+                Thread.sleep(detectionInterval);
+            } catch (InterruptedException e) {
+                // stop detection and exit
+                return;
+            }
+
+            try {
+                detectBadQuery();
+            } catch (Exception ex) {
+                logger.error("", ex);
+            }
+        }
+    }
+
+    private void detectBadQuery() {
+        if (getSystemAvailMB() < alertMB) {
+            ArrayList<Entry> entries = new ArrayList<Entry>(runningQueries.values());
+            Collections.sort(entries);
+
+            logger.info("System free memory less than " + alertMB + " MB. " + entries.size() + " queries running.");
+            long now = System.currentTimeMillis();
+
+            for (int i = 0; i < entries.size(); i++) {
+                Entry e = entries.get(i);
+                notify("Low mem", (int) ((now - e.startTime) / 1000), e.sqlRequest.getSql());
+            }
+        }
+    }
+
+    public static final int ONE_MB = 1024 * 1024;
+
+    public static long getSystemAvailBytes() {
+        Runtime runtime = Runtime.getRuntime();
+        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+        return availableMemory;
+    }
+
+    public static int getSystemAvailMB() {
+        return (int) (getSystemAvailBytes() / ONE_MB);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 6940106..6e6d9da 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -30,7 +30,6 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -48,6 +47,7 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
@@ -80,13 +80,16 @@ public class QueryService extends BasicService {
     private static final Logger logger = LoggerFactory.getLogger(QueryService.class);
 
     public static final String USER_QUERY_FAMILY = "q";
-    private Serializer<Query[]> querySerializer = new Serializer<Query[]>(Query[].class);
     private static final String DEFAULT_TABLE_PREFIX = "kylin_metadata";
     private static final String USER_TABLE_NAME = "_user";
     private static final String USER_QUERY_COLUMN = "c";
-    private String hbaseUrl = null;
-    private String tableNameBase = null;
-    private String userTableName = null;
+
+    private final Serializer<Query[]> querySerializer = new Serializer<Query[]>(Query[].class);
+    private final BadQueryDetector badQueryDetector = new BadQueryDetector();
+
+    private final String hbaseUrl;
+    private final String tableNameBase;
+    private final String userTableName;
 
     public QueryService() {
         String metadataUrl = KylinConfig.getInstanceFromEnv().getMetadataUrl();
@@ -95,6 +98,8 @@ public class QueryService extends BasicService {
         tableNameBase = cut < 0 ? DEFAULT_TABLE_PREFIX : metadataUrl.substring(0, cut);
         hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
         userTableName = tableNameBase + USER_TABLE_NAME;
+        
+        badQueryDetector.start();
     }
 
     public List<TableMeta> getMetadata(String project) throws SQLException {
@@ -102,18 +107,14 @@ public class QueryService extends BasicService {
     }
 
     public SQLResponse query(SQLRequest sqlRequest) throws Exception {
-        SQLResponse fakeResponse = QueryUtil.tableauIntercept(sqlRequest.getSql());
-        if (null != fakeResponse) {
-            logger.debug("Return fake response, is exception? " + fakeResponse.getIsException());
-
-            return fakeResponse;
+        try {
+            badQueryDetector.queryStart(Thread.currentThread(), sqlRequest);
+            
+            return queryWithSqlMassage(sqlRequest);
+            
+        } finally {
+            badQueryDetector.queryEnd(Thread.currentThread());
         }
-
-        String correctedSql = QueryUtil.healSickSql(sqlRequest.getSql());
-        if (correctedSql.equals(sqlRequest.getSql()) == false)
-            logger.debug("The corrected query: " + correctedSql);
-
-        return executeQuery(correctedSql, sqlRequest);
     }
 
     public void saveQuery(final String creator, final Query query) throws IOException {
@@ -192,12 +193,11 @@ public class QueryService extends BasicService {
         return queries;
     }
 
-    public void logQuery(final SQLRequest request, final SQLResponse response, final Date startTime, final Date endTime) {
+    public void logQuery(final SQLRequest request, final SQLResponse response) {
         final String user = SecurityContextHolder.getContext().getAuthentication().getName();
         final Set<String> realizationNames = new HashSet<String>();
         final Set<Long> cuboidIds = new HashSet<Long>();
-        long totalScanCount = 0;
-        float duration = (endTime.getTime() - startTime.getTime()) / (float) 1000;
+        float duration = response.getDuration() / (float) 1000;
 
         if (!response.isHitCache() && null != OLAPContext.getThreadLocalContexts()) {
             for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
@@ -211,8 +211,6 @@ public class QueryService extends BasicService {
                     String realizationName = ctx.realization.getName();
                     realizationNames.add(realizationName);
                 }
-
-                totalScanCount += ctx.storageContext.getTotalScanCount();
             }
         }
 
@@ -232,7 +230,7 @@ public class QueryService extends BasicService {
         stringBuilder.append("Project: ").append(request.getProject()).append(newLine);
         stringBuilder.append("Realization Names: ").append(realizationNames).append(newLine);
         stringBuilder.append("Cuboid Ids: ").append(cuboidIds).append(newLine);
-        stringBuilder.append("Total scan count: ").append(totalScanCount).append(newLine);
+        stringBuilder.append("Total scan count: ").append(response.getTotalScanCount()).append(newLine);
         stringBuilder.append("Result row count: ").append(resultRowCount).append(newLine);
         stringBuilder.append("Accept Partial: ").append(request.isAcceptPartial()).append(newLine);
         stringBuilder.append("Is Partial Result: ").append(response.isPartial()).append(newLine);
@@ -247,25 +245,30 @@ public class QueryService extends BasicService {
     public void checkAuthorization(CubeInstance cube) throws AccessDeniedException {
     }
 
-    protected SQLResponse executeQuery(String sql, SQLRequest sqlRequest) throws Exception {
-        sql = sql.trim().replace(";", "");
-
-        int limit = sqlRequest.getLimit();
-        if (limit > 0 && !sql.toLowerCase().contains("limit")) {
-            sql += (" LIMIT " + limit);
+    private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception {
+        SQLResponse fakeResponse = QueryUtil.tableauIntercept(sqlRequest.getSql());
+        if (null != fakeResponse) {
+            logger.debug("Return fake response, is exception? " + fakeResponse.getIsException());
+            return fakeResponse;
         }
 
-        int offset = sqlRequest.getOffset();
-        if (offset > 0 && !sql.toLowerCase().contains("offset")) {
-            sql += (" OFFSET " + offset);
-        }
+        String correctedSql = QueryUtil.massageSql(sqlRequest);
+        if (correctedSql.equals(sqlRequest.getSql()) == false)
+            logger.debug("The corrected query: " + correctedSql);
 
         // add extra parameters into olap context, like acceptPartial
         Map<String, String> parameters = new HashMap<String, String>();
         parameters.put(OLAPContext.PRM_ACCEPT_PARTIAL_RESULT, String.valueOf(sqlRequest.isAcceptPartial()));
         OLAPContext.setParameters(parameters);
 
-        return execute(sql, sqlRequest);
+        try {
+            BackdoorToggles.setToggles(sqlRequest.getBackdoorToggles());
+
+            return execute(correctedSql, sqlRequest);
+
+        } finally {
+            BackdoorToggles.cleanToggles();
+        }
     }
 
     protected List<TableMeta> getMetadata(CubeManager cubeMgr, String project, boolean cubedOnly) throws SQLException {
@@ -329,6 +332,8 @@ public class QueryService extends BasicService {
         Connection conn = null;
         Statement stat = null;
         ResultSet resultSet = null;
+        long startTime = System.currentTimeMillis();
+
         List<List<String>> results = new LinkedList<List<String>>();
         List<SelectedColumnMeta> columnMetas = new LinkedList<SelectedColumnMeta>();
 
@@ -384,6 +389,7 @@ public class QueryService extends BasicService {
 
         SQLResponse response = new SQLResponse(columnMetas, results, cube, 0, false, null, isPartialResult);
         response.setTotalScanCount(totalScanCount);
+        response.setDuration(System.currentTimeMillis() - startTime);
 
         return response;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/server/src/main/java/org/apache/kylin/rest/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/util/QueryUtil.java b/server/src/main/java/org/apache/kylin/rest/util/QueryUtil.java
index 7869f4d..16f6633 100644
--- a/server/src/main/java/org/apache/kylin/rest/util/QueryUtil.java
+++ b/server/src/main/java/org/apache/kylin/rest/util/QueryUtil.java
@@ -27,6 +27,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.kylin.rest.model.SelectedColumnMeta;
+import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 
 /**
@@ -97,8 +98,28 @@ public class QueryUtil {
         }
     }
 
+    public static String massageSql(SQLRequest sqlRequest) {
+        String sql = sqlRequest.getSql();
+        sql = sql.trim();
+        
+        while (sql.endsWith(";"))
+            sql = sql.substring(0, sql.length() - 1);
+
+        int limit = sqlRequest.getLimit();
+        if (limit > 0 && !sql.toLowerCase().contains("limit")) {
+            sql += (" LIMIT " + limit);
+        }
+
+        int offset = sqlRequest.getOffset();
+        if (offset > 0 && !sql.toLowerCase().contains("offset")) {
+            sql += (" OFFSET " + offset);
+        }
+        
+        return healSickSql(sql);
+    }
+
     // correct sick / invalid SQL
-    public static String healSickSql(String sql) {
+    private static String healSickSql(String sql) {
         Matcher m;
 
         // Case fn{ EXTRACT(...) }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
new file mode 100644
index 0000000..b38ee9d
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.rest.request.SQLRequest;
+import org.junit.Test;
+
+public class BadQueryDetectorTest {
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void test() throws InterruptedException {
+        int alertMB = BadQueryDetector.getSystemAvailMB() * 2;
+        int alertRunningSec = 5;
+        String mockSql = "select * from just_a_test";
+        final ArrayList<String[]> alerts = new ArrayList<>();
+
+        BadQueryDetector badQueryDetector = new BadQueryDetector(5000, alertMB, alertRunningSec);
+        badQueryDetector.registerNotifier(new BadQueryDetector.Notifier() {
+            @Override
+            public void badQueryFound(String adj, int runningSec, String sql) {
+                alerts.add(new String[] { adj, sql });
+            }
+        });
+        badQueryDetector.start();
+
+        {
+            SQLRequest sqlRequest = new SQLRequest();
+            sqlRequest.setSql(mockSql);
+            badQueryDetector.queryStart(Thread.currentThread(), sqlRequest);
+
+            Thread.sleep(alertRunningSec * 2 * 1000);
+
+            badQueryDetector.queryEnd(Thread.currentThread());
+        }
+
+        badQueryDetector.stop();
+
+        assertEquals(2, alerts.size());
+        assertArrayEquals(new String[] { "Low mem", mockSql }, alerts.get(0));
+        assertArrayEquals(new String[] { "Slow", mockSql }, alerts.get(1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index d2d7b22..1b1932c 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.kylin.rest.service;
 
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.Date;
 
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.project.ProjectInstance;
@@ -58,6 +57,6 @@ public class QueryServiceTest extends ServiceTestBase {
         request.setAcceptPartial(true);
         SQLResponse response = new SQLResponse();
         response.setHitCache(true);
-        queryService.logQuery(request, response, new Date(), new Date());
+        queryService.logQuery(request, response);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 9cebf3c..9f5011e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -53,14 +54,14 @@ public class StorageContext {
     private boolean enableLimit;
     private boolean enableCoprocessor;
 
-    private long totalScanCount;
+    private AtomicLong totalScanCount;
     private Cuboid cuboid;
     private boolean partialResultReturned;
 
     public StorageContext() {
         this.threshold = DEFAULT_THRESHOLD;
         this.limit = DEFAULT_THRESHOLD;
-        this.totalScanCount = 0;
+        this.totalScanCount = new AtomicLong();
         this.cuboid = null;
         this.aliasMap = HashBiMap.create();
         this.hasSort = false;
@@ -151,11 +152,11 @@ public class StorageContext {
     }
 
     public long getTotalScanCount() {
-        return totalScanCount;
+        return totalScanCount.get();
     }
 
-    public void setTotalScanCount(long totalScanCount) {
-        this.totalScanCount = totalScanCount;
+    public long increaseTotalScanCount(long count) {
+        return this.totalScanCount.addAndGet(count);
     }
 
     public boolean isAcceptPartialResult() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
index c68fcdd..9efbb79 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
@@ -92,6 +92,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     private TupleInfo tupleInfo;
     private Tuple tuple;
     private int scanCount;
+    private int scanCountDelta;
 
     public CubeSegmentTupleIterator(CubeSegment cubeSeg, Collection<HBaseKeyRange> keyRanges, HConnection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
         this.cube = cubeSeg.getCubeInstance();
@@ -103,7 +104,6 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         this.context = context;
         this.tableName = cubeSeg.getStorageLocationIdentifier();
         this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
-        this.scanCount = 0;
 
         try {
             this.table = conn.getTable(tableName);
@@ -121,6 +121,8 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     }
 
     private void closeScanner() {
+        flushScanCountDelta();
+        
         if (logger.isDebugEnabled() && scan != null) {
             logger.debug("Scan " + scan.toString());
             byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
@@ -162,6 +164,8 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
             if (resultIterator.hasNext()) {
                 result = this.resultIterator.next();
                 scanCount++;
+                if (++scanCountDelta >= 1000)
+                    flushScanCountDelta();
                 break;
             } else {
                 scanNextRange();
@@ -179,6 +183,11 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         return this.tuple;
     }
 
+    private void flushScanCountDelta() {
+        context.increaseTotalScanCount(scanCountDelta);
+        scanCountDelta = 0;
+    }
+
     @Override
     public void remove() {
         throw new UnsupportedOperationException();
@@ -277,8 +286,8 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
 
     private List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> convertToHBasePair(List<org.apache.kylin.common.util.Pair<byte[], byte[]>> pairList) {
         List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> result = Lists.newArrayList();
-        for (org.apache.kylin.common.util.Pair pair : pairList) {
-            org.apache.hadoop.hbase.util.Pair element = new org.apache.hadoop.hbase.util.Pair(pair.getFirst(), pair.getSecond());
+        for (org.apache.kylin.common.util.Pair<byte[], byte[]> pair : pairList) {
+            org.apache.hadoop.hbase.util.Pair<byte[], byte[]> element = new org.apache.hadoop.hbase.util.Pair<byte[], byte[]>(pair.getFirst(), pair.getSecond());
             result.add(element);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
index e6b0258..a204d62 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
@@ -128,7 +128,6 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
 
     @Override
     public void close() {
-        context.setTotalScanCount(scanCount);
         segmentIterator.close();
     }
 }