You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/01/06 08:21:30 UTC
[1/3] kylin git commit: APACHE-KYLIN-1872: Make query visible and
interruptible
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2881-review [created] d969767c3
APACHE-KYLIN-1872: Make query visible and interruptible
Signed-off-by: Zhong <nj...@apache.org>
Signed-off-by: lidongsjtu <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8d35a2be
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8d35a2be
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8d35a2be
Branch: refs/heads/KYLIN-2881-review
Commit: 8d35a2bead2f4bc91eee54e58d8b778fefc6a3ef
Parents: e44f95e
Author: Ma Gang <mg...@163.com>
Authored: Wed Sep 20 19:46:38 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Sat Jan 6 14:54:01 2018 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/QueryContext.java | 76 +++++++++----
.../kylin/common/QueryContextManager.java | 109 +++++++++++++++++++
.../gtrecord/GTCubeStorageQueryBase.java | 6 +-
.../gtrecord/SequentialCubeTupleIterator.java | 4 +-
.../apache/kylin/query/ITFailfastQueryTest.java | 4 +-
.../kylin/query/enumerator/OLAPQuery.java | 5 +-
.../kylin/rest/controller/QueryController.java | 26 +++++
.../kylin/rest/metrics/QueryMetricsFacade.java | 4 +-
.../apache/kylin/rest/service/QueryService.java | 18 +--
.../kylin/rest/metrics/QueryMetricsTest.java | 4 +-
.../kylin/rest/service/QueryServiceTest.java | 5 +-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 3 +-
12 files changed, 224 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 1af90f4..d36b332 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.common.exceptions.KylinTimeoutException;
@@ -39,37 +40,30 @@ public class QueryContext {
private static final Logger logger = LoggerFactory.getLogger(QueryContext.class);
- private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() {
- @Override
- protected QueryContext initialValue() {
- return new QueryContext();
- }
- };
+ public interface QueryStopListener {
+ void stop(QueryContext query);
+ }
private long queryStartMillis;
private long deadline = Long.MAX_VALUE;
- private String queryId;
+ private final String queryId;
private String username;
private Set<String> groups;
private AtomicLong scannedRows = new AtomicLong();
private AtomicLong scannedBytes = new AtomicLong();
+ private AtomicBoolean isRunning = new AtomicBoolean(true);
+ private volatile Throwable throwable;
+ private String stopReason;
+ private List<QueryStopListener> stopListeners = Lists.newCopyOnWriteArrayList();
+
private List<RPCStatistics> rpcStatisticsList = Lists.newCopyOnWriteArrayList();
private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap();
- private QueryContext() {
- // use QueryContext.current() instead
- queryStartMillis = System.currentTimeMillis();
+ QueryContext() {
queryId = UUID.randomUUID().toString();
- }
-
- public static QueryContext current() {
- return contexts.get();
- }
-
- public static void reset() {
- contexts.remove();
+ queryStartMillis = System.currentTimeMillis();
}
public long getQueryStartMillis() {
@@ -102,8 +96,8 @@ public class QueryContext {
return queryId == null ? "" : queryId;
}
- public void setQueryId(String queryId) {
- this.queryId = queryId;
+ public long getAccumulatedMillis() {
+ return System.currentTimeMillis() - queryStartMillis;
}
public String getUsername() {
@@ -138,6 +132,48 @@ public class QueryContext {
return scannedBytes.addAndGet(deltaBytes);
}
+ public void addQueryStopListener(QueryStopListener listener) {
+ this.stopListeners.add(listener);
+ }
+
+ public boolean isStopped() {
+ return !isRunning.get();
+ }
+
+ public String getStopReason() {
+ return stopReason;
+ }
+
+ /**
+ * stop the whole query and related sub threads
+ */
+ public void stop(Throwable t) {
+ stopQuery(t, t.getMessage());
+ }
+
+ /**
+ * stop the whole query by rest call
+ */
+ public void stopEarly(String reason) {
+ stopQuery(null, reason);
+ }
+
+ private void stopQuery(Throwable t, String reason) {
+ if (isStopped()) {
+ return;
+ }
+ isRunning.set(false);
+ this.throwable = t;
+ this.stopReason = reason;
+ for (QueryStopListener stopListener : stopListeners) {
+ stopListener.stop(this);
+ }
+ }
+
+ public Throwable getThrowable() {
+ return throwable;
+ }
+
public void addContext(int ctxId, String type, boolean ifCube) {
Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = null;
if (ifCube) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java
new file mode 100644
index 0000000..d08557e
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class QueryContextManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(QueryContextManager.class);
+
+ private static final ConcurrentMap<String, QueryContext> idContextMap = Maps.newConcurrentMap();
+ private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() {
+ @Override
+ protected QueryContext initialValue() {
+ QueryContext queryContext = new QueryContext();
+ idContextMap.put(queryContext.getQueryId(), queryContext);
+ return queryContext;
+ }
+ };
+
+ public static QueryContext current() {
+ return contexts.get();
+ }
+
+ /**
+ * invoked by program
+ */
+ public static void resetCurrent() {
+ QueryContext queryContext = contexts.get();
+ if (queryContext != null) {
+ idContextMap.remove(queryContext.getQueryId());
+ contexts.remove();
+ }
+ }
+
+ /**
+ * invoked by user to let query stop early
+ * @link resetCurrent() should be finally invoked
+ */
+ public static void stopQuery(String queryId, String info) {
+ QueryContext queryContext = idContextMap.get(queryId);
+ if (queryContext != null) {
+ queryContext.stopEarly(info);
+ } else {
+ logger.info("the query:{} is not existed", queryId);
+ }
+ }
+
+ public static List<QueryContext> getAllRunningQueries() {
+ // Sort by descending order
+ TreeSet<QueryContext> queriesSet = new TreeSet<>(new Comparator<QueryContext>() {
+ @Override
+ public int compare(QueryContext o1, QueryContext o2) {
+ if (o2.getAccumulatedMillis() > o1.getAccumulatedMillis()) {
+ return 1;
+ } else if (o2.getAccumulatedMillis() < o1.getAccumulatedMillis()) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+ });
+
+ for (QueryContext runningQuery : idContextMap.values()) {
+ queriesSet.add(runningQuery);
+ }
+ return Lists.newArrayList(queriesSet);
+ }
+
+ /**
+ * @param runningTime in milliseconds
+ * @return running queries that have run more than specified time
+ */
+ public static List<QueryContext> getLongRunningQueries(int runningTime) {
+ List<QueryContext> allRunningQueries = getAllRunningQueries();
+ int i = 0;
+ for (; i < allRunningQueries.size(); i++) {
+ if (allRunningQueries.get(i).getAccumulatedMillis() < runningTime) {
+ break;
+ }
+ }
+ return allRunningQueries.subList(0, i);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 11ad8bb..483facd 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,7 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -161,8 +161,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
// set whether to aggregate results from multiple partitions
enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
// set and check query deadline
- QueryContext.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000);
- QueryContext.current().checkMillisBeforeDeadline();
+ QueryContextManager.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000);
+ QueryContextManager.current().checkMillisBeforeDeadline();
// push down having clause filter if possible
TupleFilter havingFilter = checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations,
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index ede5ff9..f45f02b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -142,7 +142,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
@Override
public ITuple next() {
if (scanCount++ % 100 == 1) {
- QueryContext.current().checkMillisBeforeDeadline();
+ QueryContextManager.current().checkMillisBeforeDeadline();
}
if (++scanCountDelta >= 1000)
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
index 17b804a..e4b8b43 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
@@ -21,7 +21,7 @@ import java.io.File;
import java.util.Map;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.query.routing.Candidate;
@@ -57,7 +57,7 @@ public class ITFailfastQueryTest extends KylinTestBase {
@After
public void cleanUp() {
- QueryContext.reset();
+ QueryContextManager.resetCurrent();
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
index f6bd3f8..f0759ab 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
@@ -22,7 +22,7 @@ import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
-import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.query.relnode.OLAPContext;
import org.slf4j.Logger;
@@ -49,7 +49,8 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl
this.type = type;
this.contextId = ctxId;
- QueryContext.current().addContext(ctxId, type.toString(), type == EnumeratorTypeEnum.OLAP);
+ QueryContextManager.current().addContext(ctxId, type.toString(),
+ type == EnumeratorTypeEnum.OLAP);
}
public OLAPQuery(EnumeratorTypeEnum type, int ctxSeq) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index 4f83780..062fd62 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -29,6 +29,8 @@ import java.util.Map;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
@@ -54,6 +56,7 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.supercsv.io.CsvListWriter;
import org.supercsv.io.ICsvListWriter;
@@ -171,6 +174,29 @@ public class QueryController extends BasicController {
}
}
+ /**
+ *
+ * @param runTimeMoreThan in seconds
+ * @return
+ */
+ @RequestMapping(value = "/query/runningQueries", method = RequestMethod.GET)
+ @ResponseBody
+ public List<QueryContext> getRunningQueries(@RequestParam(value = "runTimeMoreThan", required = false, defaultValue = "-1") int runTimeMoreThan) {
+ if (runTimeMoreThan == -1) {
+ return QueryContextManager.getAllRunningQueries();
+ }else {
+ return QueryContextManager.getLongRunningQueries(runTimeMoreThan * 1000);
+ }
+ }
+
+ @RequestMapping(value = "/query/{queryId}/stop", method = RequestMethod.PUT)
+ @ResponseBody
+ public void stopQuery(@PathVariable String queryId) {
+ final String user = SecurityContextHolder.getContext().getAuthentication().getName();
+ logger.info("{} stop the query: {}", new Object[] { user, queryId });
+ QueryContextManager.stopQuery(queryId, "stopped by " + user);
+ }
+
public void setQueryService(QueryService queryService) {
this.queryService = queryService;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index e595804..09ccc07 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metrics.MetricsManager;
import org.apache.kylin.metrics.lib.impl.RecordEvent;
import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
@@ -98,7 +100,7 @@ public class QueryMetricsFacade {
if (user == null) {
user = "unknown";
}
- for (QueryContext.RPCStatistics entry : QueryContext.current().getRpcStatisticsList()) {
+ for (QueryContext.RPCStatistics entry : QueryContextManager.current().getRpcStatisticsList()) {
RecordEvent rpcMetricsEvent = new TimedRecordEvent(
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
setRPCWrapper(rpcMetricsEvent, //
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 9789b70..31e7336 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -58,6 +58,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
import org.apache.kylin.common.htrace.HtraceInit;
@@ -268,7 +269,7 @@ public class QueryService extends BasicService {
return queries;
}
- public void logQuery(final SQLRequest request, final SQLResponse response) {
+ public void logQuery(final String queryId, final SQLRequest request, final SQLResponse response) {
final String user = aclEvaluate.getCurrentUserName();
final List<String> realizationNames = new LinkedList<>();
final Set<Long> cuboidIds = new HashSet<Long>();
@@ -300,7 +301,7 @@ public class QueryService extends BasicService {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(newLine);
stringBuilder.append("==========================[QUERY]===============================").append(newLine);
- stringBuilder.append("Query Id: ").append(QueryContext.current().getQueryId()).append(newLine);
+ stringBuilder.append("Query Id: ").append(queryId).append(newLine);
stringBuilder.append("SQL: ").append(request.getSql()).append(newLine);
stringBuilder.append("User: ").append(user).append(newLine);
stringBuilder.append("Success: ").append((null == response.getExceptionMessage())).append(newLine);
@@ -402,7 +403,7 @@ public class QueryService extends BasicService {
if (sqlRequest.getBackdoorToggles() != null)
BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles());
- final QueryContext queryContext = QueryContext.current();
+ final QueryContext queryContext = QueryContextManager.current();
TraceScope scope = null;
if (kylinConfig.isHtraceTracingEveryQuery() || BackdoorToggles.getHtraceEnabled()) {
@@ -499,7 +500,7 @@ public class QueryService extends BasicService {
long durationThreshold = kylinConfig.getQueryDurationCacheThreshold();
long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold();
long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold();
- sqlResponse.setDuration(System.currentTimeMillis() - startTime);
+ sqlResponse.setDuration(queryContext.getAccumulatedMillis());
logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", //
String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()),
String.valueOf(sqlResponse.getTotalScanCount()));
@@ -616,7 +617,7 @@ public class QueryService extends BasicService {
conn = QueryConnection.getConnection(sqlRequest.getProject());
String userInfo = SecurityContextHolder.getContext().getAuthentication().getName();
- QueryContext context = QueryContext.current();
+ QueryContext context = QueryContextManager.current();
context.setUsername(userInfo);
context.setGroups(AclPermissionUtil.getCurrentUserGroups());
final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext()
@@ -1038,6 +1039,7 @@ public class QueryService extends BasicService {
boolean isPartialResult = false;
StringBuilder cubeSb = new StringBuilder();
StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: ");
+ QueryContext queryContext = QueryContextManager.current();
if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for'
for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
if (ctx.realization != null) {
@@ -1048,14 +1050,16 @@ public class QueryService extends BasicService {
cubeSb.append(ctx.realization.getCanonicalName());
logSb.append(ctx.storageContext.getProcessedRowCount()).append(" ");
}
+ queryContext.setContextRealization(ctx.id, realizationName, realizationType);
}
}
logger.info(logSb.toString());
SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, false, null, isPartialResult,
isPushDown);
- response.setTotalScanCount(QueryContext.current().getScannedRows());
- response.setTotalScanBytes(QueryContext.current().getScannedBytes());
+ response.setTotalScanCount(queryContext.getScannedRows());
+ response.setTotalScanBytes(queryContext.getScannedBytes());
+ response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
return response;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
index e23fc20..d4a16f8 100644
--- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
@@ -26,6 +26,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.response.SQLResponse;
import org.apache.kylin.rest.service.ServiceTestBase;
@@ -121,6 +122,8 @@ public class QueryMetricsTest extends ServiceTestBase {
sqlRequest.setSql("select * from TEST_KYLIN_FACT");
sqlRequest.setProject("default");
+ QueryContext context = QueryContextManager.current();
+
SQLResponse sqlResponse = new SQLResponse();
sqlResponse.setDuration(10);
sqlResponse.setCube("test_cube");
@@ -138,7 +141,6 @@ public class QueryMetricsTest extends ServiceTestBase {
sqlResponse.setResults(results);
sqlResponse.setStorageCacheUsed(true);
- QueryContext context = QueryContext.current();
int ctxId = 0;
context.addContext(ctxId, "OLAP", true);
context.addRPCStatistics(ctxId, "sandbox", "test_cube", "20100101000000_20150101000000", 3L, 3L, 3L, null, 80L,
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 7dc9994..061e622 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -21,6 +21,8 @@ package org.apache.kylin.rest.service;
import java.io.IOException;
import java.sql.SQLException;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.metadata.project.ProjectInstance;
@@ -63,9 +65,10 @@ public class QueryServiceTest extends ServiceTestBase {
SQLRequest request = new SQLRequest();
request.setSql("select * from test_table");
request.setAcceptPartial(true);
+ QueryContext queryContext = QueryContextManager.current();
SQLResponse response = new SQLResponse();
response.setHitExceptionCache(true);
- queryService.logQuery(request, response);
+ queryService.logQuery(queryContext.getQueryId(), request, response);
}
@Test
http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 6b4ac32..c660cad 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -76,7 +77,7 @@ public abstract class CubeHBaseRPC implements IGTStorage {
this.cubeSeg = (CubeSegment) segment;
this.cuboid = cuboid;
this.fullGTInfo = fullGTInfo;
- this.queryContext = QueryContext.current();
+ this.queryContext = QueryContextManager.current();
this.storageContext = context;
this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
[2/3] kylin git commit: KYLIN-2881 Improve hbase coprocessor
exception handling at kylin server side
Posted by li...@apache.org.
KYLIN-2881 Improve hbase coprocessor exception handling at kylin server side
Signed-off-by: lidongsjtu <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/006485d1
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/006485d1
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/006485d1
Branch: refs/heads/KYLIN-2881-review
Commit: 006485d1abcdc5392055abc726c1ab8c3eca13ca
Parents: 8d35a2b
Author: Zhong <nj...@apache.org>
Authored: Wed Sep 20 09:46:44 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Sat Jan 6 15:15:30 2018 +0800
----------------------------------------------------------------------
.../apache/kylin/query/ITKylinQueryTest.java | 4 +-
.../apache/kylin/query/ITMassInQueryTest.java | 4 +-
.../org/apache/kylin/query/KylinTestBase.java | 34 ++-
.../apache/kylin/rest/service/QueryService.java | 16 +-
.../hbase/cube/v2/CubeHBaseEndpointRPC.java | 252 ++++++++++++-------
.../hbase/cube/v2/ExpectedSizeIterator.java | 34 ++-
6 files changed, 225 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/006485d1/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 4edfb3d..02a50ce 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -156,7 +156,7 @@ public class ITKylinQueryTest extends KylinTestBase {
String sql = getTextFromFile(sqlFile);
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- executeQuery(kylinConn, queryFileName, sql, true);
+ execQueryUsingKylin(kylinConn, queryFileName, sql, true);
}
@Ignore
@@ -403,7 +403,7 @@ public class ITKylinQueryTest extends KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName);
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
+ ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
String queriedVersion = String.valueOf(kylinTable.getValue(0, "version"));
// compare the result
http://git-wip-us.apache.org/repos/asf/kylin/blob/006485d1/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
index cca0be6..16395fc 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
@@ -118,7 +118,7 @@ public class ITMassInQueryTest extends KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
+ ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
printResult(kylinTable);
}
@@ -139,7 +139,7 @@ public class ITMassInQueryTest extends KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
+ ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
// execute H2
sql = sql.replace("massin(test_kylin_fact.SELLER_ID,'vip_customers')", "test_kylin_fact.SELLER_ID in ( " + org.apache.commons.lang.StringUtils.join(vipSellers, ",") + ")");
http://git-wip-us.apache.org/repos/asf/kylin/blob/006485d1/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index 2c5b556..e38bb1a 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -44,6 +44,7 @@ import java.util.logging.LogManager;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContextManager;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.Pair;
@@ -228,6 +229,16 @@ public class KylinTestBase {
// ////////////////////////////////////////////////////////////////////////////////////////
// execute
+ private void initExecQueryUsingKylin(String sql) {
+ QueryContextManager.resetCurrent();
+ QueryContextManager.current();
+ }
+
+ protected ITable execQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort)
+ throws Exception {
+ initExecQueryUsingKylin(sql);
+ return executeQuery(dbConn, queryName, sql, needSort);
+ }
protected ITable executeQuery(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort)
throws Exception {
@@ -251,6 +262,7 @@ public class KylinTestBase {
}
protected int executeQuery(String sql, boolean needDisplay) throws Exception {
+ initExecQueryUsingKylin(sql);
// change join type to match current setting
sql = changeJoinType(sql, joinType);
@@ -302,6 +314,12 @@ public class KylinTestBase {
return PushDownUtil.tryPushDownNonSelectQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", isPrepare);
}
+ protected ITable execDynamicQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql,
+ List<String> parameters, boolean needSort) throws Exception {
+ initExecQueryUsingKylin(sql);
+ return executeDynamicQuery(dbConn, queryName, sql, parameters, needSort);
+ }
+
protected ITable executeDynamicQuery(IDatabaseConnection dbConn, String queryName, String sql,
List<String> parameters, boolean needSort) throws Exception {
@@ -382,7 +400,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
+ ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
// compare the result
if (BackdoorToggles.getPrepareOnly())
@@ -426,7 +444,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
+ ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
// compare the result
assertTableEquals(expectTable, kylinTable);
@@ -449,7 +467,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
+ ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
// execute H2
logger.info("Query Result from H2 - " + queryName);
@@ -478,7 +496,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + sql);
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = executeQuery(kylinConn, sql, sql, false);
+ ITable kylinTable = execQueryUsingKylin(kylinConn, sql, sql, false);
try {
// compare the result
@@ -510,7 +528,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = executeQuery(kylinConn, queryName, sqlWithLimit, false);
+ ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sqlWithLimit, false);
// execute H2
logger.info("Query Result from H2 - " + queryName);
@@ -561,7 +579,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = executeQuery(kylinConn, queryName, sql1, needSort);
+ ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql1, needSort);
// execute H2
logger.info("Query Result from H2 - " + queryName);
@@ -601,7 +619,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = executeDynamicQuery(kylinConn, queryName, sql, parameters, needSort);
+ ITable kylinTable = execDynamicQueryUsingKylin(kylinConn, queryName, sql, parameters, needSort);
// execute H2
logger.info("Query Result from H2 - " + queryName);
@@ -709,7 +727,7 @@ public class KylinTestBase {
//setup cube conn
String project = ProjectInstance.DEFAULT_PROJECT_NAME;
- cubeConnection = QueryConnection.getConnection(project);
+ cubeConnection = QueryDataSource.create(project, config).getConnection();
//setup h2
h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++) + ";CACHE_SIZE=32072", "sa",
http://git-wip-us.apache.org/repos/asf/kylin/blob/006485d1/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 31e7336..71926be 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -459,7 +459,7 @@ public class QueryService extends BasicService {
sqlResponse.setDuration(System.currentTimeMillis() - startTime);
sqlResponse.setTraceUrl(traceUrl);
- logQuery(sqlRequest, sqlResponse);
+ logQuery(queryContext.getQueryId(), sqlRequest, sqlResponse);
try {
recordMetric(sqlRequest, sqlResponse);
} catch (Throwable th) {
@@ -472,7 +472,7 @@ public class QueryService extends BasicService {
} finally {
BackdoorToggles.cleanToggles();
- QueryContext.reset();
+ QueryContextManager.resetCurrent();
if (scope != null) {
scope.close();
}
@@ -482,7 +482,8 @@ public class QueryService extends BasicService {
private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, long startTime, boolean queryCacheEnabled) {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
Message msg = MsgPicker.getMsg();
-
+ final QueryContext queryContext = QueryContextManager.current();
+
SQLResponse sqlResponse = null;
try {
final boolean isSelect = QueryUtil.isSelectStatement(sqlRequest.getSql());
@@ -525,13 +526,15 @@ public class QueryService extends BasicService {
Trace.addTimelineAnnotation("response from execution");
} catch (Throwable e) { // calcite may throw AssertError
+ queryContext.stop(e);
+
logger.error("Exception while executing query", e);
String errMsg = makeErrorMsgUserFriendly(e);
sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false);
- QueryContext queryContext = QueryContext.current();
sqlResponse.setTotalScanCount(queryContext.getScannedRows());
sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
+ sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
if (queryCacheEnabled && e.getCause() != null
&& ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
@@ -1042,6 +1045,8 @@ public class QueryService extends BasicService {
QueryContext queryContext = QueryContextManager.current();
if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for'
for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
+ String realizationName = "NULL";
+ int realizationType = -1;
if (ctx.realization != null) {
isPartialResult |= ctx.storageContext.isPartialResultReturned();
if (cubeSb.length() > 0) {
@@ -1049,6 +1054,9 @@ public class QueryService extends BasicService {
}
cubeSb.append(ctx.realization.getCanonicalName());
logSb.append(ctx.storageContext.getProcessedRowCount()).append(" ");
+
+ realizationName = ctx.realization.getName();
+ realizationType = ctx.realization.getStorageType();
}
queryContext.setContextRealization(ctx.id, realizationName, realizationType);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/006485d1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 26ab039..ddf62b7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -19,20 +19,23 @@
package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.DataFormatException;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.exceptions.KylinTimeoutException;
import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
import org.apache.kylin.common.util.Bytes;
@@ -52,7 +55,6 @@ import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
-import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats;
@@ -103,6 +105,16 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
return Pair.newPair(cubeSeg.getCuboidShardNum(cuboid.getId()), cubeSeg.getCuboidBaseShard(cuboid.getId()));
}
+ static Field channelRowField = null;
+ static {
+ try {
+ channelRowField = RegionCoprocessorRpcChannel.class.getDeclaredField("row");
+ channelRowField.setAccessible(true);
+ } catch (Throwable t) {
+ logger.warn("error when get row field from RegionCoprocessorRpcChannel class", t);
+ }
+ }
+
@SuppressWarnings("checkstyle:methodlength")
@Override
public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
@@ -135,7 +147,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it
scanRequestByteString = serializeGTScanReq(scanRequest);
- final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout);
+ final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(queryContext, shardNum, coprocessorTimeout);
logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
@@ -165,97 +177,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
builder.setMaxScanBytes(cubeSeg.getConfig().getPartitionMaxScanBytes());
builder.setIsExactAggregate(storageContext.isExactAggregation());
+ final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryContext.getQueryId(),
+ Integer.toHexString(System.identityHashCode(scanRequest)));
for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
executorService.submit(new Runnable() {
@Override
public void run() {
-
- final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryId, Integer.toHexString(System.identityHashCode(scanRequest)));
- final AtomicReference<RuntimeException> regionErrorHolder = new AtomicReference<>();
-
- try {
- Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool());
-
- final CubeVisitRequest request = builder.build();
- final byte[] startKey = epRange.getFirst();
- final byte[] endKey = epRange.getSecond();
-
- table.coprocessorService(CubeVisitService.class, startKey, endKey, //
- new Batch.Call<CubeVisitService, CubeVisitResponse>() {
- public CubeVisitResponse call(CubeVisitService rowsService) throws IOException {
- ServerRpcController controller = new ServerRpcController();
- BlockingRpcCallback<CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
- rowsService.visitCube(controller, request, rpcCallback);
- CubeVisitResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
- }
- return response;
- }
- }, new Batch.Callback<CubeVisitResponse>() {
- @Override
- public void update(byte[] region, byte[] row, CubeVisitResponse result) {
- if (region == null) {
- return;
- }
-
- logger.info(logHeader + getStatsString(region, result));
-
- Stats stats = result.getStats();
- queryContext.addAndGetScannedRows(stats.getScannedRowCount());
- queryContext.addAndGetScannedBytes(stats.getScannedBytes());
-
- RuntimeException rpcException = null;
- if (result.getStats().getNormalComplete() != 1) {
- rpcException = getCoprocessorException(result);
- }
- queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(),
- cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(),
- cuboid.getId(), storageContext.getFilterMask(), rpcException,
- stats.getServiceEndTime() - stats.getServiceStartTime(), 0,
- stats.getScannedRowCount(),
- stats.getScannedRowCount() - stats.getAggregatedRowCount()
- - stats.getFilteredRowCount(),
- stats.getAggregatedRowCount(), stats.getScannedBytes());
-
- // if any other region has responded with error, skip further processing
- if (regionErrorHolder.get() != null) {
- return;
- }
-
- // record coprocessor error if happened
- if (rpcException != null) {
- regionErrorHolder.compareAndSet(null, rpcException);
- return;
- }
-
- if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
- throw new ResourceLimitExceededException("Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes());
- }
-
- try {
- if (compressionResult) {
- epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
- } else {
- epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
- }
- } catch (IOException | DataFormatException e) {
- throw new RuntimeException(logHeader + "Error when decompressing", e);
- }
- }
- });
-
- } catch (Throwable ex) {
- logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout
- epResultItr.notifyCoprocException(ex);
- return;
- }
-
- if (regionErrorHolder.get() != null) {
- RuntimeException exception = regionErrorHolder.get();
- logger.error(logHeader + "Error when visiting cubes by endpoint", exception); // double log coz the query thread may already timeout
- epResultItr.notifyCoprocException(exception);
- }
+ runEPRange(queryContext, logHeader, compressionResult, builder.build(), conn, epRange.getFirst(),
+ epRange.getSecond(), epResultItr);
}
});
}
@@ -263,6 +192,149 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr), storageContext);
}
+ private void runEPRange(final QueryContext queryContext, final String logHeader, final boolean compressionResult,
+ final CubeVisitProtos.CubeVisitRequest request, final Connection conn, byte[] startKey, byte[] endKey,
+ final ExpectedSizeIterator epResultItr) {
+
+ final String queryId = queryContext.getQueryId();
+
+ try {
+ final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()),
+ HBaseConnection.getCoprocessorPool());
+
+ table.coprocessorService(CubeVisitService.class, startKey, endKey, //
+ new Batch.Call<CubeVisitService, CubeVisitResponse>() {
+ public CubeVisitResponse call(CubeVisitService rowsService) throws IOException {
+ if (queryContext.isStopped()) {
+ logger.warn(
+ "Query-{}: the query has been stopped, not send request to region server any more.",
+ queryId);
+ return null;
+ }
+
+ HRegionLocation regionLocation = getStartRegionLocation(rowsService);
+ String regionServerName = regionLocation == null ? "UNKNOWN" : regionLocation.getHostname();
+ logger.info("Query-{}: send request to the init region server {} on table {} ", queryId,
+ regionServerName, table.getName());
+
+ queryContext.addQueryStopListener(new QueryContext.QueryStopListener() {
+ private Thread hConnThread = Thread.currentThread();
+
+ @Override
+ public void stop(QueryContext query) {
+ try {
+ hConnThread.interrupt();
+ } catch (Exception e) {
+ logger.warn("Exception happens during interrupt thread {} due to {}",
+ hConnThread.getName(), e);
+ }
+ }
+ });
+
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
+ try {
+ rowsService.visitCube(controller, request, rpcCallback);
+ CubeVisitResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ return response;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ // Reset the interrupted state
+ Thread.interrupted();
+ }
+ }
+
+ private HRegionLocation getStartRegionLocation(CubeVisitProtos.CubeVisitService rowsService) {
+ try {
+ CubeVisitProtos.CubeVisitService.Stub rowsServiceStub = (CubeVisitProtos.CubeVisitService.Stub) rowsService;
+ RegionCoprocessorRpcChannel channel = (RegionCoprocessorRpcChannel) rowsServiceStub
+ .getChannel();
+ byte[] row = (byte[]) channelRowField.get(channel);
+ return conn.getRegionLocator(table.getName()).getRegionLocation(row, false);
+ } catch (Throwable throwable) {
+ logger.warn("error when get region server name", throwable);
+ }
+ return null;
+ }
+ }, new Batch.Callback<CubeVisitResponse>() {
+ @Override
+ public void update(byte[] region, byte[] row, CubeVisitResponse result) {
+ if (result == null) {
+ return;
+ }
+ if (region == null) {
+ return;
+ }
+
+ // if the query is stopped, skip further processing
+ // this may be caused by
+ // * Any other region has responded with error
+ // * ServerRpcController.failedOnException
+ // * ResourceLimitExceededException
+ // * Exception happened during CompressionUtils.decompress()
+ // * Outside exceptions, like KylinTimeoutException in SequentialCubeTupleIterator
+ if (queryContext.isStopped()) {
+ return;
+ }
+
+ logger.info(logHeader + getStatsString(region, result));
+
+ Stats stats = result.getStats();
+ queryContext.addAndGetScannedRows(stats.getScannedRowCount());
+ queryContext.addAndGetScannedBytes(stats.getScannedBytes());
+
+ RuntimeException rpcException = null;
+ if (result.getStats().getNormalComplete() != 1) {
+ // record coprocessor error if happened
+ rpcException = getCoprocessorException(result);
+ }
+ queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(),
+ cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(),
+ cuboid.getId(), storageContext.getFilterMask(), rpcException,
+ stats.getServiceEndTime() - stats.getServiceStartTime(), 0,
+ stats.getScannedRowCount(),
+ stats.getScannedRowCount() - stats.getAggregatedRowCount()
+ - stats.getFilteredRowCount(),
+ stats.getAggregatedRowCount(), stats.getScannedBytes());
+
+ if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
+ rpcException = new ResourceLimitExceededException(
+ "Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold "
+ + cubeSeg.getConfig().getQueryMaxScanBytes());
+ }
+
+ if (rpcException != null) {
+ queryContext.stop(rpcException);
+ return;
+ }
+
+ try {
+ if (compressionResult) {
+ epResultItr.append(CompressionUtils.decompress(
+ HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
+ } else {
+ epResultItr.append(
+ HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
+ }
+ } catch (IOException | DataFormatException e) {
+ throw new RuntimeException(logHeader + "Error when decompressing", e);
+ }
+ }
+ });
+
+ } catch (Throwable ex) {
+ queryContext.stop(ex);
+ }
+
+ if (queryContext.isStopped()) {
+ logger.error(logHeader + "Error when visiting cubes by endpoint", queryContext.getThrowable()); // double log coz the query thread may already timeout
+ }
+ }
+
private ByteString serializeGTScanReq(GTScanRequest scanRequest) {
ByteString scanRequestByteString;
int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
http://git-wip-us.apache.org/repos/asf/kylin/blob/006485d1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index 60d85b4..2cb0c7f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -24,19 +24,21 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.kylin.common.QueryContext;
import org.apache.kylin.gridtable.GTScanRequest;
import com.google.common.base.Throwables;
class ExpectedSizeIterator implements Iterator<byte[]> {
- private BlockingQueue<byte[]> queue;
- private int expectedSize;
+ private final QueryContext queryContext;
+ private final int expectedSize;
+ private final BlockingQueue<byte[]> queue;
+ private final long coprocessorTimeout;
+ private final long deadline;
private int current = 0;
- private long coprocessorTimeout;
- private long deadline;
- private volatile Throwable coprocException;
- public ExpectedSizeIterator(int expectedSize, long coprocessorTimeout) {
+ public ExpectedSizeIterator(QueryContext queryContext, int expectedSize, long coprocessorTimeout) {
+ this.queryContext = queryContext;
this.expectedSize = expectedSize;
this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
@@ -59,14 +61,11 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
current++;
byte[] ret = null;
- while (ret == null && coprocException == null && deadline > System.currentTimeMillis()) {
+ while (ret == null && deadline > System.currentTimeMillis()) {
+ checkState();
ret = queue.poll(1000, TimeUnit.MILLISECONDS);
}
- if (coprocException != null) {
- throw Throwables.propagate(coprocException);
- }
-
if (ret == null) {
throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + //
GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + coprocessorTimeout + ") cannot support this many scans?");
@@ -85,6 +84,8 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
}
public void append(byte[] data) {
+ checkState();
+
try {
queue.put(data);
} catch (InterruptedException e) {
@@ -93,7 +94,14 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
}
}
- public void notifyCoprocException(Throwable ex) {
- coprocException = ex;
+ private void checkState() {
+ if (queryContext.isStopped()) {
+ Throwable throwable = queryContext.getThrowable();
+ if (throwable != null) {
+ throw Throwables.propagate(throwable);
+ } else {
+ throw new IllegalStateException("the query is stopped: " + queryContext.getStopReason());
+ }
+ }
}
}
[3/3] kylin git commit: KYLIN-2881 code review
Posted by li...@apache.org.
KYLIN-2881 code review
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d969767c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d969767c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d969767c
Branch: refs/heads/KYLIN-2881-review
Commit: d969767c318f023cfdc450de50d853fa35e441b4
Parents: 006485d
Author: lidongsjtu <li...@apache.org>
Authored: Fri Dec 29 14:13:11 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Sat Jan 6 16:10:21 2018 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/QueryContext.java | 8 +-
.../apache/kylin/common/QueryContextFacade.java | 101 +++++++++++++++++
.../kylin/common/QueryContextManager.java | 109 -------------------
.../gtrecord/GTCubeStorageQueryBase.java | 6 +-
.../gtrecord/SequentialCubeTupleIterator.java | 4 +-
.../apache/kylin/query/ITFailfastQueryTest.java | 4 +-
.../apache/kylin/query/ITKylinQueryTest.java | 4 +-
.../apache/kylin/query/ITMassInQueryTest.java | 4 +-
.../org/apache/kylin/query/KylinTestBase.java | 50 ++-------
.../kylin/query/enumerator/OLAPQuery.java | 4 +-
.../kylin/rest/controller/QueryController.java | 18 +--
.../kylin/rest/metrics/QueryMetricsFacade.java | 5 +-
.../apache/kylin/rest/service/QueryService.java | 20 ++--
server/src/main/resources/kylinSecurity.xml | 4 +
.../kylin/rest/metrics/QueryMetricsTest.java | 4 +-
.../kylin/rest/service/QueryServiceTest.java | 4 +-
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 4 +-
17 files changed, 161 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index d36b332..718f590 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -62,8 +62,12 @@ public class QueryContext {
private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap();
QueryContext() {
+ this(System.currentTimeMillis());
+ }
+
+ QueryContext(long startMills) {
queryId = UUID.randomUUID().toString();
- queryStartMillis = System.currentTimeMillis();
+ queryStartMillis = startMills;
}
public long getQueryStartMillis() {
@@ -72,7 +76,7 @@ public class QueryContext {
public void setDeadline(long timeoutMillis) {
if (timeoutMillis > 0) {
- deadline = queryStartMillis + timeoutMillis;
+ deadline = queryStartMillis + timeoutMillis;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
new file mode 100644
index 0000000..e1cf54b
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common;
+
+import java.util.Comparator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class QueryContextFacade {
+
+ private static final Logger logger = LoggerFactory.getLogger(QueryContextFacade.class);
+
+ private static final ConcurrentMap<String, QueryContext> RUNNING_CTX_MAP = Maps.newConcurrentMap();
+ private static final ThreadLocal<QueryContext> CURRENT_CTX = new ThreadLocal<QueryContext>() {
+ @Override
+ protected QueryContext initialValue() {
+ QueryContext queryContext = new QueryContext();
+ RUNNING_CTX_MAP.put(queryContext.getQueryId(), queryContext);
+ return queryContext;
+ }
+ };
+
+ public static QueryContext current() {
+ return CURRENT_CTX.get();
+ }
+
+ /**
+ * invoked by program
+ */
+ public static void resetCurrent() {
+ QueryContext queryContext = CURRENT_CTX.get();
+ if (queryContext != null) {
+ RUNNING_CTX_MAP.remove(queryContext.getQueryId());
+ CURRENT_CTX.remove();
+ }
+ }
+
+ /**
+ * invoked by user to let query stop early
+ * @link resetCurrent() should be finally invoked
+ */
+ public static void stopQuery(String queryId, String info) {
+ QueryContext queryContext = RUNNING_CTX_MAP.get(queryId);
+ if (queryContext != null) {
+ queryContext.stopEarly(info);
+ } else {
+ logger.info("the query:{} is not existed", queryId);
+ }
+ }
+
+ public static TreeSet<QueryContext> getAllRunningQueries() {
+ TreeSet<QueryContext> runningQueries = Sets.newTreeSet(new Comparator<QueryContext>() {
+ @Override
+ public int compare(QueryContext o1, QueryContext o2) {
+ if (o2.getAccumulatedMillis() > o1.getAccumulatedMillis()) {
+ return 1;
+ } else if (o2.getAccumulatedMillis() < o1.getAccumulatedMillis()) {
+ return -1;
+ } else {
+ return o1.getQueryId().compareTo(o2.getQueryId());
+ }
+ }
+ });
+
+ runningQueries.addAll(RUNNING_CTX_MAP.values());
+ return runningQueries;
+ }
+
+ /**
+ * @param runningTime in milliseconds
+ * @return running queries that have run more than specified time
+ */
+ public static TreeSet<QueryContext> getLongRunningQueries(long runningTime) {
+ SortedSet<QueryContext> allRunningQueries = getAllRunningQueries();
+ QueryContext tmpCtx = new QueryContext(runningTime + 1L); // plus 1 to include those contexts in same accumulatedMills but different uuid
+ return (TreeSet<QueryContext>) allRunningQueries.headSet(tmpCtx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java
deleted file mode 100644
index d08557e..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class QueryContextManager {
-
- private static final Logger logger = LoggerFactory.getLogger(QueryContextManager.class);
-
- private static final ConcurrentMap<String, QueryContext> idContextMap = Maps.newConcurrentMap();
- private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() {
- @Override
- protected QueryContext initialValue() {
- QueryContext queryContext = new QueryContext();
- idContextMap.put(queryContext.getQueryId(), queryContext);
- return queryContext;
- }
- };
-
- public static QueryContext current() {
- return contexts.get();
- }
-
- /**
- * invoked by program
- */
- public static void resetCurrent() {
- QueryContext queryContext = contexts.get();
- if (queryContext != null) {
- idContextMap.remove(queryContext.getQueryId());
- contexts.remove();
- }
- }
-
- /**
- * invoked by user to let query stop early
- * @link resetCurrent() should be finally invoked
- */
- public static void stopQuery(String queryId, String info) {
- QueryContext queryContext = idContextMap.get(queryId);
- if (queryContext != null) {
- queryContext.stopEarly(info);
- } else {
- logger.info("the query:{} is not existed", queryId);
- }
- }
-
- public static List<QueryContext> getAllRunningQueries() {
- // Sort by descending order
- TreeSet<QueryContext> queriesSet = new TreeSet<>(new Comparator<QueryContext>() {
- @Override
- public int compare(QueryContext o1, QueryContext o2) {
- if (o2.getAccumulatedMillis() > o1.getAccumulatedMillis()) {
- return 1;
- } else if (o2.getAccumulatedMillis() < o1.getAccumulatedMillis()) {
- return -1;
- } else {
- return 0;
- }
- }
- });
-
- for (QueryContext runningQuery : idContextMap.values()) {
- queriesSet.add(runningQuery);
- }
- return Lists.newArrayList(queriesSet);
- }
-
- /**
- * @param runningTime in milliseconds
- * @return running queries that have run more than specified time
- */
- public static List<QueryContext> getLongRunningQueries(int runningTime) {
- List<QueryContext> allRunningQueries = getAllRunningQueries();
- int i = 0;
- for (; i < allRunningQueries.size(); i++) {
- if (allRunningQueries.get(i).getAccumulatedMillis() < runningTime) {
- break;
- }
- }
- return allRunningQueries.subList(0, i);
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 483facd..ae1f64f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,7 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -161,8 +161,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
// set whether to aggregate results from multiple partitions
enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
// set and check query deadline
- QueryContextManager.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000);
- QueryContextManager.current().checkMillisBeforeDeadline();
+ QueryContextFacade.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000);
+ QueryContextFacade.current().checkMillisBeforeDeadline();
// push down having clause filter if possible
TupleFilter havingFilter = checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations,
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index f45f02b..72417bf 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -142,7 +142,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
@Override
public ITuple next() {
if (scanCount++ % 100 == 1) {
- QueryContextManager.current().checkMillisBeforeDeadline();
+ QueryContextFacade.current().checkMillisBeforeDeadline();
}
if (++scanCountDelta >= 1000)
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
index e4b8b43..f7f0752 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
@@ -21,7 +21,7 @@ import java.io.File;
import java.util.Map;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.query.routing.Candidate;
@@ -57,7 +57,7 @@ public class ITFailfastQueryTest extends KylinTestBase {
@After
public void cleanUp() {
- QueryContextManager.resetCurrent();
+ QueryContextFacade.resetCurrent();
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 02a50ce..4edfb3d 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -156,7 +156,7 @@ public class ITKylinQueryTest extends KylinTestBase {
String sql = getTextFromFile(sqlFile);
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- execQueryUsingKylin(kylinConn, queryFileName, sql, true);
+ executeQuery(kylinConn, queryFileName, sql, true);
}
@Ignore
@@ -403,7 +403,7 @@ public class ITKylinQueryTest extends KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName);
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
+ ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
String queriedVersion = String.valueOf(kylinTable.getValue(0, "version"));
// compare the result
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
index 16395fc..cca0be6 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
@@ -118,7 +118,7 @@ public class ITMassInQueryTest extends KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
+ ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
printResult(kylinTable);
}
@@ -139,7 +139,7 @@ public class ITMassInQueryTest extends KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
+ ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
// execute H2
sql = sql.replace("massin(test_kylin_fact.SELLER_ID,'vip_customers')", "test_kylin_fact.SELLER_ID in ( " + org.apache.commons.lang.StringUtils.join(vipSellers, ",") + ")");
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index e38bb1a..52acadc 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -44,13 +44,12 @@ import java.util.logging.LogManager;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
-import org.apache.kylin.metadata.realization.NoRealizationFoundException;
import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule;
import org.apache.kylin.query.util.PushDownUtil;
@@ -229,19 +228,10 @@ public class KylinTestBase {
// ////////////////////////////////////////////////////////////////////////////////////////
// execute
- private void initExecQueryUsingKylin(String sql) {
- QueryContextManager.resetCurrent();
- QueryContextManager.current();
- }
-
- protected ITable execQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort)
- throws Exception {
- initExecQueryUsingKylin(sql);
- return executeQuery(dbConn, queryName, sql, needSort);
- }
protected ITable executeQuery(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort)
throws Exception {
+ QueryContextFacade.resetCurrent();
// change join type to match current setting
sql = changeJoinType(sql, joinType);
@@ -262,7 +252,6 @@ public class KylinTestBase {
}
protected int executeQuery(String sql, boolean needDisplay) throws Exception {
- initExecQueryUsingKylin(sql);
// change join type to match current setting
sql = changeJoinType(sql, joinType);
@@ -302,27 +291,8 @@ public class KylinTestBase {
}
}
- protected Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownSelectQuery(String sql) throws Exception {
- SQLException mockException = new SQLException("", new NoRealizationFoundException(""));
-
- return PushDownUtil.tryPushDownSelectQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", mockException,
- BackdoorToggles.getPrepareOnly());
- }
-
- protected Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownNonSelectQuery(String sql,
- boolean isPrepare) throws Exception {
- return PushDownUtil.tryPushDownNonSelectQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", isPrepare);
- }
-
- protected ITable execDynamicQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql,
- List<String> parameters, boolean needSort) throws Exception {
- initExecQueryUsingKylin(sql);
- return executeDynamicQuery(dbConn, queryName, sql, parameters, needSort);
- }
-
protected ITable executeDynamicQuery(IDatabaseConnection dbConn, String queryName, String sql,
List<String> parameters, boolean needSort) throws Exception {
-
// change join type to match current setting
sql = changeJoinType(sql, joinType);
@@ -400,7 +370,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
+ ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
// compare the result
if (BackdoorToggles.getPrepareOnly())
@@ -444,7 +414,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
+ ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
// compare the result
assertTableEquals(expectTable, kylinTable);
@@ -467,7 +437,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
+ ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
// execute H2
logger.info("Query Result from H2 - " + queryName);
@@ -496,7 +466,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + sql);
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = execQueryUsingKylin(kylinConn, sql, sql, false);
+ ITable kylinTable = executeQuery(kylinConn, sql, sql, false);
try {
// compare the result
@@ -528,7 +498,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sqlWithLimit, false);
+ ITable kylinTable = executeQuery(kylinConn, queryName, sqlWithLimit, false);
// execute H2
logger.info("Query Result from H2 - " + queryName);
@@ -579,7 +549,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql1, needSort);
+ ITable kylinTable = executeQuery(kylinConn, queryName, sql1, needSort);
// execute H2
logger.info("Query Result from H2 - " + queryName);
@@ -619,7 +589,7 @@ public class KylinTestBase {
// execute Kylin
logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")");
IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
- ITable kylinTable = execDynamicQueryUsingKylin(kylinConn, queryName, sql, parameters, needSort);
+ ITable kylinTable = executeDynamicQuery(kylinConn, queryName, sql, parameters, needSort);
// execute H2
logger.info("Query Result from H2 - " + queryName);
@@ -727,7 +697,7 @@ public class KylinTestBase {
//setup cube conn
String project = ProjectInstance.DEFAULT_PROJECT_NAME;
- cubeConnection = QueryDataSource.create(project, config).getConnection();
+ cubeConnection = QueryConnection.getConnection(project);
//setup h2
h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++) + ";CACHE_SIZE=32072", "sa",
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
index f0759ab..84ac5cf 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
@@ -22,7 +22,7 @@ import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.query.relnode.OLAPContext;
import org.slf4j.Logger;
@@ -49,7 +49,7 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl
this.type = type;
this.contextId = ctxId;
- QueryContextManager.current().addContext(ctxId, type.toString(),
+ QueryContextFacade.current().addContext(ctxId, type.toString(),
type == EnumeratorTypeEnum.OLAP);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index 062fd62..d43e7e2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -25,13 +25,14 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.TreeSet;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
-import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextManager;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
import org.apache.kylin.metadata.querymeta.TableMeta;
@@ -181,11 +182,12 @@ public class QueryController extends BasicController {
*/
@RequestMapping(value = "/query/runningQueries", method = RequestMethod.GET)
@ResponseBody
- public List<QueryContext> getRunningQueries(@RequestParam(value = "runTimeMoreThan", required = false, defaultValue = "-1") int runTimeMoreThan) {
+ public TreeSet<QueryContext> getRunningQueries(
+ @RequestParam(value = "runTimeMoreThan", required = false, defaultValue = "-1") int runTimeMoreThan) {
if (runTimeMoreThan == -1) {
- return QueryContextManager.getAllRunningQueries();
- }else {
- return QueryContextManager.getLongRunningQueries(runTimeMoreThan * 1000);
+ return QueryContextFacade.getAllRunningQueries();
+ } else {
+ return QueryContextFacade.getLongRunningQueries(runTimeMoreThan * 1000);
}
}
@@ -193,8 +195,8 @@ public class QueryController extends BasicController {
@ResponseBody
public void stopQuery(@PathVariable String queryId) {
final String user = SecurityContextHolder.getContext().getAuthentication().getName();
- logger.info("{} stop the query: {}", new Object[] { user, queryId });
- QueryContextManager.stopQuery(queryId, "stopped by " + user);
+ logger.info("{} tries to stop the query: {}, but not guaranteed to succeed.", user, queryId);
+ QueryContextFacade.stopQuery(queryId, "stopped by " + user);
}
public void setQueryService(QueryService queryService) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index 09ccc07..40fc5ef 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -28,8 +28,7 @@ import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextManager;
-import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.metrics.MetricsManager;
import org.apache.kylin.metrics.lib.impl.RecordEvent;
import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
@@ -100,7 +99,7 @@ public class QueryMetricsFacade {
if (user == null) {
user = "unknown";
}
- for (QueryContext.RPCStatistics entry : QueryContextManager.current().getRpcStatisticsList()) {
+ for (QueryContext.RPCStatistics entry : QueryContextFacade.current().getRpcStatisticsList()) {
RecordEvent rpcMetricsEvent = new TimedRecordEvent(
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
setRPCWrapper(rpcMetricsEvent, //
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 71926be..01963eb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -58,7 +58,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
import org.apache.kylin.common.htrace.HtraceInit;
@@ -403,7 +403,7 @@ public class QueryService extends BasicService {
if (sqlRequest.getBackdoorToggles() != null)
BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles());
- final QueryContext queryContext = QueryContextManager.current();
+ final QueryContext queryContext = QueryContextFacade.current();
TraceScope scope = null;
if (kylinConfig.isHtraceTracingEveryQuery() || BackdoorToggles.getHtraceEnabled()) {
@@ -414,8 +414,6 @@ public class QueryService extends BasicService {
String traceUrl = getTraceUrl(scope);
try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) {
- long startTime = System.currentTimeMillis();
-
SQLResponse sqlResponse = null;
String sql = sqlRequest.getSql();
String project = sqlRequest.getProject();
@@ -447,7 +445,7 @@ public class QueryService extends BasicService {
// real execution if required
if (sqlResponse == null) {
try (QueryRequestLimits limit = new QueryRequestLimits(sqlRequest.getProject())) {
- sqlResponse = queryAndUpdateCache(sqlRequest, startTime, isQueryCacheEnabled);
+ sqlResponse = queryAndUpdateCache(sqlRequest, isQueryCacheEnabled);
}
} else {
Trace.addTimelineAnnotation("response without real execution");
@@ -457,7 +455,7 @@ public class QueryService extends BasicService {
if (!sqlResponse.getIsException())
checkQueryAuth(sqlResponse, project);
- sqlResponse.setDuration(System.currentTimeMillis() - startTime);
+ sqlResponse.setDuration(queryContext.getAccumulatedMillis());
sqlResponse.setTraceUrl(traceUrl);
logQuery(queryContext.getQueryId(), sqlRequest, sqlResponse);
try {
@@ -472,17 +470,17 @@ public class QueryService extends BasicService {
} finally {
BackdoorToggles.cleanToggles();
- QueryContextManager.resetCurrent();
+ QueryContextFacade.resetCurrent();
if (scope != null) {
scope.close();
}
}
}
- private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, long startTime, boolean queryCacheEnabled) {
+ private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, boolean queryCacheEnabled) {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
Message msg = MsgPicker.getMsg();
- final QueryContext queryContext = QueryContextManager.current();
+ final QueryContext queryContext = QueryContextFacade.current();
SQLResponse sqlResponse = null;
try {
@@ -620,7 +618,7 @@ public class QueryService extends BasicService {
conn = QueryConnection.getConnection(sqlRequest.getProject());
String userInfo = SecurityContextHolder.getContext().getAuthentication().getName();
- QueryContext context = QueryContextManager.current();
+ QueryContext context = QueryContextFacade.current();
context.setUsername(userInfo);
context.setGroups(AclPermissionUtil.getCurrentUserGroups());
final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext()
@@ -1042,7 +1040,7 @@ public class QueryService extends BasicService {
boolean isPartialResult = false;
StringBuilder cubeSb = new StringBuilder();
StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: ");
- QueryContext queryContext = QueryContextManager.current();
+ QueryContext queryContext = QueryContextFacade.current();
if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for'
for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
String realizationName = "NULL";
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/server/src/main/resources/kylinSecurity.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index 3d5b686..f9c0d71 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -236,6 +236,8 @@
<scr:http-basic entry-point-ref="unauthorisedEntryPoint"/>
<scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/>
+ <scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/>
+ <scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/>
<scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/>
<scr:intercept-url pattern="/api/metadata*/**" access="isAuthenticated()"/>
<scr:intercept-url pattern="/api/**/metrics" access="permitAll"/>
@@ -279,6 +281,8 @@
<scr:http-basic entry-point-ref="unauthorisedEntryPoint"/>
<scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/>
+ <scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/>
+ <scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/>
<scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/>
<scr:intercept-url pattern="/api/metadata*/**" access="isAuthenticated()"/>
<scr:intercept-url pattern="/api/**/metrics" access="permitAll"/>
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
index d4a16f8..8cd7489 100644
--- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
@@ -26,7 +26,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.response.SQLResponse;
import org.apache.kylin.rest.service.ServiceTestBase;
@@ -122,7 +122,7 @@ public class QueryMetricsTest extends ServiceTestBase {
sqlRequest.setSql("select * from TEST_KYLIN_FACT");
sqlRequest.setProject("default");
- QueryContext context = QueryContextManager.current();
+ QueryContext context = QueryContextFacade.current();
SQLResponse sqlResponse = new SQLResponse();
sqlResponse.setDuration(10);
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 061e622..5c633a3 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -22,8 +22,8 @@ import java.io.IOException;
import java.sql.SQLException;
import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextManager;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.query.QueryConnection;
@@ -65,7 +65,7 @@ public class QueryServiceTest extends ServiceTestBase {
SQLRequest request = new SQLRequest();
request.setSql("select * from test_table");
request.setAcceptPartial(true);
- QueryContext queryContext = QueryContextManager.current();
+ QueryContext queryContext = QueryContextFacade.current();
SQLResponse response = new SQLResponse();
response.setHitExceptionCache(true);
queryService.logQuery(queryContext.getQueryId(), request, response);
http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index c660cad..1e2fbd6 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -77,7 +77,7 @@ public abstract class CubeHBaseRPC implements IGTStorage {
this.cubeSeg = (CubeSegment) segment;
this.cuboid = cuboid;
this.fullGTInfo = fullGTInfo;
- this.queryContext = QueryContextManager.current();
+ this.queryContext = QueryContextFacade.current();
this.storageContext = context;
this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);