You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/01/23 13:27:19 UTC

[kylin] branch master updated (c6d4c2b -> 7019774)

This is an automated email from the ASF dual-hosted git repository.

nic pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from c6d4c2b  Validate uuid to prevent sql injection
     new eb65877  Revert "KYLIN-4305 Streaming Receiver cannot limit income query request or cancel long-running query"
     new 7019774  Set repo of spring-snapshots with url https

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kylin/common/KylinConfigBase.java   |   7 +-
 server-base/pom.xml                                |   4 +-
 server/pom.xml                                     |   4 +-
 .../stream/rpc/HttpStreamDataSearchClient.java     |  16 ++--
 .../kylin/stream/core/model/DataRequest.java       |   9 --
 .../core/query/MultiThreadsResultCollector.java    | 106 +++++++--------------
 .../core/query/StreamingCubeDataSearcher.java      |  17 ++--
 .../stream/core/query/StreamingSearchContext.java  |  11 ---
 stream-receiver/pom.xml                            |   4 +-
 .../server/rest/controller/DataController.java     |   4 +-
 10 files changed, 55 insertions(+), 127 deletions(-)


[kylin] 02/02: Set repo of spring-snapshots with url https

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7019774375ba822eb8dc46d1c18faa339a4a551e
Author: nichunen <ni...@apache.org>
AuthorDate: Thu Jan 23 15:07:38 2020 +0800

    Set repo of spring-snapshots with url https
---
 server-base/pom.xml     | 4 ++--
 server/pom.xml          | 4 ++--
 stream-receiver/pom.xml | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/server-base/pom.xml b/server-base/pom.xml
index 7ec9b73..addbd3b 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -263,7 +263,7 @@
     <repositories>
         <repository>
             <id>spring-snapshots</id>
-            <url>http://repo.spring.io/libs-snapshot</url>
+            <url>https://repo.spring.io/libs-snapshot</url>
             <snapshots>
                 <enabled>true</enabled>
             </snapshots>
@@ -272,7 +272,7 @@
     <pluginRepositories>
         <pluginRepository>
             <id>spring-snapshots</id>
-            <url>http://repo.spring.io/libs-snapshot</url>
+            <url>https://repo.spring.io/libs-snapshot</url>
             <snapshots>
                 <enabled>true</enabled>
             </snapshots>
diff --git a/server/pom.xml b/server/pom.xml
index 65daaca..7c789b6 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -350,7 +350,7 @@
     <repositories>
         <repository>
             <id>spring-snapshots</id>
-            <url>http://repo.spring.io/libs-snapshot</url>
+            <url>https://repo.spring.io/libs-snapshot</url>
             <snapshots>
                 <enabled>true</enabled>
             </snapshots>
@@ -359,7 +359,7 @@
     <pluginRepositories>
         <pluginRepository>
             <id>spring-snapshots</id>
-            <url>http://repo.spring.io/libs-snapshot</url>
+            <url>https://repo.spring.io/libs-snapshot</url>
             <snapshots>
                 <enabled>true</enabled>
             </snapshots>
diff --git a/stream-receiver/pom.xml b/stream-receiver/pom.xml
index cc2ef92..12a5068 100644
--- a/stream-receiver/pom.xml
+++ b/stream-receiver/pom.xml
@@ -290,7 +290,7 @@
     <repositories>
         <repository>
             <id>spring-snapshots</id>
-            <url>http://repo.spring.io/libs-snapshot</url>
+            <url>https://repo.spring.io/libs-snapshot</url>
             <snapshots>
                 <enabled>true</enabled>
             </snapshots>
@@ -299,7 +299,7 @@
     <pluginRepositories>
         <pluginRepository>
             <id>spring-snapshots</id>
-            <url>http://repo.spring.io/libs-snapshot</url>
+            <url>https://repo.spring.io/libs-snapshot</url>
             <snapshots>
                 <enabled>true</enabled>
             </snapshots>


