You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/01/23 13:27:20 UTC

[kylin] 01/02: Revert "KYLIN-4305 Streaming Receiver cannot limit income query request or cancel long-running query"

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit eb65877a0556d0f45bb7876b10a924304053ef97
Author: nichunen <ni...@apache.org>
AuthorDate: Thu Jan 23 11:49:21 2020 +0800

    Revert "KYLIN-4305 Streaming Receiver cannot limit income query request or cancel long-running query"
    
    This reverts commit 61e319b524464ed2cba21fe659d057362a954b6d.
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   7 +-
 .../stream/rpc/HttpStreamDataSearchClient.java     |  16 ++--
 .../kylin/stream/core/model/DataRequest.java       |   9 --
 .../core/query/MultiThreadsResultCollector.java    | 106 +++++++--------------
 .../core/query/StreamingCubeDataSearcher.java      |  17 ++--
 .../stream/core/query/StreamingSearchContext.java  |  11 ---
 .../server/rest/controller/DataController.java     |   4 +-
 7 files changed, 49 insertions(+), 121 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 9de676b..d9561fd 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -55,7 +55,6 @@ import java.util.regex.Pattern;
 public abstract class KylinConfigBase implements Serializable {
     private static final long serialVersionUID = 1L;
     private static final Logger logger = LoggerFactory.getLogger(KylinConfigBase.class);
-    private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
 
     private static final String FALSE = "false";
     private static final String TRUE = "true";
@@ -2347,13 +2346,11 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     public int getStreamingReceiverQueryCoreThreads() {
-        int def = getStreamingReceiverQueryMaxThreads() - 1;
-        return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", def + ""));
+        return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50"));
     }
 
     public int getStreamingReceiverQueryMaxThreads() {
-        int def = Math.max(2, AVAILABLE_PROCESSORS - 1);
-        return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", def + ""));
+        return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200"));
     }
 
     public int getStreamingReceiverUseThreadsPerQuery() {
diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
index e1119a8..bc075ce 100644
--- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
+++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
@@ -72,11 +72,10 @@ import com.google.common.collect.Sets;
  */
 public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
     public static final Logger logger = LoggerFactory.getLogger(HttpStreamDataSearchClient.class);
-    public static final long WAIT_DURATION = 2 * 60000 ;
 
     private static ExecutorService executorService;
     static {
-        executorService = new ThreadPoolExecutor(20, 100, 60L, TimeUnit.SECONDS,
+        executorService = new ThreadPoolExecutor(20, 100, 60L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("stream-rpc-pool-t"));
     }
     private AssignmentsCache assignmentsCache;
@@ -96,7 +95,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
             final TupleFilter tupleFilter, final Set<TblColRef> dimensions, final Set<TblColRef> groups,
             final Set<FunctionDesc> metrics, final int storagePushDownLimit, final boolean allowStorageAggregation) {
         List<ReplicaSet> replicaSetsOfCube = assignmentsCache.getReplicaSetsByCube(cube.getName());
-        int timeout = cube.getConfig().getStreamingRPCHttpReadTimeout() * 2;
+        int timeout = 120 * 1000; // timeout should be configurable
         final QueuedStreamingTupleIterator result = new QueuedStreamingTupleIterator(replicaSetsOfCube.size(), timeout);
         final QueryContext query = QueryContextFacade.current();
 
@@ -106,6 +105,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
         final RecordsSerializer recordsSerializer = new RecordsSerializer(schema);
         final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime, tupleInfo,
                 tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation);
+
         logger.info("Query-{}:send request to stream receivers", query.getQueryId());
         for (final ReplicaSet rs : replicaSetsOfCube) {
             executorService.submit(new Runnable() {
@@ -165,7 +165,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
             return foundReceiver;
         }
 
-        if (System.currentTimeMillis() - lastFailTime > WAIT_DURATION) { // retry every 2 minutes
+        if (System.currentTimeMillis() - lastFailTime > 2 * 60 * 1000) { // retry every 2 minutes
             return foundReceiver;
         }
 
@@ -175,18 +175,18 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
     public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter,
             RecordsSerializer recordsSerializer, Node receiver, TupleInfo tupleInfo) throws Exception {
         String queryId = dataRequest.getQueryId();
+        logger.info("send query to receiver " + receiver + " with query id:" + queryId);
         String url = "http://" + receiver.getHost() + ":" + receiver.getPort() + "/kylin/api/data/query";
 
         try {
-            int connTimeout = cube.getConfig().getStreamingRPCHttpConnTimeout();
-            int readTimeout = cube.getConfig().getStreamingRPCHttpReadTimeout();
-            dataRequest.setDeadline(System.currentTimeMillis() + (int)(readTimeout * 1.5));
             String content = JsonUtil.writeValueAsString(dataRequest);
             Stopwatch sw = new Stopwatch();
             sw.start();
+            int connTimeout = cube.getConfig().getStreamingRPCHttpConnTimeout();
+            int readTimeout = cube.getConfig().getStreamingRPCHttpReadTimeout();
             String msg = restService.postRequest(url, content, connTimeout, readTimeout);
 
-            logger.info("query-{}: receive response from {} take time:{}", queryId, receiver, sw.elapsed(TimeUnit.MILLISECONDS));
+            logger.info("query-{}: receive response from {} take time:{}", queryId, receiver, sw.elapsedMillis());
             if (failedReceivers.containsKey(receiver)) {
                 failedReceivers.remove(receiver);
             }
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
index f32b751..07c9028 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
@@ -36,7 +36,6 @@ public class DataRequest {
     private boolean allowStorageAggregation;
 
     private long requestSendTime;
-    private long deadline;
     private boolean enableDetailProfile;
     private String storageBehavior;
 
@@ -143,12 +142,4 @@ public class DataRequest {
     public void setHavingFilter(String havingFilter) {
         this.havingFilter = havingFilter;
     }
-
-    public long getDeadline() {
-        return deadline;
-    }
-
-    public void setDeadline(long deadline) {
-        this.deadline = deadline;
-    }
 }
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
index 46f52a5..5ffe5f2 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
@@ -14,19 +14,18 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package org.apache.kylin.stream.core.query;
 
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kylin.common.KylinConfig;
@@ -39,39 +38,28 @@ import com.google.common.collect.Lists;
 
 public class MultiThreadsResultCollector extends ResultCollector {
     private static Logger logger = LoggerFactory.getLogger(MultiThreadsResultCollector.class);
-    private static ThreadPoolExecutor scannerThreadPool;
-    private static int MAX_RUNNING_THREAD_COUNT;
-
+    private static ExecutorService executor;
     static {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
-        MAX_RUNNING_THREAD_COUNT = config.getStreamingReceiverQueryMaxThreads();
-        scannerThreadPool = new ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
-                MAX_RUNNING_THREAD_COUNT, 60L, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(), new NamedThreadFactory("query-worker"));
+        executor = new ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
+                config.getStreamingReceiverQueryMaxThreads(), 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("query-worker"));
     }
 
-    private long deadline;
-    private String queryId;
-    /**
-     * if current query beyond the deadline
-     */
-    private AtomicBoolean cancelFlag = new AtomicBoolean(false);
+    private int timeout;
     private Semaphore workersSemaphore;
+    final BlockingQueue<Record> queue = new LinkedBlockingQueue<>(10000);
     private AtomicInteger notCompletedWorkers;
 
-    final BlockingQueue<Record> recordCachePool = new LinkedBlockingQueue<>(10000);
-
-    public MultiThreadsResultCollector(int numOfWorkers, long deadline) {
+    public MultiThreadsResultCollector(int numOfWorkers, int timeout) {
         this.workersSemaphore = new Semaphore(numOfWorkers);
-        this.deadline = deadline;
-        this.queryId = StreamingQueryProfile.get().getQueryId();
+        this.timeout = timeout;
     }
 
     @Override
     public Iterator<Record> iterator() {
         notCompletedWorkers = new AtomicInteger(searchResults.size());
-        Thread masterThread = new Thread(new WorkSubmitter(), "MultiThreadsResultCollector_" + queryId);
-        masterThread.start();
+        executor.submit(new WorkSubmitter());
 
         final int batchSize = 100;
         final long startTime = System.currentTimeMillis();
@@ -81,41 +69,38 @@ public class MultiThreadsResultCollector extends ResultCollector {
 
             @Override
             public boolean hasNext() {
-                boolean exits = (internalIT.hasNext() || !recordCachePool.isEmpty());
+                boolean exits = (internalIT.hasNext() || queue.size() > 0);
                 if (!exits) {
                     while (notCompletedWorkers.get() > 0) {
                         Thread.yield();
-                        if (System.currentTimeMillis() > deadline) {
-                            masterThread.interrupt(); // notify main thread
-                            cancelFlag.set(true);
-                            logger.warn("Beyond the deadline for {}.", queryId);
+                        long takeTime = System.currentTimeMillis() - startTime;
+                        if (takeTime > timeout) {
                             throw new RuntimeException("Timeout when iterate search result");
                         }
-                        if (internalIT.hasNext() || !recordCachePool.isEmpty()) {
+                        if (internalIT.hasNext() || queue.size() > 0) {
                             return true;
                         }
                     }
                 }
+
                 return exits;
             }
 
             @Override
             public Record next() {
                 try {
-                    if (System.currentTimeMillis() > deadline) {
+                    long takeTime = System.currentTimeMillis() - startTime;
+                    if (takeTime > timeout) {
                         throw new RuntimeException("Timeout when iterate search result");
                     }
                     if (!internalIT.hasNext()) {
                         recordList.clear();
-                        Record one = recordCachePool.poll(deadline - startTime, TimeUnit.MILLISECONDS);
+                        Record one = queue.poll(timeout - takeTime, TimeUnit.MILLISECONDS);
                         if (one == null) {
-                            masterThread.interrupt(); // notify main thread
-                            cancelFlag.set(true);
-                            logger.warn("Beyond the deadline for {}.", queryId);
                             throw new RuntimeException("Timeout when iterate search result");
                         }
                         recordList.add(one);
-                        recordCachePool.drainTo(recordList, batchSize - 1);
+                        queue.drainTo(recordList, batchSize - 1);
                         internalIT = recordList.iterator();
                     }
                     return internalIT.next();
@@ -143,13 +128,15 @@ public class MultiThreadsResultCollector extends ResultCollector {
             try {
                 result.startRead();
                 for (Record record : result) {
-                    recordCachePool.put(record.copy());
+                    try {
+                        queue.put(record.copy());
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("Timeout when visiting streaming segmenent", e);
+                    }
                 }
                 result.endRead();
-            } catch (InterruptedException inter) {
-                logger.warn("Cancelled scan streaming segment", inter);
             } catch (Exception e) {
-                logger.error("Error when iterate search result", e);
+                logger.error("error when iterate search result", e);
             } finally {
                 notCompletedWorkers.decrementAndGet();
                 workersSemaphore.release();
@@ -160,44 +147,15 @@ public class MultiThreadsResultCollector extends ResultCollector {
     private class WorkSubmitter implements Runnable {
         @Override
         public void run() {
-            List<Future> futureList = Lists.newArrayListWithExpectedSize(searchResults.size());
-            int cancelTimes = 0;
-            try {
-                for (final IStreamingSearchResult result : searchResults) {
-                    Future f = scannerThreadPool.submit(new ResultIterateWorker(result));
-                    futureList.add(f);
-                    workersSemaphore.acquire(); // Throw InterruptedException when interrupted
-                }
-                while (notCompletedWorkers.get() > 0) {
-                    Thread.sleep(100);
-                    if (cancelFlag.get() || Thread.currentThread().isInterrupted()) {
-                        break;
-                    }
-                }
-            } catch (InterruptedException inter) {
-                logger.warn("Interrupted", inter);
-            } finally {
-                for (Future f : futureList) {
-                    if (!f.isCancelled() || !f.isDone()) {
-                        if (f.cancel(true)) {
-                            cancelTimes++;
-                        }
-                    }
+            for (final IStreamingSearchResult result : searchResults) {
+                executor.submit(new ResultIterateWorker(result));
+                try {
+                    workersSemaphore.acquire();
+                } catch (InterruptedException e) {
+                    logger.error("interrupted", e);
                 }
             }
-            logger.debug("Finish MultiThreadsResultCollector for queryId {}, cancel {}. Current thread pool: {}.",
-                    queryId, cancelTimes, scannerThreadPool);
         }
     }
 
-    /**
-     * block query if return true
-     */
-    public static boolean isOccupied() {
-        boolean occupied = scannerThreadPool.getActiveCount() >= MAX_RUNNING_THREAD_COUNT;
-        if (occupied) {
-            logger.debug("ThreadPool {}", scannerThreadPool);
-        }
-        return occupied;
-    }
 }
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
index 1d38b90..f89ddec 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
@@ -43,6 +43,9 @@ import org.slf4j.LoggerFactory;
 
 public class StreamingCubeDataSearcher {
     private static Logger logger = LoggerFactory.getLogger(StreamingCubeDataSearcher.class);
+
+    private static int TIMEOUT = Integer.MAX_VALUE;
+
     private StreamingSegmentManager streamingSegmentManager;
     private String cubeName;
     private CubeDesc cubeDesc;
@@ -70,15 +73,7 @@ public class StreamingCubeDataSearcher {
         try {
             logger.info("query-{}: use cuboid {} to serve the query", queryProfile.getQueryId(),
                     searchRequest.getHitCuboid());
-            ResultCollector resultCollector = getResultCollector(searchRequest);
-            if (resultCollector instanceof MultiThreadsResultCollector) {
-                while (MultiThreadsResultCollector.isOccupied() && System.currentTimeMillis() < searchRequest.getDeadline()) {
-                    Thread.sleep(50);
-                }
-                if (System.currentTimeMillis() >= searchRequest.getDeadline()) {
-                    throw new RuntimeException("Timeout for " + queryProfile.getQueryId());
-                }
-            }
+            ResultCollector resultCollector = getResultCollector();
             Collection<StreamingCubeSegment> segments = streamingSegmentManager.getAllSegments();
             StreamingDataQueryPlanner scanRangePlanner = searchRequest.getQueryPlanner();
             for (StreamingCubeSegment queryableSegment : segments) {
@@ -110,10 +105,10 @@ public class StreamingCubeDataSearcher {
         }
     }
 
-    private ResultCollector getResultCollector(StreamingSearchContext searchRequest) {
+    private ResultCollector getResultCollector() {
         int useThreads = cubeDesc.getConfig().getStreamingReceiverUseThreadsPerQuery();
         if (useThreads > 1) {
-            return new MultiThreadsResultCollector(useThreads, searchRequest.getDeadline());
+            return new MultiThreadsResultCollector(useThreads, TIMEOUT);
         } else {
             return new SingleThreadResultCollector();
         }
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
index 307c2be..9da8e5b 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
@@ -45,8 +45,6 @@ public class StreamingSearchContext {
     private long hitCuboid;
     private long basicCuboid;
 
-    private long deadline = Long.MAX_VALUE;
-
     public StreamingSearchContext(CubeDesc cubeDesc, Set<TblColRef> dimensions, Set<TblColRef> groups,
                                   Set<FunctionDesc> metrics, TupleFilter filter, TupleFilter havingFilter) {
         this.cubeDesc = cubeDesc;
@@ -58,7 +56,6 @@ public class StreamingSearchContext {
         this.respResultSchema = new ResponseResultSchema(cubeDesc, dimensions, metrics);
         this.queryPlanner = new StreamingDataQueryPlanner(cubeDesc, filter);
         this.addedGroups = Sets.newHashSet();
-        this.deadline = deadline;
         calculateHitCuboid();
     }
 
@@ -161,12 +158,4 @@ public class StreamingSearchContext {
         sortedSet.addAll(cubeDesc.getMandatoryCuboids());
         return sortedSet;
     }
-
-    public long getDeadline() {
-        return deadline;
-    }
-
-    public void setDeadline(long deadline) {
-        this.deadline =  deadline;
-    }
 }
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
index 51d0cba..45c6307 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.StorageSideBehavior;
@@ -80,7 +79,7 @@ public class DataController extends BasicController {
         }
         StreamingQueryProfile.set(queryProfile);
         logger.info("receive query request queryId:{}", queryId);
-        try (SetThreadName changeName = new SetThreadName("Query %s", queryId)) {
+        try {
             final Stopwatch sw = new Stopwatch();
             sw.start();
             String cubeName = dataRequest.getCubeName();
@@ -106,7 +105,6 @@ public class DataController extends BasicController {
 
             StreamingSearchContext gtSearchRequest = new StreamingSearchContext(cubeDesc, dimensions, groups,
                     metrics, tupleFilter, havingFilter);
-            gtSearchRequest.setDeadline(dataRequest.getDeadline());
             searchResult = dataSearcher.doSearch(gtSearchRequest, minSegmentTime,
                     dataRequest.isAllowStorageAggregation());