You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2017/10/01 20:49:02 UTC

[2/2] asterixdb git commit: [NO ISSUE][RT] Abort CC jobs on first time registration

[NO ISSUE][RT] Abort CC jobs on first time registration

- user model changes: no
- storage format changes: no
- interface changes: yes
  - application context is in charge of providing and renewing cc client
    connection.

details:
- This change allows revival of cc if it gets killed.
- Jobs that were started by this cc are aborted and cleaned up on all ncs
  upon first time registration.
- client connections are repaired on ncs when dead connection
  is detected.

Change-Id: If755b7131bdc91790ed28be66f0c61b51f28c2fa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2026
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/756daf5c
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/756daf5c
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/756daf5c

Branch: refs/heads/master
Commit: 756daf5cac4a228bf5534628c96ad95f5d4edf79
Parents: 29a34df
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Sun Oct 1 09:26:59 2017 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Sun Oct 1 13:48:46 2017 -0700

----------------------------------------------------------------------
 .../http/server/AbstractQueryApiServlet.java    | 32 +++++++---
 .../api/http/server/NCQueryServiceServlet.java  | 18 +++---
 .../api/http/server/QueryResultApiServlet.java  |  3 +-
 .../asterix/app/nc/NCAppRuntimeContext.java     | 26 +++++++-
 .../hyracks/bootstrap/NCApplication.java        | 21 ++-----
 .../asterix/common/api/IApplicationContext.java | 14 ++++-
 .../common/dataflow/ICcApplicationContext.java  |  6 --
 .../runtime/utils/CcApplicationContext.java     | 15 ++++-
 .../HyracksClientInterfaceRemoteProxy.java      | 19 +++---
 .../hyracks/api/client/HyracksConnection.java   | 24 +++++++-
 .../api/client/IHyracksClientConnection.java    | 64 ++++++++++++--------
 .../api/client/IHyracksClientInterface.java     |  2 +
 .../hyracks/api/exceptions/ErrorCode.java       |  2 +-
 .../client/dataset/HyracksDatasetReader.java    |  2 +-
 .../hyracks/control/cc/NodeControllerState.java | 12 ++--
 .../hyracks/control/cc/cluster/NodeManager.java | 11 ++++
 .../cc/dataset/DatasetDirectoryService.java     |  2 +-
 .../control/cc/work/RegisterNodeWork.java       |  4 +-
 .../RegisterResultPartitionLocationWork.java    | 32 +++++++---
 .../control/cc/cluster/NodeManagerTest.java     | 26 ++++++--
 .../hyracks/control/cc/job/JobManagerTest.java  |  4 +-
 .../control/common/ipc/CCNCFunctions.java       | 23 +++++--
 .../common/ipc/ControllerRemoteProxy.java       | 44 ++++++++------
 .../hyracks/control/nc/NodeControllerIPCI.java  | 18 +++---
 .../control/nc/NodeControllerService.java       | 16 ++---
 .../control/nc/work/AbortAllJobsWork.java       | 61 +++++++++++++++++++
 .../control/nc/work/AbortAllTasksWork.java      | 59 ------------------
 .../integration/PredistributedJobsTest.java     | 15 ++++-
 .../apache/hyracks/ipc/api/RPCInterface.java    |  2 +-
 29 files changed, 371 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index c38b3fc..3912bd5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -24,6 +24,8 @@ import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASE
 import java.io.PrintWriter;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.IApplicationContext;
