You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/01/06 08:21:30 UTC

[1/3] kylin git commit: APACHE-KYLIN-1872: Make query visible and interruptible

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2881-review [created] d969767c3


APACHE-KYLIN-1872: Make query visible and interruptible

Signed-off-by: Zhong <nj...@apache.org>
Signed-off-by: lidongsjtu <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8d35a2be
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8d35a2be
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8d35a2be

Branch: refs/heads/KYLIN-2881-review
Commit: 8d35a2bead2f4bc91eee54e58d8b778fefc6a3ef
Parents: e44f95e
Author: Ma Gang <mg...@163.com>
Authored: Wed Sep 20 19:46:38 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Sat Jan 6 14:54:01 2018 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/QueryContext.java   |  76 +++++++++----
 .../kylin/common/QueryContextManager.java       | 109 +++++++++++++++++++
 .../gtrecord/GTCubeStorageQueryBase.java        |   6 +-
 .../gtrecord/SequentialCubeTupleIterator.java   |   4 +-
 .../apache/kylin/query/ITFailfastQueryTest.java |   4 +-
 .../kylin/query/enumerator/OLAPQuery.java       |   5 +-
 .../kylin/rest/controller/QueryController.java  |  26 +++++
 .../kylin/rest/metrics/QueryMetricsFacade.java  |   4 +-
 .../apache/kylin/rest/service/QueryService.java |  18 +--
 .../kylin/rest/metrics/QueryMetricsTest.java    |   4 +-
 .../kylin/rest/service/QueryServiceTest.java    |   5 +-
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     |   3 +-
 12 files changed, 224 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 1af90f4..d36b332 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
