You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2019/02/03 10:04:21 UTC

[asterixdb] branch master updated: [ASTERIXDB-2513][FUN] Add Active_Requests Function

This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1041389  [ASTERIXDB-2513][FUN] Add Active_Requests Function
1041389 is described below

commit 1041389b66a7473ee69001489253a25a9affe352
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Sat Feb 2 01:40:24 2019 +0300

    [ASTERIXDB-2513][FUN] Add Active_Requests Function
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - Add a datasource function (active_requests) which
      returns the active jobs that the user specified
      client_context_id for.
    - This function runs on a single NC and uses messaging
      to get the currently running jobs from CC.
    - Currently, the function returns the following fields:
      -- clientContextId: the user specified clientContextId.
      -- requestTime: a timestamp at which the request reference
                      was created.
      -- jobId: optionally, the job id that belongs to this request.
    - The function may be improved later to return all jobs and it may
      return additional fields such as (request uuid, statement,
      executionTime, elapsedTime, nodeAddress, userAgent, etc..)
    - Add test case.
    - Do not allow cancellation test to cancel queries with
      clientContextId to avoid intermittent failures.
    
    Change-Id: I95962742161ed18c4cf2e09c8541c8ad3b35356c
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3136
    Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Michael Blow <mb...@apache.org>
---
 asterixdb/asterix-algebra/pom.xml                  |  4 ++
 .../asterix/translator/BaseClientRequest.java      | 21 +++++++
 .../asterix/translator/ClientJobRequest.java       | 14 +++++
 .../translator/IStatementExecutorContext.java      |  7 +++
 .../translator/NoOpStatementExecutorContext.java   |  7 +++
 .../api/http/ctx/StatementExecutorContext.java     |  6 ++
 .../app/function/ActiveRequestsDatasource.java     | 45 +++++++++++++++
 .../app/function/ActiveRequestsFunction.java       | 67 ++++++++++++++++++++++
 .../app/function/ActiveRequestsReader.java}        | 41 +++++++------
 .../app/function/ActiveRequestsRewriter.java       | 43 ++++++++++++++
 .../asterix/app/message/ActiveRequestsRequest.java | 62 ++++++++++++++++++++
 .../app/message/ActiveRequestsResponse.java        | 50 ++++++++++++++++
 .../asterix/util/MetadataBuiltinFunctions.java     |  6 ++
 .../test/common/CancellationTestExecutor.java      |  1 +
 .../apache/asterix/test/common/TestExecutor.java   |  6 ++
 .../active_requests/active_requests.1.async.sqlpp} | 22 +------
 .../active_requests.2.pollquery.sqlpp}             | 24 ++------
 .../active_requests.3.pollquery.sqlpp}             | 24 ++------
 .../misc/active_requests/active_requests.1.ignore  |  0
 .../misc/active_requests/active_requests.2.regex   |  1 +
 .../misc/active_requests/active_requests.3.adm     |  0
 .../test/resources/runtimets/testsuite_sqlpp.xml   |  5 ++
 .../apache/asterix/common/api/IClientRequest.java  |  5 ++
 .../java/org/apache/asterix/om/base/ADateTime.java | 12 ++--
 .../AlgebricksAbsolutePartitionConstraint.java     |  6 ++
 25 files changed, 398 insertions(+), 81 deletions(-)

diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml
index f9f5677..369d93b 100644
--- a/asterixdb/asterix-algebra/pom.xml
+++ b/asterixdb/asterix-algebra/pom.xml
@@ -242,5 +242,9 @@
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-util</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index a9bd856..ec44d60 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -20,10 +20,15 @@ package org.apache.asterix.translator;
 
 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.om.base.ADateTime;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public abstract class BaseClientRequest implements IClientRequest {
     protected final IStatementExecutorContext ctx;
+    protected final long requestTime = System.currentTimeMillis();
     protected final String contextId;
     private boolean complete;
 
@@ -50,5 +55,21 @@ public abstract class BaseClientRequest implements IClientRequest {
         doCancel(appCtx);
     }
 
+    @Override
+    public String toJson() {
+        try {
+            return JSONUtil.convertNode(asJson());
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    protected ObjectNode asJson() {
+        ObjectNode json = JSONUtil.createObject();
+        json.put("requestTime", new ADateTime(requestTime).toSimpleString());
+        json.put("clientContextID", contextId);
+        return json;
+    }
+
     protected abstract void doCancel(ICcApplicationContext appCtx) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
index 520ce03..81714ca 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
@@ -22,6 +22,9 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class ClientJobRequest extends BaseClientRequest {
     private final JobId jobId;
@@ -41,4 +44,15 @@ public class ClientJobRequest extends BaseClientRequest {
         }
         ctx.remove(contextId);
     }
+
+    @Override
+    public String toJson() {
+        final ObjectNode jsonNode = super.asJson();
+        jsonNode.put("jobId", jobId.toString());
+        try {
+            return JSONUtil.convertNode(jsonNode);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
index 78080f3..29e7bda 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
@@ -19,6 +19,8 @@
 
 package org.apache.asterix.translator;
 
+import java.util.Map;
+
 import org.apache.asterix.common.api.IClientRequest;
 
 /**
@@ -52,4 +54,9 @@ public interface IStatementExecutorContext {
      *            a user provided client context id.
      */
     IClientRequest remove(String clientContextId);
+
+    /**
+     * @return The currently running requests
+     */
+    Map<String, IClientRequest> getRunningRequests();
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
index c4e2859..a2a7906 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.translator;
 
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.asterix.common.api.IClientRequest;
 
 public class NoOpStatementExecutorContext implements IStatementExecutorContext {
@@ -42,4 +45,8 @@ public class NoOpStatementExecutorContext implements IStatementExecutorContext {
         return null;
     }
 
+    @Override
+    public Map<String, IClientRequest> getRunningRequests() {
+        return Collections.emptyMap();
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
index a4da189..136fda7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.api.http.ctx;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -42,4 +43,9 @@ public class StatementExecutorContext implements IStatementExecutorContext {
     public IClientRequest remove(String clientContextId) {
         return runningQueries.remove(clientContextId);
     }
+
+    @Override
+    public Map<String, IClientRequest> getRunningRequests() {
+        return Collections.unmodifiableMap(runningQueries);
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java
new file mode 100644
index 0000000..6d32763
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class ActiveRequestsDatasource extends FunctionDataSource {
+
+    private static final DataSourceId ACTIVE_REQUESTS_DATASOURCE_ID = new DataSourceId(
+            ActiveRequestsRewriter.ACTIVE_REQUESTS.getNamespace(), ActiveRequestsRewriter.ACTIVE_REQUESTS.getName());
+
+    public ActiveRequestsDatasource(INodeDomain domain) throws AlgebricksException {
+        super(ACTIVE_REQUESTS_DATASOURCE_ID, domain);
+    }
+
+    @Override
+    protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+            AlgebricksAbsolutePartitionConstraint locations) {
+        AlgebricksAbsolutePartitionConstraint randomLocation =
+                AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations());
+        return new ActiveRequestsFunction(randomLocation);
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java
new file mode 100644
index 0000000..7c621f1
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.message.ActiveRequestsRequest;
+import org.apache.asterix.app.message.ActiveRequestsResponse;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ActiveRequestsFunction extends AbstractDatasourceFunction {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+
+    public ActiveRequestsFunction(AlgebricksAbsolutePartitionConstraint locations) {
+        super(locations);
+    }
+
+    @Override
+    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
+        INCMessageBroker messageBroker = (INCMessageBroker) serviceCtx.getMessageBroker();
+        MessageFuture messageFuture = messageBroker.registerMessageFuture();
+        long futureId = messageFuture.getFutureId();
+        ActiveRequestsRequest request = new ActiveRequestsRequest(serviceCtx.getNodeId(), futureId);
+        try {
+            messageBroker.sendMessageToPrimaryCC(request);
+            ActiveRequestsResponse response =
+                    (ActiveRequestsResponse) messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+            return new ActiveRequestsReader(response.getRequests());
+        } catch (Exception e) {
+            LOGGER.warn("Could not retrieve active requests", e);
+            throw HyracksDataException.create(e);
+        } finally {
+            messageBroker.deregisterMessageFuture(futureId);
+        }
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java
similarity index 50%
copy from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
copy to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java
index 520ce03..9c9ebd9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java
@@ -16,29 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.translator;
+package org.apache.asterix.app.function;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
+import java.io.IOException;
 
-public class ClientJobRequest extends BaseClientRequest {
-    private final JobId jobId;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
 
-    public ClientJobRequest(IStatementExecutorContext ctx, String clientCtxId, JobId jobId) {
-        super(ctx, clientCtxId);
-        this.jobId = jobId;
+public class ActiveRequestsReader extends FunctionReader {
+
+    private final String[] activeRequests;
+    private CharArrayRecord record;
+    private int recordIndex;
+
+    public ActiveRequestsReader(String[] activeRequests) {
+        this.activeRequests = activeRequests;
+        record = new CharArrayRecord();
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return recordIndex < activeRequests.length;
     }
 
     @Override
-    protected void doCancel(ICcApplicationContext appCtx) throws HyracksDataException {
-        IHyracksClientConnection hcc = appCtx.getHcc();
-        try {
-            hcc.cancelJob(jobId);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-        ctx.remove(contextId);
+    public IRawRecord<char[]> next() throws IOException {
+        record.reset();
+        record.append((activeRequests[recordIndex++]).toCharArray());
+        record.endRecord();
+        return record;
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java
new file mode 100644
index 0000000..b0daabb
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ActiveRequestsRewriter extends FunctionRewriter {
+
+    public static final FunctionIdentifier ACTIVE_REQUESTS =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "active-requests", 0);
+    public static final ActiveRequestsRewriter INSTANCE = new ActiveRequestsRewriter(ACTIVE_REQUESTS);
+
+    private ActiveRequestsRewriter(FunctionIdentifier functionId) {
+        super(functionId);
+    }
+
+    @Override
+    protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f)
+            throws AlgebricksException {
+        return new ActiveRequestsDatasource(context.getComputationNodeDomain());
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
new file mode 100644
index 0000000..9d15131
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.Collection;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ActiveRequestsRequest implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final long reqId;
+
+    public ActiveRequestsRequest(String nodeId, long reqId) {
+        this.nodeId = nodeId;
+        this.reqId = reqId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
+        CCApplication application = (CCApplication) ccs.getApplication();
+        IStatementExecutorContext executorsCtx = application.getStatementExecutorContext();
+        final Collection<IClientRequest> runningRequests = executorsCtx.getRunningRequests().values();
+        final String[] requests = runningRequests.stream().map(IClientRequest::toJson).toArray(String[]::new);
+        ActiveRequestsResponse response = new ActiveRequestsResponse(reqId, requests);
+        CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        try {
+            messageBroker.sendApplicationMessageToNC(response, nodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Failure sending response to nc", e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java
new file mode 100644
index 0000000..0bf7976
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ActiveRequestsResponse implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+    private final String[] requests;
+
+    public ActiveRequestsResponse(long reqId, String[] requests) {
+        this.reqId = reqId;
+        this.requests = requests;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        MessageFuture future = mb.deregisterMessageFuture(reqId);
+        if (future != null) {
+            future.complete(this);
+        }
+    }
+
+    public String[] getRequests() {
+        return requests;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
index 83ceec7..3407d59 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.util;
 
+import org.apache.asterix.app.function.ActiveRequestsRewriter;
 import org.apache.asterix.app.function.DatasetResourcesRewriter;
 import org.apache.asterix.app.function.DatasetRewriter;
 import org.apache.asterix.app.function.FeedRewriter;
@@ -54,6 +55,11 @@ public class MetadataBuiltinFunctions {
                 (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
         BuiltinFunctions.addUnnestFun(PingRewriter.PING, true);
         BuiltinFunctions.addDatasourceFunction(PingRewriter.PING, PingRewriter.INSTANCE);
+        // Active requests function
+        BuiltinFunctions.addFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS,
+                (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
+        BuiltinFunctions.addUnnestFun(ActiveRequestsRewriter.ACTIVE_REQUESTS, true);
+        BuiltinFunctions.addDatasourceFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS, ActiveRequestsRewriter.INSTANCE);
     }
 
     private MetadataBuiltinFunctions() {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index e85fedf..48d11ee 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -51,6 +51,7 @@ public class CancellationTestExecutor extends TestExecutor {
     public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri,
             List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded,
             Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception {
+        cancellable = cancellable && !containsClientContextID(str);
         String clientContextId = UUID.randomUUID().toString();
         final List<TestCase.CompilationUnit.Parameter> newParams = cancellable
                 ? upsertParam(params, "client_context_id", ParameterTypeEnum.STRING, clientContextId) : params;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 4766639..282d83d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1999,6 +1999,12 @@ public class TestExecutor {
         }
     }
 
+    protected static boolean containsClientContextID(String statement) {
+        List<Parameter> httpParams = extractParameters(statement);
+        return httpParams.stream().map(Parameter::getName)
+                .anyMatch(QueryServiceServlet.Parameter.CLIENT_ID.str()::equals);
+    }
+
     private static boolean isCancellable(String type) {
         return !NON_CANCELLABLE.contains(type);
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp
similarity index 62%
copy from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp
index 30759de..ec96a51 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp
@@ -16,23 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.api;
+-- param client_context_id=sleep_async_query
+-- handlevariable=status
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IClientRequest {
-
-    /**
-     * Mark the request as complete, non-cancellable anymore
-     */
-    void complete();
-
-    /**
-     * Cancel a request
-     *
-     * @param appCtx
-     * @throws HyracksDataException
-     */
-    void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
-}
+select value sleep("result", 10000);
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
similarity index 62%
copy from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
index 30759de..3a29ef2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
@@ -16,23 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.api;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IClientRequest {
-
-    /**
-     * Mark the request as complete, non-cancellable anymore
-     */
-    void complete();
-
-    /**
-     * Cancel a request
-     *
-     * @param appCtx
-     * @throws HyracksDataException
-     */
-    void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
-}
+ -- param client_context_id=ensure_running_query
+ -- polltimeoutsecs=15
+SELECT VALUE rqst FROM active_requests() rqst
+WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp
similarity index 62%
copy from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp
index 30759de..1881f1a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp
@@ -16,23 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.api;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IClientRequest {
-
-    /**
-     * Mark the request as complete, non-cancellable anymore
-     */
-    void complete();
-
-    /**
-     * Cancel a request
-     *
-     * @param appCtx
-     * @throws HyracksDataException
-     */
-    void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
-}
+ -- param client_context_id=ensure_completed_query
+ -- polltimeoutsecs=15
+SELECT VALUE rqst FROM active_requests() rqst
+WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore
new file mode 100644
index 0000000..e69de29
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
new file mode 100644
index 0000000..92f4746
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
@@ -0,0 +1 @@
+/\{ "clientContextID": "sleep_async_query", "jobId": "JID:.*", "requestTime": ".*" \}/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm
new file mode 100644
index 0000000..e69de29
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 8156244..ceee5f9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -4982,6 +4982,11 @@
         <output-dir compare="Text">p_sort_num_samples</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="misc">
+      <compilation-unit name="active_requests">
+        <output-dir compare="Text">active_requests</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="index">
     <test-group name="index/validations">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 30759de..53771d5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -35,4 +35,9 @@ public interface IClientRequest {
      * @throws HyracksDataException
      */
     void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
+
+    /**
+     * @return A json representation of this request
+     */
+    String toJson();
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
index efcb828..62e5c87 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
@@ -118,11 +118,15 @@ public class ADateTime implements IAObject {
         return sbder.toString();
     }
 
-    public String toSimpleString() throws IOException {
+    public String toSimpleString() {
         StringBuilder sbder = new StringBuilder();
-        GregorianCalendarSystem.getInstance().getExtendStringRepUntilField(chrononTime, 0, sbder,
-                GregorianCalendarSystem.Fields.YEAR, GregorianCalendarSystem.Fields.MILLISECOND, true);
-        return sbder.toString();
+        try {
+            GregorianCalendarSystem.getInstance().getExtendStringRepUntilField(chrononTime, 0, sbder,
+                    GregorianCalendarSystem.Fields.YEAR, GregorianCalendarSystem.Fields.MILLISECOND, true);
+            return sbder.toString();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
     }
 
     public long getChrononTime() {
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
index fa4a707..b53f4d3 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.algebricks.common.constraints;
 
 import java.util.Arrays;
+import java.util.Random;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -33,6 +34,11 @@ public class AlgebricksAbsolutePartitionConstraint extends AlgebricksPartitionCo
         Arrays.sort(sortedLocations);
     }
 
+    public static AlgebricksAbsolutePartitionConstraint randomLocation(String[] locations) {
+        int randomIndex = new Random().nextInt(locations.length);
+        return new AlgebricksAbsolutePartitionConstraint(new String[] { locations[randomIndex] });
+    }
+
     @Override
     public PartitionConstraintType getPartitionConstraintType() {
         return PartitionConstraintType.ABSOLUTE;