You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2017/09/13 18:27:33 UTC

asterixdb git commit: [NO ISSUE][HYR] Fix wait for completion work

Repository: asterixdb
Updated Branches:
  refs/heads/master 3166f43a1 -> 41b4519ec


[NO ISSUE][HYR] Fix wait for completion work

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- When wait for completion is called on a job that was not created
  yet, an exception is returned.
- When wait for completion is called on a job that has been cleared
  from job archive, it is retrieved correctly from history.
- When wait for completion is called on a job that has been cleared
  from job history, an exception is returned.
- Test cases that fail before the fix have been added.

Change-Id: I9e50f6ce1df9f27517d7ec3a3f8a5d38246f71ff
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1999
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/41b4519e
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/41b4519e
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/41b4519e

Branch: refs/heads/master
Commit: 41b4519ecc0c0fd664464e7a51cd56f099b66fd1
Parents: 3166f43
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Sat Sep 9 19:26:28 2017 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Wed Sep 13 11:27:18 2017 -0700

----------------------------------------------------------------------
 .../hyracks/api/exceptions/ErrorCode.java       |  2 +
 .../apache/hyracks/api/job/JobIdFactory.java    |  4 ++
 .../src/main/resources/errormsg/en.properties   |  2 +
 .../hyracks/control/cc/ClientInterfaceIPCI.java | 55 ++++++++------------
 .../hyracks/control/cc/job/JobManager.java      |  7 +--
 .../control/cc/work/DistributeJobWork.java      |  8 +--
 .../hyracks/control/cc/work/JobStartWork.java   | 16 +++---
 .../cc/work/WaitForJobCompletionWork.java       | 43 +++++++++------
 .../AbstractMultiNCIntegrationTest.java         | 25 +++++----
 .../tests/integration/JobFailureTest.java       | 21 +++++++-
 10 files changed, 112 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/41b4519e/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index ff98efa..cf83bca 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -125,6 +125,8 @@ public class ErrorCode {
     public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89;
     public static final int ILLEGAL_MEMORY_BUDGET = 90;
     public static final int TIMEOUT = 91;
+    public static final int JOB_HAS_BEEN_CLEARED_FROM_HISTORY = 92;
+    public static final int JOB_HAS_NOT_BEEN_CREATED_YET = 93;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/41b4519e/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
index dd63786..eea6b52 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
@@ -27,6 +27,10 @@ public class JobIdFactory {
         return new JobId(id.getAndIncrement());
     }
 
+    public long maxJobId() {
+        return id.get();
+    }
+
     public void ensureMinimumId(long id) {
         this.id.updateAndGet(current -> Math.max(current, id));
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/41b4519e/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 6d4ccdb..56abee5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -108,5 +108,7 @@
 89 = The byte size of a single group (%1$s bytes) exceeds the budget for a group by operator (%2$s bytes)
 90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes)
 91 = Operation timed out
+92 = Job %1$s has been cleared from job history
+93 = Job %1$s has not been created yet
 
 10000 = The given rule collection %1$s is not an instance of the List class.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/41b4519e/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index b3e65ad..327c422 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -60,8 +60,7 @@ class ClientInterfaceIPCI implements IIPCI {
     }
 
     @Override
-    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload,
-            Exception exception) {
+    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
         HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
         switch (fn.getFunctionId()) {
             case GET_CLUSTER_CONTROLLER_INFO:
@@ -86,7 +85,7 @@ class ClientInterfaceIPCI implements IIPCI {
             case DISTRIBUTE_JOB:
                 HyracksClientInterfaceFunctions.DistributeJobFunction djf =
                         (HyracksClientInterfaceFunctions.DistributeJobFunction) fn;
-                ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory.create(),
+                ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory,
                         new IPCResponder<JobId>(handle, mid)));
                 break;
             case DESTROY_JOB:
@@ -104,42 +103,30 @@ class ClientInterfaceIPCI implements IIPCI {
             case START_JOB:
                 HyracksClientInterfaceFunctions.StartJobFunction sjf =
                         (HyracksClientInterfaceFunctions.StartJobFunction) fn;
-                JobId jobId = sjf.getJobId();
-                byte[] acggfBytes = null;
-                boolean predistributed = false;
-                if (jobId == null) {
-                    //The job is new
-                    jobId = jobIdFactory.create();
-                    acggfBytes = sjf.getACGGFBytes();
-                } else {
-                    //The job has been predistributed. We don't need to send an ActivityClusterGraph
-                    predistributed = true;
-                }
-                ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
-                        jobId, new IPCResponder<JobId>(handle, mid), predistributed));
+                ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), sjf.getACGGFBytes(),
+                        sjf.getJobFlags(), sjf.getJobId(), new IPCResponder<JobId>(handle, mid), jobIdFactory));
                 break;
             case GET_DATASET_DIRECTORY_SERIVICE_INFO:
-                ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs,
-                        new IPCResponder<NetworkAddress>(handle, mid)));
+                ccs.getWorkQueue().schedule(
+                        new GetDatasetDirectoryServiceInfoWork(ccs, new IPCResponder<NetworkAddress>(handle, mid)));
                 break;
             case GET_DATASET_RESULT_STATUS:
                 HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrsf =
                         (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
-                ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, gdrsf.getJobId(),
-                        gdrsf.getResultSetId(), new IPCResponder<Status>(handle, mid)));
+                ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, gdrsf.getJobId(), gdrsf.getResultSetId(),
+                        new IPCResponder<Status>(handle, mid)));
                 break;
             case GET_DATASET_RESULT_LOCATIONS:
                 HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
                         (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
-                ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs,
-                        gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
-                        new IPCResponder<>(handle, mid)));
+                ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs, gdrlf.getJobId(),
+                        gdrlf.getResultSetId(), gdrlf.getKnownRecords(), new IPCResponder<>(handle, mid)));
                 break;
             case WAIT_FOR_COMPLETION:
                 HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
                         (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
-                ccs.getWorkQueue().schedule(new WaitForJobCompletionWork(ccs, wfcf.getJobId(),
-                        new IPCResponder<>(handle, mid)));
+                ccs.getWorkQueue()
+                        .schedule(new WaitForJobCompletionWork(ccs, wfcf.getJobId(), new IPCResponder<>(handle, mid)));
                 break;
             case GET_NODE_CONTROLLERS_INFO:
                 ccs.getWorkQueue().schedule(
@@ -155,33 +142,33 @@ class ClientInterfaceIPCI implements IIPCI {
             case CLI_DEPLOY_BINARY:
                 HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
                         (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
-                ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(),
-                        dbf.getDeploymentId(), new IPCResponder<>(handle, mid)));
+                ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(), dbf.getDeploymentId(),
+                        new IPCResponder<>(handle, mid)));
                 break;
             case CLI_UNDEPLOY_BINARY:
                 HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf =
                         (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
-                ccs.getWorkQueue().schedule(new CliUnDeployBinaryWork(ccs, udbf.getDeploymentId(),
-                        new IPCResponder<>(handle, mid)));
+                ccs.getWorkQueue().schedule(
+                        new CliUnDeployBinaryWork(ccs, udbf.getDeploymentId(), new IPCResponder<>(handle, mid)));
                 break;
             case CLUSTER_SHUTDOWN:
                 HyracksClientInterfaceFunctions.ClusterShutdownFunction csf =
                         (HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn;
-                ccs.getWorkQueue().schedule(new ClusterShutdownWork(ccs,
-                        csf.isTerminateNCService(), new IPCResponder<>(handle, mid)));
+                ccs.getWorkQueue().schedule(
+                        new ClusterShutdownWork(ccs, csf.isTerminateNCService(), new IPCResponder<>(handle, mid)));
                 break;
             case GET_NODE_DETAILS_JSON:
                 HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gndjf =
                         (HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
                 ccs.getWorkQueue()
                         .schedule(new GetNodeDetailsJSONWork(ccs.getNodeManager(), ccs.getCCConfig(), gndjf.getNodeId(),
-                        gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid)));
+                                gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid)));
                 break;
             case THREAD_DUMP:
                 HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
                         (HyracksClientInterfaceFunctions.ThreadDumpFunction) fn;
-                ccs.getWorkQueue().schedule(new GetThreadDumpWork(ccs, tdf.getNode(),
-                        new IPCResponder<String>(handle, mid)));
+                ccs.getWorkQueue()
+                        .schedule(new GetThreadDumpWork(ccs, tdf.getNode(), new IPCResponder<String>(handle, mid)));
                 break;
             default:
                 try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/41b4519e/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 4ba847d..c6d90a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -46,8 +46,8 @@ import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
 import org.apache.hyracks.control.cc.scheduler.IJobQueue;
 import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.work.NoOpCallback;
 import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.control.common.work.NoOpCallback;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -92,7 +92,7 @@ public class JobManager implements IJobManager {
         runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
             private static final long serialVersionUID = 1L;
             /** history size + 1 is for the case when history size = 0 */
-            private int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1);
+            private final int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1);
 
             @Override
             protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