@@ -39,37 +40,30 @@ public class QueryContext {
 
     private static final Logger logger = LoggerFactory.getLogger(QueryContext.class);
 
-    private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() {
-        @Override
-        protected QueryContext initialValue() {
-            return new QueryContext();
-        }
-    };
+    public interface QueryStopListener {
+        void stop(QueryContext query);
+    }
 
     private long queryStartMillis;
     private long deadline = Long.MAX_VALUE;
 
-    private String queryId;
+    private final String queryId;
     private String username;
     private Set<String> groups;
     private AtomicLong scannedRows = new AtomicLong();
     private AtomicLong scannedBytes = new AtomicLong();
 
+    private AtomicBoolean isRunning = new AtomicBoolean(true);
+    private volatile Throwable throwable;
+    private String stopReason;
+    private List<QueryStopListener> stopListeners = Lists.newCopyOnWriteArrayList();
+
     private List<RPCStatistics> rpcStatisticsList = Lists.newCopyOnWriteArrayList();
     private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap();
 
-    private QueryContext() {
-        // use QueryContext.current() instead
-        queryStartMillis = System.currentTimeMillis();
+    QueryContext() {
         queryId = UUID.randomUUID().toString();
-    }
-
-    public static QueryContext current() {
-        return contexts.get();
-    }
-
-    public static void reset() {
-        contexts.remove();
+        queryStartMillis = System.currentTimeMillis();
     }
 
     public long getQueryStartMillis() {
@@ -102,8 +96,8 @@ public class QueryContext {
         return queryId == null ? "" : queryId;
     }
 
-    public void setQueryId(String queryId) {
-        this.queryId = queryId;
+    public long getAccumulatedMillis() {
+        return System.currentTimeMillis() - queryStartMillis;
     }
 
     public String getUsername() {
@@ -138,6 +132,48 @@ public class QueryContext {
         return scannedBytes.addAndGet(deltaBytes);
     }
 
+    public void addQueryStopListener(QueryStopListener listener) {
+        this.stopListeners.add(listener);
+    }
+
+    public boolean isStopped() {
+        return !isRunning.get();
+    }
+
+    public String getStopReason() {
+        return stopReason;
+    }
+
+    /**
+     * stop the whole query and related sub threads
+     */
+    public void stop(Throwable t) {
+        stopQuery(t, t.getMessage());
+    }
+
+    /**
+     * stop the whole query by rest call
+     */
+    public void stopEarly(String reason) {
+        stopQuery(null, reason);
+    }
+
+    private void stopQuery(Throwable t, String reason) {
+        if (isStopped()) {
+            return;
+        }
+        isRunning.set(false);
+        this.throwable = t;
+        this.stopReason = reason;
+        for (QueryStopListener stopListener : stopListeners) {
+            stopListener.stop(this);
+        }
+    }
+
+    public Throwable getThrowable() {
+        return throwable;
+    }
+
     public void addContext(int ctxId, String type, boolean ifCube) {
         Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = null;
         if (ifCube) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java
new file mode 100644
index 0000000..d08557e
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class QueryContextManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(QueryContextManager.class);
+
+    private static final ConcurrentMap<String, QueryContext> idContextMap = Maps.newConcurrentMap();
+    private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() {
+        @Override
+        protected QueryContext initialValue() {
+            QueryContext queryContext = new QueryContext();
+            idContextMap.put(queryContext.getQueryId(), queryContext);
+            return queryContext;
+        }
+    };
+
+    public static QueryContext current() {
+        return contexts.get();
+    }
+
+    /**
+     * invoked by program
+     */
+    public static void resetCurrent() {
+        QueryContext queryContext = contexts.get();
+        if (queryContext != null) {
+            idContextMap.remove(queryContext.getQueryId());
+            contexts.remove();
+        }
+    }
+
+    /**
+     * invoked by user to let query stop early
+     * @link resetCurrent() should be finally invoked
+     */
+    public static void stopQuery(String queryId, String info) {
+        QueryContext queryContext = idContextMap.get(queryId);
+        if (queryContext != null) {
+            queryContext.stopEarly(info);
+        } else {
+            logger.info("the query:{} is not existed", queryId);
+        }
+    }
+
+    public static List<QueryContext> getAllRunningQueries() {
+        // Sort by descending order
+        TreeSet<QueryContext> queriesSet = new TreeSet<>(new Comparator<QueryContext>() {
+            @Override
+            public int compare(QueryContext o1, QueryContext o2) {
+                if (o2.getAccumulatedMillis() > o1.getAccumulatedMillis()) {
+                    return 1;
+                } else if (o2.getAccumulatedMillis() < o1.getAccumulatedMillis()) {
+                    return -1;
+                } else {
+                    return 0;
+                }
+            }
+        });
+
+        for (QueryContext runningQuery : idContextMap.values()) {
+            queriesSet.add(runningQuery);
+        }
+        return Lists.newArrayList(queriesSet);
+    }
+
+    /**
+     * @param runningTime in milliseconds
+     * @return running queries that have run more than specified time
+     */
+    public static List<QueryContext> getLongRunningQueries(int runningTime) {
+        List<QueryContext> allRunningQueries = getAllRunningQueries();
+        int i = 0;
+        for (; i < allRunningQueries.size(); i++) {
+            if (allRunningQueries.get(i).getAccumulatedMillis() < runningTime) {
+                break;
+            }
+        }
+        return allRunningQueries.subList(0, i);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 11ad8bb..483facd 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -161,8 +161,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         // set whether to aggregate results from multiple partitions
         enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
         // set and check query deadline
-        QueryContext.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000);
-        QueryContext.current().checkMillisBeforeDeadline();
+        QueryContextManager.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000);
+        QueryContextManager.current().checkMillisBeforeDeadline();
 
         // push down having clause filter if possible
         TupleFilter havingFilter = checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations,

http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index ede5ff9..f45f02b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -142,7 +142,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
     @Override
     public ITuple next() {
         if (scanCount++ % 100 == 1) {
-            QueryContext.current().checkMillisBeforeDeadline();
+            QueryContextManager.current().checkMillisBeforeDeadline();
         }
 
         if (++scanCountDelta >= 1000)

http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
index 17b804a..e4b8b43 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
@@ -21,7 +21,7 @@ import java.io.File;
 import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.routing.Candidate;
@@ -57,7 +57,7 @@ public class ITFailfastQueryTest extends KylinTestBase {
 
     @After
     public void cleanUp() {
-        QueryContext.reset();
+        QueryContextManager.resetCurrent();
     }
 
     @AfterClass

http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
index f6bd3f8..f0759ab 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
@@ -22,7 +22,7 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.AbstractEnumerable;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
-import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.slf4j.Logger;
@@ -49,7 +49,8 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl
         this.type = type;
         this.contextId = ctxId;
 
-        QueryContext.current().addContext(ctxId, type.toString(), type == EnumeratorTypeEnum.OLAP);
+        QueryContextManager.current().addContext(ctxId, type.toString(),
+                type == EnumeratorTypeEnum.OLAP);
     }
 
     public OLAPQuery(EnumeratorTypeEnum type, int ctxSeq) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index 4f83780..062fd62 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
@@ -54,6 +56,7 @@ import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.supercsv.io.CsvListWriter;
 import org.supercsv.io.ICsvListWriter;
@@ -171,6 +174,29 @@ public class QueryController extends BasicController {
         }
     }
 
+    /**
+     *
+     * @param runTimeMoreThan in seconds
+     * @return
+     */
+    @RequestMapping(value = "/query/runningQueries", method = RequestMethod.GET)
+    @ResponseBody
+    public List<QueryContext> getRunningQueries(@RequestParam(value = "runTimeMoreThan", required = false, defaultValue = "-1") int runTimeMoreThan) {
+        if (runTimeMoreThan == -1) {
+            return QueryContextManager.getAllRunningQueries();
+        }else {
+            return QueryContextManager.getLongRunningQueries(runTimeMoreThan * 1000);
+        }
+    }
+
+    @RequestMapping(value = "/query/{queryId}/stop", method = RequestMethod.PUT)
+    @ResponseBody
+    public void stopQuery(@PathVariable String queryId) {
+        final String user = SecurityContextHolder.getContext().getAuthentication().getName();
+        logger.info("{} stop the query: {}", new Object[] { user, queryId });
+        QueryContextManager.stopQuery(queryId, "stopped by " + user);
+    }
+
     public void setQueryService(QueryService queryService) {
         this.queryService = queryService;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index e595804..09ccc07 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.metrics2.MetricsException;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metrics.MetricsManager;
 import org.apache.kylin.metrics.lib.impl.RecordEvent;
 import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
@@ -98,7 +100,7 @@ public class QueryMetricsFacade {
         if (user == null) {
             user = "unknown";
         }
-        for (QueryContext.RPCStatistics entry : QueryContext.current().getRpcStatisticsList()) {
+        for (QueryContext.RPCStatistics entry : QueryContextManager.current().getRpcStatisticsList()) {
             RecordEvent rpcMetricsEvent = new TimedRecordEvent(
                     KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
             setRPCWrapper(rpcMetricsEvent, //

http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 9789b70..31e7336 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -58,6 +58,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
 import org.apache.kylin.common.htrace.HtraceInit;
@@ -268,7 +269,7 @@ public class QueryService extends BasicService {
         return queries;
     }
 
-    public void logQuery(final SQLRequest request, final SQLResponse response) {
+    public void logQuery(final String queryId, final SQLRequest request, final SQLResponse response) {
         final String user = aclEvaluate.getCurrentUserName();
         final List<String> realizationNames = new LinkedList<>();
         final Set<Long> cuboidIds = new HashSet<Long>();
@@ -300,7 +301,7 @@ public class QueryService extends BasicService {
         StringBuilder stringBuilder = new StringBuilder();
         stringBuilder.append(newLine);
         stringBuilder.append("==========================[QUERY]===============================").append(newLine);
-        stringBuilder.append("Query Id: ").append(QueryContext.current().getQueryId()).append(newLine);
+        stringBuilder.append("Query Id: ").append(queryId).append(newLine);
         stringBuilder.append("SQL: ").append(request.getSql()).append(newLine);
         stringBuilder.append("User: ").append(user).append(newLine);
         stringBuilder.append("Success: ").append((null == response.getExceptionMessage())).append(newLine);
@@ -402,7 +403,7 @@ public class QueryService extends BasicService {
         if (sqlRequest.getBackdoorToggles() != null)
             BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles());
 
-        final QueryContext queryContext = QueryContext.current();
+        final QueryContext queryContext = QueryContextManager.current();
 
         TraceScope scope = null;
         if (kylinConfig.isHtraceTracingEveryQuery() || BackdoorToggles.getHtraceEnabled()) {
@@ -499,7 +500,7 @@ public class QueryService extends BasicService {
             long durationThreshold = kylinConfig.getQueryDurationCacheThreshold();
             long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold();
             long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold();
-            sqlResponse.setDuration(System.currentTimeMillis() - startTime);
+            sqlResponse.setDuration(queryContext.getAccumulatedMillis());
             logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", //
                     String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()),
                     String.valueOf(sqlResponse.getTotalScanCount()));
@@ -616,7 +617,7 @@ public class QueryService extends BasicService {
             conn = QueryConnection.getConnection(sqlRequest.getProject());
 
             String userInfo = SecurityContextHolder.getContext().getAuthentication().getName();
-            QueryContext context = QueryContext.current();
+            QueryContext context = QueryContextManager.current();
             context.setUsername(userInfo);
             context.setGroups(AclPermissionUtil.getCurrentUserGroups());
             final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext()
@@ -1038,6 +1039,7 @@ public class QueryService extends BasicService {
         boolean isPartialResult = false;
         StringBuilder cubeSb = new StringBuilder();
         StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: ");
+        QueryContext queryContext = QueryContextManager.current();
         if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for'
             for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
                 if (ctx.realization != null) {
@@ -1048,14 +1050,16 @@ public class QueryService extends BasicService {
                     cubeSb.append(ctx.realization.getCanonicalName());
                     logSb.append(ctx.storageContext.getProcessedRowCount()).append(" ");
                 }
+                queryContext.setContextRealization(ctx.id, realizationName, realizationType);
             }
         }
         logger.info(logSb.toString());
 
         SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, false, null, isPartialResult,
                 isPushDown);
-        response.setTotalScanCount(QueryContext.current().getScannedRows());
-        response.setTotalScanBytes(QueryContext.current().getScannedBytes());
+        response.setTotalScanCount(queryContext.getScannedRows());
+        response.setTotalScanBytes(queryContext.getScannedBytes());
+        response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
         return response;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
index e23fc20..d4a16f8 100644
--- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
@@ -26,6 +26,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.service.ServiceTestBase;
@@ -121,6 +122,8 @@ public class QueryMetricsTest extends ServiceTestBase {
         sqlRequest.setSql("select * from TEST_KYLIN_FACT");
         sqlRequest.setProject("default");
 
+        QueryContext context = QueryContextManager.current();
+        
         SQLResponse sqlResponse = new SQLResponse();
         sqlResponse.setDuration(10);
         sqlResponse.setCube("test_cube");
@@ -138,7 +141,6 @@ public class QueryMetricsTest extends ServiceTestBase {
         sqlResponse.setResults(results);
         sqlResponse.setStorageCacheUsed(true);
 
-        QueryContext context = QueryContext.current();
         int ctxId = 0;
         context.addContext(ctxId, "OLAP", true);
         context.addRPCStatistics(ctxId, "sandbox", "test_cube", "20100101000000_20150101000000", 3L, 3L, 3L, null, 80L,

http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 7dc9994..061e622 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -21,6 +21,8 @@ package org.apache.kylin.rest.service;
 import java.io.IOException;
 import java.sql.SQLException;
 
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.project.ProjectInstance;
@@ -63,9 +65,10 @@ public class QueryServiceTest extends ServiceTestBase {
         SQLRequest request = new SQLRequest();
         request.setSql("select * from test_table");
         request.setAcceptPartial(true);
+        QueryContext queryContext = QueryContextManager.current();
         SQLResponse response = new SQLResponse();
         response.setHitExceptionCache(true);
-        queryService.logQuery(request, response);
+        queryService.logQuery(queryContext.getQueryId(), request, response);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kylin/blob/8d35a2be/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 6b4ac32..c660cad 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -76,7 +77,7 @@ public abstract class CubeHBaseRPC implements IGTStorage {
         this.cubeSeg = (CubeSegment) segment;
         this.cuboid = cuboid;
         this.fullGTInfo = fullGTInfo;
-        this.queryContext = QueryContext.current();
+        this.queryContext = QueryContextManager.current();
         this.storageContext = context;
 
         this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);


[2/3] kylin git commit: KYLIN-2881 Improve hbase coprocessor exception handling at kylin server side

Posted by li...@apache.org.
KYLIN-2881 Improve hbase coprocessor exception handling at kylin server side

Signed-off-by: lidongsjtu <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/006485d1
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/006485d1
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/006485d1

Branch: refs/heads/KYLIN-2881-review
Commit: 006485d1abcdc5392055abc726c1ab8c3eca13ca
Parents: 8d35a2b
Author: Zhong <nj...@apache.org>
Authored: Wed Sep 20 09:46:44 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Sat Jan 6 15:15:30 2018 +0800

----------------------------------------------------------------------
 .../apache/kylin/query/ITKylinQueryTest.java    |   4 +-
 .../apache/kylin/query/ITMassInQueryTest.java   |   4 +-
 .../org/apache/kylin/query/KylinTestBase.java   |  34 ++-
 .../apache/kylin/rest/service/QueryService.java |  16 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 252 ++++++++++++-------
 .../hbase/cube/v2/ExpectedSizeIterator.java     |  34 ++-
 6 files changed, 225 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/006485d1/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 4edfb3d..02a50ce 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -156,7 +156,7 @@ public class ITKylinQueryTest extends KylinTestBase {
         String sql = getTextFromFile(sqlFile);
         IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
 
-        executeQuery(kylinConn, queryFileName, sql, true);
+        execQueryUsingKylin(kylinConn, queryFileName, sql, true);
     }
 
     @Ignore
@@ -403,7 +403,7 @@ public class ITKylinQueryTest extends KylinTestBase {
         // execute Kylin
         logger.info("Query Result from Kylin - " + queryName);
         IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-        ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
+        ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
         String queriedVersion = String.valueOf(kylinTable.getValue(0, "version"));
 
         // compare the result

http://git-wip-us.apache.org/repos/asf/kylin/blob/006485d1/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
index cca0be6..16395fc 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
@@ -118,7 +118,7 @@ public class ITMassInQueryTest extends KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
             printResult(kylinTable);
 
         }
@@ -139,7 +139,7 @@ public class ITMassInQueryTest extends KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
 
             // execute H2
             sql = sql.replace("massin(test_kylin_fact.SELLER_ID,'vip_customers')", "test_kylin_fact.SELLER_ID in ( " + org.apache.commons.lang.StringUtils.join(vipSellers, ",") + ")");

http://git-wip-us.apache.org/repos/asf/kylin/blob/006485d1/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index 2c5b556..e38bb1a 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -44,6 +44,7 @@ import java.util.logging.LogManager;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
@@ -228,6 +229,16 @@ public class KylinTestBase {
 
     // ////////////////////////////////////////////////////////////////////////////////////////
     // execute
+    private void initExecQueryUsingKylin(String sql) {
+        QueryContextManager.resetCurrent();
+        QueryContextManager.current();
+    }
+
+    protected ITable execQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort)
+            throws Exception {
+        initExecQueryUsingKylin(sql);
+        return executeQuery(dbConn, queryName, sql, needSort);
+    }
 
     protected ITable executeQuery(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort)
             throws Exception {
@@ -251,6 +262,7 @@ public class KylinTestBase {
     }
 
     protected int executeQuery(String sql, boolean needDisplay) throws Exception {
+        initExecQueryUsingKylin(sql);
 
         // change join type to match current setting
         sql = changeJoinType(sql, joinType);
@@ -302,6 +314,12 @@ public class KylinTestBase {
         return PushDownUtil.tryPushDownNonSelectQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", isPrepare);
     }
 
+    protected ITable execDynamicQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql,
+            List<String> parameters, boolean needSort) throws Exception {
+        initExecQueryUsingKylin(sql);
+        return executeDynamicQuery(dbConn, queryName, sql, parameters, needSort);
+    }
+
     protected ITable executeDynamicQuery(IDatabaseConnection dbConn, String queryName, String sql,
             List<String> parameters, boolean needSort) throws Exception {
 
@@ -382,7 +400,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
 
             // compare the result
             if (BackdoorToggles.getPrepareOnly())
@@ -426,7 +444,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
 
             // compare the result
             assertTableEquals(expectTable, kylinTable);
@@ -449,7 +467,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
 
             // execute H2
             logger.info("Query Result from H2 - " + queryName);
@@ -478,7 +496,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + sql);
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, sql, sql, false);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, sql, sql, false);
 
             try {
                 // compare the result
@@ -510,7 +528,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sqlWithLimit, false);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sqlWithLimit, false);
 
             // execute H2
             logger.info("Query Result from H2 - " + queryName);
@@ -561,7 +579,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeQuery(kylinConn, queryName, sql1, needSort);
+            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql1, needSort);
 
             // execute H2
             logger.info("Query Result from H2 - " + queryName);
@@ -601,7 +619,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = executeDynamicQuery(kylinConn, queryName, sql, parameters, needSort);
+            ITable kylinTable = execDynamicQueryUsingKylin(kylinConn, queryName, sql, parameters, needSort);
 
             // execute H2
             logger.info("Query Result from H2 - " + queryName);
@@ -709,7 +727,7 @@ public class KylinTestBase {
 
         //setup cube conn
         String project = ProjectInstance.DEFAULT_PROJECT_NAME;
-        cubeConnection = QueryConnection.getConnection(project);
+        cubeConnection = QueryDataSource.create(project, config).getConnection();
 
         //setup h2
         h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++) + ";CACHE_SIZE=32072", "sa",

http://git-wip-us.apache.org/repos/asf/kylin/blob/006485d1/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 31e7336..71926be 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -459,7 +459,7 @@ public class QueryService extends BasicService {
 
             sqlResponse.setDuration(System.currentTimeMillis() - startTime);
             sqlResponse.setTraceUrl(traceUrl);
-            logQuery(sqlRequest, sqlResponse);
+            logQuery(queryContext.getQueryId(), sqlRequest, sqlResponse);
             try {
                 recordMetric(sqlRequest, sqlResponse);
             } catch (Throwable th) {
@@ -472,7 +472,7 @@ public class QueryService extends BasicService {
 
         } finally {
             BackdoorToggles.cleanToggles();
-            QueryContext.reset();
+            QueryContextManager.resetCurrent();
             if (scope != null) {
                 scope.close();
             }
@@ -482,7 +482,8 @@ public class QueryService extends BasicService {
     private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, long startTime, boolean queryCacheEnabled) {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         Message msg = MsgPicker.getMsg();
-        
+        final QueryContext queryContext = QueryContextManager.current();
+
         SQLResponse sqlResponse = null;
         try {
             final boolean isSelect = QueryUtil.isSelectStatement(sqlRequest.getSql());
@@ -525,13 +526,15 @@ public class QueryService extends BasicService {
             Trace.addTimelineAnnotation("response from execution");
 
         } catch (Throwable e) { // calcite may throw AssertError
+            queryContext.stop(e);
+
             logger.error("Exception while executing query", e);
             String errMsg = makeErrorMsgUserFriendly(e);
 
             sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false);
-            QueryContext queryContext = QueryContext.current();
             sqlResponse.setTotalScanCount(queryContext.getScannedRows());
             sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
+            sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
 
             if (queryCacheEnabled && e.getCause() != null
                     && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
@@ -1042,6 +1045,8 @@ public class QueryService extends BasicService {
         QueryContext queryContext = QueryContextManager.current();
         if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for'
             for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
+                String realizationName = "NULL";
+                int realizationType = -1;
                 if (ctx.realization != null) {
                     isPartialResult |= ctx.storageContext.isPartialResultReturned();
                     if (cubeSb.length() > 0) {
@@ -1049,6 +1054,9 @@ public class QueryService extends BasicService {
                     }
                     cubeSb.append(ctx.realization.getCanonicalName());
                     logSb.append(ctx.storageContext.getProcessedRowCount()).append(" ");
+
+                    realizationName = ctx.realization.getName();
+                    realizationType = ctx.realization.getStorageType();
                 }
                 queryContext.setContextRealization(ctx.id, realizationName, realizationType);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/006485d1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 26ab039..ddf62b7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -19,20 +19,23 @@
 package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.DataFormatException;
 
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
 import org.apache.kylin.common.util.Bytes;
@@ -52,7 +55,6 @@ import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
 import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
-import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats;
@@ -103,6 +105,16 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         return Pair.newPair(cubeSeg.getCuboidShardNum(cuboid.getId()), cubeSeg.getCuboidBaseShard(cuboid.getId()));
     }
 
+    static Field channelRowField = null;
+    static {
+        try {
+            channelRowField = RegionCoprocessorRpcChannel.class.getDeclaredField("row");
+            channelRowField.setAccessible(true);
+        } catch (Throwable t) {
+            logger.warn("error when get row field from RegionCoprocessorRpcChannel class", t);
+        }
+    }
+
     @SuppressWarnings("checkstyle:methodlength")
     @Override
     public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
@@ -135,7 +147,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         scanRequest.clearScanRanges();//since raw scans are sent to coprocessor, we don't need to duplicate sending it
         scanRequestByteString = serializeGTScanReq(scanRequest);
 
-        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum, coprocessorTimeout);
+        final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(queryContext, shardNum, coprocessorTimeout);
 
         logger.info("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
 
@@ -165,97 +177,14 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         builder.setMaxScanBytes(cubeSeg.getConfig().getPartitionMaxScanBytes());
         builder.setIsExactAggregate(storageContext.isExactAggregation());
 
+        final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryContext.getQueryId(),
+                Integer.toHexString(System.identityHashCode(scanRequest)));
         for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
             executorService.submit(new Runnable() {
                 @Override
                 public void run() {
-
-                    final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryId, Integer.toHexString(System.identityHashCode(scanRequest)));
-                    final AtomicReference<RuntimeException> regionErrorHolder = new AtomicReference<>();
-
-                    try {
-                        Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()), HBaseConnection.getCoprocessorPool());
-
-                        final CubeVisitRequest request = builder.build();
-                        final byte[] startKey = epRange.getFirst();
-                        final byte[] endKey = epRange.getSecond();
-
-                        table.coprocessorService(CubeVisitService.class, startKey, endKey, //
-                                new Batch.Call<CubeVisitService, CubeVisitResponse>() {
-                                    public CubeVisitResponse call(CubeVisitService rowsService) throws IOException {
-                                        ServerRpcController controller = new ServerRpcController();
-                                        BlockingRpcCallback<CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
-                                        rowsService.visitCube(controller, request, rpcCallback);
-                                        CubeVisitResponse response = rpcCallback.get();
-                                        if (controller.failedOnException()) {
-                                            throw controller.getFailedOn();
-                                        }
-                                        return response;
-                                    }
-                                }, new Batch.Callback<CubeVisitResponse>() {
-                                    @Override
-                                    public void update(byte[] region, byte[] row, CubeVisitResponse result) {
-                                        if (region == null) {
-                                            return;
-                                        }
-
-                                        logger.info(logHeader + getStatsString(region, result));
-
-                                        Stats stats = result.getStats();
-                                        queryContext.addAndGetScannedRows(stats.getScannedRowCount());
-                                        queryContext.addAndGetScannedBytes(stats.getScannedBytes());
-
-                                        RuntimeException rpcException = null;
-                                        if (result.getStats().getNormalComplete() != 1) {
-                                            rpcException = getCoprocessorException(result);
-                                        }
-                                        queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(),
-                                                cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(),
-                                                cuboid.getId(), storageContext.getFilterMask(), rpcException,
-                                                stats.getServiceEndTime() - stats.getServiceStartTime(), 0,
-                                                stats.getScannedRowCount(),
-                                                stats.getScannedRowCount() - stats.getAggregatedRowCount()
-                                                        - stats.getFilteredRowCount(),
-                                                stats.getAggregatedRowCount(), stats.getScannedBytes());
-
-                                        // if any other region has responded with error, skip further processing
-                                        if (regionErrorHolder.get() != null) {
-                                            return;
-                                        }
-
-                                        // record coprocessor error if happened
-                                        if (rpcException != null) {
-                                            regionErrorHolder.compareAndSet(null, rpcException);
-                                            return;
-                                        }
-
-                                        if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
-                                            throw new ResourceLimitExceededException("Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold " + cubeSeg.getConfig().getQueryMaxScanBytes());
-                                        }
-
-                                        try {
-                                            if (compressionResult) {
-                                                epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
-                                            } else {
-                                                epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
-                                            }
-                                        } catch (IOException | DataFormatException e) {
-                                            throw new RuntimeException(logHeader + "Error when decompressing", e);
-                                        }
-                                    }
-                                });
-
-                    } catch (Throwable ex) {
-                        logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout
-                        epResultItr.notifyCoprocException(ex);
-                        return;
-                    }
-
-                    if (regionErrorHolder.get() != null) {
-                        RuntimeException exception = regionErrorHolder.get();
-                        logger.error(logHeader + "Error when visiting cubes by endpoint", exception); // double log coz the query thread may already timeout
-                        epResultItr.notifyCoprocException(exception);
-                    }
+                    runEPRange(queryContext, logHeader, compressionResult, builder.build(), conn, epRange.getFirst(),
+                            epRange.getSecond(), epResultItr);
                 }
             });
         }
@@ -263,6 +192,149 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr), storageContext);
     }
 
+    private void runEPRange(final QueryContext queryContext, final String logHeader, final boolean compressionResult,
+            final CubeVisitProtos.CubeVisitRequest request, final Connection conn, byte[] startKey, byte[] endKey,
+            final ExpectedSizeIterator epResultItr) {
+
+        final String queryId = queryContext.getQueryId();
+
+        try {
+            final Table table = conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()),
+                    HBaseConnection.getCoprocessorPool());
+
+            table.coprocessorService(CubeVisitService.class, startKey, endKey, //
+                    new Batch.Call<CubeVisitService, CubeVisitResponse>() {
+                        public CubeVisitResponse call(CubeVisitService rowsService) throws IOException {
+                            if (queryContext.isStopped()) {
+                                logger.warn(
+                                        "Query-{}: the query has been stopped, not send request to region server any more.",
+                                        queryId);
+                                return null;
+                            }
+
+                            HRegionLocation regionLocation = getStartRegionLocation(rowsService);
+                            String regionServerName = regionLocation == null ? "UNKNOWN" : regionLocation.getHostname();
+                            logger.info("Query-{}: send request to the init region server {} on table {} ", queryId,
+                                    regionServerName, table.getName());
+
+                            queryContext.addQueryStopListener(new QueryContext.QueryStopListener() {
+                                private Thread hConnThread = Thread.currentThread();
+
+                                @Override
+                                public void stop(QueryContext query) {
+                                    try {
+                                        hConnThread.interrupt();
+                                    } catch (Exception e) {
+                                        logger.warn("Exception happens during interrupt thread {} due to {}",
+                                                hConnThread.getName(), e);
+                                    }
+                                }
+                            });
+
+                            ServerRpcController controller = new ServerRpcController();
+                            BlockingRpcCallback<CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
+                            try {
+                                rowsService.visitCube(controller, request, rpcCallback);
+                                CubeVisitResponse response = rpcCallback.get();
+                                if (controller.failedOnException()) {
+                                    throw controller.getFailedOn();
+                                }
+                                return response;
+                            } catch (Exception e) {
+                                throw e;
+                            } finally {
+                                // Reset the interrupted state
+                                Thread.interrupted();
+                            }
+                        }
+
+                        private HRegionLocation getStartRegionLocation(CubeVisitProtos.CubeVisitService rowsService) {
+                            try {
+                                CubeVisitProtos.CubeVisitService.Stub rowsServiceStub = (CubeVisitProtos.CubeVisitService.Stub) rowsService;
+                                RegionCoprocessorRpcChannel channel = (RegionCoprocessorRpcChannel) rowsServiceStub
+                                        .getChannel();
+                                byte[] row = (byte[]) channelRowField.get(channel);
+                                return conn.getRegionLocator(table.getName()).getRegionLocation(row, false);
+                            } catch (Throwable throwable) {
+                                logger.warn("error when get region server name", throwable);
+                            }
+                            return null;
+                        }
+                    }, new Batch.Callback<CubeVisitResponse>() {
+                        @Override
+                        public void update(byte[] region, byte[] row, CubeVisitResponse result) {
+                            if (result == null) {
+                                return;
+                            }
+                            if (region == null) {
+                                return;
+                            }
+
+                            // if the query is stopped, skip further processing
+                            // this may be caused by
+                            //      * Any other region has responded with error
+                            //      * ServerRpcController.failedOnException
+                            //      * ResourceLimitExceededException
+                            //      * Exception happened during CompressionUtils.decompress()
+                            //      * Outside exceptions, like KylinTimeoutException in SequentialCubeTupleIterator
+                            if (queryContext.isStopped()) {
+                                return;
+                            }
+
+                            logger.info(logHeader + getStatsString(region, result));
+
+                            Stats stats = result.getStats();
+                            queryContext.addAndGetScannedRows(stats.getScannedRowCount());
+                            queryContext.addAndGetScannedBytes(stats.getScannedBytes());
+
+                            RuntimeException rpcException = null;
+                            if (result.getStats().getNormalComplete() != 1) {
+                                // record coprocessor error if happened
+                                rpcException = getCoprocessorException(result);
+                            }
+                            queryContext.addRPCStatistics(storageContext.ctxId, stats.getHostname(),
+                                    cubeSeg.getCubeDesc().getName(), cubeSeg.getName(), cuboid.getInputID(),
+                                    cuboid.getId(), storageContext.getFilterMask(), rpcException,
+                                    stats.getServiceEndTime() - stats.getServiceStartTime(), 0,
+                                    stats.getScannedRowCount(),
+                                    stats.getScannedRowCount() - stats.getAggregatedRowCount()
+                                            - stats.getFilteredRowCount(),
+                                    stats.getAggregatedRowCount(), stats.getScannedBytes());
+
+                            if (queryContext.getScannedBytes() > cubeSeg.getConfig().getQueryMaxScanBytes()) {
+                                rpcException = new ResourceLimitExceededException(
+                                        "Query scanned " + queryContext.getScannedBytes() + " bytes exceeds threshold "
+                                                + cubeSeg.getConfig().getQueryMaxScanBytes());
+                            }
+
+                            if (rpcException != null) {
+                                queryContext.stop(rpcException);
+                                return;
+                            }
+
+                            try {
+                                if (compressionResult) {
+                                    epResultItr.append(CompressionUtils.decompress(
+                                            HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
+                                } else {
+                                    epResultItr.append(
+                                            HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
+                                }
+                            } catch (IOException | DataFormatException e) {
+                                throw new RuntimeException(logHeader + "Error when decompressing", e);
+                            }
+                        }
+                    });
+
+        } catch (Throwable ex) {
+            queryContext.stop(ex);
+        }
+
+        if (queryContext.isStopped()) {
+            logger.error(logHeader + "Error when visiting cubes by endpoint", queryContext.getThrowable()); // double log coz the query thread may already timeout
+        }
+    }
+
     private ByteString serializeGTScanReq(GTScanRequest scanRequest) {
         ByteString scanRequestByteString;
         int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;

http://git-wip-us.apache.org/repos/asf/kylin/blob/006485d1/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
index 60d85b4..2cb0c7f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -24,19 +24,21 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.gridtable.GTScanRequest;
 
 import com.google.common.base.Throwables;
 
 class ExpectedSizeIterator implements Iterator<byte[]> {
-    private BlockingQueue<byte[]> queue;
-    private int expectedSize;
+    private final QueryContext queryContext;
+    private final int expectedSize;
+    private final BlockingQueue<byte[]> queue;
+    private final long coprocessorTimeout;
+    private final long deadline;
     private int current = 0;
-    private long coprocessorTimeout;
-    private long deadline;
-    private volatile Throwable coprocException;
 
-    public ExpectedSizeIterator(int expectedSize, long coprocessorTimeout) {
+    public ExpectedSizeIterator(QueryContext queryContext, int expectedSize, long coprocessorTimeout) {
+        this.queryContext = queryContext;
         this.expectedSize = expectedSize;
         this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
 
@@ -59,14 +61,11 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
             current++;
             byte[] ret = null;
 
-            while (ret == null && coprocException == null && deadline > System.currentTimeMillis()) {
+            while (ret == null && deadline > System.currentTimeMillis()) {
+                checkState();
                 ret = queue.poll(1000, TimeUnit.MILLISECONDS);
             }
 
-            if (coprocException != null) {
-                throw Throwables.propagate(coprocException);
-            }
-
             if (ret == null) {
                 throw new RuntimeException("Timeout visiting cube! Check why coprocessor exception is not sent back? In coprocessor Self-termination is checked every " + //
                         GTScanRequest.terminateCheckInterval + " scanned rows, the configured timeout(" + coprocessorTimeout + ") cannot support this many scans?");
@@ -85,6 +84,8 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
     }
 
     public void append(byte[] data) {
+        checkState();
+
         try {
             queue.put(data);
         } catch (InterruptedException e) {
@@ -93,7 +94,14 @@ class ExpectedSizeIterator implements Iterator<byte[]> {
         }
     }
 
-    public void notifyCoprocException(Throwable ex) {
-        coprocException = ex;
+    private void checkState() {
+        if (queryContext.isStopped()) {
+            Throwable throwable = queryContext.getThrowable();
+            if (throwable != null) {
+                throw Throwables.propagate(throwable);
+            } else {
+                throw new IllegalStateException("the query is stopped: " + queryContext.getStopReason());
+            }
+        }
     }
 }


[3/3] kylin git commit: KYLIN-2881 code review

Posted by li...@apache.org.
KYLIN-2881 code review


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d969767c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d969767c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d969767c

Branch: refs/heads/KYLIN-2881-review
Commit: d969767c318f023cfdc450de50d853fa35e441b4
Parents: 006485d
Author: lidongsjtu <li...@apache.org>
Authored: Fri Dec 29 14:13:11 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Sat Jan 6 16:10:21 2018 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/QueryContext.java   |   8 +-
 .../apache/kylin/common/QueryContextFacade.java | 101 +++++++++++++++++
 .../kylin/common/QueryContextManager.java       | 109 -------------------
 .../gtrecord/GTCubeStorageQueryBase.java        |   6 +-
 .../gtrecord/SequentialCubeTupleIterator.java   |   4 +-
 .../apache/kylin/query/ITFailfastQueryTest.java |   4 +-
 .../apache/kylin/query/ITKylinQueryTest.java    |   4 +-
 .../apache/kylin/query/ITMassInQueryTest.java   |   4 +-
 .../org/apache/kylin/query/KylinTestBase.java   |  50 ++-------
 .../kylin/query/enumerator/OLAPQuery.java       |   4 +-
 .../kylin/rest/controller/QueryController.java  |  18 +--
 .../kylin/rest/metrics/QueryMetricsFacade.java  |   5 +-
 .../apache/kylin/rest/service/QueryService.java |  20 ++--
 server/src/main/resources/kylinSecurity.xml     |   4 +
 .../kylin/rest/metrics/QueryMetricsTest.java    |   4 +-
 .../kylin/rest/service/QueryServiceTest.java    |   4 +-
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     |   4 +-
 17 files changed, 161 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index d36b332..718f590 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -62,8 +62,12 @@ public class QueryContext {
     private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap();
 
     QueryContext() {
+        this(System.currentTimeMillis());
+    }
+
+    QueryContext(long startMills) {
         queryId = UUID.randomUUID().toString();
-        queryStartMillis = System.currentTimeMillis();
+        queryStartMillis = startMills;
     }
 
     public long getQueryStartMillis() {
@@ -72,7 +76,7 @@ public class QueryContext {
 
     public void setDeadline(long timeoutMillis) {
         if (timeoutMillis > 0) {
-            deadline  = queryStartMillis + timeoutMillis;
+            deadline = queryStartMillis + timeoutMillis;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
new file mode 100644
index 0000000..e1cf54b
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common;
+
+import java.util.Comparator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class QueryContextFacade {
+
+    private static final Logger logger = LoggerFactory.getLogger(QueryContextFacade.class);
+
+    private static final ConcurrentMap<String, QueryContext> RUNNING_CTX_MAP = Maps.newConcurrentMap();
+    private static final ThreadLocal<QueryContext> CURRENT_CTX = new ThreadLocal<QueryContext>() {
+        @Override
+        protected QueryContext initialValue() {
+            QueryContext queryContext = new QueryContext();
+            RUNNING_CTX_MAP.put(queryContext.getQueryId(), queryContext);
+            return queryContext;
+        }
+    };
+
+    public static QueryContext current() {
+        return CURRENT_CTX.get();
+    }
+
+    /**
+     * invoked by program
+     */
+    public static void resetCurrent() {
+        QueryContext queryContext = CURRENT_CTX.get();
+        if (queryContext != null) {
+            RUNNING_CTX_MAP.remove(queryContext.getQueryId());
+            CURRENT_CTX.remove();
+        }
+    }
+
+    /**
+     * invoked by user to let query stop early
+     * @link resetCurrent() should be finally invoked
+     */
+    public static void stopQuery(String queryId, String info) {
+        QueryContext queryContext = RUNNING_CTX_MAP.get(queryId);
+        if (queryContext != null) {
+            queryContext.stopEarly(info);
+        } else {
+            logger.info("the query:{} is not existed", queryId);
+        }
+    }
+
+    public static TreeSet<QueryContext> getAllRunningQueries() {
+        TreeSet<QueryContext> runningQueries = Sets.newTreeSet(new Comparator<QueryContext>() {
+            @Override
+            public int compare(QueryContext o1, QueryContext o2) {
+                if (o2.getAccumulatedMillis() > o1.getAccumulatedMillis()) {
+                    return 1;
+                } else if (o2.getAccumulatedMillis() < o1.getAccumulatedMillis()) {
+                    return -1;
+                } else {
+                    return o1.getQueryId().compareTo(o2.getQueryId());
+                }
+            }
+        });
+
+        runningQueries.addAll(RUNNING_CTX_MAP.values());
+        return runningQueries;
+    }
+
+    /**
+     * @param runningTime in milliseconds
+     * @return running queries that have run more than specified time
+     */
+    public static TreeSet<QueryContext> getLongRunningQueries(long runningTime) {
+        SortedSet<QueryContext> allRunningQueries = getAllRunningQueries();
+        QueryContext tmpCtx = new QueryContext(runningTime + 1L); // plus 1 to include those contexts in same accumulatedMills but different uuid
+        return (TreeSet<QueryContext>) allRunningQueries.headSet(tmpCtx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java
deleted file mode 100644
index d08557e..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class QueryContextManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(QueryContextManager.class);
-
-    private static final ConcurrentMap<String, QueryContext> idContextMap = Maps.newConcurrentMap();
-    private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() {
-        @Override
-        protected QueryContext initialValue() {
-            QueryContext queryContext = new QueryContext();
-            idContextMap.put(queryContext.getQueryId(), queryContext);
-            return queryContext;
-        }
-    };
-
-    public static QueryContext current() {
-        return contexts.get();
-    }
-
-    /**
-     * invoked by program
-     */
-    public static void resetCurrent() {
-        QueryContext queryContext = contexts.get();
-        if (queryContext != null) {
-            idContextMap.remove(queryContext.getQueryId());
-            contexts.remove();
-        }
-    }
-
-    /**
-     * invoked by user to let query stop early
-     * @link resetCurrent() should be finally invoked
-     */
-    public static void stopQuery(String queryId, String info) {
-        QueryContext queryContext = idContextMap.get(queryId);
-        if (queryContext != null) {
-            queryContext.stopEarly(info);
-        } else {
-            logger.info("the query:{} is not existed", queryId);
-        }
-    }
-
-    public static List<QueryContext> getAllRunningQueries() {
-        // Sort by descending order
-        TreeSet<QueryContext> queriesSet = new TreeSet<>(new Comparator<QueryContext>() {
-            @Override
-            public int compare(QueryContext o1, QueryContext o2) {
-                if (o2.getAccumulatedMillis() > o1.getAccumulatedMillis()) {
-                    return 1;
-                } else if (o2.getAccumulatedMillis() < o1.getAccumulatedMillis()) {
-                    return -1;
-                } else {
-                    return 0;
-                }
-            }
-        });
-
-        for (QueryContext runningQuery : idContextMap.values()) {
-            queriesSet.add(runningQuery);
-        }
-        return Lists.newArrayList(queriesSet);
-    }
-
-    /**
-     * @param runningTime in milliseconds
-     * @return running queries that have run more than specified time
-     */
-    public static List<QueryContext> getLongRunningQueries(int runningTime) {
-        List<QueryContext> allRunningQueries = getAllRunningQueries();
-        int i = 0;
-        for (; i < allRunningQueries.size(); i++) {
-            if (allRunningQueries.get(i).getAccumulatedMillis() < runningTime) {
-                break;
-            }
-        }
-        return allRunningQueries.subList(0, i);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 483facd..ae1f64f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -161,8 +161,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         // set whether to aggregate results from multiple partitions
         enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
         // set and check query deadline
-        QueryContextManager.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000);
-        QueryContextManager.current().checkMillisBeforeDeadline();
+        QueryContextFacade.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000);
+        QueryContextFacade.current().checkMillisBeforeDeadline();
 
         // push down having clause filter if possible
         TupleFilter havingFilter = checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations,

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index f45f02b..72417bf 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -142,7 +142,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
     @Override
     public ITuple next() {
         if (scanCount++ % 100 == 1) {
-            QueryContextManager.current().checkMillisBeforeDeadline();
+            QueryContextFacade.current().checkMillisBeforeDeadline();
         }
 
         if (++scanCountDelta >= 1000)

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
index e4b8b43..f7f0752 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java
@@ -21,7 +21,7 @@ import java.io.File;
 import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.routing.Candidate;
@@ -57,7 +57,7 @@ public class ITFailfastQueryTest extends KylinTestBase {
 
     @After
     public void cleanUp() {
-        QueryContextManager.resetCurrent();
+        QueryContextFacade.resetCurrent();
     }
 
     @AfterClass

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 02a50ce..4edfb3d 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -156,7 +156,7 @@ public class ITKylinQueryTest extends KylinTestBase {
         String sql = getTextFromFile(sqlFile);
         IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
 
-        execQueryUsingKylin(kylinConn, queryFileName, sql, true);
+        executeQuery(kylinConn, queryFileName, sql, true);
     }
 
     @Ignore
@@ -403,7 +403,7 @@ public class ITKylinQueryTest extends KylinTestBase {
         // execute Kylin
         logger.info("Query Result from Kylin - " + queryName);
         IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-        ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
+        ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
         String queriedVersion = String.valueOf(kylinTable.getValue(0, "version"));
 
         // compare the result

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
index 16395fc..cca0be6 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java
@@ -118,7 +118,7 @@ public class ITMassInQueryTest extends KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
             printResult(kylinTable);
 
         }
@@ -139,7 +139,7 @@ public class ITMassInQueryTest extends KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
 
             // execute H2
             sql = sql.replace("massin(test_kylin_fact.SELLER_ID,'vip_customers')", "test_kylin_fact.SELLER_ID in ( " + org.apache.commons.lang.StringUtils.join(vipSellers, ",") + ")");

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index e38bb1a..52acadc 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -44,13 +44,12 @@ import java.util.logging.LogManager;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
-import org.apache.kylin.metadata.realization.NoRealizationFoundException;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule;
 import org.apache.kylin.query.util.PushDownUtil;
@@ -229,19 +228,10 @@ public class KylinTestBase {
 
     // ////////////////////////////////////////////////////////////////////////////////////////
     // execute
-    private void initExecQueryUsingKylin(String sql) {
-        QueryContextManager.resetCurrent();
-        QueryContextManager.current();
-    }
-
-    protected ITable execQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort)
-            throws Exception {
-        initExecQueryUsingKylin(sql);
-        return executeQuery(dbConn, queryName, sql, needSort);
-    }
 
     protected ITable executeQuery(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort)
             throws Exception {
+        QueryContextFacade.resetCurrent();
 
         // change join type to match current setting
         sql = changeJoinType(sql, joinType);
@@ -262,7 +252,6 @@ public class KylinTestBase {
     }
 
     protected int executeQuery(String sql, boolean needDisplay) throws Exception {
-        initExecQueryUsingKylin(sql);
 
         // change join type to match current setting
         sql = changeJoinType(sql, joinType);
@@ -302,27 +291,8 @@ public class KylinTestBase {
         }
     }
 
-    protected Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownSelectQuery(String sql) throws Exception {
-        SQLException mockException = new SQLException("", new NoRealizationFoundException(""));
-
-        return PushDownUtil.tryPushDownSelectQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", mockException,
-                BackdoorToggles.getPrepareOnly());
-    }
-
-    protected Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownNonSelectQuery(String sql,
-            boolean isPrepare) throws Exception {
-        return PushDownUtil.tryPushDownNonSelectQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", isPrepare);
-    }
-
-    protected ITable execDynamicQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql,
-            List<String> parameters, boolean needSort) throws Exception {
-        initExecQueryUsingKylin(sql);
-        return executeDynamicQuery(dbConn, queryName, sql, parameters, needSort);
-    }
-
     protected ITable executeDynamicQuery(IDatabaseConnection dbConn, String queryName, String sql,
             List<String> parameters, boolean needSort) throws Exception {
-
         // change join type to match current setting
         sql = changeJoinType(sql, joinType);
 
@@ -400,7 +370,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
 
             // compare the result
             if (BackdoorToggles.getPrepareOnly())
@@ -444,7 +414,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sql, false);
 
             // compare the result
             assertTableEquals(expectTable, kylinTable);
@@ -467,7 +437,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort);
 
             // execute H2
             logger.info("Query Result from H2 - " + queryName);
@@ -496,7 +466,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + sql);
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = execQueryUsingKylin(kylinConn, sql, sql, false);
+            ITable kylinTable = executeQuery(kylinConn, sql, sql, false);
 
             try {
                 // compare the result
@@ -528,7 +498,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sqlWithLimit, false);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sqlWithLimit, false);
 
             // execute H2
             logger.info("Query Result from H2 - " + queryName);
@@ -579,7 +549,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql1, needSort);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sql1, needSort);
 
             // execute H2
             logger.info("Query Result from H2 - " + queryName);
@@ -619,7 +589,7 @@ public class KylinTestBase {
             // execute Kylin
             logger.info("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
             IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
-            ITable kylinTable = execDynamicQueryUsingKylin(kylinConn, queryName, sql, parameters, needSort);
+            ITable kylinTable = executeDynamicQuery(kylinConn, queryName, sql, parameters, needSort);
 
             // execute H2
             logger.info("Query Result from H2 - " + queryName);
@@ -727,7 +697,7 @@ public class KylinTestBase {
 
         //setup cube conn
         String project = ProjectInstance.DEFAULT_PROJECT_NAME;
-        cubeConnection = QueryDataSource.create(project, config).getConnection();
+        cubeConnection = QueryConnection.getConnection(project);
 
         //setup h2
         h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++) + ";CACHE_SIZE=32072", "sa",

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
index f0759ab..84ac5cf 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
@@ -22,7 +22,7 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.AbstractEnumerable;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.slf4j.Logger;
@@ -49,7 +49,7 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl
         this.type = type;
         this.contextId = ctxId;
 
-        QueryContextManager.current().addContext(ctxId, type.toString(),
+        QueryContextFacade.current().addContext(ctxId, type.toString(),
                 type == EnumeratorTypeEnum.OLAP);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index 062fd62..d43e7e2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -25,13 +25,14 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeSet;
 
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import org.apache.kylin.metadata.querymeta.TableMeta;
@@ -181,11 +182,12 @@ public class QueryController extends BasicController {
      */
     @RequestMapping(value = "/query/runningQueries", method = RequestMethod.GET)
     @ResponseBody
-    public List<QueryContext> getRunningQueries(@RequestParam(value = "runTimeMoreThan", required = false, defaultValue = "-1") int runTimeMoreThan) {
+    public TreeSet<QueryContext> getRunningQueries(
+            @RequestParam(value = "runTimeMoreThan", required = false, defaultValue = "-1") int runTimeMoreThan) {
         if (runTimeMoreThan == -1) {
-            return QueryContextManager.getAllRunningQueries();
-        }else {
-            return QueryContextManager.getLongRunningQueries(runTimeMoreThan * 1000);
+            return QueryContextFacade.getAllRunningQueries();
+        } else {
+            return QueryContextFacade.getLongRunningQueries(runTimeMoreThan * 1000);
         }
     }
 
@@ -193,8 +195,8 @@ public class QueryController extends BasicController {
     @ResponseBody
     public void stopQuery(@PathVariable String queryId) {
         final String user = SecurityContextHolder.getContext().getAuthentication().getName();
-        logger.info("{} stop the query: {}", new Object[] { user, queryId });
-        QueryContextManager.stopQuery(queryId, "stopped by " + user);
+        logger.info("{} tries to stop the query: {}, but not guaranteed to succeed.", user, queryId);
+        QueryContextFacade.stopQuery(queryId, "stopped by " + user);
     }
 
     public void setQueryService(QueryService queryService) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index 09ccc07..40fc5ef 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -28,8 +28,7 @@ import org.apache.hadoop.metrics2.MetricsException;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextManager;
-import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.metrics.MetricsManager;
 import org.apache.kylin.metrics.lib.impl.RecordEvent;
 import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
@@ -100,7 +99,7 @@ public class QueryMetricsFacade {
         if (user == null) {
             user = "unknown";
         }
-        for (QueryContext.RPCStatistics entry : QueryContextManager.current().getRpcStatisticsList()) {
+        for (QueryContext.RPCStatistics entry : QueryContextFacade.current().getRpcStatisticsList()) {
             RecordEvent rpcMetricsEvent = new TimedRecordEvent(
                     KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
             setRPCWrapper(rpcMetricsEvent, //

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 71926be..01963eb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -58,7 +58,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
 import org.apache.kylin.common.htrace.HtraceInit;
@@ -403,7 +403,7 @@ public class QueryService extends BasicService {
         if (sqlRequest.getBackdoorToggles() != null)
             BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles());
 
-        final QueryContext queryContext = QueryContextManager.current();
+        final QueryContext queryContext = QueryContextFacade.current();
 
         TraceScope scope = null;
         if (kylinConfig.isHtraceTracingEveryQuery() || BackdoorToggles.getHtraceEnabled()) {
@@ -414,8 +414,6 @@ public class QueryService extends BasicService {
         String traceUrl = getTraceUrl(scope);
 
         try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) {
-            long startTime = System.currentTimeMillis();
-            
             SQLResponse sqlResponse = null;
             String sql = sqlRequest.getSql();
             String project = sqlRequest.getProject();
@@ -447,7 +445,7 @@ public class QueryService extends BasicService {
             // real execution if required
             if (sqlResponse == null) {
                 try (QueryRequestLimits limit = new QueryRequestLimits(sqlRequest.getProject())) {
-                    sqlResponse = queryAndUpdateCache(sqlRequest, startTime, isQueryCacheEnabled);
+                    sqlResponse = queryAndUpdateCache(sqlRequest, isQueryCacheEnabled);
                 }
             } else {
                 Trace.addTimelineAnnotation("response without real execution");
@@ -457,7 +455,7 @@ public class QueryService extends BasicService {
             if (!sqlResponse.getIsException())
                 checkQueryAuth(sqlResponse, project);
 
-            sqlResponse.setDuration(System.currentTimeMillis() - startTime);
+            sqlResponse.setDuration(queryContext.getAccumulatedMillis());
             sqlResponse.setTraceUrl(traceUrl);
             logQuery(queryContext.getQueryId(), sqlRequest, sqlResponse);
             try {
@@ -472,17 +470,17 @@ public class QueryService extends BasicService {
 
         } finally {
             BackdoorToggles.cleanToggles();
-            QueryContextManager.resetCurrent();
+            QueryContextFacade.resetCurrent();
             if (scope != null) {
                 scope.close();
             }
         }
     }
 
-    private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, long startTime, boolean queryCacheEnabled) {
+    private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, boolean queryCacheEnabled) {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         Message msg = MsgPicker.getMsg();
-        final QueryContext queryContext = QueryContextManager.current();
+        final QueryContext queryContext = QueryContextFacade.current();
 
         SQLResponse sqlResponse = null;
         try {
@@ -620,7 +618,7 @@ public class QueryService extends BasicService {
             conn = QueryConnection.getConnection(sqlRequest.getProject());
 
             String userInfo = SecurityContextHolder.getContext().getAuthentication().getName();
-            QueryContext context = QueryContextManager.current();
+            QueryContext context = QueryContextFacade.current();
             context.setUsername(userInfo);
             context.setGroups(AclPermissionUtil.getCurrentUserGroups());
             final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext()
@@ -1042,7 +1040,7 @@ public class QueryService extends BasicService {
         boolean isPartialResult = false;
         StringBuilder cubeSb = new StringBuilder();
         StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: ");
-        QueryContext queryContext = QueryContextManager.current();
+        QueryContext queryContext = QueryContextFacade.current();
         if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for'
             for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) {
                 String realizationName = "NULL";

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/server/src/main/resources/kylinSecurity.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index 3d5b686..f9c0d71 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -236,6 +236,8 @@
             <scr:http-basic entry-point-ref="unauthorisedEntryPoint"/>
 
             <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/>
+            <scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/>
+            <scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/>
             <scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/metadata*/**" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/**/metrics" access="permitAll"/>
@@ -279,6 +281,8 @@
             <scr:http-basic entry-point-ref="unauthorisedEntryPoint"/>
 
             <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/>
+            <scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/>
+            <scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/>
             <scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/metadata*/**" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/**/metrics" access="permitAll"/>

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
index d4a16f8..8cd7489 100644
--- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
@@ -26,7 +26,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.service.ServiceTestBase;
@@ -122,7 +122,7 @@ public class QueryMetricsTest extends ServiceTestBase {
         sqlRequest.setSql("select * from TEST_KYLIN_FACT");
         sqlRequest.setProject("default");
 
-        QueryContext context = QueryContextManager.current();
+        QueryContext context = QueryContextFacade.current();
         
         SQLResponse sqlResponse = new SQLResponse();
         sqlResponse.setDuration(10);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 061e622..5c633a3 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -22,8 +22,8 @@ import java.io.IOException;
 import java.sql.SQLException;
 
 import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextManager;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.query.QueryConnection;
@@ -65,7 +65,7 @@ public class QueryServiceTest extends ServiceTestBase {
         SQLRequest request = new SQLRequest();
         request.setSql("select * from test_table");
         request.setAcceptPartial(true);
-        QueryContext queryContext = QueryContextManager.current();
+        QueryContext queryContext = QueryContextFacade.current();
         SQLResponse response = new SQLResponse();
         response.setHitExceptionCache(true);
         queryService.logQuery(queryContext.getQueryId(), request, response);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d969767c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index c660cad..1e2fbd6 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextManager;
+import org.apache.kylin.common.QueryContextFacade;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -77,7 +77,7 @@ public abstract class CubeHBaseRPC implements IGTStorage {
         this.cubeSeg = (CubeSegment) segment;
         this.cuboid = cuboid;
         this.fullGTInfo = fullGTInfo;
-        this.queryContext = QueryContextManager.current();
+        this.queryContext = QueryContextFacade.current();
         this.storageContext = context;
 
         this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);