You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2019/02/09 11:35:32 UTC

[asterixdb] branch master updated: [ASTERIXDB-2518][RT] Introduce Request Tracker

This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 720d419  [ASTERIXDB-2518][RT] Introduce Request Tracker
720d419 is described below

commit 720d419c0c3162fca0a3553d1b785657985ca306
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Sat Feb 9 05:27:05 2019 +0300

    [ASTERIXDB-2518][RT] Introduce Request Tracker
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - Introduce IReceptionist to generate request references.
    - Track all requests by uuid.
    - Add more information to active_requests response.
    - Replace StatementExecutorContext by RequestTracker.
    - Deprecate StatementExecutorContext (to be removed)
    - Allow extensions to set optional parameters in query service.
    - Return forbidden when a cancellation is attempt on a request
      that is not cancellable.
    
    Change-Id: If08ecd91c55881743b2ecf40a628fa3d4166c554
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3163
    Reviewed-by: Till Westmann <ti...@apache.org>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
---
 asterixdb/asterix-algebra/pom.xml                  |  8 ++
 .../asterix/translator/BaseClientRequest.java      | 62 ++++++++++----
 .../asterix/translator/ClientJobRequest.java       | 58 -------------
 .../apache/asterix/translator/ClientRequest.java   | 84 +++++++++++++++++++
 .../asterix/translator/IRequestParameters.java     | 15 ++++
 .../asterix/translator/IStatementExecutor.java     |  4 +-
 .../translator/IStatementExecutorContext.java      |  3 +-
 .../apache/asterix/translator/Receptionist.java    | 54 ++++++++++++
 .../api/http/ctx/StatementExecutorContext.java     | 51 ------------
 .../api/http/server/AbstractQueryApiServlet.java   |  6 +-
 .../apache/asterix/api/http/server/ApiServlet.java | 10 ++-
 .../http/server/CcQueryCancellationServlet.java    | 13 +--
 .../api/http/server/NCQueryServiceServlet.java     | 15 ++--
 .../api/http/server/QueryServiceServlet.java       | 41 +++++----
 .../asterix/api/http/server/RestApiServlet.java    | 13 +--
 .../apache/asterix/api/java/AsterixJavaClient.java | 15 ++--
 .../asterix/app/message/ActiveRequestsRequest.java |  5 +-
 .../asterix/app/message/CancelQueryRequest.java    | 27 +++---
 .../message/ExecuteStatementRequestMessage.java    | 13 +--
 .../apache/asterix/app/nc/NCAppRuntimeContext.java | 12 ++-
 .../asterix/app/translator/QueryTranslator.java    | 79 ++++++++++--------
 .../asterix/app/translator/RequestParameters.java  | 24 +++++-
 .../asterix/hyracks/bootstrap/CCApplication.java   | 20 ++---
 .../asterix/hyracks/bootstrap/NCApplication.java   |  9 +-
 .../http/servlet/QueryCancellationServletTest.java | 24 ++++--
 .../test/common/CancellationTestExecutor.java      |  2 +-
 .../active_requests.2.pollquery.sqlpp              |  2 +-
 .../misc/active_requests/active_requests.2.regex   |  2 +-
 .../asterix/common/api/IApplicationContext.java    |  2 +
 .../apache/asterix/common/api/IClientRequest.java  | 24 ++++++
 .../asterix/common/api/INcApplicationContext.java  |  4 +-
 .../{IClientRequest.java => IReceptionist.java}    | 29 ++++---
 .../asterix/common/api/IReceptionistFactory.java}  | 16 +++-
 ...{IClientRequest.java => IRequestReference.java} | 24 +++---
 .../apache/asterix/common/api/IRequestTracker.java | 71 ++++++++++++++++
 .../asterix/common/api/RequestReference.java       | 84 +++++++++++++++++++
 .../common/dataflow/ICcApplicationContext.java     |  8 ++
 .../apache/asterix/common/utils/RequestStatus.java |  5 +-
 .../runtime/utils/CcApplicationContext.java        | 21 ++++-
 .../asterix/runtime/utils/RequestTracker.java      | 97 ++++++++++++++++++++++
 .../java/org/apache/hyracks/util/JSONUtil.java     |  8 ++
 41 files changed, 766 insertions(+), 298 deletions(-)

diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml
index 369d93b..ee0bb5e 100644
--- a/asterixdb/asterix-algebra/pom.xml
+++ b/asterixdb/asterix-algebra/pom.xml
@@ -246,5 +246,13 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpcore</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-http</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index ec44d60..50e6cc2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -19,6 +19,8 @@
 package org.apache.asterix.translator;
 
 import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.api.RequestReference;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.om.base.ADateTime;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -27,14 +29,14 @@ import org.apache.hyracks.util.JSONUtil;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public abstract class BaseClientRequest implements IClientRequest {
-    protected final IStatementExecutorContext ctx;
-    protected final long requestTime = System.currentTimeMillis();
-    protected final String contextId;
+
     private boolean complete;
+    private final IRequestReference requestReference;
+    private boolean cancellable = false;
+    protected volatile String state = "received";
 
-    public BaseClientRequest(IStatementExecutorContext ctx, String contextId) {
-        this.ctx = ctx;
-        this.contextId = contextId;
+    public BaseClientRequest(IRequestReference requestReference) {
+        this.requestReference = requestReference;
     }
 
     @Override
@@ -43,7 +45,6 @@ public abstract class BaseClientRequest implements IClientRequest {
             return;
         }
         complete = true;
-        ctx.remove(contextId);
     }
 
     @Override
@@ -52,24 +53,55 @@ public abstract class BaseClientRequest implements IClientRequest {
             return;
         }
         complete();
-        doCancel(appCtx);
+        if (cancellable) {
+            doCancel(appCtx);
+        }
+    }
+
+    @Override
+    public synchronized void markCancellable() {
+        cancellable = true;
+    }
+
+    @Override
+    public String getId() {
+        // the uuid is generated by the node which received the request
+        // so there is a chance this might not be unique now
+        return requestReference.getUuid();
+    }
+
+    @Override
+    public synchronized boolean isCancellable() {
+        return cancellable;
+    }
+
+    public void setRunning() {
+        state = "running";
     }
 
     @Override
     public String toJson() {
-        try {
-            return JSONUtil.convertNode(asJson());
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
+        return JSONUtil.convertNodeOrThrow(asJson());
     }
 
     protected ObjectNode asJson() {
         ObjectNode json = JSONUtil.createObject();
-        json.put("requestTime", new ADateTime(requestTime).toSimpleString());
-        json.put("clientContextID", contextId);
+        json.put("uuid", requestReference.getUuid());
+        json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString());
+        json.put("elapsedTime", getElapsedTime());
+        json.put("node", requestReference.getNode());
+        json.put("state", state);
+        json.put("userAgent", ((RequestReference) requestReference).getUserAgent());
+        json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr());
+        json.put("cancellable", cancellable);
         return json;
     }
 
+    private String getElapsedTime() {
+        // this is just an estimation as the request might have been received on a node with a different system time
+        // TODO add dynamic time unit
+        return System.currentTimeMillis() - requestReference.getTime() + "ms";
+    }
+
     protected abstract void doCancel(ICcApplicationContext appCtx) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
