You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by bu...@apache.org on 2017/03/04 07:37:04 UTC

asterixdb git commit: Add job cancellation support in Hyracks.

Repository: asterixdb
Updated Branches:
  refs/heads/master f7100f704 -> 88b576952


Add job cancellation support in Hyracks.

This change also:
- cleans up JobRun;
- adds tests for job cancellation.

Change-Id: Ic26330c19c8642dd3246739b5150c4aa667c359c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1537
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
BAD: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/88b57695
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/88b57695
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/88b57695

Branch: refs/heads/master
Commit: 88b57695259c5fdc94ea7dbc2f7ad91715687fe0
Parents: f7100f7
Author: Yingyi Bu <yi...@couchbase.com>
Authored: Fri Mar 3 17:22:10 2017 -0800
Committer: Yingyi Bu <bu...@gmail.com>
Committed: Fri Mar 3 23:35:58 2017 -0800

----------------------------------------------------------------------
 .../src/test/resources/runtimets/testsuite.xml  |   2 +-
 .../client/HyracksClientInterfaceFunctions.java |  20 ++
 .../HyracksClientInterfaceRemoteProxy.java      |   7 +
 .../hyracks/api/client/HyracksConnection.java   |   5 +
 .../api/client/IHyracksClientConnection.java    |   9 +
 .../api/client/IHyracksClientInterface.java     |   2 +
 .../hyracks/api/exceptions/ErrorCode.java       |   3 +-
 .../api/exceptions/HyracksDataException.java    |   7 +
 .../api/exceptions/HyracksException.java        |   7 +
 .../SuperActivityOperatorNodePushable.java      |   4 +-
 .../src/main/resources/errormsg/en.properties   |   2 +
 .../hyracks/control/cc/ClientInterfaceIPCI.java |   7 +
 .../control/cc/executor/JobExecutor.java        | 106 +++++--
 .../hyracks/control/cc/job/IJobManager.java     |   8 +
 .../hyracks/control/cc/job/JobManager.java      |  73 +++--
 .../apache/hyracks/control/cc/job/JobRun.java   |  27 +-
 .../control/cc/scheduler/FIFOJobQueue.java      |  29 +-
 .../hyracks/control/cc/scheduler/IJobQueue.java |  17 ++
 .../hyracks/control/cc/work/CancelJobWork.java  |  53 ++++
 .../hyracks/control/cc/work/JobStartWork.java   |   6 +-
 .../hyracks/control/cc/job/JobManagerTest.java  |  55 ++++
 .../AbstractMultiNCIntegrationTest.java         |  58 +++-
 .../tests/integration/CancelJobTest.java        | 303 +++++++++++++++++++
 23 files changed, 717 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index a651833..29998fd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -6758,7 +6758,7 @@
     <test-case FilePath="load">
       <compilation-unit name="duplicate-key-error">
         <output-dir compare="Text">none</output-dir>
-        <expected-error>org.apache.hyracks.api.exceptions.HyracksException</expected-error>
+        <expected-error>Input stream given to BTree bulk load has duplicates</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index aa292f6..aa9232e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -38,6 +38,7 @@ public class HyracksClientInterfaceFunctions {
         START_JOB,
         DISTRIBUTE_JOB,
         DESTROY_JOB,
+        CANCEL_JOB,
         GET_DATASET_DIRECTORY_SERIVICE_INFO,
         GET_DATASET_RESULT_STATUS,
         GET_DATASET_RESULT_LOCATIONS,
@@ -122,6 +123,25 @@ public class HyracksClientInterfaceFunctions {
         }
     }
 
+    public static class CancelJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public CancelJobFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CANCEL_JOB;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
     public static class DestroyJobFunction extends Function {
         private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 8e7affb..0142c7d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -69,6 +69,13 @@ public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterfac
     }
 
     @Override