@@ -33,8 +35,10 @@ import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.client.dataset.HyracksDataset;
 import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.ipc.exceptions.IPCException;
 
 public class AbstractQueryApiServlet extends AbstractServlet {
+    private static final Logger LOGGER = Logger.getLogger(AbstractQueryApiServlet.class.getName());
     protected final IApplicationContext appCtx;
 
     public enum ResultFields {
@@ -99,9 +103,19 @@ public class AbstractQueryApiServlet extends AbstractServlet {
     }
 
     protected IHyracksDataset getHyracksDataset() throws Exception { // NOSONAR
-        synchronized (ctx) {
-            IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
-            if (hds == null) {
+        try {
+            return doGetHyracksDataset();
+        } catch (IPCException e) {
+            LOGGER.log(Level.WARNING, "Failed getting hyracks dataset connection. Resetting hyracks connection.", e);
+            ctx.put(HYRACKS_CONNECTION_ATTR, appCtx.getHcc());
+            return doGetHyracksDataset();
+        }
+    }
+
+    protected IHyracksDataset doGetHyracksDataset() throws Exception {
+        IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
+        if (hds == null) {
+            synchronized (ctx) {
                 hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
                 if (hds == null) {
                     hds = new HyracksDataset(getHyracksClientConnection(),
@@ -109,18 +123,16 @@ public class AbstractQueryApiServlet extends AbstractServlet {
                     ctx.put(HYRACKS_DATASET_ATTR, hds);
                 }
             }
-            return hds;
         }
+        return hds;
     }
 
     protected IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
-        synchronized (ctx) {
-            final IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
-            if (hcc == null) {
-                throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR);
-            }
-            return hcc;
+        IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+        if (hcc == null) {
+            throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR);
         }
+        return hcc;
     }
 
     protected static UUID printRequestId(PrintWriter pw) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index da06dd1..616c22e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -36,6 +36,7 @@ import org.apache.asterix.common.api.Duration;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.MessageFuture;
@@ -45,6 +46,7 @@ import org.apache.asterix.translator.SessionOutput;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.ipc.exceptions.IPCException;
@@ -64,9 +66,9 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
     }
 
     @Override
-    protected void executeStatement(String statementsText,
-            SessionOutput sessionOutput, IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats,
-            RequestParameters param, long[] outExecStartEnd, Map<String, String> optionalParameters) throws Exception {
+    protected void executeStatement(String statementsText, SessionOutput sessionOutput,
+            IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param,
+            long[] outExecStartEnd, Map<String, String> optionalParameters) throws Exception {
         // Running on NC -> send 'execute' message to CC
         INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
         INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
@@ -83,10 +85,9 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
             if (param.timeout != null) {
                 timeout = TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.timeout));
             }
-            ExecuteStatementRequestMessage requestMsg =
-                    new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage,
-                            statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl,
-                            optionalParameters);
+            ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(),
+                    responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), ccDelivery,
+                    param.clientContextID, handleUrl, optionalParameters);
             outExecStartEnd[0] = System.nanoTime();
             ncMb.sendMessageToCC(requestMsg);
             try {
@@ -148,7 +149,8 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
 
     @Override
     protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
-        if (t instanceof IPCException || t instanceof TimeoutException) {
+        if (t instanceof TimeoutException
+                || (t instanceof HyracksDataException && ExceptionUtils.getRootCause(t) instanceof IPCException)) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.toString(), t);
             return HttpResponseStatus.SERVICE_UNAVAILABLE;
         } else {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 901aff8..cce5099 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -61,7 +61,6 @@ public class QueryResultApiServlet extends AbstractQueryApiServlet {
         IHyracksDataset hds = getHyracksDataset();
         ResultReader resultReader = new ResultReader(hds, handle.getJobId(), handle.getResultSetId());
 
-
         try {
             DatasetJobRecord.Status status = resultReader.getStatus();
 
@@ -98,7 +97,7 @@ public class QueryResultApiServlet extends AbstractQueryApiServlet {
             ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null);
         } catch (HyracksDataException e) {
             final int errorCode = e.getErrorCode();
-            if (ErrorCode.NO_RESULTSET == errorCode) {
+            if (ErrorCode.NO_RESULT_SET == errorCode) {
                 LOGGER.log(Level.INFO, "No results for: \"" + strHandle + "\"");
                 response.setStatus(HttpResponseStatus.NOT_FOUND);
                 return;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index eedc8ec..3e0f2c6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -76,10 +76,14 @@ import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -119,14 +123,11 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     private IBufferCache bufferCache;
     private ITransactionSubsystem txnSubsystem;
     private IMetadataNode metadataNodeStub;
-
     private ILSMIOOperationScheduler lsmIOScheduler;
     private PersistentLocalResourceRepository localResourceRepository;
     private IIOManager ioManager;
     private boolean isShuttingdown;
-
     private ActiveManager activeManager;
-
     private IReplicationChannel replicationChannel;
     private IReplicationManager replicationManager;
     private IRemoteRecoveryManager remoteRecoveryManager;
@@ -134,6 +135,7 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     private final ILibraryManager libraryManager;
     private final NCExtensionManager ncExtensionManager;
     private final IStorageComponentProvider componentProvider;
+    private IHyracksClientConnection hcc;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions)
             throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
@@ -485,4 +487,22 @@ public class NCAppRuntimeContext implements INcApplicationContext {
     public INCServiceContext getServiceContext() {
         return ncServiceContext;
     }
+
+    @Override
+    public IHyracksClientConnection getHcc() throws HyracksDataException {
+        if (hcc == null || !hcc.isConnected()) {
+            synchronized (this) {
+                if (hcc == null || !hcc.isConnected()) {
+                    try {
+                        NodeControllerService ncSrv = (NodeControllerService) ncServiceContext.getControllerService();
+                        ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo();
+                        hcc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort());
+                    } catch (Exception e) {
+                        throw HyracksDataException.create(e);
+                    }
+                }
+            }
+        }
+        return hcc;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index e8f63b4..3d7f870 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -48,9 +48,6 @@ import org.apache.asterix.messaging.NCMessageBroker;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.client.ClusterControllerInfo;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
@@ -81,7 +78,7 @@ public class NCApplication extends BaseNCApplication {
 
     @Override
     public void init(IServiceContext serviceCtx) throws Exception {
-        this.ncServiceCtx = (INCServiceContext) serviceCtx;
+        ncServiceCtx = (INCServiceContext) serviceCtx;
         ncServiceCtx.setThreadFactory(
                 new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), ncServiceCtx.getLifeCycleComponentManager()));
     }
@@ -103,7 +100,7 @@ public class NCApplication extends BaseNCApplication {
             System.setProperty("java.rmi.server.hostname",
                     (controllerService).getConfiguration().getClusterPublicAddress());
         }
-        runtimeContext = new NCAppRuntimeContext(this.ncServiceCtx, getExtensions());
+        runtimeContext = new NCAppRuntimeContext(ncServiceCtx, getExtensions());
         MetadataProperties metadataProperties = runtimeContext.getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(this.ncServiceCtx.getNodeId())) {
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -115,8 +112,8 @@ public class NCApplication extends BaseNCApplication {
         MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
         IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
         this.ncServiceCtx.setMessageBroker(messageBroker);
-        MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory(
-                (NCMessageBroker) messageBroker, messagingProperties);
+        MessagingChannelInterfaceFactory interfaceFactory =
+                new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties);
         this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
 
         IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -228,8 +225,8 @@ public class NCApplication extends BaseNCApplication {
         String[] ioDevices = ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
                 .getStorageMountingPoints();
         for (String ioDevice : ioDevices) {
-            String tempDatasetsDir = ioDevice + storageDirName + File.separator
-                    + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
+            String tempDatasetsDir =
+                    ioDevice + storageDirName + File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
             File tmpDsDir = new File(tempDatasetsDir);
             if (tmpDsDir.exists()) {
                 IoUtil.delete(tmpDsDir);
@@ -307,10 +304,4 @@ public class NCApplication extends BaseNCApplication {
             return devices.get(ioDeviceIndex);
         };
     }
-
-    protected IHyracksClientConnection getHcc() throws Exception {
-        NodeControllerService ncSrv = (NodeControllerService) ncServiceCtx.getControllerService();
-        ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo();
-        return new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort());
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index e1840d3..4e30d54 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -30,6 +30,8 @@ import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IApplicationContext {
 
@@ -56,7 +58,17 @@ public interface IApplicationContext {
     /**
      * @return the library manager which implements {@link org.apache.asterix.common.library.ILibraryManager}
      */
-    public ILibraryManager getLibraryManager();
+    ILibraryManager getLibraryManager();
 
+    /**
+     * @return the service context
+     */
     IServiceContext getServiceContext();
+
+    /**
+     * @return a connected instance of {@link IHyracksClientConnection}
+     * @throws HyracksDataException
+     *             if connection couldn't be established to cluster controller
+     */
+    IHyracksClientConnection getHcc() throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 20f685a..690d1fd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -27,7 +27,6 @@ import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.storage.common.IStorageManager;
 
@@ -71,11 +70,6 @@ public interface ICcApplicationContext extends IApplicationContext {
     IJobLifecycleListener getActiveNotificationHandler();
 
     /**
-     * @return a new instance of {@link IHyracksClientConnection}
-     */
-    IHyracksClientConnection getHcc();
-
-    /**
      * @return the cluster wide resource id manager
      */
     IResourceIdManager getResourceIdManager();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index e4cc7f4..855031e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -45,7 +45,9 @@ import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.asterix.runtime.transaction.ResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.storage.common.IStorageManager;
 
@@ -155,7 +157,18 @@ public class CcApplicationContext implements ICcApplicationContext {
     }
 
     @Override
-    public IHyracksClientConnection getHcc() {
+    public IHyracksClientConnection getHcc() throws HyracksDataException {
+        if (!hcc.isConnected()) {
+            synchronized (this) {
+                if (!hcc.isConnected()) {
+                    try {
+                        hcc = new HyracksConnection(hcc.getHost(), hcc.getPort());
+                    } catch (Exception e) {
+                        throw HyracksDataException.create(e);
+                    }
+                }
+            }
+        }
         return hcc;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 0142c7d..0ded84f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -70,8 +70,8 @@ public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterfac
 
     @Override
     public void cancelJob(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.CancelJobFunction cjf = new HyracksClientInterfaceFunctions.CancelJobFunction(
-                jobId);
+        HyracksClientInterfaceFunctions.CancelJobFunction cjf =
+                new HyracksClientInterfaceFunctions.CancelJobFunction(jobId);
         rpci.call(ipcHandle, cjf);
     }
 
@@ -84,8 +84,8 @@ public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterfac
 
     @Override
     public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception {
-        HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
-                deploymentId, acggfBytes, jobFlags);
+        HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                new HyracksClientInterfaceFunctions.StartJobFunction(deploymentId, acggfBytes, jobFlags);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
@@ -165,8 +165,8 @@ public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterfac
             }
         }
         if (ipcHandle.isConnected()) {
-            throw new IPCException("CC refused to release connection after " + SHUTDOWN_CONNECTION_TIMEOUT_SECS
-                    + " seconds");
+            throw new IPCException(
+                    "CC refused to release connection after " + SHUTDOWN_CONNECTION_TIMEOUT_SECS + " seconds");
         }
     }
 
@@ -181,6 +181,11 @@ public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterfac
     public String getThreadDump(String node) throws Exception {
         HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
                 new HyracksClientInterfaceFunctions.ThreadDumpFunction(node);
-        return (String)rpci.call(ipcHandle, tdf);
+        return (String) rpci.call(ipcHandle, tdf);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return ipcHandle.isConnected();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 75cbf61..e979da6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -44,7 +44,6 @@ import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
-import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.RPCInterface;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
@@ -58,6 +57,8 @@ import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeseri
 public final class HyracksConnection implements IHyracksClientConnection {
     private final String ccHost;
 
+    private final int ccPort;
+
     private final IPCSystem ipc;
 
     private final IHyracksClientInterface hci;
@@ -77,11 +78,11 @@ public final class HyracksConnection implements IHyracksClientConnection {
      */
     public HyracksConnection(String ccHost, int ccPort) throws Exception {
         this.ccHost = ccHost;
+        this.ccPort = ccPort;
         RPCInterface rpci = new RPCInterface();
         ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
         ipc.start();
-        IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
-        this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle, rpci);
+        hci = new HyracksClientInterfaceRemoteProxy(ipc.getHandle(new InetSocketAddress(ccHost, ccPort)), rpci);
         ccInfo = hci.getClusterControllerInfo();
     }
 
@@ -124,6 +125,7 @@ public final class HyracksConnection implements IHyracksClientConnection {
         return hci.startJob(jobId);
     }
 
+    @Override
     public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception {
         return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
     }
@@ -132,6 +134,7 @@ public final class HyracksConnection implements IHyracksClientConnection {
         return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
     }
 
+    @Override
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
         return hci.getDatasetDirectoryServiceInfo();
     }
@@ -242,4 +245,19 @@ public final class HyracksConnection implements IHyracksClientConnection {
     public String getThreadDump(String node) throws Exception {
         return hci.getThreadDump(node);
     }
+
+    @Override
+    public String getHost() {
+        return ccHost;
+    }
+
+    @Override
+    public int getPort() {
+        return ccPort;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return hci.isConnected();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 0956d85..0189135 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -44,7 +44,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      * @return {@link JobStatus}
      * @throws Exception
      */
-    public JobStatus getJobStatus(JobId jobId) throws Exception;
+    JobStatus getJobStatus(JobId jobId) throws Exception;
 
     /**
      * Gets detailed information about the specified Job.
@@ -54,7 +54,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      * @return {@link JobStatus}
      * @throws Exception
      */
-    public JobInfo getJobInfo(JobId jobId) throws Exception;
+    JobInfo getJobInfo(JobId jobId) throws Exception;
 
     /**
      * Cancel the job that has the given job id.
@@ -63,7 +63,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      *            the JobId of the Job
      * @throws Exception
      */
-    public void cancelJob(JobId jobId) throws Exception;
+    void cancelJob(JobId jobId) throws Exception;
 
     /**
      * Start the specified Job.
@@ -72,7 +72,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      *            Job Specification
      * @throws Exception
      */
-    public JobId startJob(JobSpecification jobSpec) throws Exception;
+    JobId startJob(JobSpecification jobSpec) throws Exception;
 
     /**
      * Start the specified Job.
@@ -83,7 +83,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      *            Flags
      * @throws Exception
      */
-    public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+    JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
 
     /**
      * Distribute the specified Job.
@@ -94,7 +94,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      *            Flags
      * @throws Exception
      */
-    public JobId distributeJob(JobSpecification jobSpec) throws Exception;
+    JobId distributeJob(JobSpecification jobSpec) throws Exception;
 
     /**
      * Destroy the distributed graph for a pre-distributed job
@@ -103,7 +103,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      *            The id of the predistributed job
      * @throws Exception
      */
-    public JobId destroyJob(JobId jobId) throws Exception;
+    JobId destroyJob(JobId jobId) throws Exception;
 
     /**
      * Used to run a pre-distributed job by id (the same JobId will be returned)
@@ -112,7 +112,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      *            The id of the predistributed job
      * @throws Exception
      */
-    public JobId startJob(JobId jobId) throws Exception;
+    JobId startJob(JobId jobId) throws Exception;
 
     /**
      * Start the specified Job.
@@ -123,7 +123,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      *            Flags
      * @throws Exception
      */
-    public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception;
+    JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception;
 
     /**
      * Gets the IP Address and port for the DatasetDirectoryService wrapped in NetworkAddress
@@ -131,7 +131,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      * @return {@link NetworkAddress}
      * @throws Exception
      */
-    public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
+    NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
 
     /**
      * Waits until the specified job has completed, either successfully or has
@@ -141,8 +141,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      *            JobId of the Job
      * @throws Exception
      */
-    public void waitForCompletion(JobId jobId) throws Exception;
-
+    void waitForCompletion(JobId jobId) throws Exception;
 
     /**
      * Deploy the user-defined jars to the cluster
@@ -150,7 +149,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      * @param jars
      *            a list of user-defined jars
      */
-    public DeploymentId deployBinary(List<String> jars) throws Exception;
+    DeploymentId deployBinary(List<String> jars) throws Exception;
 
     /**
      * undeploy a certain deployment
@@ -158,7 +157,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      * @param deploymentId
      *            the id for the deployment to be undeployed
      */
-    public void unDeployBinary(DeploymentId deploymentId) throws Exception;
+    void unDeployBinary(DeploymentId deploymentId) throws Exception;
 
     /**
      * Start the specified Job.
@@ -169,7 +168,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      *            Job Specification
      * @throws Exception
      */
-    public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception;
+    JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception;
 
     /**
      * Start the specified Job.
@@ -182,8 +181,7 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      *            Flags
      * @throws Exception
      */
-    public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
-            throws Exception;
+    JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
 
     /**
      * Start the specified Job.
@@ -196,27 +194,45 @@ public interface IHyracksClientConnection extends IClusterInfoCollector {
      *            Flags
      * @throws Exception
      */
-    public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
-            EnumSet<JobFlag> jobFlags) throws Exception;
+    JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+            throws Exception;
 
     /**
      * Shuts down all NCs and then the CC.
+     *
      * @param terminateNCService
      */
-    public void stopCluster(boolean terminateNCService) throws Exception;
+    void stopCluster(boolean terminateNCService) throws Exception;
 
     /**
      * Get details of specified node as JSON object
+     *
      * @param nodeId
-     *              id the subject node
+     *            id the subject node
      * @param includeStats
-     * @param includeConfig @return serialized JSON containing the node details
+     * @param includeConfig
+     * @return serialized JSON containing the node details
      * @throws Exception
      */
-    public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception;
+    String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception;
 
     /**
      * Gets thread dump from the specified node as a serialized JSON string
      */
-    public String getThreadDump(String node) throws Exception;
+    String getThreadDump(String node) throws Exception;
+
+    /**
+     * @return true if the connection is alive, false otherwise
+     */
+    boolean isConnected();
+
+    /**
+     * @return the hostname of the cluster controller
+     */
+    String getHost();
+
+    /**
+     * @return the port of the cluster controller
+     */
+    int getPort();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 1afbe9e..9cebd3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -67,4 +67,6 @@ public interface IHyracksClientInterface {
     public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception;
 
     public String getThreadDump(String node) throws Exception;
+
+    public boolean isConnected();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 68e7cd1..4bb2869 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -57,7 +57,7 @@ public class ErrorCode {
     public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21;
     public static final int DUPLICATE_DISTRIBUTED_JOB = 22;
     public static final int DISTRIBUTED_JOB_FAILURE = 23;
-    public static final int NO_RESULTSET = 24;
+    public static final int NO_RESULT_SET = 24;
     public static final int JOB_CANCELED = 25;
     public static final int NODE_FAILED = 26;
     public static final int FILE_IS_NOT_DIRECTORY = 27;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
index fdac7f1..31fd379 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
@@ -93,7 +93,7 @@ public class HyracksDatasetReader implements IHyracksDatasetReader {
         try {
             return datasetDirectoryServiceConnection.getDatasetResultStatus(jobId, resultSetId);
         } catch (HyracksDataException e) {
-            if (e.getErrorCode() != ErrorCode.NO_RESULTSET) {
+            if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) {
                 LOGGER.log(Level.WARNING, "Exception retrieving result set for job " + jobId, e);
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 8400a59..7be6524 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -28,12 +28,12 @@ import java.util.Set;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
-import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -41,7 +41,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 public class NodeControllerState {
     private static final int RRD_SIZE = 720;
 
-    private final INodeController nodeController;
+    private final NodeControllerRemoteProxy nodeController;
 
     private final NCConfig ncConfig;
 
@@ -145,7 +145,7 @@ public class NodeControllerState {
 
     private NodeCapacity capacity;
 
-    public NodeControllerState(INodeController nodeController, NodeRegistration reg) {
+    public NodeControllerState(NodeControllerRemoteProxy nodeController, NodeRegistration reg) {
         this.nodeController = nodeController;
         ncConfig = reg.getNCConfig();
         dataPort = reg.getDataPort();
@@ -251,7 +251,7 @@ public class NodeControllerState {
         return lastHeartbeatDuration++;
     }
 
-    public INodeController getNodeController() {
+    public NodeControllerRemoteProxy getNodeController() {
         return nodeController;
     }
 
@@ -279,7 +279,7 @@ public class NodeControllerState {
         return capacity;
     }
 
-    public synchronized ObjectNode toSummaryJSON()  {
+    public synchronized ObjectNode toSummaryJSON() {
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
         o.put("node-id", ncConfig.getNodeId());
@@ -289,7 +289,7 @@ public class NodeControllerState {
         return o;
     }
 
-    public synchronized ObjectNode toDetailedJSON(boolean includeStats, boolean includeConfig)  {
+    public synchronized ObjectNode toDetailedJSON(boolean includeStats, boolean includeConfig) {
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 8cca1e0..2d43d42 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -44,6 +45,9 @@ import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.scheduler.IResourceManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortCCJobsFunction;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
 
 public class NodeManager implements INodeManager {
     private static final Logger LOGGER = Logger.getLogger(NodeManager.class.getName());
@@ -93,6 +97,13 @@ public class NodeManager implements INodeManager {
             LOGGER.warning(
                     "Node with name " + nodeId + " has already registered; failing the node then re-registering.");
             removeDeadNode(nodeId);
+        } else {
+            try {
+                IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
+                ncIPCHandle.send(-1, new AbortCCJobsFunction(), null);
+            } catch (IPCException e) {
+                throw HyracksDataException.create(e);
+            }
         }
         LOGGER.warning("adding node to registry");
         nodeRegistry.put(nodeId, ncState);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 7a9306c..ca1c91b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -107,7 +107,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
     private DatasetJobRecord getNonNullDatasetJobRecord(JobId jobId) throws HyracksDataException {
         final DatasetJobRecord djr = getDatasetJobRecord(jobId);
         if (djr == null) {
-            throw HyracksDataException.create(ErrorCode.NO_RESULTSET, jobId);
+            throw HyracksDataException.create(ErrorCode.NO_RESULT_SET, jobId);
         }
         return djr;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index bf0846f..5866ba5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -28,7 +28,6 @@ import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
@@ -55,7 +54,8 @@ public class RegisterNodeWork extends SynchronizableWork {
         Map<IOption, Object> ncConfiguration = new HashMap<>();
         try {
             LOGGER.log(Level.WARNING, "Registering INodeController: id = " + id);
-            INodeController nc = new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
+            NodeControllerRemoteProxy nc =
+                    new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
             NodeControllerState state = new NodeControllerState(nc, reg);
             INodeManager nodeManager = ccs.getNodeManager();
             nodeManager.addNode(id, state);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index f51dd06..7117b6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -18,14 +18,24 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.common.work.NoOpCallback;
 
 public class RegisterResultPartitionLocationWork extends AbstractWork {
+
+    private static final Logger LOGGER = Logger.getLogger(RegisterResultPartitionLocationWork.class.getName());
+
     private final ClusterControllerService ccs;
 
     private final JobId jobId;
@@ -43,8 +53,7 @@ public class RegisterResultPartitionLocationWork extends AbstractWork {
     private final NetworkAddress networkAddress;
 
     public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
-            boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress
-            networkAddress) {
+            boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
         this.ccs = ccs;
         this.jobId = jobId;
         this.rsId = rsId;
@@ -58,17 +67,24 @@ public class RegisterResultPartitionLocationWork extends AbstractWork {
     @Override
     public void run() {
         try {
-            ccs.getDatasetDirectoryService()
-                    .registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult, partition, nPartitions,
-                            networkAddress);
+            ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
+                    partition, nPartitions, networkAddress);
         } catch (HyracksDataException e) {
-            throw new RuntimeException(e);
+            LOGGER.log(Level.WARNING, "Failed to register partition location", e);
+            // Should fail the job if exists on cc, otherwise, do nothing
+            JobRun jobRun = ccs.getJobManager().get(jobId);
+            if (jobRun != null) {
+                List<Exception> exceptions = new ArrayList<>();
+                exceptions.add(e);
+                jobRun.getExecutor().abortJob(exceptions, NoOpCallback.INSTANCE);
+            }
         }
     }
 
     @Override
     public String toString() {
-        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions
-                + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult + " EmptyResult@" + emptyResult;
+        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@"
+                + nPartitions + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult
+                + " EmptyResult@" + emptyResult;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index fa6580e..9b9a3b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -27,13 +27,19 @@ import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.scheduler.IResourceManager;
 import org.apache.hyracks.control.cc.scheduler.ResourceManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class NodeManagerTest {
 
@@ -43,9 +49,9 @@ public class NodeManagerTest {
     private static final String NODE2 = "node2";
 
     @Test
-    public void testNormal() throws HyracksException {
+    public void testNormal() throws HyracksException, IPCException {
         IResourceManager resourceManager = new ResourceManager();
-        INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager);
+        INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager);
         NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
         NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
 
@@ -68,9 +74,9 @@ public class NodeManagerTest {
     }
 
     @Test
-    public void testException() throws HyracksException {
+    public void testException() throws HyracksException, IPCException {
         IResourceManager resourceManager = new ResourceManager();
-        INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager);
+        INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager);
         NodeControllerState ncState1 = mockNodeControllerState(NODE1, true);
 
         boolean invalidNetworkAddress = false;
@@ -86,6 +92,16 @@ public class NodeManagerTest {
         verifyEmptyCluster(resourceManager, nodeManager);
     }
 
+    private ClusterControllerService mockCcs() throws IPCException {
+        ClusterControllerService ccs = Mockito.mock(ClusterControllerService.class);
+        IPCSystem ipcSystem = Mockito.mock(IPCSystem.class);
+        IIPCHandle ipcHandle = Mockito.mock(IIPCHandle.class);
+        Mockito.when(ccs.getClusterIPC()).thenReturn(ipcSystem);
+        Mockito.when(ipcSystem.getHandle(Mockito.any())).thenReturn(ipcHandle);
+        Mockito.when(ipcSystem.getHandle(Mockito.any(), Mockito.anyInt())).thenReturn(ipcHandle);
+        return ccs;
+    }
+
     @Test
     public void testNullNode() throws HyracksException {
         IResourceManager resourceManager = new ResourceManager();
@@ -112,6 +128,7 @@ public class NodeManagerTest {
 
     private NodeControllerState mockNodeControllerState(String nodeId, boolean invalidIpAddr) {
         NodeControllerState ncState = mock(NodeControllerState.class);
+        NodeControllerRemoteProxy ncProxy = Mockito.mock(NodeControllerRemoteProxy.class);
         String ipAddr = invalidIpAddr ? "255.255.255:255" : "127.0.0.2";
         NetworkAddress dataAddr = new NetworkAddress(ipAddr, 1001);
         NetworkAddress resultAddr = new NetworkAddress(ipAddr, 1002);
@@ -123,6 +140,7 @@ public class NodeManagerTest {
         NCConfig ncConfig = new NCConfig(nodeId);
         ncConfig.setDataPublicAddress(ipAddr);
         when(ncState.getNCConfig()).thenReturn(ncConfig);
+        Mockito.when(ncState.getNodeController()).thenReturn(ncProxy);
         return ncState;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 0d46d64..251aed8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -45,8 +45,8 @@ import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.application.CCServiceContext;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.cc.cluster.NodeManager;
-import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 import org.apache.hyracks.control.common.logs.LogFile;
 import org.apache.hyracks.control.common.work.NoOpCallback;
 import org.junit.Assert;
@@ -293,7 +293,7 @@ public class JobManagerTest {
     private INodeManager mockNodeManager() {
         INodeManager nodeManager = mock(NodeManager.class);
         NodeControllerState ncState = mock(NodeControllerState.class);
-        INodeController nodeController = mock(INodeController.class);
+        NodeControllerRemoteProxy nodeController = mock(NodeControllerRemoteProxy.class);
         when(nodeManager.getNodeControllerState(any())).thenReturn(ncState);
         when(ncState.getNodeController()).thenReturn(nodeController);
         return nodeManager;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index d42c4a8..4797ed7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -87,6 +87,7 @@ public class CCNCFunctions {
         NODE_REGISTRATION_RESULT,
         START_TASKS,
         ABORT_TASKS,
+        ABORT_ALL_JOBS,
         CLEANUP_JOBLET,
         REPORT_PARTITION_AVAILABILITY,
         SEND_APPLICATION_MESSAGE,
@@ -665,6 +666,16 @@ public class CCNCFunctions {
         }
     }
 
+    //TODO: Add CC id to this job to only abort jobs by this CC: https://issues.apache.org/jira/browse/ASTERIXDB-2110
+    public static class AbortCCJobsFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.ABORT_ALL_JOBS;
+        }
+    }
+
     public static class DistributeJobFunction extends Function {
         private static final long serialVersionUID = 1L;
 
@@ -782,7 +793,7 @@ public class CCNCFunctions {
 
             // read task attempt descriptors
             int tadSize = dis.readInt();
-            List<TaskAttemptDescriptor> taskDescriptors = new ArrayList<TaskAttemptDescriptor>();
+            List<TaskAttemptDescriptor> taskDescriptors = new ArrayList<>();
             for (int i = 0; i < tadSize; i++) {
                 TaskAttemptDescriptor tad = TaskAttemptDescriptor.create(dis);
                 taskDescriptors.add(tad);
@@ -790,7 +801,7 @@ public class CCNCFunctions {
 
             //read connector policies
             int cpSize = dis.readInt();
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<>();
             for (int i = 0; i < cpSize; i++) {
                 ConnectorDescriptorId cid = ConnectorDescriptorId.create(dis);
                 IConnectorPolicy policy = ConnectorPolicyFactory.INSTANCE.getConnectorPolicy(dis);
@@ -1362,8 +1373,8 @@ public class CCNCFunctions {
         int cdid = dis.readInt();
         int senderIndex = dis.readInt();
         int receiverIndex = dis.readInt();
-        PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex,
-                receiverIndex);
+        PartitionId pid =
+                new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
         return pid;
     }
 
@@ -1379,8 +1390,8 @@ public class CCNCFunctions {
         int aid = dis.readInt();
         int partition = dis.readInt();
         int attempt = dis.readInt();
-        TaskAttemptId taId = new TaskAttemptId(
-                new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), partition), attempt);
+        TaskAttemptId taId =
+                new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), partition), attempt);
         return taId;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
index 2815ae1..d4ccbd9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -21,13 +21,14 @@ package org.apache.hyracks.control.common.ipc;
 import java.net.InetSocketAddress;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 
 public abstract class ControllerRemoteProxy {
     protected final IPCSystem ipc;
-    protected final InetSocketAddress inetSocketAddress;
+    private final InetSocketAddress inetSocketAddress;
     private final IControllerRemoteProxyIPCEventListener eventListener;
     private IIPCHandle ipcHandle;
 
@@ -36,28 +37,33 @@ public abstract class ControllerRemoteProxy {
     }
 
     protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress,
-                                    IControllerRemoteProxyIPCEventListener eventListener) {
+            IControllerRemoteProxyIPCEventListener eventListener) {
         this.ipc = ipc;
         this.inetSocketAddress = inetSocketAddress;
-        this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() {} : eventListener;
+        this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() {
+        } : eventListener;
     }
 
-    protected IIPCHandle ensureIpcHandle() throws IPCException {
-        final boolean first = ipcHandle == null;
-        if (first || !ipcHandle.isConnected()) {
-            if (!first) {
-                getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
-                eventListener.ipcHandleDisconnected(ipcHandle);
-            }
-            ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
-            if (ipcHandle.isConnected()) {
-                if (first) {
-                    eventListener.ipcHandleConnected(ipcHandle);
-                } else {
-                    getLogger().warning("ipcHandle " + ipcHandle + " restored");
-                    eventListener.ipcHandleRestored(ipcHandle);
+    protected IIPCHandle ensureIpcHandle() throws HyracksDataException {
+        try {
+            final boolean first = ipcHandle == null;
+            if (first || !ipcHandle.isConnected()) {
+                if (!first) {
+                    getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
+                    eventListener.ipcHandleDisconnected(ipcHandle);
+                }
+                ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
+                if (ipcHandle.isConnected()) {
+                    if (first) {
+                        eventListener.ipcHandleConnected(ipcHandle);
+                    } else {
+                        getLogger().warning("ipcHandle " + ipcHandle + " restored");
+                        eventListener.ipcHandleRestored(ipcHandle);
+                    }
                 }
             }
+        } catch (IPCException e) {
+            throw HyracksDataException.create(e);
         }
         return ipcHandle;
     }
@@ -65,4 +71,8 @@ public abstract class ControllerRemoteProxy {
     protected abstract int getRetries(boolean first);
 
     protected abstract Logger getLogger();
+
+    public InetSocketAddress getAddress() {
+        return inetSocketAddress;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index c416942..1eb1393 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -22,6 +22,7 @@ import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
 import org.apache.hyracks.control.nc.task.ShutdownTask;
 import org.apache.hyracks.control.nc.task.ThreadDumpTask;
+import org.apache.hyracks.control.nc.work.AbortAllJobsWork;
 import org.apache.hyracks.control.nc.work.AbortTasksWork;
 import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
 import org.apache.hyracks.control.nc.work.CleanupJobletWork;
@@ -55,10 +56,9 @@ final class NodeControllerIPCI implements IIPCI {
         CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
         switch (fn.getFunctionId()) {
             case SEND_APPLICATION_MESSAGE:
-                CCNCFunctions.SendApplicationMessageFunction amf =
-                        (CCNCFunctions.SendApplicationMessageFunction) fn;
-                ncs.getWorkQueue().schedule(new ApplicationMessageWork(ncs, amf.getMessage(),
-                        amf.getDeploymentId(), amf.getNodeId()));
+                CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+                ncs.getWorkQueue().schedule(
+                        new ApplicationMessageWork(ncs, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId()));
                 return;
             case START_TASKS:
                 CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
@@ -69,6 +69,9 @@ final class NodeControllerIPCI implements IIPCI {
                 CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
                 ncs.getWorkQueue().schedule(new AbortTasksWork(ncs, atf.getJobId(), atf.getTasks()));
                 return;
+            case ABORT_ALL_JOBS:
+                ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs));
+                return;
             case CLEANUP_JOBLET:
                 CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
                 ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, cjf.getJobId(), cjf.getStatus()));
@@ -76,8 +79,8 @@ final class NodeControllerIPCI implements IIPCI {
             case REPORT_PARTITION_AVAILABILITY:
                 CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
                         (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
-                ncs.getWorkQueue().schedule(new ReportPartitionAvailabilityWork(ncs,
-                        rpaf.getPartitionId(), rpaf.getNetworkAddress()));
+                ncs.getWorkQueue().schedule(
+                        new ReportPartitionAvailabilityWork(ncs, rpaf.getPartitionId(), rpaf.getNetworkAddress()));
                 return;
             case NODE_REGISTRATION_RESULT:
                 CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
@@ -92,8 +95,7 @@ final class NodeControllerIPCI implements IIPCI {
 
             case DEPLOY_BINARY:
                 CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
-                ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(),
-                        dbf.getBinaryURLs()));
+                ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs()));
                 return;
 
             case UNDEPLOY_BINARY:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b0a702d..f4ec6e4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -125,7 +125,7 @@ public class NodeControllerService implements IControllerService {
 
     private final Map<JobId, Joblet> jobletMap;
 
-    private final Map<JobId, ActivityClusterGraph> preDistributedJobActivityClusterGraphMap;
+    private final Map<JobId, ActivityClusterGraph> preDistributedJobs;
 
     private ExecutorService executor;
 
@@ -199,7 +199,7 @@ public class NodeControllerService implements IControllerService {
 
         workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
         jobletMap = new Hashtable<>();
-        preDistributedJobActivityClusterGraphMap = new Hashtable<>();
+        preDistributedJobs = new Hashtable<>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
                 new File(new File(NodeControllerService.class.getName()), id));
@@ -418,27 +418,27 @@ public class NodeControllerService implements IControllerService {
     }
 
     public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException {
-        if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
+        if (preDistributedJobs.get(jobId) != null) {
             throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
         }
-        preDistributedJobActivityClusterGraphMap.put(jobId, acg);
+        preDistributedJobs.put(jobId, acg);
     }
 
     public void removeActivityClusterGraph(JobId jobId) throws HyracksException {
-        if (preDistributedJobActivityClusterGraphMap.get(jobId) == null) {
+        if (preDistributedJobs.get(jobId) == null) {
             throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
         }
-        preDistributedJobActivityClusterGraphMap.remove(jobId);
+        preDistributedJobs.remove(jobId);
     }
 
     public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException {
-        if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
+        if (preDistributedJobs.get(jobId) != null) {
             throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
         }
     }
 
     public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException {
-        return preDistributedJobActivityClusterGraphMap.get(jobId);
+        return preDistributedJobs.get(jobId);
     }
 
     public NetworkManager getNetworkManager() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
new file mode 100644
index 0000000..56100da
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.work;
+
+import java.util.Collection;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.hyracks.control.nc.Joblet;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.Task;
+
+public class AbortAllJobsWork extends SynchronizableWork {
+
+    private static final Logger LOGGER = Logger.getLogger(AbortAllJobsWork.class.getName());
+    private final NodeControllerService ncs;
+
+    public AbortAllJobsWork(NodeControllerService ncs) {
+        this.ncs = ncs;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Aborting all tasks");
+        }
+        IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
+        if (dpm != null) {
+            ncs.getDatasetPartitionManager().abortAllReaders();
+        } else {
+            LOGGER.log(Level.WARNING, "DatasetPartitionManager is null on " + ncs.getId());
+        }
+        Collection<Joblet> joblets = ncs.getJobletMap().values();
+        for (Joblet ji : joblets) {
+            Collection<Task> tasks = ji.getTaskMap().values();
+            for (Task task : tasks) {
+                task.abort();
+            }
+            ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, ji.getJobId(), JobStatus.FAILURE));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/756daf5c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java
deleted file mode 100644
index 4fb4bf6..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.nc.work;
-
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-import org.apache.hyracks.control.nc.Joblet;
-import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.hyracks.control.nc.Task;
-
-public class AbortAllTasksWork extends SynchronizableWork {
-
-    private static final Logger LOGGER = Logger.getLogger(AbortAllTasksWork.class.getName());
-    private final NodeControllerService ncs;
-
-    public AbortAllTasksWork(NodeControllerService ncs) {
-        this.ncs = ncs;
-    }
-
-    @Override
-    protected void doRun() throws Exception {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Aborting all tasks");
-        }
-        IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
-        if (dpm != null) {
-            ncs.getDatasetPartitionManager().abortAllReaders();
-        }
-        for (Joblet ji : ncs.getJobletMap().values()) {
-            Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
-            for (Task task : taskMap.values()) {
-                if (task != null) {
-                    task.abort();
-                }
-            }
-        }
-    }
-}