@@ -277,7 +277,8 @@ public class JobManager implements IJobManager {
 
     @Override
     public List<Exception> getExceptionHistory(JobId jobId) {
-        return runMapHistory.get(jobId);
+        List<Exception> exceptions = runMapHistory.get(jobId);
+        return exceptions == null ? runMapHistory.containsKey(jobId) ? Collections.emptyList() : null : exceptions;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/41b4519e/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
index e5fd66a..5a57b1b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
@@ -37,12 +38,12 @@ import org.apache.hyracks.control.common.work.SynchronizableWork;
 public class DistributeJobWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
     private final byte[] acggfBytes;
-    private final JobId jobId;
+    private final JobIdFactory jobIdFactory;
     private final IResultCallback<JobId> callback;
 
-    public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobId jobId,
+    public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobIdFactory jobIdFactory,
             IResultCallback<JobId> callback) {
-        this.jobId = jobId;
+        this.jobIdFactory = jobIdFactory;
         this.ccs = ccs;
         this.acggfBytes = acggfBytes;
         this.callback = callback;
@@ -51,6 +52,7 @@ public class DistributeJobWork extends SynchronizableWork {
     @Override
     protected void doRun() throws Exception {
         try {
+            JobId jobId = jobIdFactory.create();
             final CCServiceContext ccServiceCtx = ccs.getContext();
             ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId);
             IActivityClusterGraphGeneratorFactory acggf =

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/41b4519e/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index e083d2a..ed82705 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.job.IActivityClusterGraphGenerator;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.application.CCServiceContext;
 import org.apache.hyracks.control.cc.job.IJobManager;
@@ -38,19 +39,19 @@ public class JobStartWork extends SynchronizableWork {
     private final byte[] acggfBytes;
     private final EnumSet<JobFlag> jobFlags;
     private final DeploymentId deploymentId;
-    private final JobId jobId;
+    private final JobId preDistributedJobId;
     private final IResultCallback<JobId> callback;
-    private final boolean predestributed;
+    private final JobIdFactory jobIdFactory;
 
     public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes,
-            EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, boolean predestributed) {
+            EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, JobIdFactory jobIdFactory) {
         this.deploymentId = deploymentId;
-        this.jobId = jobId;
+        this.preDistributedJobId = jobId;
         this.ccs = ccs;
         this.acggfBytes = acggfBytes;
         this.jobFlags = jobFlags;
         this.callback = callback;
-        this.predestributed = predestributed;
+        this.jobIdFactory = jobIdFactory;
     }
 
     @Override
@@ -58,8 +59,10 @@ public class JobStartWork extends SynchronizableWork {
         IJobManager jobManager = ccs.getJobManager();
         try {
             final CCServiceContext ccServiceCtx = ccs.getContext();
+            JobId jobId;
             JobRun run;
-            if (!predestributed) {
+            if (preDistributedJobId == null) {
+                jobId = jobIdFactory.create();
                 //Need to create the ActivityClusterGraph
                 IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
                         .deserialize(acggfBytes, deploymentId, ccServiceCtx);
@@ -67,6 +70,7 @@ public class JobStartWork extends SynchronizableWork {
                         acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, jobFlags);
                 run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
             } else {
+                jobId = preDistributedJobId;
                 //ActivityClusterGraph has already been distributed
                 run = new JobRun(ccs, deploymentId, jobId, jobFlags,
                         ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/41b4519e/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index 713cf96..f1d9a4d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -18,12 +18,15 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import java.util.Collections;
 import java.util.List;
 
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.IJobManager;
-import org.apache.hyracks.control.cc.job.IJobStatusConditionVariable;
+import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 
@@ -41,13 +44,13 @@ public class WaitForJobCompletionWork extends SynchronizableWork {
     @Override
     protected void doRun() throws Exception {
         IJobManager jobManager = ccs.getJobManager();
-        final IJobStatusConditionVariable cRunningVar = jobManager.get(jobId);
-        if (cRunningVar != null) {
+        final JobRun jobRun = jobManager.get(jobId);
+        if (jobRun != null) {
             ccs.getExecutor().execute(new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        cRunningVar.waitForCompletion();
+                        jobRun.waitForCompletion();
                         callback.setValue(null);
                     } catch (Exception e) {
                         callback.setException(e);
@@ -55,18 +58,28 @@ public class WaitForJobCompletionWork extends SynchronizableWork {
                 }
             });
         } else {
-            final List<Exception> exceptions = jobManager.getExceptionHistory(jobId);
-            ccs.getExecutor().execute(new Runnable() {
-                @Override
-                public void run() {
+            // Couldn't find jobRun
+            List<Exception> exceptionHistory = jobManager.getExceptionHistory(jobId);
+            List<Exception> exceptions;
+            if (exceptionHistory == null) {
+                // couldn't be found
+                long maxJobId = ccs.getJobIdFactory().maxJobId();
+                exceptions = Collections.singletonList(jobId.getId() <= maxJobId
+                        ? HyracksDataException.create(ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY, jobId)
+                        : HyracksDataException.create(ErrorCode.JOB_HAS_NOT_BEEN_CREATED_YET, jobId));
+
+            } else {
+                exceptions = exceptionHistory;
+            }
+            ccs.getExecutor().execute(() -> {
+                if (!exceptions.isEmpty()) {
+                    /**
+                     * only report the first exception because IResultCallback will only throw one exception
+                     * anyway
+                     */
+                    callback.setException(exceptions.get(0));
+                } else {
                     callback.setValue(null);
-                    if (exceptions != null && !exceptions.isEmpty()) {
-                        /**
-                         * only report the first exception because IResultCallback will only throw one exception
-                         * anyway
-                         */
-                        callback.setException(exceptions.get(0));
-                    }
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/41b4519e/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 18479e2..21c1e77 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -62,13 +62,13 @@ public abstract class AbstractMultiNCIntegrationTest {
     public static final String[] ASTERIX_IDS =
             { "asterix-001", "asterix-002", "asterix-003", "asterix-004", "asterix-005", "asterix-006", "asterix-007" };
 
-    private static ClusterControllerService cc;
+    protected static ClusterControllerService cc;
 
-    private static NodeControllerService[] asterixNCs;
+    protected static NodeControllerService[] asterixNCs;
 
-    private static IHyracksClientConnection hcc;
+    protected static IHyracksClientConnection hcc;
 
-    private final List<File> outputFiles;
+    protected final List<File> outputFiles;
 
     public AbstractMultiNCIntegrationTest() {
         outputFiles = new ArrayList<>();
@@ -82,6 +82,7 @@ public abstract class AbstractMultiNCIntegrationTest {
         ccConfig.setClusterListenAddress("127.0.0.1");
         ccConfig.setClusterListenPort(39001);
         ccConfig.setProfileDumpPeriod(10000);
+        ccConfig.setJobHistorySize(2);
         File outDir = new File("target" + File.separator + "ClusterController");
         outDir.mkdirs();
         File ccRoot = File.createTempFile(AbstractMultiNCIntegrationTest.class.getName(), ".data", outDir);
@@ -186,24 +187,30 @@ public abstract class AbstractMultiNCIntegrationTest {
                 readSize = reader.read(resultFrame);
             }
         }
+        waitForCompletion(jobId, expectedErrorMessage);
+        // Waiting a second time should lead to the same behavior
+        waitForCompletion(jobId, expectedErrorMessage);
+        dumpOutputFiles();
+    }
+
+    protected void waitForCompletion(JobId jobId, String expectedErrorMessage) throws Exception {
         boolean expectedExceptionThrown = false;
         try {
             hcc.waitForCompletion(jobId);
-        } catch (HyracksDataException hde) {
+        } catch (Exception e) {
             if (expectedErrorMessage != null) {
-                if (hde.toString().contains(expectedErrorMessage)) {
+                if (e.toString().contains(expectedErrorMessage)) {
                     expectedExceptionThrown = true;
                 } else {
-                    throw hde;
+                    throw e;
                 }
             } else {
-                throw hde;
+                throw e;
             }
         }
         if (expectedErrorMessage != null && !expectedExceptionThrown) {
             throw new Exception("Expected error (" + expectedErrorMessage + ") was not thrown");
         }
-        dumpOutputFiles();
     }
 
     private void dumpOutputFiles() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/41b4519e/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 13a103e..34b1480 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.tests.integration;
 
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -32,9 +33,27 @@ public class JobFailureTest extends AbstractMultiNCIntegrationTest {
 
     @Test
     public void failureOnCreatePushRuntime() throws Exception {
-        for (int round = 0; round < 100; ++round) {
+        JobId jobId = new JobId(0); // First job
+        for (int i = 0; i < 20; i++) {
             execTest();
+            if (i == 0) {
+                // passes. read from job archive
+                waitForCompletion(jobId, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
+            }
         }
+        // passes. read from job history
+        waitForCompletion(jobId, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
+        for (int i = 0; i < 300; i++) {
+            execTest();
+        }
+        // passes. history has been cleared
+        waitForCompletion(jobId, "has been cleared from job history");
+    }
+
+    @Test
+    public void waitForNonExistingJob() throws Exception {
+        JobId jobId = new JobId(Long.MAX_VALUE);
+        waitForCompletion(jobId, "has not been created yet");
     }
 
     private void execTest() throws Exception {