You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/01/23 13:27:20 UTC
[kylin] 01/02: Revert "KYLIN-4305 Streaming Receiver cannot limit
income query request or cancel long-running query"
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit eb65877a0556d0f45bb7876b10a924304053ef97
Author: nichunen <ni...@apache.org>
AuthorDate: Thu Jan 23 11:49:21 2020 +0800
Revert "KYLIN-4305 Streaming Receiver cannot limit income query request or cancel long-running query"
This reverts commit 61e319b524464ed2cba21fe659d057362a954b6d.
---
.../org/apache/kylin/common/KylinConfigBase.java | 7 +-
.../stream/rpc/HttpStreamDataSearchClient.java | 16 ++--
.../kylin/stream/core/model/DataRequest.java | 9 --
.../core/query/MultiThreadsResultCollector.java | 106 +++++++--------------
.../core/query/StreamingCubeDataSearcher.java | 17 ++--
.../stream/core/query/StreamingSearchContext.java | 11 ---
.../server/rest/controller/DataController.java | 4 +-
7 files changed, 49 insertions(+), 121 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 9de676b..d9561fd 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -55,7 +55,6 @@ import java.util.regex.Pattern;
public abstract class KylinConfigBase implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(KylinConfigBase.class);
- private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
private static final String FALSE = "false";
private static final String TRUE = "true";
@@ -2347,13 +2346,11 @@ public abstract class KylinConfigBase implements Serializable {
}
public int getStreamingReceiverQueryCoreThreads() {
- int def = getStreamingReceiverQueryMaxThreads() - 1;
- return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", def + ""));
+ return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50"));
}
public int getStreamingReceiverQueryMaxThreads() {
- int def = Math.max(2, AVAILABLE_PROCESSORS - 1);
- return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", def + ""));
+ return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200"));
}
public int getStreamingReceiverUseThreadsPerQuery() {
diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
index e1119a8..bc075ce 100644
--- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
+++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
@@ -72,11 +72,10 @@ import com.google.common.collect.Sets;
*/
public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
public static final Logger logger = LoggerFactory.getLogger(HttpStreamDataSearchClient.class);
- public static final long WAIT_DURATION = 2 * 60000 ;
private static ExecutorService executorService;
static {
- executorService = new ThreadPoolExecutor(20, 100, 60L, TimeUnit.SECONDS,
+ executorService = new ThreadPoolExecutor(20, 100, 60L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("stream-rpc-pool-t"));
}
private AssignmentsCache assignmentsCache;
@@ -96,7 +95,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
final TupleFilter tupleFilter, final Set<TblColRef> dimensions, final Set<TblColRef> groups,
final Set<FunctionDesc> metrics, final int storagePushDownLimit, final boolean allowStorageAggregation) {
List<ReplicaSet> replicaSetsOfCube = assignmentsCache.getReplicaSetsByCube(cube.getName());
- int timeout = cube.getConfig().getStreamingRPCHttpReadTimeout() * 2;
+ int timeout = 120 * 1000; // timeout should be configurable
final QueuedStreamingTupleIterator result = new QueuedStreamingTupleIterator(replicaSetsOfCube.size(), timeout);
final QueryContext query = QueryContextFacade.current();
@@ -106,6 +105,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
final RecordsSerializer recordsSerializer = new RecordsSerializer(schema);
final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime, tupleInfo,
tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation);
+
logger.info("Query-{}:send request to stream receivers", query.getQueryId());
for (final ReplicaSet rs : replicaSetsOfCube) {
executorService.submit(new Runnable() {
@@ -165,7 +165,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
return foundReceiver;
}
- if (System.currentTimeMillis() - lastFailTime > WAIT_DURATION) { // retry every 2 minutes
+ if (System.currentTimeMillis() - lastFailTime > 2 * 60 * 1000) { // retry every 2 minutes
return foundReceiver;
}
@@ -175,18 +175,18 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter,
RecordsSerializer recordsSerializer, Node receiver, TupleInfo tupleInfo) throws Exception {
String queryId = dataRequest.getQueryId();
+ logger.info("send query to receiver " + receiver + " with query id:" + queryId);
String url = "http://" + receiver.getHost() + ":" + receiver.getPort() + "/kylin/api/data/query";
try {
- int connTimeout = cube.getConfig().getStreamingRPCHttpConnTimeout();
- int readTimeout = cube.getConfig().getStreamingRPCHttpReadTimeout();
- dataRequest.setDeadline(System.currentTimeMillis() + (int)(readTimeout * 1.5));
String content = JsonUtil.writeValueAsString(dataRequest);
Stopwatch sw = new Stopwatch();
sw.start();
+ int connTimeout = cube.getConfig().getStreamingRPCHttpConnTimeout();
+ int readTimeout = cube.getConfig().getStreamingRPCHttpReadTimeout();
String msg = restService.postRequest(url, content, connTimeout, readTimeout);
- logger.info("query-{}: receive response from {} take time:{}", queryId, receiver, sw.elapsed(TimeUnit.MILLISECONDS));
+ logger.info("query-{}: receive response from {} take time:{}", queryId, receiver, sw.elapsedMillis());
if (failedReceivers.containsKey(receiver)) {
failedReceivers.remove(receiver);
}
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
index f32b751..07c9028 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
@@ -36,7 +36,6 @@ public class DataRequest {
private boolean allowStorageAggregation;
private long requestSendTime;
- private long deadline;
private boolean enableDetailProfile;
private String storageBehavior;
@@ -143,12 +142,4 @@ public class DataRequest {
public void setHavingFilter(String havingFilter) {
this.havingFilter = havingFilter;
}
-
- public long getDeadline() {
- return deadline;
- }
-
- public void setDeadline(long deadline) {
- this.deadline = deadline;
- }
}
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
index 46f52a5..5ffe5f2 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
@@ -14,19 +14,18 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
+*/
package org.apache.kylin.stream.core.query;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.KylinConfig;
@@ -39,39 +38,28 @@ import com.google.common.collect.Lists;
public class MultiThreadsResultCollector extends ResultCollector {
private static Logger logger = LoggerFactory.getLogger(MultiThreadsResultCollector.class);
- private static ThreadPoolExecutor scannerThreadPool;
- private static int MAX_RUNNING_THREAD_COUNT;
-
+ private static ExecutorService executor;
static {
KylinConfig config = KylinConfig.getInstanceFromEnv();
- MAX_RUNNING_THREAD_COUNT = config.getStreamingReceiverQueryMaxThreads();
- scannerThreadPool = new ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
- MAX_RUNNING_THREAD_COUNT, 60L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(), new NamedThreadFactory("query-worker"));
+ executor = new ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
+ config.getStreamingReceiverQueryMaxThreads(), 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("query-worker"));
}
- private long deadline;
- private String queryId;
- /**
- * if current query beyond the deadline
- */
- private AtomicBoolean cancelFlag = new AtomicBoolean(false);
+ private int timeout;
private Semaphore workersSemaphore;
+ final BlockingQueue<Record> queue = new LinkedBlockingQueue<>(10000);
private AtomicInteger notCompletedWorkers;
- final BlockingQueue<Record> recordCachePool = new LinkedBlockingQueue<>(10000);
-
- public MultiThreadsResultCollector(int numOfWorkers, long deadline) {
+ public MultiThreadsResultCollector(int numOfWorkers, int timeout) {
this.workersSemaphore = new Semaphore(numOfWorkers);
- this.deadline = deadline;
- this.queryId = StreamingQueryProfile.get().getQueryId();
+ this.timeout = timeout;
}
@Override
public Iterator<Record> iterator() {
notCompletedWorkers = new AtomicInteger(searchResults.size());
- Thread masterThread = new Thread(new WorkSubmitter(), "MultiThreadsResultCollector_" + queryId);
- masterThread.start();
+ executor.submit(new WorkSubmitter());
final int batchSize = 100;
final long startTime = System.currentTimeMillis();
@@ -81,41 +69,38 @@ public class MultiThreadsResultCollector extends ResultCollector {
@Override
public boolean hasNext() {
- boolean exits = (internalIT.hasNext() || !recordCachePool.isEmpty());
+ boolean exits = (internalIT.hasNext() || queue.size() > 0);
if (!exits) {
while (notCompletedWorkers.get() > 0) {
Thread.yield();
- if (System.currentTimeMillis() > deadline) {
- masterThread.interrupt(); // notify main thread
- cancelFlag.set(true);
- logger.warn("Beyond the deadline for {}.", queryId);
+ long takeTime = System.currentTimeMillis() - startTime;
+ if (takeTime > timeout) {
throw new RuntimeException("Timeout when iterate search result");
}
- if (internalIT.hasNext() || !recordCachePool.isEmpty()) {
+ if (internalIT.hasNext() || queue.size() > 0) {
return true;
}
}
}
+
return exits;
}
@Override
public Record next() {
try {
- if (System.currentTimeMillis() > deadline) {
+ long takeTime = System.currentTimeMillis() - startTime;
+ if (takeTime > timeout) {
throw new RuntimeException("Timeout when iterate search result");
}
if (!internalIT.hasNext()) {
recordList.clear();
- Record one = recordCachePool.poll(deadline - startTime, TimeUnit.MILLISECONDS);
+ Record one = queue.poll(timeout - takeTime, TimeUnit.MILLISECONDS);
if (one == null) {
- masterThread.interrupt(); // notify main thread
- cancelFlag.set(true);
- logger.warn("Beyond the deadline for {}.", queryId);
throw new RuntimeException("Timeout when iterate search result");
}
recordList.add(one);
- recordCachePool.drainTo(recordList, batchSize - 1);
+ queue.drainTo(recordList, batchSize - 1);
internalIT = recordList.iterator();
}
return internalIT.next();
@@ -143,13 +128,15 @@ public class MultiThreadsResultCollector extends ResultCollector {
try {
result.startRead();
for (Record record : result) {
- recordCachePool.put(record.copy());
+ try {
+ queue.put(record.copy());
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Timeout when visiting streaming segmenent", e);
+ }
}
result.endRead();
- } catch (InterruptedException inter) {
- logger.warn("Cancelled scan streaming segment", inter);
} catch (Exception e) {
- logger.error("Error when iterate search result", e);
+ logger.error("error when iterate search result", e);
} finally {
notCompletedWorkers.decrementAndGet();
workersSemaphore.release();
@@ -160,44 +147,15 @@ public class MultiThreadsResultCollector extends ResultCollector {
private class WorkSubmitter implements Runnable {
@Override
public void run() {
- List<Future> futureList = Lists.newArrayListWithExpectedSize(searchResults.size());
- int cancelTimes = 0;
- try {
- for (final IStreamingSearchResult result : searchResults) {
- Future f = scannerThreadPool.submit(new ResultIterateWorker(result));
- futureList.add(f);
- workersSemaphore.acquire(); // Throw InterruptedException when interrupted
- }
- while (notCompletedWorkers.get() > 0) {
- Thread.sleep(100);
- if (cancelFlag.get() || Thread.currentThread().isInterrupted()) {
- break;
- }
- }
- } catch (InterruptedException inter) {
- logger.warn("Interrupted", inter);
- } finally {
- for (Future f : futureList) {
- if (!f.isCancelled() || !f.isDone()) {
- if (f.cancel(true)) {
- cancelTimes++;
- }
- }
+ for (final IStreamingSearchResult result : searchResults) {
+ executor.submit(new ResultIterateWorker(result));
+ try {
+ workersSemaphore.acquire();
+ } catch (InterruptedException e) {
+ logger.error("interrupted", e);
}
}
- logger.debug("Finish MultiThreadsResultCollector for queryId {}, cancel {}. Current thread pool: {}.",
- queryId, cancelTimes, scannerThreadPool);
}
}
- /**
- * block query if return true
- */
- public static boolean isOccupied() {
- boolean occupied = scannerThreadPool.getActiveCount() >= MAX_RUNNING_THREAD_COUNT;
- if (occupied) {
- logger.debug("ThreadPool {}", scannerThreadPool);
- }
- return occupied;
- }
}
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
index 1d38b90..f89ddec 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
@@ -43,6 +43,9 @@ import org.slf4j.LoggerFactory;
public class StreamingCubeDataSearcher {
private static Logger logger = LoggerFactory.getLogger(StreamingCubeDataSearcher.class);
+
+ private static int TIMEOUT = Integer.MAX_VALUE;
+
private StreamingSegmentManager streamingSegmentManager;
private String cubeName;
private CubeDesc cubeDesc;
@@ -70,15 +73,7 @@ public class StreamingCubeDataSearcher {
try {
logger.info("query-{}: use cuboid {} to serve the query", queryProfile.getQueryId(),
searchRequest.getHitCuboid());
- ResultCollector resultCollector = getResultCollector(searchRequest);
- if (resultCollector instanceof MultiThreadsResultCollector) {
- while (MultiThreadsResultCollector.isOccupied() && System.currentTimeMillis() < searchRequest.getDeadline()) {
- Thread.sleep(50);
- }
- if (System.currentTimeMillis() >= searchRequest.getDeadline()) {
- throw new RuntimeException("Timeout for " + queryProfile.getQueryId());
- }
- }
+ ResultCollector resultCollector = getResultCollector();
Collection<StreamingCubeSegment> segments = streamingSegmentManager.getAllSegments();
StreamingDataQueryPlanner scanRangePlanner = searchRequest.getQueryPlanner();
for (StreamingCubeSegment queryableSegment : segments) {
@@ -110,10 +105,10 @@ public class StreamingCubeDataSearcher {
}
}
- private ResultCollector getResultCollector(StreamingSearchContext searchRequest) {
+ private ResultCollector getResultCollector() {
int useThreads = cubeDesc.getConfig().getStreamingReceiverUseThreadsPerQuery();
if (useThreads > 1) {
- return new MultiThreadsResultCollector(useThreads, searchRequest.getDeadline());
+ return new MultiThreadsResultCollector(useThreads, TIMEOUT);
} else {
return new SingleThreadResultCollector();
}
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
index 307c2be..9da8e5b 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
@@ -45,8 +45,6 @@ public class StreamingSearchContext {
private long hitCuboid;
private long basicCuboid;
- private long deadline = Long.MAX_VALUE;
-
public StreamingSearchContext(CubeDesc cubeDesc, Set<TblColRef> dimensions, Set<TblColRef> groups,
Set<FunctionDesc> metrics, TupleFilter filter, TupleFilter havingFilter) {
this.cubeDesc = cubeDesc;
@@ -58,7 +56,6 @@ public class StreamingSearchContext {
this.respResultSchema = new ResponseResultSchema(cubeDesc, dimensions, metrics);
this.queryPlanner = new StreamingDataQueryPlanner(cubeDesc, filter);
this.addedGroups = Sets.newHashSet();
- this.deadline = deadline;
calculateHitCuboid();
}
@@ -161,12 +158,4 @@ public class StreamingSearchContext {
sortedSet.addAll(cubeDesc.getMandatoryCuboids());
return sortedSet;
}
-
- public long getDeadline() {
- return deadline;
- }
-
- public void setDeadline(long deadline) {
- this.deadline = deadline;
- }
}
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
index 51d0cba..45c6307 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
@@ -24,7 +24,6 @@ import java.util.Set;
import org.apache.commons.codec.binary.Base64;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.StorageSideBehavior;
@@ -80,7 +79,7 @@ public class DataController extends BasicController {
}
StreamingQueryProfile.set(queryProfile);
logger.info("receive query request queryId:{}", queryId);
- try (SetThreadName changeName = new SetThreadName("Query %s", queryId)) {
+ try {
final Stopwatch sw = new Stopwatch();
sw.start();
String cubeName = dataRequest.getCubeName();
@@ -106,7 +105,6 @@ public class DataController extends BasicController {
StreamingSearchContext gtSearchRequest = new StreamingSearchContext(cubeDesc, dimensions, groups,
metrics, tupleFilter, havingFilter);
- gtSearchRequest.setDeadline(dataRequest.getDeadline());
searchResult = dataSearcher.doSearch(gtSearchRequest, minSegmentTime,
dataRequest.isAllowStorageAggregation());