You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/10/18 09:19:33 UTC
[kylin] 01/02: KYLIN-4771 Clear the recordCachePool when the
deadline has reached; add timeout for the recordCachePool offer method.
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 6096f0869f2293699d4ad8fe6499c1042416d65a
Author: ggke <gu...@autohome.com.cn>
AuthorDate: Fri Oct 9 18:54:08 2020 +0800
KYLIN-4771 Clear the recordCachePool when the deadline has reached;add timeout for the recordCachePool offer method.
---
.../kylin/stream/core/query/MultiThreadsResultCollector.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
index 0ca08e4..f0c91fd 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
@@ -89,6 +89,7 @@ public class MultiThreadsResultCollector extends ResultCollector {
if (System.currentTimeMillis() > deadline) {
masterThread.interrupt(); // notify main thread
cancelFlag.set(true);
+ recordCachePool.clear();
logger.warn("Beyond the deadline for {}.", queryId);
throw new RuntimeException("Timeout when iterate search result");
}
@@ -112,6 +113,7 @@ public class MultiThreadsResultCollector extends ResultCollector {
if (one == null) {
masterThread.interrupt(); // notify main thread
cancelFlag.set(true);
+ recordCachePool.clear();
logger.debug("Exceeded the deadline for {}.", queryId);
throw new RuntimeException("Timeout when iterate search result");
}
@@ -141,10 +143,15 @@ public class MultiThreadsResultCollector extends ResultCollector {
@Override
public void run() {
+ long offserTimeout = 0L;
try {
result.startRead();
for (Record record : result) {
- recordCachePool.put(record.copy());
+ offserTimeout = deadline - System.currentTimeMillis();
+ if (!recordCachePool.offer(record, offserTimeout, TimeUnit.MILLISECONDS)) {
+ logger.warn("Timeout when offer to recordCachePool, deadline: {}, offser Timeout: {}", deadline, offserTimeout);
+ break;
+ }
}
result.endRead();
} catch (InterruptedException inter) {