You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/03 03:31:49 UTC
[34/50] [abbrv] incubator-kylin git commit: KYLIN-967 Add
BadQueryDetector to log bad queriest
KYLIN-967 Add BadQueryDetector to log bad queriest
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/294a3eba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/294a3eba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/294a3eba
Branch: refs/heads/0.7
Commit: 294a3ebabb9f5a81cd73c9aa977f1cb272407b29
Parents: 9fa3584
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Aug 28 10:52:16 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Aug 28 10:52:45 2015 +0800
----------------------------------------------------------------------
.../kylin/rest/controller/QueryController.java | 47 ++----
.../kylin/rest/service/BadQueryDetector.java | 157 +++++++++++++++++++
.../apache/kylin/rest/service/QueryService.java | 72 +++++----
.../org/apache/kylin/rest/util/QueryUtil.java | 23 ++-
.../rest/service/BadQueryDetectorTest.java | 63 ++++++++
.../kylin/rest/service/QueryServiceTest.java | 3 +-
.../apache/kylin/storage/StorageContext.java | 11 +-
.../storage/hbase/CubeSegmentTupleIterator.java | 15 +-
.../hbase/SerializedHBaseTupleIterator.java | 1 -
9 files changed, 309 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index 1b34616..5ac8296 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -21,9 +21,7 @@ package org.apache.kylin.rest.controller;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
-import java.util.Map;
import javax.servlet.http.HttpServletResponse;
@@ -33,7 +31,6 @@ import net.sf.ehcache.Element;
import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
@@ -84,14 +81,14 @@ public class QueryController extends BasicController {
@RequestMapping(value = "/query", method = RequestMethod.POST)
@ResponseBody
public SQLResponse query(@RequestBody SQLRequest sqlRequest) {
- return doQuery(sqlRequest);
+ return doQueryWithCache(sqlRequest);
}
// TODO should be just "prepare" a statement, get back expected ResultSetMetaData
@RequestMapping(value = "/query/prestate", method = RequestMethod.POST, produces = "application/json")
@ResponseBody
public SQLResponse prepareQuery(@RequestBody PrepareSqlRequest sqlRequest) {
- return doQuery(sqlRequest);
+ return doQueryWithCache(sqlRequest);
}
@RequestMapping(value = "/saved_queries", method = RequestMethod.POST)
@@ -120,7 +117,7 @@ public class QueryController extends BasicController {
@RequestMapping(value = "/query/format/{format}", method = RequestMethod.GET)
@ResponseBody
public void downloadQueryResult(@PathVariable String format, SQLRequest sqlRequest, HttpServletResponse response) {
- SQLResponse result = doQuery(sqlRequest);
+ SQLResponse result = doQueryWithCache(sqlRequest);
response.setContentType("text/" + format + ";charset=utf-8");
response.setHeader("Content-Disposition", "attachment; filename=\"result." + format + "\"");
ICsvListWriter csvWriter = null;
@@ -158,19 +155,7 @@ public class QueryController extends BasicController {
}
}
- private SQLResponse doQuery(SQLRequest sqlRequest) {
- initDebugToggles(sqlRequest);
-
- long startTimestamp = System.currentTimeMillis();
- SQLResponse response = doQueryInternal(sqlRequest);
- response.setDuration(System.currentTimeMillis() - startTimestamp);
- queryService.logQuery(sqlRequest, response, new Date(startTimestamp), new Date(System.currentTimeMillis()));
-
- cleanupDebugToggles();
- return response;
- }
-
- private SQLResponse doQueryInternal(SQLRequest sqlRequest) {
+ private SQLResponse doQueryWithCache(SQLRequest sqlRequest) {
String sql = sqlRequest.getSql();
String project = sqlRequest.getProject();
logger.info("Using project: " + project);
@@ -189,13 +174,11 @@ public class QueryController extends BasicController {
SQLResponse sqlResponse = searchQueryInCache(sqlRequest);
try {
if (null == sqlResponse) {
- long start = System.currentTimeMillis();
sqlResponse = queryService.query(sqlRequest);
- long duration = System.currentTimeMillis() - start;
long durationThreshold = KylinConfig.getInstanceFromEnv().getQueryDurationCacheThreshold();
long scancountThreshold = KylinConfig.getInstanceFromEnv().getQueryScanCountCacheThreshold();
- if (!sqlResponse.getIsException() && (duration > durationThreshold || sqlResponse.getTotalScanCount() > scancountThreshold)) {
+ if (!sqlResponse.getIsException() && (sqlResponse.getDuration() > durationThreshold || sqlResponse.getTotalScanCount() > scancountThreshold)) {
cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest, sqlResponse));
}
}
@@ -215,10 +198,12 @@ public class QueryController extends BasicController {
}
}
+ queryService.logQuery(sqlRequest, sqlResponse);
+
if (sqlResponse.getIsException())
throw new InternalErrorException(sqlResponse.getExceptionMessage());
- else
- return sqlResponse;
+
+ return sqlResponse;
}
private SQLResponse searchQueryInCache(SQLRequest sqlRequest) {
@@ -254,18 +239,4 @@ public class QueryController extends BasicController {
this.cacheManager = cacheManager;
}
- private void initDebugToggles(SQLRequest sqlRequest) {
-
- Map<String, String> toggles = sqlRequest.getBackdoorToggles();
- if (toggles == null || toggles.size() == 0) {
- return;
- }
-
- BackdoorToggles.setToggles(toggles);
- }
-
- private void cleanupDebugToggles() {
- BackdoorToggles.cleanToggles();
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
new file mode 100644
index 0000000..32410f4
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.kylin.rest.request.SQLRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class BadQueryDetector extends Thread {
+
+ private static final Logger logger = LoggerFactory.getLogger(BadQueryDetector.class);
+
+ private final ConcurrentMap<Thread, Entry> runningQueries = Maps.newConcurrentMap();
+ private final long detectionInterval;
+ private final int alertMB;
+ private final int alertRunningSec;
+
+ private ArrayList<Notifier> notifiers = new ArrayList<Notifier>();
+
+ public BadQueryDetector() {
+ this(60 * 1000, 100, 60); // 1 minute, 100 MB, 60 seconds
+ }
+
+ public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec) {
+ super("BadQueryDetector");
+ this.setDaemon(true);
+ this.detectionInterval = detectionInterval;
+ this.alertMB = alertMB;
+ this.alertRunningSec = alertRunningSec;
+
+ this.notifiers.add(new Notifier() {
+ @Override
+ public void badQueryFound(String adj, int runningSec, String sql) {
+ logger.info(adj + " query has been running " + runningSec + " seconds -- " + sql);
+ }
+ });
+ }
+
+ public void registerNotifier(Notifier notifier) {
+ notifiers.add(notifier);
+ }
+
+ private void notify(String adj, int runningSec, String sql) {
+ for (Notifier notifier : notifiers) {
+ try {
+ notifier.badQueryFound(adj, runningSec, sql);
+ } catch (Exception e) {
+ logger.error("", e);
+ }
+ }
+ }
+
+ public interface Notifier {
+ void badQueryFound(String adj, int runningSec, String sql);
+ }
+
+ public void queryStart(Thread thread, SQLRequest sqlRequest) {
+ runningQueries.put(thread, new Entry(sqlRequest));
+ }
+
+ public void queryEnd(Thread thread) {
+ Entry e = runningQueries.remove(thread);
+
+ if (e != null) {
+ int runningSec = (int) ((System.currentTimeMillis() - e.startTime) / 1000);
+ if (runningSec >= alertRunningSec) {
+ notify("Slow", runningSec, e.sqlRequest.getSql());
+ }
+ }
+ }
+
+ private class Entry implements Comparable<Entry> {
+ final SQLRequest sqlRequest;
+ final long startTime;
+
+ Entry(SQLRequest sqlRequest) {
+ this.sqlRequest = sqlRequest;
+ this.startTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public int compareTo(Entry o) {
+ return (int) (this.startTime - o.startTime);
+ }
+ }
+
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(detectionInterval);
+ } catch (InterruptedException e) {
+ // stop detection and exit
+ return;
+ }
+
+ try {
+ detectBadQuery();
+ } catch (Exception ex) {
+ logger.error("", ex);
+ }
+ }
+ }
+
+ private void detectBadQuery() {
+ if (getSystemAvailMB() < alertMB) {
+ ArrayList<Entry> entries = new ArrayList<Entry>(runningQueries.values());
+ Collections.sort(entries);
+
+ logger.info("System free memory less than " + alertMB + " MB. " + entries.size() + " queries running.");
+ long now = System.currentTimeMillis();
+
+ for (int i = 0; i < entries.size(); i++) {
+ Entry e = entries.get(i);
+ notify("Low mem", (int) ((now - e.startTime) / 1000), e.sqlRequest.getSql());
+ }
+ }
+ }
+
+ public static final int ONE_MB = 1024 * 1024;
+
+ public static long getSystemAvailBytes() {
+ Runtime runtime = Runtime.getRuntime();
+ long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+ long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+ long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+ long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+ long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+ return availableMemory;
+ }
+
+ public static int getSystemAvailMB() {
+ return (int) (getSystemAvailBytes() / ONE_MB);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 6940106..6e6d9da 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -30,7 +30,6 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -48,6 +47,7 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
@@ -80,13 +80,16 @@ public class QueryService extends BasicService {
private static final Logger logger = LoggerFactory.getLogger(QueryService.class);
public static final String USER_QUERY_FAMILY = "q";
- private Serializer<Query[]> querySerializer = new Serializer<Query[]>(Query[].class);
private static final String DEFAULT_TABLE_PREFIX = "kylin_metadata";
private static final String USER_TABLE_NAME = "_user";
private static final String USER_QUERY_COLUMN = "c";
- private String hbaseUrl = null;
- private String tableNameBase = null;
- private String userTableName = null;
+
+ private final Serializer<Query[]> querySerializer = new Serializer<Query[]>(Query[].class);
+ private final BadQueryDetector badQueryDetector = new BadQueryDetector();
+
+ private final String hbaseUrl;
+ private final String tableNameBase;
+ private final String userTableName;
public QueryService() {
String metadataUrl = KylinConfig.getInstanceFromEnv().getMetadataUrl();
@@ -95,6 +98,8 @@ public class QueryService extends BasicService {
tableNameBase = cut < 0 ? DEFAULT_TABLE_PREFIX : metadataUrl.substring(0, cut);
hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
userTableName = tableNameBase + USER_TABLE_NAME;
+
+ badQueryDetector.start();
}
public List<TableMeta> getMetadata(String project) throws SQLException {
@@ -102,18 +107,14 @@ public class QueryService extends BasicService {
}
public SQLResponse query(SQLRequest sqlRequest) throws Exception {
- SQLResponse fakeResponse = QueryUtil.tableauIntercept(sqlRequest.getSql());
- if (null != fakeResponse) {
- logger.debug("Return fake response, is exception? " + fakeResponse.getIsException());
-
- return fakeResponse;
+ try {
+ badQueryDetector.queryStart(Thread.currentThread(), sqlRequest);
+
+ return queryWithSqlMassage(sqlRequest);
+
+ } finally {
+ badQueryDetector.queryEnd(Thread.currentThread());
}
-
- String correctedSql = QueryUtil.healSickSql(sqlRequest.getSql());
- if (correctedSql.equals(sqlRequest.getSql()) == false)
- logger.debug("The corrected query: " + correctedSql);
-
- return executeQuery(correctedSql, sqlRequest);
}
public void saveQuery(final String creator, final Query query) throws IOException {
@@ -192,12 +193,11 @@ public class QueryService extends BasicService {
return queries;
}
- public void logQuery(final SQLRequest request, final SQLResponse response, final Date startTime, final Date endTime) {
+ public void logQuery(final SQLRequest request, final SQLResponse response) {
final String user = SecurityContextHolder.getContext().getAuthentication().getName();
final Set<String> realizationNames = new HashSet<String>();
final Set<Long> cuboidIds = new HashSet<Long>();
- long totalScanCount = 0;
- float duration = (endTime.getTime() - startTime.getTime()) / (float) 1000;
+ float duration = response.getDuration() / (float) 1000;
if (!response.isHitCache() && null != OLAPContext.getThreadLocalContexts()) {
for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
@@ -211,8 +211,6 @@ public class QueryService extends BasicService {
String realizationName = ctx.realization.getName();
realizationNames.add(realizationName);
}
-
- totalScanCount += ctx.storageContext.getTotalScanCount();
}
}
@@ -232,7 +230,7 @@ public class QueryService extends BasicService {
stringBuilder.append("Project: ").append(request.getProject()).append(newLine);
stringBuilder.append("Realization Names: ").append(realizationNames).append(newLine);
stringBuilder.append("Cuboid Ids: ").append(cuboidIds).append(newLine);
- stringBuilder.append("Total scan count: ").append(totalScanCount).append(newLine);
+ stringBuilder.append("Total scan count: ").append(response.getTotalScanCount()).append(newLine);
stringBuilder.append("Result row count: ").append(resultRowCount).append(newLine);
stringBuilder.append("Accept Partial: ").append(request.isAcceptPartial()).append(newLine);
stringBuilder.append("Is Partial Result: ").append(response.isPartial()).append(newLine);
@@ -247,25 +245,30 @@ public class QueryService extends BasicService {
public void checkAuthorization(CubeInstance cube) throws AccessDeniedException {
}
- protected SQLResponse executeQuery(String sql, SQLRequest sqlRequest) throws Exception {
- sql = sql.trim().replace(";", "");
-
- int limit = sqlRequest.getLimit();
- if (limit > 0 && !sql.toLowerCase().contains("limit")) {
- sql += (" LIMIT " + limit);
+ private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception {
+ SQLResponse fakeResponse = QueryUtil.tableauIntercept(sqlRequest.getSql());
+ if (null != fakeResponse) {
+ logger.debug("Return fake response, is exception? " + fakeResponse.getIsException());
+ return fakeResponse;
}
- int offset = sqlRequest.getOffset();
- if (offset > 0 && !sql.toLowerCase().contains("offset")) {
- sql += (" OFFSET " + offset);
- }
+ String correctedSql = QueryUtil.massageSql(sqlRequest);
+ if (correctedSql.equals(sqlRequest.getSql()) == false)
+ logger.debug("The corrected query: " + correctedSql);
// add extra parameters into olap context, like acceptPartial
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(OLAPContext.PRM_ACCEPT_PARTIAL_RESULT, String.valueOf(sqlRequest.isAcceptPartial()));
OLAPContext.setParameters(parameters);
- return execute(sql, sqlRequest);
+ try {
+ BackdoorToggles.setToggles(sqlRequest.getBackdoorToggles());
+
+ return execute(correctedSql, sqlRequest);
+
+ } finally {
+ BackdoorToggles.cleanToggles();
+ }
}
protected List<TableMeta> getMetadata(CubeManager cubeMgr, String project, boolean cubedOnly) throws SQLException {
@@ -329,6 +332,8 @@ public class QueryService extends BasicService {
Connection conn = null;
Statement stat = null;
ResultSet resultSet = null;
+ long startTime = System.currentTimeMillis();
+
List<List<String>> results = new LinkedList<List<String>>();
List<SelectedColumnMeta> columnMetas = new LinkedList<SelectedColumnMeta>();
@@ -384,6 +389,7 @@ public class QueryService extends BasicService {
SQLResponse response = new SQLResponse(columnMetas, results, cube, 0, false, null, isPartialResult);
response.setTotalScanCount(totalScanCount);
+ response.setDuration(System.currentTimeMillis() - startTime);
return response;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/server/src/main/java/org/apache/kylin/rest/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/util/QueryUtil.java b/server/src/main/java/org/apache/kylin/rest/util/QueryUtil.java
index 7869f4d..16f6633 100644
--- a/server/src/main/java/org/apache/kylin/rest/util/QueryUtil.java
+++ b/server/src/main/java/org/apache/kylin/rest/util/QueryUtil.java
@@ -27,6 +27,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kylin.rest.model.SelectedColumnMeta;
+import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.response.SQLResponse;
/**
@@ -97,8 +98,28 @@ public class QueryUtil {
}
}
+ public static String massageSql(SQLRequest sqlRequest) {
+ String sql = sqlRequest.getSql();
+ sql = sql.trim();
+
+ while (sql.endsWith(";"))
+ sql = sql.substring(0, sql.length() - 1);
+
+ int limit = sqlRequest.getLimit();
+ if (limit > 0 && !sql.toLowerCase().contains("limit")) {
+ sql += (" LIMIT " + limit);
+ }
+
+ int offset = sqlRequest.getOffset();
+ if (offset > 0 && !sql.toLowerCase().contains("offset")) {
+ sql += (" OFFSET " + offset);
+ }
+
+ return healSickSql(sql);
+ }
+
// correct sick / invalid SQL
- public static String healSickSql(String sql) {
+ private static String healSickSql(String sql) {
Matcher m;
// Case fn{ EXTRACT(...) }
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
new file mode 100644
index 0000000..b38ee9d
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.rest.request.SQLRequest;
+import org.junit.Test;
+
+public class BadQueryDetectorTest {
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void test() throws InterruptedException {
+ int alertMB = BadQueryDetector.getSystemAvailMB() * 2;
+ int alertRunningSec = 5;
+ String mockSql = "select * from just_a_test";
+ final ArrayList<String[]> alerts = new ArrayList<>();
+
+ BadQueryDetector badQueryDetector = new BadQueryDetector(5000, alertMB, alertRunningSec);
+ badQueryDetector.registerNotifier(new BadQueryDetector.Notifier() {
+ @Override
+ public void badQueryFound(String adj, int runningSec, String sql) {
+ alerts.add(new String[] { adj, sql });
+ }
+ });
+ badQueryDetector.start();
+
+ {
+ SQLRequest sqlRequest = new SQLRequest();
+ sqlRequest.setSql(mockSql);
+ badQueryDetector.queryStart(Thread.currentThread(), sqlRequest);
+
+ Thread.sleep(alertRunningSec * 2 * 1000);
+
+ badQueryDetector.queryEnd(Thread.currentThread());
+ }
+
+ badQueryDetector.stop();
+
+ assertEquals(2, alerts.size());
+ assertArrayEquals(new String[] { "Low mem", mockSql }, alerts.get(0));
+ assertArrayEquals(new String[] { "Slow", mockSql }, alerts.get(1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index d2d7b22..1b1932c 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.kylin.rest.service;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.Date;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.metadata.project.ProjectInstance;
@@ -58,6 +57,6 @@ public class QueryServiceTest extends ServiceTestBase {
request.setAcceptPartial(true);
SQLResponse response = new SQLResponse();
response.setHitCache(true);
- queryService.logQuery(request, response, new Date(), new Date());
+ queryService.logQuery(request, response);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 9cebf3c..9f5011e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.MeasureDesc;
@@ -53,14 +54,14 @@ public class StorageContext {
private boolean enableLimit;
private boolean enableCoprocessor;
- private long totalScanCount;
+ private AtomicLong totalScanCount;
private Cuboid cuboid;
private boolean partialResultReturned;
public StorageContext() {
this.threshold = DEFAULT_THRESHOLD;
this.limit = DEFAULT_THRESHOLD;
- this.totalScanCount = 0;
+ this.totalScanCount = new AtomicLong();
this.cuboid = null;
this.aliasMap = HashBiMap.create();
this.hasSort = false;
@@ -151,11 +152,11 @@ public class StorageContext {
}
public long getTotalScanCount() {
- return totalScanCount;
+ return totalScanCount.get();
}
- public void setTotalScanCount(long totalScanCount) {
- this.totalScanCount = totalScanCount;
+ public long increaseTotalScanCount(long count) {
+ return this.totalScanCount.addAndGet(count);
}
public boolean isAcceptPartialResult() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
index c68fcdd..9efbb79 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
@@ -92,6 +92,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
private TupleInfo tupleInfo;
private Tuple tuple;
private int scanCount;
+ private int scanCountDelta;
public CubeSegmentTupleIterator(CubeSegment cubeSeg, Collection<HBaseKeyRange> keyRanges, HConnection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
this.cube = cubeSeg.getCubeInstance();
@@ -103,7 +104,6 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
this.context = context;
this.tableName = cubeSeg.getStorageLocationIdentifier();
this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
- this.scanCount = 0;
try {
this.table = conn.getTable(tableName);
@@ -121,6 +121,8 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
}
private void closeScanner() {
+ flushScanCountDelta();
+
if (logger.isDebugEnabled() && scan != null) {
logger.debug("Scan " + scan.toString());
byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
@@ -162,6 +164,8 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
if (resultIterator.hasNext()) {
result = this.resultIterator.next();
scanCount++;
+ if (++scanCountDelta >= 1000)
+ flushScanCountDelta();
break;
} else {
scanNextRange();
@@ -179,6 +183,11 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
return this.tuple;
}
+ private void flushScanCountDelta() {
+ context.increaseTotalScanCount(scanCountDelta);
+ scanCountDelta = 0;
+ }
+
@Override
public void remove() {
throw new UnsupportedOperationException();
@@ -277,8 +286,8 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
private List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> convertToHBasePair(List<org.apache.kylin.common.util.Pair<byte[], byte[]>> pairList) {
List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> result = Lists.newArrayList();
- for (org.apache.kylin.common.util.Pair pair : pairList) {
- org.apache.hadoop.hbase.util.Pair element = new org.apache.hadoop.hbase.util.Pair(pair.getFirst(), pair.getSecond());
+ for (org.apache.kylin.common.util.Pair<byte[], byte[]> pair : pairList) {
+ org.apache.hadoop.hbase.util.Pair<byte[], byte[]> element = new org.apache.hadoop.hbase.util.Pair<byte[], byte[]>(pair.getFirst(), pair.getSecond());
result.add(element);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294a3eba/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
index e6b0258..a204d62 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
@@ -128,7 +128,6 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
@Override
public void close() {
- context.setTotalScanCount(scanCount);
segmentIterator.close();
}
}