deleted file mode 100644
index 81714ca..0000000
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.translator;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.util.JSONUtil;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-public class ClientJobRequest extends BaseClientRequest {
-    private final JobId jobId;
-
-    public ClientJobRequest(IStatementExecutorContext ctx, String clientCtxId, JobId jobId) {
-        super(ctx, clientCtxId);
-        this.jobId = jobId;
-    }
-
-    @Override
-    protected void doCancel(ICcApplicationContext appCtx) throws HyracksDataException {
-        IHyracksClientConnection hcc = appCtx.getHcc();
-        try {
-            hcc.cancelJob(jobId);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-        ctx.remove(contextId);
-    }
-
-    @Override
-    public String toJson() {
-        final ObjectNode jsonNode = super.asJson();
-        jsonNode.put("jobId", jobId.toString());
-        try {
-            return JSONUtil.convertNode(jsonNode);
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
new file mode 100644
index 0000000..014bf3c
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class ClientRequest extends BaseClientRequest {
+
+    protected String statement;
+    protected JobId jobId;
+    protected Thread executor;
+    protected String clientContextId;
+
+    public ClientRequest(IRequestReference requestReference, String clientContextId, String statement,
+            Map<String, String> optionalParameters) {
+        super(requestReference);
+        this.clientContextId = clientContextId;
+        this.statement = statement;
+        this.executor = Thread.currentThread();
+    }
+
+    @Override
+    public String getClientContextId() {
+        return clientContextId;
+    }
+
+    public synchronized void setJobId(JobId jobId) {
+        this.jobId = jobId;
+        setRunning();
+    }
+
+    public Thread getExecutor() {
+        return executor;
+    }
+
+    @Override
+    protected void doCancel(ICcApplicationContext appCtx) throws HyracksDataException {
+        // if the request has a job, we abort the job and do not interrupt the thread as it will be notified
+        // that the job has been cancelled. Otherwise, we interrupt the thread
+        if (jobId != null) {
+            IHyracksClientConnection hcc = appCtx.getHcc();
+            try {
+                hcc.cancelJob(jobId);
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
+        } else if (executor != null) {
+            executor.interrupt();
+        }
+    }
+
+    @Override
+    protected ObjectNode asJson() {
+        ObjectNode json = super.asJson();
+        json.put("jobId", jobId.toString());
+        json.put("statement", statement);
+        json.put("clientContextID", clientContextId);
+        return json;
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 58f0997..86ba301 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -20,6 +20,7 @@ package org.apache.asterix.translator;
 
 import java.util.Map;
 
+import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.api.result.IResultSet;
@@ -67,4 +68,18 @@ public interface IRequestParameters {
      * @return true if the request accepts multiple statements. Otherwise, false.
      */
     boolean isMultiStatement();
+
+    /**
+     * Gets the statement the client provided with the request
+     *
+     * @return the request statement
+     */
+    String getStatement();
+
+    /**
+     * The request reference of this {@link IRequestParameters}
+     *
+     * @return the request reference
+     */
+    IRequestReference getRequestReference();
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 9bc86da..93eed8a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -110,12 +110,10 @@ public interface IStatementExecutor {
      * Compiles and executes a list of statements
      *
      * @param hcc
-     * @param ctx
      * @param requestParameters
      * @throws Exception
      */
-    void compileAndExecute(IHyracksClientConnection hcc, IStatementExecutorContext ctx,
-            IRequestParameters requestParameters) throws Exception;
+    void compileAndExecute(IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception;
 
     /**
      * rewrites and compiles query into a hyracks job specifications
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
index 29e7bda..9648036 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
@@ -24,8 +24,9 @@ import java.util.Map;
 import org.apache.asterix.common.api.IClientRequest;
 
 /**
- * The context for statement executors. Maintains ongoing user requests.
+ * @deprecated (use IRequestTracker)
  */
+@Deprecated
 public interface IStatementExecutorContext {
 
     /**
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
new file mode 100644
index 0000000..8d06143
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.IReceptionist;
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.api.RequestReference;
+import org.apache.http.HttpHeaders;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.http.api.IServletRequest;
+
+public class Receptionist implements IReceptionist {
+
+    private final String node;
+
+    public Receptionist(String node) {
+        this.node = node;
+    }
+
+    @Override
+    public IRequestReference welcome(IServletRequest request) {
+        final String uuid = UUID.randomUUID().toString();
+        final RequestReference ref = RequestReference.of(uuid, node, System.currentTimeMillis());
+        ref.setUserAgent(request.getHeader(HttpHeaders.USER_AGENT));
+        //TODO set remote address
+        return ref;
+    }
+
+    @Override
+    public IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement,
+            Map<String, String> optionalParameters) throws HyracksDataException {
+        return new ClientRequest(requestRef, clientContextId, statement, optionalParameters);
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
deleted file mode 100644
index 136fda7..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.api.http.ctx;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.asterix.common.api.IClientRequest;
-import org.apache.asterix.translator.IStatementExecutorContext;
-
-public class StatementExecutorContext implements IStatementExecutorContext {
-
-    private final Map<String, IClientRequest> runningQueries = new ConcurrentHashMap<>();
-
-    @Override
-    public IClientRequest get(String clientContextId) {
-        return runningQueries.get(clientContextId);
-    }
-
-    @Override
-    public void put(String clientContextId, IClientRequest req) {
-        runningQueries.put(clientContextId, req);
-    }
-
-    @Override
-    public IClientRequest remove(String clientContextId) {
-        return runningQueries.remove(clientContextId);
-    }
-
-    @Override
-    public Map<String, IClientRequest> getRunningRequests() {
-        return Collections.unmodifiableMap(runningQueries);
-    }
-}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index 9844900..b23fa1e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -120,10 +120,8 @@ public class AbstractQueryApiServlet extends AbstractServlet {
         return hcc;
     }
 
-    protected static UUID printRequestId(PrintWriter pw) {
-        UUID requestId = UUID.randomUUID();
-        ResultUtil.printField(pw, ResultFields.REQUEST_ID.str(), requestId.toString());
-        return requestId;
+    protected static void printRequestId(PrintWriter pw, String requestId) {
+        ResultUtil.printField(pw, ResultFields.REQUEST_ID.str(), requestId);
     }
 
     protected static void printHandle(PrintWriter pw, String handle, boolean comma) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index d206336..ee3eb9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap;
 import javax.imageio.ImageIO;
 
 import org.apache.asterix.app.translator.RequestParameters;
+import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -89,6 +90,7 @@ public class ApiServlet extends AbstractServlet {
 
     @Override
     protected void post(IServletRequest request, IServletResponse response) {
+        final IRequestReference requestReference = appCtx.getReceptionist().welcome(request);
         // Query language
         ILangCompilationProvider compilationProvider = "AQL".equals(request.getParameter("query-language"))
                 ? aqlCompilationProvider : sqlppCompilationProvider;
@@ -149,10 +151,10 @@ public class ApiServlet extends AbstractServlet {
                     compilationProvider, componentProvider);
             double duration;
             long startTime = System.currentTimeMillis();
-            final IRequestParameters requestParameters =
-                    new RequestParameters(resultSet, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
-                            new IStatementExecutor.Stats(), null, null, null, null, true);
-            translator.compileAndExecute(hcc, null, requestParameters);
+            final IRequestParameters requestParameters = new RequestParameters(requestReference, query, resultSet,
+                    new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), new IStatementExecutor.Stats(),
+                    null, null, null, null, true);
+            translator.compileAndExecute(hcc, requestParameters);
             long endTime = System.currentTimeMillis();
             duration = (endTime - startTime) / 1000.00;
             out.println(HTML_STATEMENT_SEPARATOR);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
index 5f5692d..d260a0b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
@@ -23,8 +23,8 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.asterix.api.http.server.QueryServiceServlet.Parameter;
 import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.AbstractServlet;
@@ -54,17 +54,20 @@ public class CcQueryCancellationServlet extends AbstractServlet {
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             return;
         }
-        IStatementExecutorContext executorCtx =
-                (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
-        IClientRequest req = executorCtx.get(clientContextId);
+        final IRequestTracker requestTracker = appCtx.getRequestTracker();
+        final IClientRequest req = requestTracker.getByClientContextId(clientContextId);
         if (req == null) {
             // response: NOT FOUND
             response.setStatus(HttpResponseStatus.NOT_FOUND);
             return;
         }
+        if (!req.isCancellable()) {
+            response.setStatus(HttpResponseStatus.FORBIDDEN);
+            return;
+        }
         try {
             // Cancels the on-going job.
-            req.cancel(appCtx);
+            requestTracker.cancel(req.getId());
             // response: OK
             response.setStatus(HttpResponseStatus.OK);
         } catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 362f924..55f3369 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -20,7 +20,6 @@
 package org.apache.asterix.api.http.server;
 
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -33,6 +32,7 @@ import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.Duration;
 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.ExceptionUtils;
@@ -69,10 +69,10 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
     }
 
     @Override
-    protected void executeStatement(String statementsText, SessionOutput sessionOutput,
-            ResultProperties resultProperties, IStatementExecutor.Stats stats, QueryServiceRequestParameters param,
-            RequestExecutionState execution, Map<String, String> optionalParameters,
-            Map<String, byte[]> statementParameters) throws Exception {
+    protected void executeStatement(IRequestReference requestReference, String statementsText,
+            SessionOutput sessionOutput, ResultProperties resultProperties, IStatementExecutor.Stats stats,
+            QueryServiceRequestParameters param, RequestExecutionState execution,
+            Map<String, String> optionalParameters, Map<String, byte[]> statementParameters) throws Exception {
         // Running on NC -> send 'execute' message to CC
         INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
         INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
@@ -81,9 +81,6 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
         MessageFuture responseFuture = ncMb.registerMessageFuture();
         final String handleUrl = getHandleUrl(param.getHost(), param.getPath(), delivery);
         try {
-            if (param.getClientContextID() == null) {
-                param.setClientContextID(UUID.randomUUID().toString());
-            }
             long timeout = ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
             if (param.getTimeout() != null && !param.getTimeout().trim().isEmpty()) {
                 timeout = TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.getTimeout()));
@@ -91,7 +88,7 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
             ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(),
                     responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(),
                     resultProperties.getNcToCcResultProperties(), param.getClientContextID(), handleUrl,
-                    optionalParameters, statementParameters, param.isMultiStatement());
+                    optionalParameters, statementParameters, param.isMultiStatement(), requestReference);
             execution.start();
             ncMb.sendMessageToPrimaryCC(requestMsg);
             try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 99df372..a34f6b4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -43,6 +43,8 @@ import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.common.api.Duration;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IReceptionist;
+import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -64,7 +66,6 @@ import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;
@@ -93,7 +94,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
     private final ILangCompilationProvider compilationProvider;
     private final IStatementExecutorFactory statementExecutorFactory;
     private final IStorageComponentProvider componentProvider;
-    private final IStatementExecutorContext queryCtx;
+    private final IReceptionist receptionist;
     protected final IServiceContext serviceCtx;
     protected final Function<IServletRequest, Map<String, String>> optionalParamProvider;
     protected String hostName;
@@ -107,7 +108,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         this.compilationProvider = compilationProvider;
         this.statementExecutorFactory = statementExecutorFactory;
         this.componentProvider = componentProvider;
-        this.queryCtx = (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
+        receptionist = appCtx.getReceptionist();
         this.serviceCtx = (IServiceContext) ctx.get(ServletConstants.SERVICE_CONTEXT_ATTR);
         this.optionalParamProvider = optionalParamProvider;
         try {
@@ -345,7 +346,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         pw.print("\t}\n");
     }
 
-    private String getOptText(JsonNode node, String fieldName) {
+    protected String getOptText(JsonNode node, String fieldName) {
         final JsonNode value = node.get(fieldName);
         return value != null ? value.asText() : null;
     }
@@ -397,7 +398,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         String contentType = HttpUtil.getContentTypeOnly(request);
         if (HttpUtil.ContentType.APPLICATION_JSON.equals(contentType)) {
             try {
-                setParamFromJSON(request, param);
+                setParamFromJSON(request, param, optionalParams);
             } catch (JsonParseException | JsonMappingException e) {
                 // if the JSON parsing fails, the statement is empty and we get an empty statement error
                 GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, e.getMessage(), e);
@@ -407,7 +408,8 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         }
     }
 
-    private void setParamFromJSON(IServletRequest request, QueryServiceRequestParameters param) throws IOException {
+    private void setParamFromJSON(IServletRequest request, QueryServiceRequestParameters param,
+            Map<String, String> optionalParameters) throws IOException {
         JsonNode jsonRequest = OBJECT_MAPPER.readTree(HttpUtil.getRequestBody(request));
         param.setFormat(toLower(getOptText(jsonRequest, Parameter.FORMAT.str())));
         param.setPretty(getOptBoolean(jsonRequest, Parameter.PRETTY.str(), false));
@@ -430,6 +432,11 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         if (jsonRequest.has(statementParam)) {
             param.setStatement(jsonRequest.get(statementParam).asText());
         }
+        setJsonOptionalParameters(jsonRequest, optionalParameters);
+    }
+
+    protected void setJsonOptionalParameters(JsonNode jsonRequest, Map<String, String> optionalParameters) {
+        // allows extensions to set extra parameters
     }
 
     private void setParamFromRequest(IServletRequest request, QueryServiceRequestParameters param) throws IOException {
@@ -503,6 +510,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
     }
 
     private void handleRequest(IServletRequest request, IServletResponse response) {
+        final IRequestReference requestRef = receptionist.welcome(request);
         long elapsedStart = System.nanoTime();
         long errorCount = 1;
         Stats stats = new Stats();
@@ -527,7 +535,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
             final ResultProperties resultProperties = param.getMaxResultReads() == null ? new ResultProperties(delivery)
                     : new ResultProperties(delivery, Long.parseLong(param.getMaxResultReads()));
             printAdditionalResultFields(sessionOutput.out());
-            printRequestId(sessionOutput.out());
+            printRequestId(sessionOutput.out(), requestRef.getUuid());
             printClientContextID(sessionOutput.out(), param);
             if (!param.isParseOnly()) {
                 printSignature(sessionOutput.out(), param);
@@ -544,10 +552,9 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
             } else {
                 Map<String, byte[]> statementParams = org.apache.asterix.app.translator.RequestParameters
                         .serializeParameterValues(param.getStatementParams());
-
                 setAccessControlHeaders(request, response);
                 response.setStatus(execution.getHttpStatus());
-                executeStatement(statementsText, sessionOutput, resultProperties, stats, param, execution,
+                executeStatement(requestRef, statementsText, sessionOutput, resultProperties, stats, param, execution,
                         optionalParams, statementParams);
                 if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) {
                     ResultUtil.printStatus(sessionOutput, execution.getResultStatus());
@@ -594,10 +601,10 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         return parseOnlyResult;
     }
 
-    protected void executeStatement(String statementsText, SessionOutput sessionOutput,
-            ResultProperties resultProperties, Stats stats, QueryServiceRequestParameters param,
-            RequestExecutionState execution, Map<String, String> optionalParameters,
-            Map<String, byte[]> statementParameters) throws Exception {
+    protected void executeStatement(IRequestReference requestReference, String statementsText,
+            SessionOutput sessionOutput, ResultProperties resultProperties, Stats stats,
+            QueryServiceRequestParameters param, RequestExecutionState execution,
+            Map<String, String> optionalParameters, Map<String, byte[]> statementParameters) throws Exception {
         IClusterManagementWork.ClusterState clusterState =
                 ((ICcApplicationContext) appCtx).getClusterStateManager().getState();
         if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
@@ -612,10 +619,10 @@ public class QueryServiceServlet extends AbstractQueryApiServlet {
         execution.start();
         Map<String, IAObject> stmtParams =
                 org.apache.asterix.app.translator.RequestParameters.deserializeParameterValues(statementParameters);
-        IRequestParameters requestParameters =
-                new org.apache.asterix.app.translator.RequestParameters(getResultSet(), resultProperties, stats, null,
-                        param.getClientContextID(), optionalParameters, stmtParams, param.isMultiStatement());
-        translator.compileAndExecute(getHyracksClientConnection(), queryCtx, requestParameters);
+        IRequestParameters requestParameters = new org.apache.asterix.app.translator.RequestParameters(requestReference,
+                statementsText, getResultSet(), resultProperties, stats, null, param.getClientContextID(),
+                optionalParameters, stmtParams, param.isMultiStatement());
+        translator.compileAndExecute(getHyracksClientConnection(), requestParameters);
         execution.end();
         printExecutionPlans(sessionOutput, translator.getExecutionPlans());
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 0aa2211..520f85a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.app.translator.RequestParameters;
+import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -176,16 +177,16 @@ public abstract class RestApiServlet extends AbstractServlet {
             response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
             SessionOutput sessionOutput = initResponse(request, response);
             QueryTranslator.ResultDelivery resultDelivery = whichResultDelivery(request);
-            doHandle(response, query, sessionOutput, resultDelivery);
+            final IRequestReference requestReference = appCtx.getReceptionist().welcome(request);
+            doHandle(requestReference, response, query, sessionOutput, resultDelivery);
         } catch (Exception e) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
             LOGGER.log(Level.WARN, "Failure handling request", e);
-            return;
         }
     }
 
-    private void doHandle(IServletResponse response, String query, SessionOutput sessionOutput,
-            ResultDelivery resultDelivery) throws JsonProcessingException {
+    private void doHandle(IRequestReference requestReference, IServletResponse response, String query,
+            SessionOutput sessionOutput, ResultDelivery resultDelivery) throws JsonProcessingException {
         try {
             response.setStatus(HttpResponseStatus.OK);
             IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
@@ -196,9 +197,9 @@ public abstract class RestApiServlet extends AbstractServlet {
             IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, sessionOutput,
                     compilationProvider, componentProvider);
             final IResultSet resultSet = ServletUtil.getResultSet(hcc, appCtx, ctx);
-            final IRequestParameters requestParameters = new RequestParameters(resultSet,
+            final IRequestParameters requestParameters = new RequestParameters(requestReference, query, resultSet,
                     new ResultProperties(resultDelivery), new IStatementExecutor.Stats(), null, null, null, null, true);
-            translator.compileAndExecute(hcc, null, requestParameters);
+            translator.compileAndExecute(hcc, requestParameters);
         } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
             GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, pe.getMessage(), pe);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index 71b4b81..a5c8645 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -22,9 +22,11 @@ import java.io.PrintWriter;
 import java.io.Reader;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.asterix.api.common.APIFramework;
 import org.apache.asterix.app.translator.RequestParameters;
+import org.apache.asterix.common.api.RequestReference;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.utils.Job;
@@ -113,7 +115,8 @@ public class AsterixJavaClient {
         while ((ch = queryText.read()) != -1) {
             builder.append((char) ch);
         }
-        IParser parser = parserFactory.createParser(builder.toString());
+        String statement = builder.toString();
+        IParser parser = parserFactory.createParser(statement);
         List<Statement> statements = parser.parse();
         MetadataManager.INSTANCE.init();
 
@@ -126,10 +129,12 @@ public class AsterixJavaClient {
 
         IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
                 storageComponentProvider);
-        final IRequestParameters requestParameters =
-                new RequestParameters(null, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
-                        new IStatementExecutor.Stats(), null, null, null, statementParams, true);
-        translator.compileAndExecute(hcc, null, requestParameters);
+        final RequestReference requestReference =
+                RequestReference.of(UUID.randomUUID().toString(), "CC", System.currentTimeMillis());
+        final IRequestParameters requestParameters = new RequestParameters(requestReference, statement, null,
+                new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), new IStatementExecutor.Stats(), null,
+                null, null, statementParams, true);
+        translator.compileAndExecute(hcc, requestParameters);
         writer.flush();
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
index 9d15131..3ae18a7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
@@ -46,10 +46,7 @@ public class ActiveRequestsRequest implements ICcAddressedMessage {
 
     @Override
     public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
-        CCApplication application = (CCApplication) ccs.getApplication();
-        IStatementExecutorContext executorsCtx = application.getStatementExecutorContext();
-        final Collection<IClientRequest> runningRequests = executorsCtx.getRunningRequests().values();
+        final Collection<IClientRequest> runningRequests = appCtx.getRequestTracker().getRunningRequests();
         final String[] requests = runningRequests.stream().map(IClientRequest::toJson).toArray(String[]::new);
         ActiveRequestsResponse response = new ActiveRequestsResponse(reqId, requests);
         CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index 943aad3..e4cef7f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -19,14 +19,12 @@
 package org.apache.asterix.app.message;
 
 import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.utils.RequestStatus;
-import org.apache.asterix.hyracks.bootstrap.CCApplication;
 import org.apache.asterix.messaging.CCMessageBroker;
-import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -47,23 +45,24 @@ public class CancelQueryRequest implements ICcAddressedMessage {
 
     @Override
     public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
-        ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
-        CCApplication application = (CCApplication) ccs.getApplication();
-        IStatementExecutorContext executorsCtx = application.getStatementExecutorContext();
-        IClientRequest req = executorsCtx.get(contextId);
+        final IRequestTracker requestTracker = appCtx.getRequestTracker();
+        IClientRequest req = requestTracker.getByClientContextId(contextId);
         RequestStatus status;
 
         if (req == null) {
             LOGGER.log(Level.WARN, "No job found for context id " + contextId);
             status = RequestStatus.NOT_FOUND;
         } else {
-            try {
-                req.cancel(appCtx);
-                executorsCtx.remove(contextId);
-                status = RequestStatus.SUCCESS;
-            } catch (Exception e) {
-                LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e);
-                status = RequestStatus.FAILED;
+            if (!req.isCancellable()) {
+                status = RequestStatus.REJECTED;
+            } else {
+                try {
+                    requestTracker.cancel(req.getId());
+                    status = RequestStatus.SUCCESS;
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e);
+                    status = RequestStatus.FAILED;
+                }
             }
         }
         CancelQueryResponse response = new CancelQueryResponse(reqId, status);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 94d63a4..fdc4432 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -30,6 +30,7 @@ import org.apache.asterix.api.http.server.ResultUtil;
 import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.app.translator.RequestParameters;
 import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -80,11 +81,12 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage
     private final Map<String, String> optionalParameters;
     private final Map<String, byte[]> statementParameters;
     private final boolean multiStatement;
+    private final IRequestReference requestReference;
 
     public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
             String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties,
             String clientContextID, String handleUrl, Map<String, String> optionalParameters,
-            Map<String, byte[]> statementParameters, boolean multiStatement) {
+            Map<String, byte[]> statementParameters, boolean multiStatement, IRequestReference requestReference) {
         this.requestNodeId = requestNodeId;
         this.requestMessageId = requestMessageId;
         this.lang = lang;
@@ -96,6 +98,7 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage
         this.optionalParameters = optionalParameters;
         this.statementParameters = statementParameters;
         this.multiStatement = multiStatement;
+        this.requestReference = requestReference;
     }
 
     @Override
@@ -113,7 +116,6 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage
         ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang);
         IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider();
         IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory();
-        IStatementExecutorContext statementExecutorContext = ccApp.getStatementExecutorContext();
         ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
         try {
             IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
@@ -132,9 +134,10 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage
                     compilationProvider, storageComponentProvider);
             final IStatementExecutor.Stats stats = new IStatementExecutor.Stats();
             Map<String, IAObject> stmtParams = RequestParameters.deserializeParameterValues(statementParameters);
-            final IRequestParameters requestParameters = new RequestParameters(null, resultProperties, stats,
-                    outMetadata, clientContextID, optionalParameters, stmtParams, multiStatement);
-            translator.compileAndExecute(ccApp.getHcc(), statementExecutorContext, requestParameters);
+            final IRequestParameters requestParameters =
+                    new RequestParameters(requestReference, statementsText, null, resultProperties, stats, outMetadata,
+                            clientContextID, optionalParameters, stmtParams, multiStatement);
+            translator.compileAndExecute(ccApp.getHcc(), requestParameters);
             outPrinter.close();
             responseMsg.setResult(outWriter.toString());
             responseMsg.setMetadata(outMetadata);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 3e72b7d..724691c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -36,6 +36,8 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.IDatasetMemoryManager;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.api.IPropertiesFactory;
+import org.apache.asterix.common.api.IReceptionist;
+import org.apache.asterix.common.api.IReceptionistFactory;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.AsterixExtension;
@@ -146,6 +148,7 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     private IHyracksClientConnection hcc;
     private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
     private IReplicaManager replicaManager;
+    private IReceptionist receptionist;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions,
             IPropertiesFactory propertiesFactory) throws AsterixException, InstantiationException,
@@ -175,7 +178,8 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     }
 
     @Override
-    public void initialize(IRecoveryManagerFactory recoveryManagerFactory, boolean initialRun) throws IOException {
+    public void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory,
+            boolean initialRun) throws IOException {
         ioManager = getServiceContext().getIoManager();
         threadExecutor =
                 MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
@@ -215,6 +219,7 @@ public class NCAppRuntimeContext implements INcApplicationContext {
         activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
                 activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(),
                 this.ncServiceContext);
+        receptionist = receptionistFactory.create();
 
         if (replicationProperties.isReplicationEnabled()) {
             replicationManager = new ReplicationManager(this, replicationProperties);
@@ -533,4 +538,9 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     public IPersistedResourceRegistry getPersistedResourceRegistry() {
         return persistedResourceRegistry;
     }
+
+    @Override
+    public IReceptionist getReceptionist() {
+        return receptionist;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b0f3287..2a1ed4f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -52,7 +52,9 @@ import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.active.FeedEventsListener;
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -150,7 +152,7 @@ import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeSignature;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
-import org.apache.asterix.translator.ClientJobRequest;
+import org.apache.asterix.translator.ClientRequest;
 import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
@@ -160,7 +162,6 @@ import org.apache.asterix.translator.ExecutionPlans;
 import org.apache.asterix.translator.ExecutionPlansHtmlPrintUtil;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.NoOpStatementExecutorContext;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
@@ -261,30 +262,24 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
     }
 
     @Override
-    public void compileAndExecute(IHyracksClientConnection hcc, IStatementExecutorContext ctx,
-            IRequestParameters requestParameters) throws Exception {
+    public void compileAndExecute(IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         if (!requestParameters.isMultiStatement()) {
             validateStatements(statements);
         }
+        trackRequest(requestParameters);
         int resultSetIdCounter = 0;
         FileSplit outputFile = null;
         IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
         IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
-        /*
-         * Since the system runs a large number of threads, when HTTP requests don't
-         * return, it becomes difficult to find the thread running the request to
-         * determine where it has stopped. Setting the thread name helps make that
-         * easier
-         */
         String threadName = Thread.currentThread().getName();
-        Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
+        Thread.currentThread().setName(
+                QueryTranslator.class.getSimpleName() + ":" + requestParameters.getRequestReference().getUuid());
         Map<String, String> config = new HashMap<>();
         final IResultSet resultSet = requestParameters.getResultSet();
         final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
         final long maxResultReads = requestParameters.getResultProperties().getMaxReads();
         final Stats stats = requestParameters.getStats();
         final ResultMetadata outMetadata = requestParameters.getOutMetadata();
-        final String clientContextId = requestParameters.getClientContextId();
         final Map<String, IAObject> stmtParams = requestParameters.getStatementParameters();
         try {
             for (Statement stmt : statements) {
@@ -354,7 +349,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                             metadataProvider.setMaxResultReads(maxResultReads);
                         }
                         handleInsertUpsertStatement(metadataProvider, stmt, hcc, resultSet, resultDelivery, outMetadata,
-                                stats, false, clientContextId, stmtParams, stmtRewriter);
+                                stats, false, requestParameters, stmtParams, stmtRewriter);
                         break;
                     case DELETE:
                         handleDeleteStatement(metadataProvider, stmt, hcc, false, stmtParams, stmtRewriter);
@@ -389,7 +384,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                                 resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
                         metadataProvider.setMaxResultReads(maxResultReads);
                         handleQuery(metadataProvider, (Query) stmt, hcc, resultSet, resultDelivery, outMetadata, stats,
-                                clientContextId, ctx, stmtParams, stmtRewriter);
+                                requestParameters, stmtParams, stmtRewriter);
                         break;
                     case COMPACT:
                         handleCompactStatement(metadataProvider, stmt, hcc);
@@ -406,8 +401,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                         // No op
                         break;
                     case EXTENSION:
+                        //TODO remove deprecated statement executor context
                         ((ExtensionStatement) stmt).handle(hcc, this, requestParameters, metadataProvider,
-                                resultSetIdCounter, ctx);
+                                resultSetIdCounter, NoOpStatementExecutorContext.INSTANCE);
                         break;
                     default:
                         throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, stmt.getSourceLocation(),
@@ -415,6 +411,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 }
             }
         } finally {
+            // async queries are completed after their job completes
+            if (ResultDelivery.ASYNC != resultDelivery) {
+                appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid());
+            }
             Thread.currentThread().setName(threadName);
         }
     }
@@ -1856,7 +1856,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
 
     public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats, boolean compileOnly, String clientContextId,
+            ResultMetadata outMetadata, Stats stats, boolean compileOnly, IRequestParameters requestParameters,
             Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception {
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
         String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName());
@@ -1901,7 +1901,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
 
         if (stmtInsertUpsert.getReturnExpression() != null) {
             deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
-                    clientContextId, NoOpStatementExecutorContext.INSTANCE);
+                    requestParameters, false);
         } else {
             locker.lock();
             try {
@@ -2454,8 +2454,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
 
     protected void handleQuery(MetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
             IResultSet resultSet, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats,
-            String clientContextId, IStatementExecutorContext ctx, Map<String, IAObject> stmtParams,
-            IStatementRewriter stmtRewriter) throws Exception {
+            IRequestParameters requestParameters, Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter)
+            throws Exception {
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() {
@@ -2488,19 +2488,19 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             }
         };
         deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
-                clientContextId, ctx);
+                requestParameters, true);
     }
 
     private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler,
             MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx)