+    public void cancelJob(JobId jobId) throws Exception {
+        HyracksClientInterfaceFunctions.CancelJobFunction cjf = new HyracksClientInterfaceFunctions.CancelJobFunction(
+                jobId);
+        rpci.call(ipcHandle, cjf);
+    }
+
+    @Override
     public JobId startJob(JobId jobId) throws Exception {
         HyracksClientInterfaceFunctions.StartJobFunction sjf =
                 new HyracksClientInterfaceFunctions.StartJobFunction(jobId);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 5da1f34..4b3aff2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -91,6 +91,11 @@ public final class HyracksConnection implements IHyracksClientConnection {
     }
 
     @Override
+    public void cancelJob(JobId jobId) throws Exception {
+        hci.cancelJob(jobId);
+    }
+
+    @Override
     public JobId startJob(JobSpecification jobSpec) throws Exception {
         return startJob(jobSpec, EnumSet.noneOf(JobFlag.class));
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index e65cacd..0956d85 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -57,6 +57,15 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
     public JobInfo getJobInfo(JobId jobId) throws Exception;
 
     /**
+     * Cancel the job that has the given job id.
+     *
+     * @param jobId
+     *            the JobId of the Job
+     * @throws Exception
+     */
+    public void cancelJob(JobId jobId) throws Exception;
+
+    /**
      * Start the specified Job.
      *
      * @param jobSpec

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index f7995d7..1afbe9e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -40,6 +40,8 @@ public interface IHyracksClientInterface {
 
     public JobId startJob(JobId jobId) throws Exception;
 
+    public void cancelJob(JobId jobId) throws Exception;
+
     public JobId distributeJob(byte[] acggfBytes) throws Exception;
 
     public JobId destroyJob(JobId jobId) throws Exception;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 1b97a60..333b1df 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hyracks.api.exceptions;
 
-import java.io.File;
 import java.io.InputStream;
 import java.util.Map;
 
@@ -59,6 +58,8 @@ public class ErrorCode {
     public static final int DUPLICATE_DISTRIBUTED_JOB = 22;
     public static final int DISTRIBUTED_JOB_FAILURE = 23;
     public static final int NO_RESULTSET = 24;
+    public static final int JOB_CANCELED = 25;
+    public static final int NODE_FAILED = 26;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 404104d..6c581f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -28,6 +28,13 @@ import org.apache.hyracks.api.util.ErrorMessageUtil;
  */
 public class HyracksDataException extends HyracksException {
 
+    public static HyracksDataException create(Throwable cause) {
+        if (cause instanceof HyracksDataException) {
+            return (HyracksDataException) cause;
+        }
+        return new HyracksDataException(cause);
+    }
+
     public static HyracksDataException create(int code, Serializable... params) {
         return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 559468d..1f2c7a5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -33,6 +33,13 @@ public class HyracksException extends IOException {
     private final String nodeId;
     private transient volatile String msgCache;
 
+    public static HyracksException create(Throwable cause) {
+        if (cause instanceof HyracksException) {
+            return (HyracksException) cause;
+        }
+        return new HyracksException(cause);
+    }
+
     public static HyracksException create(int code, Serializable... params) {
         return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 2ac392b..1c4f916 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -221,11 +221,11 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable
             for (Future<Void> initializationTask : initializationTasks) {
                 initializationTask.get();
             }
-        } catch (Throwable th) {
+        } catch (Exception e) {
             for (Future<Void> initializationTask : initializationTasks) {
                 initializationTask.cancel(true);
             }
-            throw new HyracksDataException(th);
+            throw new HyracksDataException(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 3bf5a9a..12601fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -43,5 +43,7 @@
 22 = The distributed job %1$s already exists
 23 = The distributed work failed for %1$s at %2$s
 24 = No result set for job %1$s
+25 = Job %1$s has been cancelled by a user
+26 = Node %1$s failed
 
 10000 = The given rule collection %1$s is not an instance of the List class.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 265d3ef..ced3d67 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -27,6 +27,7 @@ import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.control.cc.work.CancelJobWork;
 import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
 import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
 import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
@@ -94,6 +95,12 @@ class ClientInterfaceIPCI implements IIPCI {
                 ccs.getWorkQueue()
                         .schedule(new DestroyJobWork(ccs, dsjf.getJobId(), new IPCResponder<JobId>(handle, mid)));
                 break;
+            case CANCEL_JOB:
+                HyracksClientInterfaceFunctions.CancelJobFunction cjf =
+                        (HyracksClientInterfaceFunctions.CancelJobFunction) fn;
+                ccs.getWorkQueue().schedule(
+                        new CancelJobWork(ccs.getJobManager(), cjf.getJobId(), new IPCResponder<Void>(handle, mid)));
+                break;
             case START_JOB:
                 HyracksClientInterfaceFunctions.StartJobFunction sjf =
                         (HyracksClientInterfaceFunctions.StartJobFunction) fn;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 8f7b0cb..084bd1b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -43,6 +43,7 @@ import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.TaskId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
@@ -83,6 +84,8 @@ public class JobExecutor {
 
     private final Random random;
 
+    private boolean cancelled = false;
+
     public JobExecutor(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints,
             boolean predistributed) {
         this.ccs = ccs;
@@ -112,6 +115,19 @@ public class JobExecutor {
         ccs.getApplicationContext().notifyJobStart(jobRun.getJobId());
     }
 
+    public void cancelJob() throws HyracksException {
+        // If the job is already terminated or failed, do nothing here.
+        if (jobRun.getPendingStatus() != null) {
+            return;
+        }
+        // Sets the cancelled flag.
+        cancelled = true;
+        // Aborts on-ongoing task clusters.
+        abortOngoingTaskClusters(ta -> false, ta -> null);
+        // Aborts the whole job.
+        abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId())));
+    }
+
     private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots)
             throws HyracksException {
         for (ActivityCluster root : roots) {
@@ -661,7 +677,7 @@ public class JobExecutor {
                 ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
                 abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED);
                 abortDoomedTaskClusters();
-                if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) {
+                if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts() || isCancelled()) {
                     abortJob(exceptions);
                     return;
                 }
@@ -691,42 +707,70 @@ public class JobExecutor {
                 jobManager.finalComplete(jobRun);
                 return;
             }
-            for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
-                if (!isPlanned(ac)) {
-                    continue;
-                }
-                TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
-                if (taskClusters == null) {
+            abortOngoingTaskClusters(ta -> deadNodes.contains(ta.getNodeId()),
+                    ta -> HyracksException.create(ErrorCode.NODE_FAILED, ta.getNodeId()));
+            startRunnableActivityClusters();
+        } catch (Exception e) {
+            abortJob(Collections.singletonList(e));
+        }
+    }
+
+    private interface ITaskFilter {
+        boolean directlyMarkAsFailed(TaskAttempt ta);
+    }
+
+    private interface IExceptionGenerator {
+        HyracksException getException(TaskAttempt ta);
+    }
+
+    /**
+     * Aborts ongoing task clusters.
+     *
+     * @param taskFilter,
+     *            selects tasks that should be directly marked as failed without doing the aborting RPC.
+     * @param exceptionGenerator,
+     *            generates an exception for tasks that are directly marked as failed.
+     */
+    private void abortOngoingTaskClusters(ITaskFilter taskFilter, IExceptionGenerator exceptionGenerator)
+            throws HyracksException {
+        for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
+            if (!isPlanned(ac)) {
+                continue;
+            }
+            TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
+            if (taskClusters == null) {
+                continue;
+            }
+            for (TaskCluster tc : taskClusters) {
+                TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
+                if (lastTaskClusterAttempt == null || !(lastTaskClusterAttempt
+                        .getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED
+                        || lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
                     continue;
                 }
-                for (TaskCluster tc : taskClusters) {
-                    TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
-                    if (lastTaskClusterAttempt == null || !(lastTaskClusterAttempt
-                            .getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED
-                            || lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
-                        continue;
-                    }
-                    boolean abort = false;
-                    for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
-                        assert ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED
-                                || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING;
-                        if (deadNodes.contains(ta.getNodeId())) {
-                            ta.setStatus(TaskAttempt.TaskStatus.FAILED,
-                                    Collections.singletonList(new Exception("Node " + ta.getNodeId() + " failed")));
-                            ta.setEndTime(System.currentTimeMillis());
-                            abort = true;
-                        }
-                    }
-                    if (abort) {
-                        abortTaskCluster(lastTaskClusterAttempt, TaskClusterAttempt.TaskClusterStatus.ABORTED);
+                boolean abort = false;
+                for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) {
+                    assert ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED
+                            || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING;
+                    if (taskFilter.directlyMarkAsFailed(ta)) {
+                        // Directly mark it as fail, without further aborting.
+                        ta.setStatus(TaskAttempt.TaskStatus.FAILED,
+                                Collections.singletonList(exceptionGenerator.getException(ta)));
+                        ta.setEndTime(System.currentTimeMillis());
+                        abort = true;
                     }
                 }
-                abortDoomedTaskClusters();
+                if (abort) {
+                    abortTaskCluster(lastTaskClusterAttempt, TaskClusterAttempt.TaskClusterStatus.ABORTED);
+                }
             }
-            startRunnableActivityClusters();
-        } catch (Exception e) {
-            abortJob(Collections.singletonList(e));
+            abortDoomedTaskClusters();
         }
     }
 
+    // Returns whether the job has been cancelled.
+    private boolean isCancelled() {
+        return cancelled;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index 180e850..21fc08f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -45,6 +45,14 @@ public interface IJobManager {
     void add(JobRun jobRun) throws HyracksException;
 
     /**
+     * Cancel a job with a given job id.
+     *
+     * @param jobId,
+     *            the id of the job.
+     */
+    void cancel(JobId jobId) throws HyracksException;
+
+    /**
      * This method is called when the master process decides to complete job.
      * The implementation of this method should instruct all involved worker processes to clean the state of each
      * individual parallel partition up.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 741e3db..031303b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -47,7 +47,6 @@ import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
 import org.apache.hyracks.control.cc.scheduler.IJobQueue;
 import org.apache.hyracks.control.cc.work.JobCleanupWork;
 import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.work.IResultCallback;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -108,8 +107,7 @@ public class JobManager implements IJobManager {
         IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
         switch (status) {
             case QUEUE:
-                jobRun.setStatus(JobStatus.PENDING, null);
-                jobQueue.add(jobRun);
+                queueJob(jobRun);
                 break;
             case EXECUTE:
                 executeJob(jobRun);
@@ -118,6 +116,32 @@ public class JobManager implements IJobManager {
     }
 
     @Override
+    public void cancel(JobId jobId) throws HyracksException {
+        if (jobId == null) {
+            return;
+        }
+        // Cancels a running job.
+        if (activeRunMap.containsKey(jobId)) {
+            JobRun jobRun = activeRunMap.get(jobId);
+            // The following call will abort all ongoing tasks and then consequently
+            // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job.
+            // Therefore, we do not remove the job out of activeRunMap here.
+            jobRun.getExecutor().cancelJob();
+            return;
+        }
+        // Removes a pending job.
+        JobRun jobRun = jobQueue.remove(jobId);
+        if (jobRun != null) {
+            List<Exception> exceptions = Collections
+                    .singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
+            // Since the job has not been executed, we only need to update its status and lifecyle here.
+            jobRun.setStatus(JobStatus.FAILURE, exceptions);
+            runMapArchive.put(jobId, jobRun);
+            runMapHistory.put(jobId, exceptions);
+        }
+    }
+
+    @Override
     public void prepareComplete(JobRun run, JobStatus status, List<Exception> exceptions) throws HyracksException {
         checkJob(run);
         if (status == JobStatus.FAILURE_BEFORE_EXECUTION) {
@@ -244,9 +268,12 @@ public class JobManager implements IJobManager {
 
     @Override
     public JobRun get(JobId jobId) {
-        JobRun jobRun = activeRunMap.get(jobId);
+        JobRun jobRun = activeRunMap.get(jobId); // Running job.
+        if (jobRun == null) {
+            jobRun = jobQueue.get(jobId); // Pending job.
+        }
         if (jobRun == null) {
-            jobRun = runMapArchive.get(jobId);
+            jobRun = runMapArchive.get(jobId); // Completed job.
         }
         return jobRun;
     }
@@ -256,7 +283,7 @@ public class JobManager implements IJobManager {
         return runMapHistory.get(jobId);
     }
 
-    private void pickJobsToRun() {
+    private void pickJobsToRun() throws HyracksException {
         List<JobRun> selectedRuns = jobQueue.pull();
         for (JobRun run : selectedRuns) {
             executeJob(run);
@@ -264,24 +291,24 @@ public class JobManager implements IJobManager {
     }
 
     // Executes a job when the required capacity for the job is met.
-    private void executeJob(JobRun run) {
-        IResultCallback<JobId> callback = run.getCallback();
-        try {
-            run.setStartTime(System.currentTimeMillis());
-            JobId jobId = run.getJobId();
-            activeRunMap.put(jobId, run);
-
-            CCApplicationContext appCtx = ccs.getApplicationContext();
-            JobSpecification spec = run.getJobSpecification();
-            if (!run.getExecutor().isPredistributed()) {
-                appCtx.notifyJobCreation(jobId, spec);
-            }
-            run.setStatus(JobStatus.RUNNING, null);
-            executeJobInternal(run);
-            callback.setValue(jobId);
-        } catch (Exception e) {
-            callback.setException(e);
+    private void executeJob(JobRun run) throws HyracksException {
+        run.setStartTime(System.currentTimeMillis());
+        JobId jobId = run.getJobId();
+        activeRunMap.put(jobId, run);
+
+        CCApplicationContext appCtx = ccs.getApplicationContext();
+        JobSpecification spec = run.getJobSpecification();
+        if (!run.getExecutor().isPredistributed()) {
+            appCtx.notifyJobCreation(jobId, spec);
         }
+        run.setStatus(JobStatus.RUNNING, null);
+        executeJobInternal(run);
+    }
+
+    // Queue a job when the required capacity for the job is not met.
+    private void queueJob(JobRun jobRun) throws HyracksException {
+        jobRun.setStatus(JobStatus.PENDING, null);
+        jobQueue.add(jobRun);
     }
 
     private void executeJobInternal(JobRun run) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 3aa9043..55a7a82 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -52,7 +52,6 @@ import org.apache.hyracks.control.cc.executor.JobExecutor;
 import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.utils.ExceptionUtils;
-import org.apache.hyracks.control.common.work.IResultCallback;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -99,9 +98,7 @@ public class JobRun implements IJobStatusConditionVariable {
 
     private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
 
-    private final IResultCallback<JobId> callback;
-
-    private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, IResultCallback<JobId> callback,
+    private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags,
             JobSpecification spec, ActivityClusterGraph acg) {
         this.deploymentId = deploymentId;
         this.jobId = jobId;
@@ -116,14 +113,13 @@ public class JobRun implements IJobStatusConditionVariable {
         connectorPolicyMap = new HashMap<>();
         operatorLocations = new HashMap<>();
         createTime = System.currentTimeMillis();
-        this.callback = callback;
     }
 
     //Run a Pre-distributed job by passing the JobId
-    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, IResultCallback<JobId> callback,
+    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
             PreDistributedJobDescriptor distributedJobDescriptor)
             throws HyracksException {
-        this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class), callback,
+        this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class),
                 distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
         Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
         this.scheduler = new JobExecutor(ccs, this, constaints, true);
@@ -131,9 +127,8 @@ public class JobRun implements IJobStatusConditionVariable {
 
     //Run a new job by creating an ActivityClusterGraph
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
-            IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags,
-            IResultCallback<JobId> callback) {
-        this(deploymentId, jobId, jobFlags, callback, acggf.getJobSpecification(), acgg.initialize());
+            IActivityClusterGraphGeneratorFactory acggf, IActivityClusterGraphGenerator acgg, Set<JobFlag> jobFlags) {
+        this(deploymentId, jobId, jobFlags, acggf.getJobSpecification(), acgg.initialize());
         this.scheduler = new JobExecutor(ccs, this, acgg.getConstraints(), false);
     }
 
@@ -196,10 +191,6 @@ public class JobRun implements IJobStatusConditionVariable {
         return createTime;
     }
 
-    public IResultCallback<JobId> getCallback() {
-        return callback;
-    }
-
     public long getStartTime() {
         return startTime;
     }
@@ -231,13 +222,7 @@ public class JobRun implements IJobStatusConditionVariable {
             wait();
         }
         if (exceptions != null && !exceptions.isEmpty()) {
-            StringBuilder buffer = new StringBuilder();
-            buffer.append("Job failed on account of:\n");
-            for (Exception e : exceptions) {
-                buffer.append(e.getMessage()).append('\n');
-            }
-            HyracksException he;
-            he = new HyracksException(buffer.toString(), exceptions.get(0));
+            HyracksException he = HyracksException.create(exceptions.get(0));
             for (int i = 1; i < exceptions.size(); ++i) {
                 he.addSuppressed(exceptions.get(i));
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 6cf75bb..0377692 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -23,13 +23,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
@@ -42,9 +44,9 @@ import org.apache.hyracks.control.cc.job.JobRun;
 public class FIFOJobQueue implements IJobQueue {
 
     private static final Logger LOGGER = Logger.getLogger(FIFOJobQueue.class.getName());
-
     private static final int CAPACITY = 4096;
-    private final List<JobRun> jobQueue = new LinkedList<>();
+
+    private final Map<JobId, JobRun> jobListMap = new LinkedHashMap<>();
     private final IJobManager jobManager;
     private final IJobCapacityController jobCapacityController;
 
@@ -55,17 +57,27 @@ public class FIFOJobQueue implements IJobQueue {
 
     @Override
     public void add(JobRun run) throws HyracksException {
-        int size = jobQueue.size();
+        int size = jobListMap.size();
         if (size >= CAPACITY) {
-            throw HyracksException.create(ErrorCode.JOB_QUEUE_FULL, new Integer(CAPACITY));
+            throw HyracksException.create(ErrorCode.JOB_QUEUE_FULL, CAPACITY);
         }
-        jobQueue.add(run);
+        jobListMap.put(run.getJobId(), run);
+    }
+
+    @Override
+    public JobRun remove(JobId jobId) {
+        return jobListMap.remove(jobId);
+    }
+
+    @Override
+    public JobRun get(JobId jobId) {
+        return jobListMap.get(jobId);
     }
 
     @Override
     public List<JobRun> pull() {
         List<JobRun> jobRuns = new ArrayList<>();
-        Iterator<JobRun> runIterator = jobQueue.iterator();
+        Iterator<JobRun> runIterator = jobListMap.values().iterator();
         while (runIterator.hasNext()) {
             JobRun run = runIterator.next();
             JobSpecification job = run.getJobSpecification();
@@ -89,7 +101,6 @@ public class FIFOJobQueue implements IJobQueue {
                 } catch (HyracksException e) {
                     LOGGER.log(Level.SEVERE, e.getMessage(), e);
                 }
-                continue;
             }
         }
         return jobRuns;
@@ -97,7 +108,7 @@ public class FIFOJobQueue implements IJobQueue {
 
     @Override
     public Collection<JobRun> jobs() {
-        return Collections.unmodifiableCollection(jobQueue);
+        return Collections.unmodifiableCollection(jobListMap.values());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
index 2c26799..e666224 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.job.JobRun;
 
 /**
@@ -41,6 +42,22 @@ public interface IJobQueue {
     void add(JobRun run) throws HyracksException;
 
     /**
+     * Removes a job with a given jobId from the job queue.
+     *
+     * @param jobId,
+     *            the job id of the job to be removed.
+     */
+    JobRun remove(JobId jobId);
+
+    /**
+     * Retrieves a job with a given jobId from the job queue.
+     *
+     * @param jobId,
+     *            the job id of the job to be retrieved.
+     */
+    JobRun get(JobId jobId);
+
+    /**
      * Pull a list of jobs from the job queque, when more cluster capacity becomes available.
      *
      * @return a list of jobs whose capacity requirements can all be met at the same time.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
new file mode 100644
index 0000000..f3b67c9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+
+/**
+ * This work cancels a job with the given job id.
+ * It is triggered by the cancel call with a job id from the client.
+ */
+public class CancelJobWork extends SynchronizableWork {
+    private final IJobManager jobManager;
+    private final JobId jobId;
+    private final IResultCallback<Void> callback;
+
+    public CancelJobWork(IJobManager jobManager, JobId jobId, IResultCallback<Void> callback) {
+        this.jobId = jobId;
+        this.jobManager = jobManager;
+        this.callback = callback;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        try {
+            if (jobId != null) {
+                jobManager.cancel(jobId);
+            }
+            callback.setValue(null);
+        } catch (Exception e) {
+            callback.setException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index c608712..1253cf7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -65,14 +65,14 @@ public class JobStartWork extends SynchronizableWork {
                         .deserialize(acggfBytes, deploymentId, appCtx);
                 IActivityClusterGraphGenerator acgg =
                         acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
-                run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags, callback);
+                run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
             } else {
                 //ActivityClusterGraph has already been distributed
-                run = new JobRun(ccs, deploymentId, jobId, callback,
+                run = new JobRun(ccs, deploymentId, jobId,
                         ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
             }
             jobManager.add(run);
-
+            callback.setValue(jobId);
         } catch (Exception e) {
             callback.setException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 88b8939..3bb08bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -199,6 +199,61 @@ public class JobManagerTest {
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
     }
 
+    @Test
+    public void testCancel() throws HyracksException {
+        CCConfig ccConfig = new CCConfig();
+        IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
+        IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
+
+        // Submits runnable jobs.
+        List<JobRun> acceptedRuns = new ArrayList<>();
+        for (int id = 0; id < 4096; ++id) {
+            // Mocks an immediately executable job.
+            JobRun run = mockJobRun(id);
+            JobSpecification job = mock(JobSpecification.class);
+            when(run.getJobSpecification()).thenReturn(job);
+            when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+
+            // Submits the job.
+            acceptedRuns.add(run);
+            jobManager.add(run);
+            Assert.assertTrue(jobManager.getRunningJobs().size() == id + 1);
+            Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
+        }
+
+        // Submits jobs that will be deferred due to the capacity limitation.
+        List<JobRun> deferredRuns = new ArrayList<>();
+        for (int id = 4096; id < 8192; ++id) {
+            // Mocks a deferred job.
+            JobRun run = mockJobRun(id);
+            JobSpecification job = mock(JobSpecification.class);
+            when(run.getJobSpecification()).thenReturn(job);
+            when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+                    .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+
+            // Submits the job.
+            deferredRuns.add(run);
+            jobManager.add(run);
+            Assert.assertTrue(jobManager.getRunningJobs().size() == 4096);
+            Assert.assertTrue(jobManager.getPendingJobs().size() == id + 1 - 4096);
+        }
+
+        // Cancels deferred jobs.
+        for (JobRun run : deferredRuns) {
+            jobManager.cancel(run.getJobId());
+        }
+
+        // Cancels runnable jobs.
+        for (JobRun run : acceptedRuns) {
+            jobManager.cancel(run.getJobId());
+        }
+
+        Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+        verify(jobManager, times(0)).prepareComplete(any(), any(), any());
+        verify(jobManager, times(0)).finalComplete(any());
+    }
+
     private JobRun mockJobRun(long id) {
         JobRun run = mock(JobRun.class, Mockito.RETURNS_DEEP_STUBS);
         when(run.getExceptions()).thenReturn(Collections.emptyList());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 4163e46..3d6ac00 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -26,8 +26,9 @@ import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -35,9 +36,11 @@ import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.IHyracksDatasetReader;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.client.dataset.HyracksDataset;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -46,12 +49,14 @@ import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.resources.memory.FrameManager;
 import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
 public abstract class AbstractMultiNCIntegrationTest {
 
     private static final Logger LOGGER = Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName());
@@ -88,6 +93,7 @@ public abstract class AbstractMultiNCIntegrationTest {
         ccRoot.delete();
         ccRoot.mkdir();
         ccConfig.ccRoot = ccRoot.getAbsolutePath();
+        ccConfig.appCCMainClass = DummyApplicationEntryPoint.class.getName();
         cc = new ClusterControllerService(ccConfig);
         cc.start();
 
@@ -122,6 +128,18 @@ public abstract class AbstractMultiNCIntegrationTest {
         cc.stop();
     }
 
+    protected JobId startJob(JobSpecification spec) throws Exception {
+        return hcc.startJob(spec);
+    }
+
+    protected void waitForCompletion(JobId jobId) throws Exception {
+        hcc.waitForCompletion(jobId);
+    }
+
+    protected void cancelJob(JobId jobId) throws Exception {
+        hcc.cancelJob(jobId);
+    }
+
     protected void runTest(JobSpecification spec) throws Exception {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(spec.toJSON().asText());
@@ -201,4 +219,40 @@ public abstract class AbstractMultiNCIntegrationTest {
         return tempFile;
     }
 
+    public static class DummyApplicationEntryPoint implements ICCApplicationEntryPoint {
+
+        @Override
+        public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
+
+        }
+
+        @Override
+        public void stop() throws Exception {
+
+        }
+
+        @Override
+        public void startupCompleted() throws Exception {
+
+        }
+
+        @Override
+        public IJobCapacityController getJobCapacityController() {
+            return new IJobCapacityController() {
+                private long maxRAM = Runtime.getRuntime().maxMemory();
+
+                @Override
+                public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException {
+                    return maxRAM > job.getRequiredClusterCapacity().getAggregatedMemoryByteSize()
+                            ? JobSubmissionStatus.EXECUTE : JobSubmissionStatus.QUEUE;
+                }
+
+                @Override
+                public void release(JobSpecification job) {
+
+                }
+            };
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/88b57695/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
new file mode 100644
index 0000000..7c3b66f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CancelJobTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.integration;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.io.ManagedFileSplit;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CancelJobTest extends AbstractMultiNCIntegrationTest {
+
+    @Test
+    public void cancelExecutingJobAfterWaitForCompletion() throws Exception {
+        //Cancels executing jobs after waitForCompletion() is called.
+        for (JobSpecification spec : testJobs()) {
+            cancelAfterWaitForCompletion(spec);
+        }
+    }
+
+    @Test
+    public void cancelExecutingJobBeforeWaitForCompletion() throws Exception {
+        //Cancels executing jobs before waitForCompletion is called.
+        for (JobSpecification spec : testJobs()) {
+            cancelBeforeWaitForCompletion(spec);
+        }
+    }
+
+    @Test
+    public void cancelExecutingJobWithoutWaitForCompletion() throws Exception {
+        //Cancels executing jobs without calling waitForCompletion.
+        for (JobSpecification spec : testJobs()) {
+            cancelWithoutWait(spec);
+        }
+    }
+
+    @Test
+    public void cancelPendingJobAfterWaitForCompletion() throws Exception {
+        //Cancels pending jobs after waitForCompletion() is called.
+        for (JobSpecification spec : testJobs()) {
+            setJobCapacity(spec);
+            cancelAfterWaitForCompletion(spec);
+        }
+    }
+
+    @Test
+    public void cancelPendingJobBeforeWaitForCompletion() throws Exception {
+        //Cancels pending jobs before waitForCompletion is called.
+        for (JobSpecification spec : testJobs()) {
+            setJobCapacity(spec);
+            cancelBeforeWaitForCompletion(spec);
+        }
+    }
+
+    @Test
+    public void cancelPendingJobWithoutWaitForCompletion() throws Exception {
+        //Cancels pending jobs without calling waitForCompletion.
+        for (JobSpecification spec : testJobs()) {
+            setJobCapacity(spec);
+            cancelWithoutWait(spec);
+        }
+    }
+
+    private JobSpecification[] testJobs() {
+        return new JobSpecification[] { jobWithSleepSourceOp(), jobWithSleepOp() };
+    }
+
+    private void setJobCapacity(JobSpecification spec) {
+        IClusterCapacity reqCapacity = new ClusterCapacity();
+        reqCapacity.setAggregatedMemoryByteSize(Long.MAX_VALUE);
+        spec.setRequiredClusterCapacity(reqCapacity);
+    }
+
+    private void cancelAfterWaitForCompletion(JobSpecification spec) throws Exception {
+        JobId jobId = startJob(spec);
+        // A thread for canceling the job.
+        Thread thread = new Thread(() -> {
+            try {
+                synchronized (this) {
+                    this.wait(500); // Make sure waitForCompletion be called first.
+                }
+                cancelJob(jobId);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+        // Cancels the job.
+        thread.start();
+
+        // Checks the resulting Exception.
+        boolean exceptionMatched = false;
+        try {
+            waitForCompletion(jobId);
+        } catch (Exception e) {
+            exceptionMatched = true;
+            Assert.assertTrue(e instanceof HyracksException);
+            HyracksException hyracksException = (HyracksException) e;
+            Assert.assertTrue(hyracksException.getErrorCode() == ErrorCode.JOB_CANCELED);
+        } finally {
+            Assert.assertTrue(exceptionMatched);
+        }
+        thread.join();
+    }
+
+    private void cancelBeforeWaitForCompletion(JobSpecification spec) throws Exception {
+        boolean exceptionMatched = false;
+        try {
+            JobId jobId = startJob(spec);
+            cancelJob(jobId);
+            waitForCompletion(jobId);
+        } catch (HyracksException e) {
+            exceptionMatched = true;
+            Assert.assertTrue(e.getErrorCode() == ErrorCode.JOB_CANCELED);
+        } finally {
+            Assert.assertTrue(exceptionMatched);
+        }
+    }
+
+    private void cancelWithoutWait(JobSpecification spec) throws Exception {
+        JobId jobId = startJob(spec);
+        cancelJob(jobId);
+    }
+
+    private JobSpecification jobWithSleepSourceOp() {
+        JobSpecification spec = new JobSpecification();
+        SleepSourceOperatorDescriptor sourceOpDesc = new SleepSourceOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sourceOpDesc, ASTERIX_IDS);
+        SinkOperatorDescriptor sinkOpDesc = new SinkOperatorDescriptor(spec, 1);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sinkOpDesc, ASTERIX_IDS);
+        IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0);
+        spec.addRoot(sinkOpDesc);
+        return spec;
+    }
+
+    private JobSpecification jobWithSleepOp() {
+        JobSpecification spec = new JobSpecification();
+        FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(ASTERIX_IDS[0],
+                "data" + File.separator + "tpch0.001" + File.separator + "orders-part1.tbl") };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor recordDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
+
+        // File scan operator.
+        FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
+                recordDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, ASTERIX_IDS[0]);
+
+        // Sleep operator.
+        SleepOperatorDescriptor sleepOp = new SleepOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sleepOp, ASTERIX_IDS);
+
+        // Sink operator.
+        SinkOperatorDescriptor sinkOp = new SinkOperatorDescriptor(spec, 1);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sinkOp, ASTERIX_IDS);
+
+        // Hash-repartitioning connector.
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 }, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, scanOp, 0, sleepOp, 0);
+
+        // One-to-one connector.
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, sleepOp, 0, sinkOp, 0);
+        return spec;
+    }
+
+}
+
+class SleepSourceOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public SleepSourceOperatorDescriptor(JobSpecification spec) {
+        super(spec, 0, 1);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+            @Override
+            public void initialize() throws HyracksDataException {
+                try {
+                    writer.open();
+                    while (true) {
+                        synchronized (this) {
+                            wait();
+                        }
+                    }
+                } catch (Exception e) {
+                    writer.fail();
+                } finally {
+                    writer.close();
+                }
+            }
+        };
+    }
+}
+
+class SleepOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public SleepOperatorDescriptor(JobSpecification spec) {
+        super(spec, 1, 1);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+
+            @Override
+            public void open() throws HyracksDataException {
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                try {
+                    while (true) {
+                        synchronized (this) {
+                            wait();
+                        }
+                    }
+                } catch (Exception e) {
+                    throw HyracksDataException.create(e);
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                writer.close();
+            }
+        };
+    }
+}
\ No newline at end of file