[kylin] 01/02: Revert "KYLIN-4305 Streaming Receiver cannot limit income query request or cancel long-running query"

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit eb65877a0556d0f45bb7876b10a924304053ef97
Author: nichunen <ni...@apache.org>
AuthorDate: Thu Jan 23 11:49:21 2020 +0800

    Revert "KYLIN-4305 Streaming Receiver cannot limit income query request or cancel long-running query"
    
    This reverts commit 61e319b524464ed2cba21fe659d057362a954b6d.
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   7 +-
 .../stream/rpc/HttpStreamDataSearchClient.java     |  16 ++--
 .../kylin/stream/core/model/DataRequest.java       |   9 --
 .../core/query/MultiThreadsResultCollector.java    | 106 +++++++--------------
 .../core/query/StreamingCubeDataSearcher.java      |  17 ++--
 .../stream/core/query/StreamingSearchContext.java  |  11 ---
 .../server/rest/controller/DataController.java     |   4 +-
 7 files changed, 49 insertions(+), 121 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 9de676b..d9561fd 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -55,7 +55,6 @@ import java.util.regex.Pattern;
 public abstract class KylinConfigBase implements Serializable {
     private static final long serialVersionUID = 1L;
     private static final Logger logger = LoggerFactory.getLogger(KylinConfigBase.class);
-    private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
 
     private static final String FALSE = "false";
     private static final String TRUE = "true";
@@ -2347,13 +2346,11 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     public int getStreamingReceiverQueryCoreThreads() {
-        int def = getStreamingReceiverQueryMaxThreads() - 1;
-        return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", def + ""));
+        return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50"));
     }
 
     public int getStreamingReceiverQueryMaxThreads() {
-        int def = Math.max(2, AVAILABLE_PROCESSORS - 1);
-        return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", def + ""));
+        return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200"));
     }
 
     public int getStreamingReceiverUseThreadsPerQuery() {
diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
index e1119a8..bc075ce 100644
--- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
+++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
@@ -72,11 +72,10 @@ import com.google.common.collect.Sets;
  */
 public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
     public static final Logger logger = LoggerFactory.getLogger(HttpStreamDataSearchClient.class);
-    public static final long WAIT_DURATION = 2 * 60000 ;
 
     private static ExecutorService executorService;
     static {
-        executorService = new ThreadPoolExecutor(20, 100, 60L, TimeUnit.SECONDS,
+        executorService = new ThreadPoolExecutor(20, 100, 60L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("stream-rpc-pool-t"));
     }
     private AssignmentsCache assignmentsCache;
@@ -96,7 +95,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
             final TupleFilter tupleFilter, final Set<TblColRef> dimensions, final Set<TblColRef> groups,
             final Set<FunctionDesc> metrics, final int storagePushDownLimit, final boolean allowStorageAggregation) {
         List<ReplicaSet> replicaSetsOfCube = assignmentsCache.getReplicaSetsByCube(cube.getName());
-        int timeout = cube.getConfig().getStreamingRPCHttpReadTimeout() * 2;
+        int timeout = 120 * 1000; // timeout should be configurable
         final QueuedStreamingTupleIterator result = new QueuedStreamingTupleIterator(replicaSetsOfCube.size(), timeout);
         final QueryContext query = QueryContextFacade.current();
 
@@ -106,6 +105,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
         final RecordsSerializer recordsSerializer = new RecordsSerializer(schema);
         final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime, tupleInfo,
                 tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation);
+
         logger.info("Query-{}:send request to stream receivers", query.getQueryId());
         for (final ReplicaSet rs : replicaSetsOfCube) {
             executorService.submit(new Runnable() {
@@ -165,7 +165,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
             return foundReceiver;
         }
 
-        if (System.currentTimeMillis() - lastFailTime > WAIT_DURATION) { // retry every 2 minutes
+        if (System.currentTimeMillis() - lastFailTime > 2 * 60 * 1000) { // retry every 2 minutes
             return foundReceiver;
         }
 
@@ -175,18 +175,18 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
     public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter,
             RecordsSerializer recordsSerializer, Node receiver, TupleInfo tupleInfo) throws Exception {
         String queryId = dataRequest.getQueryId();
+        logger.info("send query to receiver " + receiver + " with query id:" + queryId);
         String url = "http://" + receiver.getHost() + ":" + receiver.getPort() + "/kylin/api/data/query";
 
         try {
-            int connTimeout = cube.getConfig().getStreamingRPCHttpConnTimeout();
-            int readTimeout = cube.getConfig().getStreamingRPCHttpReadTimeout();
-            dataRequest.setDeadline(System.currentTimeMillis() + (int)(readTimeout * 1.5));
             String content = JsonUtil.writeValueAsString(dataRequest);
             Stopwatch sw = new Stopwatch();
             sw.start();
+            int connTimeout = cube.getConfig().getStreamingRPCHttpConnTimeout();
+            int readTimeout = cube.getConfig().getStreamingRPCHttpReadTimeout();
             String msg = restService.postRequest(url, content, connTimeout, readTimeout);
 
-            logger.info("query-{}: receive response from {} take time:{}", queryId, receiver, sw.elapsed(TimeUnit.MILLISECONDS));
+            logger.info("query-{}: receive response from {} take time:{}", queryId, receiver, sw.elapsedMillis());
             if (failedReceivers.containsKey(receiver)) {
                 failedReceivers.remove(receiver);
             }
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
index f32b751..07c9028 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
@@ -36,7 +36,6 @@ public class DataRequest {
     private boolean allowStorageAggregation;
 
     private long requestSendTime;
-    private long deadline;
     private boolean enableDetailProfile;
     private String storageBehavior;
 
@@ -143,12 +142,4 @@ public class DataRequest {
     public void setHavingFilter(String havingFilter) {
         this.havingFilter = havingFilter;
     }
-
-    public long getDeadline() {
-        return deadline;
-    }
-
-    public void setDeadline(long deadline) {
-        this.deadline = deadline;
-    }
 }
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
index 46f52a5..5ffe5f2 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
@@ -14,19 +14,18 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package org.apache.kylin.stream.core.query;
 
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kylin.common.KylinConfig;
@@ -39,39 +38,28 @@ import com.google.common.collect.Lists;
 
 public class MultiThreadsResultCollector extends ResultCollector {
     private static Logger logger = LoggerFactory.getLogger(MultiThreadsResultCollector.class);
-    private static ThreadPoolExecutor scannerThreadPool;
-    private static int MAX_RUNNING_THREAD_COUNT;
-
+    private static ExecutorService executor;
     static {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
-        MAX_RUNNING_THREAD_COUNT = config.getStreamingReceiverQueryMaxThreads();
-        scannerThreadPool = new ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
-                MAX_RUNNING_THREAD_COUNT, 60L, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(), new NamedThreadFactory("query-worker"));
+        executor = new ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
+                config.getStreamingReceiverQueryMaxThreads(), 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("query-worker"));
     }
 
-    private long deadline;
-    private String queryId;
-    /**
-     * if current query beyond the deadline
-     */
-    private AtomicBoolean cancelFlag = new AtomicBoolean(false);
+    private int timeout;
     private Semaphore workersSemaphore;
+    final BlockingQueue<Record> queue = new LinkedBlockingQueue<>(10000);
     private AtomicInteger notCompletedWorkers;
 
-    final BlockingQueue<Record> recordCachePool = new LinkedBlockingQueue<>(10000);
-
-    public MultiThreadsResultCollector(int numOfWorkers, long deadline) {
+    public MultiThreadsResultCollector(int numOfWorkers, int timeout) {
         this.workersSemaphore = new Semaphore(numOfWorkers);
-        this.deadline = deadline;
-        this.queryId = StreamingQueryProfile.get().getQueryId();
+        this.timeout = timeout;
     }
 
     @Override
     public Iterator<Record> iterator() {
         notCompletedWorkers = new AtomicInteger(searchResults.size());
-        Thread masterThread = new Thread(new WorkSubmitter(), "MultiThreadsResultCollector_" + queryId);
-        masterThread.start();
+        executor.submit(new WorkSubmitter());
 
         final int batchSize = 100;
         final long startTime = System.currentTimeMillis();
@@ -81,41 +69,38 @@ public class MultiThreadsResultCollector extends ResultCollector {
 
             @Override
             public boolean hasNext() {
-                boolean exits = (internalIT.hasNext() || !recordCachePool.isEmpty());
+                boolean exits = (internalIT.hasNext() || queue.size() > 0);
                 if (!exits) {
                     while (notCompletedWorkers.get() > 0) {
                         Thread.yield();
-                        if (System.currentTimeMillis() > deadline) {
-                            masterThread.interrupt(); // notify main thread
-                            cancelFlag.set(true);
-                            logger.warn("Beyond the deadline for {}.", queryId);
+                        long takeTime = System.currentTimeMillis() - startTime;
+                        if (takeTime > timeout) {
                             throw new RuntimeException("Timeout when iterate search result");
                         }
-                        if (internalIT.hasNext() || !recordCachePool.isEmpty()) {
+                        if (internalIT.hasNext() || queue.size() > 0) {
                             return true;
                         }
                     }
                 }
+
                 return exits;
             }
 
             @Override
             public Record next() {
                 try {
-                    if (System.currentTimeMillis() > deadline) {
+                    long takeTime = System.currentTimeMillis() - startTime;
+                    if (takeTime > timeout) {
                         throw new RuntimeException("Timeout when iterate search result");
                     }
                     if (!internalIT.hasNext()) {
                         recordList.clear();
-                        Record one = recordCachePool.poll(deadline - startTime, TimeUnit.MILLISECONDS);
+                        Record one = queue.poll(timeout - takeTime, TimeUnit.MILLISECONDS);
                         if (one == null) {
-                            masterThread.interrupt(); // notify main thread
-                            cancelFlag.set(true);
-                            logger.warn("Beyond the deadline for {}.", queryId);
                             throw new RuntimeException("Timeout when iterate search result");
                         }
                         recordList.add(one);
-                        recordCachePool.drainTo(recordList, batchSize - 1);
+                        queue.drainTo(recordList, batchSize - 1);
                         internalIT = recordList.iterator();
                     }
                     return internalIT.next();
@@ -143,13 +128,15 @@ public class MultiThreadsResultCollector extends ResultCollector {
             try {
                 result.startRead();
                 for (Record record : result) {
-                    recordCachePool.put(record.copy());
+                    try {
+                        queue.put(record.copy());
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("Timeout when visiting streaming segmenent", e);
+                    }
                 }
                 result.endRead();
-            } catch (InterruptedException inter) {
-                logger.warn("Cancelled scan streaming segment", inter);
             } catch (Exception e) {
-                logger.error("Error when iterate search result", e);
+                logger.error("error when iterate search result", e);
             } finally {
                 notCompletedWorkers.decrementAndGet();
                 workersSemaphore.release();
@@ -160,44 +147,15 @@ public class MultiThreadsResultCollector extends ResultCollector {
     private class WorkSubmitter implements Runnable {
         @Override
         public void run() {
-            List<Future> futureList = Lists.newArrayListWithExpectedSize(searchResults.size());
-            int cancelTimes = 0;
-            try {
-                for (final IStreamingSearchResult result : searchResults) {
-                    Future f = scannerThreadPool.submit(new ResultIterateWorker(result));
-                    futureList.add(f);
-                    workersSemaphore.acquire(); // Throw InterruptedException when interrupted
-                }
-                while (notCompletedWorkers.get() > 0) {
-                    Thread.sleep(100);
-                    if (cancelFlag.get() || Thread.currentThread().isInterrupted()) {
-                        break;
-                    }
-                }
-            } catch (InterruptedException inter) {
-                logger.warn("Interrupted", inter);
-            } finally {
-                for (Future f : futureList) {
-                    if (!f.isCancelled() || !f.isDone()) {
-                        if (f.cancel(true)) {
-                            cancelTimes++;
-                        }
-                    }
+            for (final IStreamingSearchResult result : searchResults) {
+                executor.submit(new ResultIterateWorker(result));
+                try {
+                    workersSemaphore.acquire();
+                } catch (InterruptedException e) {
+                    logger.error("interrupted", e);
                 }
             }
-            logger.debug("Finish MultiThreadsResultCollector for queryId {}, cancel {}. Current thread pool: {}.",
-                    queryId, cancelTimes, scannerThreadPool);
         }
     }
 
-    /**
-     * block query if return true
-     */
-    public static boolean isOccupied() {
-        boolean occupied = scannerThreadPool.getActiveCount() >= MAX_RUNNING_THREAD_COUNT;
-        if (occupied) {
-            logger.debug("ThreadPool {}", scannerThreadPool);
-        }
-        return occupied;
-    }
 }
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
index 1d38b90..f89ddec 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
@@ -43,6 +43,9 @@ import org.slf4j.LoggerFactory;
 
 public class StreamingCubeDataSearcher {
     private static Logger logger = LoggerFactory.getLogger(StreamingCubeDataSearcher.class);
+
+    private static int TIMEOUT = Integer.MAX_VALUE;
+
     private StreamingSegmentManager streamingSegmentManager;
     private String cubeName;
     private CubeDesc cubeDesc;
@@ -70,15 +73,7 @@ public class StreamingCubeDataSearcher {
         try {
             logger.info("query-{}: use cuboid {} to serve the query", queryProfile.getQueryId(),
                     searchRequest.getHitCuboid());
-            ResultCollector resultCollector = getResultCollector(searchRequest);
-            if (resultCollector instanceof MultiThreadsResultCollector) {
-                while (MultiThreadsResultCollector.isOccupied() && System.currentTimeMillis() < searchRequest.getDeadline()) {
-                    Thread.sleep(50);
-                }
-                if (System.currentTimeMillis() >= searchRequest.getDeadline()) {
-                    throw new RuntimeException("Timeout for " + queryProfile.getQueryId());
-                }
-            }
+            ResultCollector resultCollector = getResultCollector();
             Collection<StreamingCubeSegment> segments = streamingSegmentManager.getAllSegments();
             StreamingDataQueryPlanner scanRangePlanner = searchRequest.getQueryPlanner();
             for (StreamingCubeSegment queryableSegment : segments) {
@@ -110,10 +105,10 @@ public class StreamingCubeDataSearcher {
         }
     }
 
-    private ResultCollector getResultCollector(StreamingSearchContext searchRequest) {
+    private ResultCollector getResultCollector() {
         int useThreads = cubeDesc.getConfig().getStreamingReceiverUseThreadsPerQuery();
         if (useThreads > 1) {
-            return new MultiThreadsResultCollector(useThreads, searchRequest.getDeadline());
+            return new MultiThreadsResultCollector(useThreads, TIMEOUT);
         } else {
             return new SingleThreadResultCollector();
         }
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
index 307c2be..9da8e5b 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
@@ -45,8 +45,6 @@ public class StreamingSearchContext {
     private long hitCuboid;
     private long basicCuboid;
 
-    private long deadline = Long.MAX_VALUE;
-
     public StreamingSearchContext(CubeDesc cubeDesc, Set<TblColRef> dimensions, Set<TblColRef> groups,
                                   Set<FunctionDesc> metrics, TupleFilter filter, TupleFilter havingFilter) {
         this.cubeDesc = cubeDesc;
@@ -58,7 +56,6 @@ public class StreamingSearchContext {
         this.respResultSchema = new ResponseResultSchema(cubeDesc, dimensions, metrics);
         this.queryPlanner = new StreamingDataQueryPlanner(cubeDesc, filter);
         this.addedGroups = Sets.newHashSet();
-        this.deadline = deadline;
         calculateHitCuboid();
     }
 
@@ -161,12 +158,4 @@ public class StreamingSearchContext {
         sortedSet.addAll(cubeDesc.getMandatoryCuboids());
         return sortedSet;
     }
-
-    public long getDeadline() {
-        return deadline;
-    }
-
-    public void setDeadline(long deadline) {
-        this.deadline =  deadline;
-    }
 }
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
index 51d0cba..45c6307 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.StorageSideBehavior;
@@ -80,7 +79,7 @@ public class DataController extends BasicController {
         }
         StreamingQueryProfile.set(queryProfile);
         logger.info("receive query request queryId:{}", queryId);
-        try (SetThreadName changeName = new SetThreadName("Query %s", queryId)) {
+        try {
             final Stopwatch sw = new Stopwatch();
             sw.start();
             String cubeName = dataRequest.getCubeName();
@@ -106,7 +105,6 @@ public class DataController extends BasicController {
 
             StreamingSearchContext gtSearchRequest = new StreamingSearchContext(cubeDesc, dimensions, groups,
                     metrics, tupleFilter, havingFilter);
-            gtSearchRequest.setDeadline(dataRequest.getDeadline());
             searchResult = dataSearcher.doSearch(gtSearchRequest, minSegmentTime,
                     dataRequest.isAllowStorageAggregation());