You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/10/18 09:19:33 UTC

[kylin] 01/02: KYLIN-4771 Clear the recordCachePool when the deadline has reached; add timeout for the recordCachePool offer method.

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 6096f0869f2293699d4ad8fe6499c1042416d65a
Author: ggke <gu...@autohome.com.cn>
AuthorDate: Fri Oct 9 18:54:08 2020 +0800

    KYLIN-4771 Clear the recordCachePool when the deadline has reached;add timeout for the recordCachePool offer method.
---
 .../kylin/stream/core/query/MultiThreadsResultCollector.java     | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
index 0ca08e4..f0c91fd 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
@@ -89,6 +89,7 @@ public class MultiThreadsResultCollector extends ResultCollector {
                         if (System.currentTimeMillis() > deadline) {
                             masterThread.interrupt(); // notify main thread
                             cancelFlag.set(true);
+                            recordCachePool.clear();
                             logger.warn("Beyond the deadline for {}.", queryId);
                             throw new RuntimeException("Timeout when iterate search result");
                         }
@@ -112,6 +113,7 @@ public class MultiThreadsResultCollector extends ResultCollector {
                         if (one == null) {
                             masterThread.interrupt(); // notify main thread
                             cancelFlag.set(true);
+                            recordCachePool.clear();
                             logger.debug("Exceeded the deadline for {}.", queryId);
                             throw new RuntimeException("Timeout when iterate search result");
                         }
@@ -141,10 +143,15 @@ public class MultiThreadsResultCollector extends ResultCollector {
 
         @Override
         public void run() {
+            long offserTimeout = 0L;
             try {
                 result.startRead();
                 for (Record record : result) {
-                    recordCachePool.put(record.copy());
+                    offserTimeout = deadline - System.currentTimeMillis();
+                    if (!recordCachePool.offer(record, offserTimeout, TimeUnit.MILLISECONDS)) {
+                        logger.warn("Timeout when offer to recordCachePool, deadline: {}, offser Timeout: {}", deadline, offserTimeout);
+                        break;
+                    }
                 }
                 result.endRead();
             } catch (InterruptedException inter) {