+            ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable)
             throws Exception {
         final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
             case ASYNC:
                 MutableBoolean printed = new MutableBoolean(false);
                 executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery,
-                        clientContextId, ctx, resultSetId, printed));
+                        requestParameters, cancellable, resultSetId, printed));
                 synchronized (printed) {
                     while (!printed.booleanValue()) {
                         printed.wait();
@@ -2515,7 +2515,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     sessionOutput.release();
                     ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
                             metadataProvider.findOutputRecordType());
-                }, clientContextId, ctx);
+                }, requestParameters, cancellable, appCtx);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
@@ -2525,7 +2525,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                         outMetadata.getResultSets()
                                 .add(Triple.of(id, resultSetId, metadataProvider.findOutputRecordType()));
                     }
-                }, clientContextId, ctx);
+                }, requestParameters, cancellable, appCtx);
                 break;
             default:
                 break;
@@ -2552,7 +2552,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
     }
 
     private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker,
-            ResultDelivery resultDelivery, String clientContextId, IStatementExecutorContext ctx,
+            ResultDelivery resultDelivery, IRequestParameters requestParameters, boolean cancellable,
             ResultSetId resultSetId, MutableBoolean printed) {
         Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
         try {
@@ -2564,7 +2564,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     printed.setTrue();
                     printed.notify();
                 }
-            }, clientContextId, ctx);
+            }, requestParameters, cancellable, appCtx);
         } catch (Exception e) {
             if (Objects.equals(JobId.INVALID, jobId.getValue())) {
                 // compilation failed
@@ -2595,8 +2595,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
 
     private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
             IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
-            String clientContextId, IStatementExecutorContext ctx) throws Exception {
-        ClientJobRequest req = null;
+            IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx) throws Exception {
+        final IRequestTracker requestTracker = appCtx.getRequestTracker();
+        final ClientRequest clientRequest =
+                (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
         locker.lock();
         try {
             final JobSpecification jobSpec = compiler.compile();
@@ -2604,9 +2606,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 return;
             }
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
-            if (ctx != null && clientContextId != null) {
-                req = new ClientJobRequest(ctx, clientContextId, jobId);
-                ctx.put(clientContextId, req); // Adds the running job into the context.
+            clientRequest.setJobId(jobId);
+            if (cancellable) {
+                clientRequest.markCancellable();
             }
             if (jId != null) {
                 jId.setValue(jobId);
@@ -2619,11 +2621,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 printer.print(jobId);
             }
         } finally {
-            locker.unlock();
-            // No matter the job succeeds or fails, removes it into the context.
-            if (req != null) {
-                req.complete();
+            // complete async jobs after their job completes
+            if (ResultDelivery.ASYNC == resultDelivery) {
+                requestTracker.complete(clientRequest.getId());
             }
+            locker.unlock();
         }
     }
 
@@ -2941,6 +2943,13 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
+    protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
+        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(
+                requestParameters.getRequestReference(), requestParameters.getClientContextId(),
+                requestParameters.getStatement(), requestParameters.getOptionalParameters());
+        appCtx.getRequestTracker().track(clientRequest);
+    }
+
     public static void validateStatements(List<Statement> statements) throws CompilationException {
         if (statements.stream().filter(QueryTranslator::isNotAllowedMultiStatement).count() > 1) {
             throw new CompilationException(ErrorCode.UNSUPPORTED_MULTIPLE_STATEMENTS);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
index d0adcda..eda8a4a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.external.parser.JSONDataParser;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.IAObject;
@@ -41,6 +42,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 
 public class RequestParameters implements IRequestParameters {
 
+    private final IRequestReference requestReference;
     private final IResultSet resultSet;
     private final ResultProperties resultProperties;
     private final Stats stats;
@@ -49,10 +51,14 @@ public class RequestParameters implements IRequestParameters {
     private final String clientContextId;
     private final Map<String, IAObject> statementParameters;
     private final boolean multiStatement;
-
-    public RequestParameters(IResultSet resultSet, ResultProperties resultProperties, Stats stats,
-            IStatementExecutor.ResultMetadata outMetadata, String clientContextId,
-            Map<String, String> optionalParameters, Map<String, IAObject> statementParameters, boolean multiStatement) {
+    private final String statement;
+
+    public RequestParameters(IRequestReference requestReference, String statement, IResultSet resultSet,
+            ResultProperties resultProperties, Stats stats, IStatementExecutor.ResultMetadata outMetadata,
+            String clientContextId, Map<String, String> optionalParameters, Map<String, IAObject> statementParameters,
+            boolean multiStatement) {
+        this.requestReference = requestReference;
+        this.statement = statement;
         this.resultSet = resultSet;
         this.resultProperties = resultProperties;
         this.stats = stats;
@@ -103,6 +109,16 @@ public class RequestParameters implements IRequestParameters {
         return statementParameters;
     }
 
+    @Override
+    public String getStatement() {
+        return statement;
+    }
+
+    @Override
+    public IRequestReference getRequestReference() {
+        return requestReference;
+    }
+
     public static Map<String, byte[]> serializeParameterValues(Map<String, JsonNode> inParams)
             throws HyracksDataException {
         if (inParams == null || inParams.isEmpty()) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 24a1463..ce98a03 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -33,7 +33,6 @@ import java.util.ServiceLoader;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.asterix.api.http.IQueryWebServerRegistrant;
-import org.apache.asterix.api.http.ctx.StatementExecutorContext;
 import org.apache.asterix.api.http.server.ActiveStatsApiServlet;
 import org.apache.asterix.api.http.server.ApiServlet;
 import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
@@ -55,6 +54,8 @@ import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.app.replication.NcLifecycleCoordinator;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.INodeJobTracker;
+import org.apache.asterix.common.api.IReceptionistFactory;
+import org.apache.asterix.translator.Receptionist;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.ExtensionProperties;
@@ -77,7 +78,6 @@ import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
 import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.runtime.job.resource.JobCapacityController;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.util.MetadataBuiltinFunctions;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -110,7 +110,6 @@ public class CCApplication extends BaseCCApplication {
     protected ICCServiceContext ccServiceCtx;
     protected CCExtensionManager ccExtensionManager;
     protected IStorageComponentProvider componentProvider;
-    protected StatementExecutorContext statementExecutorCtx;
     protected WebManager webManager;
     protected ICcApplicationContext appCtx;
     private IJobCapacityController jobCapacityController;
@@ -154,8 +153,8 @@ public class CCApplication extends BaseCCApplication {
         extensions.addAll(getExtensions());
         ccExtensionManager = new CCExtensionManager(extensions);
         IGlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();
-        statementExecutorCtx = new StatementExecutorContext();
-        appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator);
+        appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator,
+                () -> new Receptionist("CC"));
         appCtx.setExtensionManager(ccExtensionManager);
         final CCConfig ccConfig = controllerService.getCCConfig();
         if (System.getProperty("java.rmi.server.hostname") == null) {
@@ -182,11 +181,11 @@ public class CCApplication extends BaseCCApplication {
     }
 
     protected ICcApplicationContext createApplicationContext(ILibraryManager libraryManager,
-            IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator)
-            throws AlgebricksException, IOException {
+            IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator,
+            IReceptionistFactory receptionistFactory) throws AlgebricksException, IOException {
         return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE,
                 globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider,
-                new MetadataLockManager());
+                new MetadataLockManager(), receptionistFactory);
     }
 
     protected IGlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
@@ -243,7 +242,6 @@ public class CCApplication extends BaseCCApplication {
         jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
         jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
                 ccServiceCtx.getControllerService().getExecutor());
-        jsonAPIServer.setAttribute(ServletConstants.RUNNING_QUERIES_ATTR, statementExecutorCtx);
         jsonAPIServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ccServiceCtx);
 
         // Other APIs.
@@ -322,10 +320,6 @@ public class CCApplication extends BaseCCApplication {
         return ccExtensionManager.getStatementExecutorFactory(ccServiceCtx.getControllerService().getExecutor());
     }
 
-    public IStatementExecutorContext getStatementExecutorContext() {
-        return statementExecutorCtx;
-    }
-
     @Override
     public IJobCapacityController getJobCapacityController() {
         return jobCapacityController;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 3857ac5..1112507 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -34,6 +34,8 @@ import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessag
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.api.IPropertiesFactory;
+import org.apache.asterix.common.api.IReceptionistFactory;
+import org.apache.asterix.translator.Receptionist;
 import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.GlobalConfig;
@@ -128,7 +130,8 @@ public class NCApplication extends BaseNCApplication {
             }
             updateOnNodeJoin();
         }
-        runtimeContext.initialize(getRecoveryManagerFactory(), runtimeContext.getNodeProperties().isInitialRun());
+        runtimeContext.initialize(getRecoveryManagerFactory(), getReceptionistFactory(),
+                runtimeContext.getNodeProperties().isInitialRun());
         MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
         NCMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
         this.ncServiceCtx.setMessageBroker(messageBroker);
@@ -155,6 +158,10 @@ public class NCApplication extends BaseNCApplication {
         return RecoveryManager::new;
     }
 
+    protected IReceptionistFactory getReceptionistFactory() {
+        return () -> new Receptionist(nodeId);
+    }
+
     @Override
     protected void configureLoggingLevel(Level level) {
         super.configureLoggingLevel(level);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index eae82af..1a526b2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -25,14 +25,15 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.asterix.api.http.ctx.StatementExecutorContext;
 import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
 import org.apache.asterix.api.http.server.ServletConstants;
+import org.apache.asterix.common.api.RequestReference;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.translator.ClientJobRequest;
-import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.asterix.runtime.utils.RequestTracker;
+import org.apache.asterix.translator.ClientRequest;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.http.api.IServletRequest;
@@ -49,15 +50,14 @@ public class QueryCancellationServletTest {
     @Test
     public void testDelete() throws Exception {
         ICcApplicationContext appCtx = mock(ICcApplicationContext.class);
+        RequestTracker tracker = new RequestTracker(appCtx);
+        Mockito.when(appCtx.getRequestTracker()).thenReturn(tracker);
         // Creates a query cancellation servlet.
         CcQueryCancellationServlet cancellationServlet =
                 new CcQueryCancellationServlet(new ConcurrentHashMap<>(), appCtx, new String[] { "/" });
         // Adds mocked Hyracks client connection into the servlet context.
         IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
         cancellationServlet.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc);
-        // Adds a query context into the servlet context.
-        IStatementExecutorContext queryCtx = new StatementExecutorContext();
-        cancellationServlet.ctx().put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx);
         Mockito.when(appCtx.getHcc()).thenReturn(mockHcc);
         // Tests the case that query is not in the map.
         IServletRequest mockRequest = mockRequest("1");
@@ -65,8 +65,12 @@ public class QueryCancellationServletTest {
         cancellationServlet.handle(mockRequest, mockResponse);
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
 
+        final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis());
+        ClientRequest request = new ClientRequest(requestReference, "1", "select 1;", new HashMap<>());
+        request.setJobId(new JobId(1));
+        request.markCancellable();
+        tracker.track(request);
         // Tests the case that query is in the map.
-        queryCtx.put("1", new ClientJobRequest(queryCtx, "1", new JobId(1)));
         cancellationServlet.handle(mockRequest, mockResponse);
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.OK);
 
@@ -76,7 +80,11 @@ public class QueryCancellationServletTest {
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.BAD_REQUEST);
 
         // Tests the case that the job cancellation hit some exception from Hyracks.
-        queryCtx.put("2", new ClientJobRequest(queryCtx, "2", new JobId(2)));
+        final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis());
+        ClientRequest request2 = new ClientRequest(requestReference2, "2", "select 1;", new HashMap<>());
+        request2.setJobId(new JobId(2));
+        request2.markCancellable();
+        tracker.track(request2);
         Mockito.doThrow(new Exception()).when(mockHcc).cancelJob(any());
         mockRequest = mockRequest("2");
         cancellationServlet.handle(mockRequest, mockResponse);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index 48d11ee..2a945e9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -70,7 +70,7 @@ public class CancellationTestExecutor extends TestExecutor {
                 Thread.sleep(10);
                 // Cancels the query request while the query is executing.
                 int rc = cancelQuery(getEndpoint(Servlets.RUNNING_REQUESTS), newParams);
-                Assert.assertTrue(rc == 200 || rc == 404);
+                Assert.assertTrue(rc == 200 || rc == 404 || rc == 403);
                 if (rc == 200) {
                     break;
                 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
index 3a29ef2..7eee42e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
@@ -18,5 +18,5 @@
  */
  -- param client_context_id=ensure_running_query
  -- polltimeoutsecs=15
-SELECT VALUE rqst FROM active_requests() rqst
+SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst
 WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
index 92f4746..e31fe3b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
@@ -1 +1 @@
-/\{ "clientContextID": "sleep_async_query", "jobId": "JID:.*", "requestTime": ".*" \}/
\ No newline at end of file
+/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*" \}/
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index 6b5b472..9e5801f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -76,4 +76,6 @@ public interface IApplicationContext {
      * @return the cluster coordination service.
      */
     ICoordinationService getCoordinationService();
+
+    IReceptionist getReceptionist();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 53771d5..430cd2a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -24,11 +24,35 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 public interface IClientRequest {
 
     /**
+     * A system wide unique id representing this {@link IClientRequest}
+     *
+     * @return the system request id
+     */
+    String getId();
+
+    /**
+     * A user supplied id representing this {@link IClientRequest}
+     *
+     * @return the client supplied request id
+     */
+    String getClientContextId();
+
+    /**
      * Mark the request as complete, non-cancellable anymore
      */
     void complete();
 
     /**
+     * Mark the request as cancellable
+     */
+    void markCancellable();
+
+    /**
+     * @return true if the request can be cancelled. Otherwise false.
+     */
+    boolean isCancellable();
+
+    /**
      * Cancel a request
      *
      * @param appCtx
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8648c5b..c6e7439 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -67,8 +67,8 @@ public interface INcApplicationContext extends IApplicationContext {
 
     IResourceIdFactory getResourceIdFactory();
 
-    void initialize(IRecoveryManagerFactory recoveryManagerFactory, boolean initialRun)
-            throws IOException, AlgebricksException;
+    void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory,
+            boolean initialRun) throws IOException, AlgebricksException;
 
     void setShuttingdown(boolean b);
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
similarity index 57%
copy from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
copy to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 53771d5..51df306 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -18,26 +18,31 @@
  */
 package org.apache.asterix.common.api;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import java.util.Map;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.http.api.IServletRequest;
 
-public interface IClientRequest {
+public interface IReceptionist {
 
     /**
-     * Mark the request as complete, non-cancellable anymore
+     * Generates a request reference based on {@code request}
+     *
+     * @param request
+     * @return a request reference representing the request
      */
-    void complete();
+    IRequestReference welcome(IServletRequest request);
 
     /**
-     * Cancel a request
+     * Generates a {@link IClientRequest} based on the requests parameters
      *
-     * @param appCtx
+     * @param requestRef
+     * @param clientContextId
+     * @param statement
+     * @param getOptionalParameters
+     * @return A client request
      * @throws HyracksDataException
      */
-    void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
-
-    /**
-     * @return A json representation of this request
-     */
-    String toJson();
+    IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement,
+            Map<String, String> getOptionalParameters) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionistFactory.java
similarity index 78%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
copy to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionistFactory.java
index 3a29ef2..6784f26 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionistFactory.java
@@ -16,7 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
- -- param client_context_id=ensure_running_query
- -- polltimeoutsecs=15
-SELECT VALUE rqst FROM active_requests() rqst
-WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
+package org.apache.asterix.common.api;
+
+@FunctionalInterface
+public interface IReceptionistFactory {
+
+    /**
+     * Creates a {@link IReceptionist}
+     *
+     * @return a receptionist
+     */
+    IReceptionist create();
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestReference.java
similarity index 65%
copy from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
copy to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestReference.java
index 53771d5..8a25ed2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestReference.java
@@ -18,26 +18,28 @@
  */
 package org.apache.asterix.common.api;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import java.io.Serializable;
 
-public interface IClientRequest {
+public interface IRequestReference extends Serializable {
 
     /**
-     * Mark the request as complete, non-cancellable anymore
+     * Gets the system wide unique request id.
+     *
+     * @return the requests id.
      */
-    void complete();
+    String getUuid();
 
     /**
-     * Cancel a request
+     * Get the node name which received this requests.
      *
-     * @param appCtx
-     * @throws HyracksDataException
+     * @return the node name
      */
-    void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
+    String getNode();
 
     /**
-     * @return A json representation of this request
+     * Gets the system time at which the request was received.
+     *
+     * @return the time at which the request was received.
      */
-    String toJson();
+    long getTime();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
new file mode 100644
index 0000000..01bcf82
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.util.Collection;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IRequestTracker {
+
+    /**
+     * Starts tracking {@code request}
+     *
+     * @param request
+     */
+    void track(IClientRequest request);
+
+    /**
+     * Gets a client request by {@code requestId}
+     *
+     * @param requestId
+     * @return the client request if found. Otherwise null.
+     */
+    IClientRequest get(String requestId);
+
+    /**
+     * Gets a client request by {@code clientContextId}
+     *
+     * @param clientContextId
+     * @return the client request if found. Otherwise null.
+     */
+    IClientRequest getByClientContextId(String clientContextId);
+
+    /**
+     * Cancels the client request with id {@code requestId} if found.
+     *
+     * @param requestId
+     * @throws HyracksDataException
+     */
+    void cancel(String requestId) throws HyracksDataException;
+
+    /**
+     * Completes the request with id {@code requestId}
+     *
+     * @param requestId
+     */
+    void complete(String requestId);
+
+    /**
+     * Gets the currently running requests
+     *
+     * @return the currently running requests
+     */
+    Collection<IClientRequest> getRunningRequests();
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/RequestReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/RequestReference.java
new file mode 100644
index 0000000..eb08c09
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/RequestReference.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class RequestReference implements IRequestReference {
+
+    private static final long serialVersionUID = 1L;
+    private String uuid;
+    private String node;
+    private long time;
+    private String userAgent;
+    private String remoteAddr;
+
+    private RequestReference(String uuid, String node, long time) {
+        this.uuid = uuid;
+        this.node = node;
+        this.time = time;
+    }
+
+    public static RequestReference of(String uuid, String node, long time) {
+        return new RequestReference(uuid, node, time);
+    }
+
+    @Override
+    public String getUuid() {
+        return uuid;
+    }
+
+    @Override
+    public long getTime() {
+        return time;
+    }
+
+    public String getNode() {
+        return node;
+    }
+
+    public String getUserAgent() {
+        return userAgent;
+    }
+
+    public void setUserAgent(String userAgent) {
+        this.userAgent = userAgent;
+    }
+
+    public void setRemoteAddr(String remoteAddr) {
+        this.remoteAddr = remoteAddr;
+    }
+
+    public String getRemoteAddr() {
+        return remoteAddr;
+    }
+
+    @Override
+    public String toString() {
+        final ObjectNode object = JSONUtil.createObject();
+        object.put("uuid", uuid);
+        object.put("node", node);
+        object.put("time", time);
+        object.put("userAgent", userAgent);
+        object.put("remoteAddr", remoteAddr);
+        return JSONUtil.convertNodeOrThrow(object);
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 81ae3e1..e4b70f6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -21,6 +21,7 @@ package org.apache.asterix.common.dataflow;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.api.INodeJobTracker;
+import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.ExtensionProperties;
@@ -133,4 +134,11 @@ public interface ICcApplicationContext extends IApplicationContext {
      * @return the compression manager
      */
     ICompressionManager getCompressionManager();
+
+    /**
+     * Gets the request tracker.
+     *
+     * @return the request tracker.
+     */
+    IRequestTracker getRequestTracker();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java
index 52dfe90..741af83 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java
@@ -23,7 +23,8 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 public enum RequestStatus {
     SUCCESS,
     FAILED,
-    NOT_FOUND;
+    NOT_FOUND,
+    REJECTED;
 
     public HttpResponseStatus toHttpResponse() {
         switch (this) {
@@ -33,6 +34,8 @@ public enum RequestStatus {
                 return HttpResponseStatus.INTERNAL_SERVER_ERROR;
             case NOT_FOUND:
                 return HttpResponseStatus.NOT_FOUND;
+            case REJECTED:
+                return HttpResponseStatus.FORBIDDEN;
             default:
                 throw new IllegalStateException("Unrecognized status: " + this);
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 48463e8..b92a15e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -24,6 +24,9 @@ import java.util.function.Supplier;
 import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.api.INodeJobTracker;
+import org.apache.asterix.common.api.IReceptionist;
+import org.apache.asterix.common.api.IReceptionistFactory;
+import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.ActiveProperties;
@@ -90,12 +93,15 @@ public class CcApplicationContext implements ICcApplicationContext {
     private final INodeJobTracker nodeJobTracker;
     private final ITxnIdFactory txnIdFactory;
     private final ICompressionManager compressionManager;
+    private final IReceptionist receptionist;
+    private final IRequestTracker requestTracker;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
             ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
             IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy,
             IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider,
-            IMetadataLockManager mdLockManager) throws AlgebricksException, IOException {
+            IMetadataLockManager mdLockManager, IReceptionistFactory receptionistFactory)
+            throws AlgebricksException, IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
         this.libraryManager = libraryManager;
@@ -125,7 +131,8 @@ public class CcApplicationContext implements ICcApplicationContext {
         nodeJobTracker = new NodeJobTracker();
         txnIdFactory = new BulkTxnIdFactory();
         compressionManager = new CompressionManager(storageProperties);
-
+        receptionist = receptionistFactory.create();
+        requestTracker = new RequestTracker(this);
     }
 
     @Override
@@ -283,4 +290,14 @@ public class CcApplicationContext implements ICcApplicationContext {
     public ICompressionManager getCompressionManager() {
         return compressionManager;
     }
+
+    @Override
+    public IReceptionist getReceptionist() {
+        return receptionist;
+    }
+
+    @Override
+    public IRequestTracker getRequestTracker() {
+        return requestTracker;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
new file mode 100644
index 0000000..f651eb3
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.utils;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.IRequestTracker;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class RequestTracker implements IRequestTracker {
+
+    private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>();
+    private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>();
+    private final ICcApplicationContext ccAppCtx;
+
+    public RequestTracker(ICcApplicationContext ccAppCtx) {
+        this.ccAppCtx = ccAppCtx;
+    }
+
+    @Override
+    public IClientRequest get(String requestId) {
+        return runningRequests.get(requestId);
+    }
+
+    @Override
+    public IClientRequest getByClientContextId(String clientContextId) {
+        return clientIdRequests.get(clientContextId);
+    }
+
+    @Override
+    public void track(IClientRequest request) {
+        runningRequests.put(request.getId(), request);
+        if (request.getClientContextId() != null) {
+            clientIdRequests.put(request.getClientContextId(), request);
+        }
+    }
+
+    @Override
+    public void cancel(String requestId) throws HyracksDataException {
+        final IClientRequest request = runningRequests.get(requestId);
+        if (request == null) {
+            return;
+        }
+        if (!request.isCancellable()) {
+            throw new IllegalStateException("Request " + request.getId() + " cannot be cancelled");
+        }
+        cancel(request);
+    }
+
+    @Override
+    public void complete(String requestId) {
+        final IClientRequest request = runningRequests.get(requestId);
+        if (request != null) {
+            request.complete();
+            untrack(request);
+        }
+    }
+
+    @Override
+    public synchronized Collection<IClientRequest> getRunningRequests() {
+        return Collections.unmodifiableCollection(runningRequests.values());
+    }
+
+    private void cancel(IClientRequest request) throws HyracksDataException {
+        request.cancel(ccAppCtx);
+        untrack(request);
+    }
+
+    private void untrack(IClientRequest request) {
+        runningRequests.remove(request.getId());
+        final String clientContextId = request.getClientContextId();
+        if (clientContextId != null) {
+            clientIdRequests.remove(request.getClientContextId());
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
index 006659b..7decbe0 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
@@ -63,6 +63,14 @@ public class JSONUtil {
         return PRETTY_SORTED_WRITER.writeValueAsString(SORTED_MAPPER.treeToValue(node, Object.class));
     }
 
+    public static String convertNodeOrThrow(final JsonNode node) {
+        try {
+            return convertNode(node);
+        } catch (JsonProcessingException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
     public static void writeNode(final Writer writer, final JsonNode node) throws IOException {
         PRETTY_SORTED_WRITER.writeValue(writer, SORTED_MAPPER.treeToValue(node, Object.class));
     }