You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/01/27 16:09:51 UTC

[2/3] asterixdb git commit: [ASTERIXDB-2110] Introduce Cluster Controller Id

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 360975d..1ec7485 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -25,7 +25,6 @@ import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -37,14 +36,12 @@ import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.application.ICCApplication;
-import org.apache.hyracks.api.application.IClusterLifecycleListener;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.config.IApplicationConfig;
-import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.context.ICCContext;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -147,6 +144,8 @@ public class ClusterControllerService implements IControllerService {
 
     private ShutdownRun shutdownCallback;
 
+    private final CcId ccId;
+
     static {
         ExitUtil.init();
     }
@@ -182,7 +181,8 @@ public class ClusterControllerService implements IControllerService {
         // Node manager is in charge of cluster membership management.
         nodeManager = new NodeManager(this, ccConfig, resourceManager);
 
-        jobIdFactory = new JobIdFactory();
+        ccId = ccConfig.getCcId();
+        jobIdFactory = new JobIdFactory(ccId);
 
         deployedJobSpecIdFactory = new DeployedJobSpecIdFactory();
     }
@@ -252,17 +252,18 @@ public class ClusterControllerService implements IControllerService {
         }
     }
 
-    private Pair<String, Integer> getNCService(String nodeId) {
+    private InetSocketAddress getNCService(String nodeId) {
         IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(nodeId);
-        return Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS),
-                ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT));
+        final int port = ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT);
+        return port == NCConfig.NCSERVICE_PORT_DISABLED ? null
+                : InetSocketAddress.createUnresolved(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS), port);
     }
 
-    private Map<String, Pair<String, Integer>> getNCServices() {
-        Map<String, Pair<String, Integer>> ncMap = new TreeMap<>();
+    private Map<String, InetSocketAddress> getNCServices() {
+        Map<String, InetSocketAddress> ncMap = new TreeMap<>();
         for (String ncId : configManager.getNodeNames()) {
-            Pair<String, Integer> ncService = getNCService(ncId);
-            if (ncService.getRight() != NCConfig.NCSERVICE_PORT_DISABLED) {
+            InetSocketAddress ncService = getNCService(ncId);
+            if (ncService != null) {
                 ncMap.put(ncId, ncService);
             }
         }
@@ -271,31 +272,19 @@ public class ClusterControllerService implements IControllerService {
 
     private void connectNCs() {
         getNCServices().forEach((key, value) -> {
-            final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this, value.getLeft(),
-                    value.getRight(), key);
+            final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this, value.getHostString(),
+                    value.getPort(), key);
             executor.submit(triggerWork);
         });
-        serviceCtx.addClusterLifecycleListener(new IClusterLifecycleListener() {
-            @Override
-            public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException {
-                // no-op, we don't care
-                LOGGER.log(Level.WARN, "Getting notified that node: " + nodeId + " has joined. and we don't care");
-            }
-
-            @Override
-            public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException {
-                LOGGER.log(Level.WARN, "Getting notified that nodes: " + deadNodeIds + " has failed");
-            }
-        });
     }
 
     public boolean startNC(String nodeId) {
-        Pair<String, Integer> ncServiceAddress = getNCService(nodeId);
+        InetSocketAddress ncServiceAddress = getNCService(nodeId);
         if (ncServiceAddress == null) {
             return false;
         }
-        final TriggerNCWork startNc = new TriggerNCWork(ClusterControllerService.this, ncServiceAddress.getLeft(),
-                ncServiceAddress.getRight(), nodeId);
+        final TriggerNCWork startNc = new TriggerNCWork(ClusterControllerService.this, ncServiceAddress.getHostString(),
+                ncServiceAddress.getPort(), nodeId);
         executor.submit(startNc);
         return true;
 
@@ -304,11 +293,9 @@ public class ClusterControllerService implements IControllerService {
     private void terminateNCServices() throws Exception {
         List<ShutdownNCServiceWork> shutdownNCServiceWorks = new ArrayList<>();
         getNCServices().forEach((key, value) -> {
-            if (value.getRight() != NCConfig.NCSERVICE_PORT_DISABLED) {
-                ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(value.getLeft(), value.getRight(), key);
-                workQueue.schedule(shutdownWork);
-                shutdownNCServiceWorks.add(shutdownWork);
-            }
+            ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(value.getHostString(), value.getPort(), key);
+            workQueue.schedule(shutdownWork);
+            shutdownNCServiceWorks.add(shutdownWork);
         });
         for (ShutdownNCServiceWork shutdownWork : shutdownNCServiceWorks) {
             shutdownWork.sync();
@@ -428,6 +415,10 @@ public class ClusterControllerService implements IControllerService {
         return deployedJobSpecIdFactory;
     }
 
+    public CcId getCcId() {
+        return ccId;
+    }
+
     private final class ClusterControllerContext implements ICCContext {
         private final ClusterTopology topology;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 367a1d5..742e2e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -105,7 +105,7 @@ public class NodeManager implements INodeManager {
             try {
                 // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
                 IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
-                ncIPCHandle.send(-1, new AbortCCJobsFunction(), null);
+                ncIPCHandle.send(-1, new AbortCCJobsFunction(ccConfig.getCcId()), null);
             } catch (IPCException e) {
                 throw HyracksDataException.create(e);
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 3a38287..04a34af 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -57,7 +57,7 @@ public class RegisterNodeWork extends SynchronizableWork {
         try {
             LOGGER.log(Level.WARN, "Registering INodeController: id = " + id);
             NodeControllerRemoteProxy nc =
-                    new NodeControllerRemoteProxy(
+                    new NodeControllerRemoteProxy(ccs.getCcId(),
                             ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
             NodeControllerState state = new NodeControllerState(nc, reg);
             INodeManager nodeManager = ccs.getNodeManager();
@@ -73,7 +73,6 @@ public class RegisterNodeWork extends SynchronizableWork {
             params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
             params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
             result = new CCNCFunctions.NodeRegistrationResult(params, null);
-            ccs.getJobIdFactory().ensureMinimumId(reg.getMaxJobId() + 1);
         } catch (Exception e) {
             LOGGER.log(Level.WARN, "Node registration failed", e);
             result = new CCNCFunctions.NodeRegistrationResult(null, e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index f1d9a4d..53998aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -63,17 +63,15 @@ public class WaitForJobCompletionWork extends SynchronizableWork {
             List<Exception> exceptions;
             if (exceptionHistory == null) {
                 // couldn't be found
-                long maxJobId = ccs.getJobIdFactory().maxJobId();
-                exceptions = Collections.singletonList(jobId.getId() <= maxJobId
-                        ? HyracksDataException.create(ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY, jobId)
-                        : HyracksDataException.create(ErrorCode.JOB_HAS_NOT_BEEN_CREATED_YET, jobId));
+                exceptions = Collections
+                        .singletonList(HyracksDataException.create(ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY, jobId));
 
             } else {
                 exceptions = exceptionHistory;
             }
             ccs.getExecutor().execute(() -> {
                 if (!exceptions.isEmpty()) {
-                    /**
+                    /*
                      * only report the first exception because IResultCallback will only throw one exception
                      * anyway
                      */

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 6fd321e..2307185 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.base;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
@@ -35,42 +36,44 @@ import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 
 public interface IClusterController {
-    public void registerNode(NodeRegistration reg) throws Exception;
+    void registerNode(NodeRegistration reg) throws Exception;
 
-    public void unregisterNode(String nodeId) throws Exception;
+    void unregisterNode(String nodeId) throws Exception;
 
-    public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+    void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
             throws Exception;
 
-    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
+    void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
             throws Exception;
 
-    public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception;
+    void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception;
 
-    public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
+    void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
 
-    public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception;
+    void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception;
 
-    public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception;
+    void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception;
 
-    public void notifyShutdown(String nodeId) throws Exception;
+    void notifyShutdown(String nodeId) throws Exception;
 
-    public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
+    void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
 
-    public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
+    void reportProfile(String id, List<JobProfile> profiles) throws Exception;
 
-    public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception;
+    void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception;
 
-    public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
+    void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
 
-    public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+    void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
 
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+    void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
             boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception;
 
-    public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
+    void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
 
-    public void getNodeControllerInfos() throws Exception;
+    void getNodeControllerInfos() throws Exception;
 
-    public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception;
+    void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception;
+
+    CcId getCcId();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 5d781cf..ef3b27c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -36,30 +36,30 @@ import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 
 public interface INodeController {
-    public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
-            List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
-            Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
+    void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
+                    List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
+                    Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
             throws Exception;
 
-    public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
+    void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
 
-    public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception;
+    void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception;
 
-    public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
+    void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
 
-    public void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
+    void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
 
-    public void undeployBinary(DeploymentId deploymentId) throws Exception;
+    void undeployBinary(DeploymentId deploymentId) throws Exception;
 
-    public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception;
+    void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception;
 
-    public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
+    void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
 
-    public void dumpState(String stateDumpId) throws Exception;
+    void dumpState(String stateDumpId) throws Exception;
 
-    public void shutdown(boolean terminateNCService) throws Exception;
+    void shutdown(boolean terminateNCService) throws Exception;
 
-    public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+    void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
 
-    public void takeThreadDump(String requestId) throws Exception;
+    void takeThreadDump(String requestId) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
index 62e6ee0..42ed1e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
@@ -81,6 +81,27 @@ public class OptionTypes {
         }
     };
 
+    public static final IOptionType<Short> SHORT = new IOptionType<Short>() {
+        @Override
+        public Short parse(String s) {
+            int value = Integer.decode(s);
+            if (Integer.highestOneBit(value) > 16) {
+                throw new IllegalArgumentException("The given value " + s + " is too big for a short");
+            }
+            return (short)value;
+        }
+
+        @Override
+        public Class<Short> targetType() {
+            return Short.class;
+        }
+
+        @Override
+        public void serializeJSONField(String fieldName, Object value, ObjectNode node) {
+            node.put(fieldName, (short)value);
+        }
+    };
+
     public static final IOptionType<Integer> INTEGER = new IOptionType<Integer>() {
         @Override
         public Integer parse(String s) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index 470e87c..85731b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.controllers;
 import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
+import static org.apache.hyracks.control.common.config.OptionTypes.SHORT;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
 
 import java.io.File;
@@ -33,6 +34,7 @@ import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.IOptionType;
 import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.util.file.FileUtil;
 import org.ini4j.Ini;
@@ -67,7 +69,8 @@ public class CCConfig extends ControllerConfig {
         JOB_QUEUE_CAPACITY(INTEGER, 4096),
         JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"),
         ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false),
-        CORES_MULTIPLIER(INTEGER, 3);
+        CORES_MULTIPLIER(INTEGER, 3),
+        CONTROLLER_ID(SHORT, (short)0x0000);
 
         private final IOptionType parser;
         private Object defaultValue;
@@ -164,6 +167,8 @@ public class CCConfig extends ControllerConfig {
                             + "bad behaving operators";
                 case CORES_MULTIPLIER:
                     return "Specifies the multiplier to use on the cluster available cores";
+                case CONTROLLER_ID:
+                    return "The 16-bit (0-65535) id of this Cluster Controller";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -374,4 +379,8 @@ public class CCConfig extends ControllerConfig {
     public int getCoresMultiplier() {
         return getAppConfig().getInt(Option.CORES_MULTIPLIER);
     }
+
+    public CcId getCcId() {
+        return CcId.valueOf(getAppConfig().getShort(Option.CONTROLLER_ID));
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 0a5ba30..95c063f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.controllers;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
 import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
+import static org.apache.hyracks.control.common.config.OptionTypes.SHORT;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING_ARRAY;
 
@@ -33,6 +34,7 @@ import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.IOptionType;
 import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.util.file.FileUtil;
 
@@ -48,6 +50,7 @@ public class NCConfig extends ControllerConfig {
         NCSERVICE_PORT(INTEGER, 9090),
         CLUSTER_ADDRESS(STRING, (String) null),
         CLUSTER_PORT(INTEGER, 1099),
+        CLUSTER_CONTROLLER_ID(SHORT, (short)0x0000),
         CLUSTER_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
         CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT),
         NODE_ID(STRING, (String) null),
@@ -141,6 +144,8 @@ public class NCConfig extends ControllerConfig {
                     return "Cluster Controller port";
                 case CLUSTER_LISTEN_PORT:
                     return "IP port to bind cluster listener";
+                case CLUSTER_CONTROLLER_ID:
+                    return "16-bit (0-65535) id of the Cluster Controller";
                 case CLUSTER_PUBLIC_ADDRESS:
                     return "Public IP Address to announce cluster listener";
                 case CLUSTER_PUBLIC_PORT:
@@ -308,6 +313,10 @@ public class NCConfig extends ControllerConfig {
         configManager.set(nodeId, Option.CLUSTER_PORT, clusterPort);
     }
 
+    public CcId getClusterControllerId() {
+        return CcId.valueOf(appConfig.getShort(Option.CLUSTER_CONTROLLER_ID));
+    }
+
     public String getClusterListenAddress() {
         return appConfig.getString(Option.CLUSTER_LISTEN_ADDRESS);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index dca8c07..5c6d078 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -38,6 +38,7 @@ import java.util.Set;
 
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
@@ -150,12 +151,25 @@ public class CCNCFunctions {
 
     }
 
-    public static abstract class Function implements Serializable {
+    public abstract static class Function implements Serializable {
         private static final long serialVersionUID = 1L;
 
         public abstract FunctionId getFunctionId();
     }
 
+    public abstract static class CCIdentifiedFunction extends Function {
+        private static final long serialVersionUID = 1L;
+        private final CcId ccId;
+
+        protected CCIdentifiedFunction(CcId ccId) {
+            this.ccId = ccId;
+        }
+
+        public CcId getCcId() {
+            return ccId;
+        }
+    }
+
     public static class RegisterNodeFunction extends Function {
         private static final long serialVersionUID = 1L;
 
@@ -668,24 +682,33 @@ public class CCNCFunctions {
         }
     }
 
-    //TODO: Add CC id to this job to only abort jobs by this CC: https://issues.apache.org/jira/browse/ASTERIXDB-2110
     public static class AbortCCJobsFunction extends Function {
         private static final long serialVersionUID = 1L;
+        private final CcId ccId;
+
+        public AbortCCJobsFunction(CcId ccId) {
+            this.ccId = ccId;
+        }
 
         @Override
         public FunctionId getFunctionId() {
             return FunctionId.ABORT_ALL_JOBS;
         }
+
+        public CcId getCcId() {
+            return ccId;
+        }
     }
 
-    public static class DeployJobSpecFunction extends Function {
+    public static class DeployJobSpecFunction extends CCIdentifiedFunction {
         private static final long serialVersionUID = 1L;
 
         private final DeployedJobSpecId deployedJobSpecId;
 
         private final byte[] acgBytes;
 
-        public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes) {
+        public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes, CcId ccId) {
+            super(ccId);
             this.deployedJobSpecId = deployedJobSpecId;
             this.acgBytes = acgBytes;
         }
@@ -704,12 +727,13 @@ public class CCNCFunctions {
         }
     }
 
-    public static class UndeployJobSpecFunction extends Function {
+    public static class UndeployJobSpecFunction extends CCIdentifiedFunction {
         private static final long serialVersionUID = 1L;
 
         private final DeployedJobSpecId deployedJobSpecId;
 
-        public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) {
+        public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, CcId ccId) {
+            super(ccId);
             this.deployedJobSpecId = deployedJobSpecId;
         }
 
@@ -724,7 +748,7 @@ public class CCNCFunctions {
     }
 
     public static class StartTasksFunction extends Function {
-        private static final long serialVersionUID = 1L;
+        private static final long serialVersionUID = 2L;
 
         private final DeploymentId deploymentId;
         private final JobId jobId;
@@ -1008,11 +1032,12 @@ public class CCNCFunctions {
         }
     }
 
-    public static class ThreadDumpRequestFunction extends Function {
+    public static class ThreadDumpRequestFunction extends CCIdentifiedFunction {
         private static final long serialVersionUID = 1L;
         private final String requestId;
 
-        public ThreadDumpRequestFunction(String requestId) {
+        public ThreadDumpRequestFunction(String requestId, CcId ccId) {
+            super(ccId);
             this.requestId = requestId;
         }
 
@@ -1106,13 +1131,14 @@ public class CCNCFunctions {
         }
     }
 
-    public static class DeployBinaryFunction extends Function {
+    public static class DeployBinaryFunction extends CCIdentifiedFunction {
         private static final long serialVersionUID = 1L;
 
         private final List<URL> binaryURLs;
         private final DeploymentId deploymentId;
 
-        public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs) {
+        public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) {
+            super(ccId);
             this.binaryURLs = binaryURLs;
             this.deploymentId = deploymentId;
         }
@@ -1131,12 +1157,13 @@ public class CCNCFunctions {
         }
     }
 
-    public static class UnDeployBinaryFunction extends Function {
+    public static class UnDeployBinaryFunction extends CCIdentifiedFunction {
         private static final long serialVersionUID = 1L;
 
         private final DeploymentId deploymentId;
 
-        public UnDeployBinaryFunction(DeploymentId deploymentId) {
+        public UnDeployBinaryFunction(DeploymentId deploymentId, CcId ccId) {
+            super(ccId);
             this.deploymentId = deploymentId;
         }
 
@@ -1211,12 +1238,13 @@ public class CCNCFunctions {
         }
     }
 
-    public static class StateDumpRequestFunction extends Function {
+    public static class StateDumpRequestFunction extends CCIdentifiedFunction {
         private static final long serialVersionUID = 1L;
 
         private final String stateDumpId;
 
-        public StateDumpRequestFunction(String stateDumpId) {
+        public StateDumpRequestFunction(String stateDumpId, CcId ccId) {
+            super(ccId);
             this.stateDumpId = stateDumpId;
         }
 
@@ -1265,12 +1293,13 @@ public class CCNCFunctions {
         }
     }
 
-    public static class ShutdownRequestFunction extends Function {
+    public static class ShutdownRequestFunction extends CCIdentifiedFunction {
         private static final long serialVersionUID = 1L;
 
         private final boolean terminateNCService;
 
-        public ShutdownRequestFunction(boolean terminateNCService) {
+        public ShutdownRequestFunction(boolean terminateNCService, CcId ccId) {
+            super(ccId);
             this.terminateNCService = terminateNCService;
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index f2e7d87..e4e2dbe 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.control.common.ipc;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
@@ -56,9 +57,11 @@ import org.apache.hyracks.ipc.api.IIPCHandle;
 
 public class ClusterControllerRemoteProxy implements IClusterController {
 
+    private final CcId ccId;
     private IIPCHandle ipcHandle;
 
-    public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
+    public ClusterControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) {
+        this.ccId = ccId;
         this.ipcHandle = ipcHandle;
     }
 
@@ -181,4 +184,14 @@ public class ClusterControllerRemoteProxy implements IClusterController {
                 threadDumpJSON);
         ipcHandle.send(-1, tdrf, null);
     }
+
+    @Override
+    public CcId getCcId() {
+        return ccId;
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + " " + ccId + " [" + ipcHandle.getRemoteAddress() + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index b6b9b4b..a09a8bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -51,9 +52,11 @@ import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 
 public class NodeControllerRemoteProxy implements INodeController {
+    private final CcId ccId;
     private final IIPCHandle ipcHandle;
 
-    public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
+    public NodeControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) {
+        this.ccId = ccId;
         this.ipcHandle = ipcHandle;
     }
 
@@ -88,37 +91,37 @@ public class NodeControllerRemoteProxy implements INodeController {
 
     @Override
     public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception {
-        DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs);
+        DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs, ccId);
         ipcHandle.send(-1, rpaf, null);
     }
 
     @Override
     public void undeployBinary(DeploymentId deploymentId) throws Exception {
-        UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId);
+        UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId, ccId);
         ipcHandle.send(-1, rpaf, null);
     }
 
     @Override
     public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception {
-        DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes);
+        DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes, ccId);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
-        UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId);
+        UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId, ccId);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void dumpState(String stateDumpId) throws Exception {
-        StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId);
+        StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId, ccId);
         ipcHandle.send(-1, dsf, null);
     }
 
     @Override
     public void shutdown(boolean terminateNCService) throws Exception {
-        ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService);
+        ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService, ccId);
         ipcHandle.send(-1, sdrf, null);
     }
 
@@ -131,7 +134,7 @@ public class NodeControllerRemoteProxy implements INodeController {
 
     @Override
     public void takeThreadDump(String requestId) throws Exception {
-        ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId);
+        ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId, ccId);
         ipcHandle.send(-1, fn, null);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 94e86dd..9670e42 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.application.INCApplication;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -58,7 +59,7 @@ public class BaseNCApplication implements INCApplication {
     }
 
     @Override
-    public void onRegisterNode() throws Exception {
+    public void onRegisterNode(CcId ccId) throws Exception {
         // no-op
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 8cb33ca..8790434 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -222,15 +222,10 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
     public void close() {
         long stillAllocated = memoryAllocation.get();
         if (stillAllocated > 0) {
-            LOGGER.warn("Freeing leaked " + stillAllocated + " bytes");
+            LOGGER.info(() -> "Freeing leaked " + stillAllocated + " bytes");
             serviceCtx.getMemoryManager().deallocate(stillAllocated);
         }
-        nodeController.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                deallocatableRegistry.close();
-            }
-        });
+        nodeController.getExecutor().execute(() -> deallocatableRegistry.close());
     }
 
     ByteBuffer allocateFrame() throws HyracksDataException {
@@ -298,7 +293,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
         for (PartitionId pid : pids) {
             partitionRequestMap.put(pid, collector);
             PartitionRequest req = new PartitionRequest(pid, nodeController.getId(), taId, minState);
-            nodeController.getClusterController().registerPartitionRequest(req);
+            nodeController.getClusterController(jobId.getCcId()).registerPartitionRequest(req);
         }
     }
 
@@ -326,7 +321,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
         close();
         cleanupPending = false;
         try {
-            nodeController.getClusterController().notifyJobletCleanup(jobId, nodeController.getId());
+            nodeController.getClusterController(jobId.getCcId()).notifyJobletCleanup(jobId, nodeController.getId());
         } catch (Exception e) {
             e.printStackTrace();
         }
@@ -341,4 +336,5 @@ public class Joblet implements IHyracksJobletContext, ICounterContext {
     public ClassLoader getClassLoader() throws HyracksException {
         return DeploymentUtils.getClassLoader(deploymentId, serviceCtx);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index b220039..f55e250 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -72,7 +72,8 @@ final class NodeControllerIPCI implements IIPCI {
                 ncs.getWorkQueue().schedule(new AbortTasksWork(ncs, atf.getJobId(), atf.getTasks()));
                 return;
             case ABORT_ALL_JOBS:
-                ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs));
+                CCNCFunctions.AbortCCJobsFunction aajf = (CCNCFunctions.AbortCCJobsFunction) fn;
+                ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs, aajf.getCcId()));
                 return;
             case CLEANUP_JOBLET:
                 CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
@@ -97,27 +98,29 @@ final class NodeControllerIPCI implements IIPCI {
 
             case DEPLOY_BINARY:
                 CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
-                ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs()));
+                ncs.getWorkQueue()
+                        .schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs(), dbf.getCcId()));
                 return;
 
             case UNDEPLOY_BINARY:
                 CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
-                ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId()));
+                ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId(), ndbf.getCcId()));
                 return;
 
             case DISTRIBUTE_JOB:
                 CCNCFunctions.DeployJobSpecFunction djf = (CCNCFunctions.DeployJobSpecFunction) fn;
-                ncs.getWorkQueue().schedule(new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes()));
+                ncs.getWorkQueue().schedule(
+                        new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes(), djf.getCcId()));
                 return;
 
             case DESTROY_JOB:
                 CCNCFunctions.UndeployJobSpecFunction dsjf = (CCNCFunctions.UndeployJobSpecFunction) fn;
-                ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId()));
+                ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId(), dsjf.getCcId()));
                 return;
 
             case STATE_DUMP_REQUEST:
                 final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
-                ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId()));
+                ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId(), dsrf.getCcId()));
                 return;
 
             case SHUTDOWN_REQUEST:
@@ -127,7 +130,7 @@ final class NodeControllerIPCI implements IIPCI {
 
             case THREAD_DUMP_REQUEST:
                 final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn;
-                ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId()));
+                ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId(), tdrf.getCcId()));
                 return;
 
             default:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 18a6b20..24d72f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -29,12 +29,14 @@ import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
@@ -46,6 +48,7 @@ import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.api.application.INCApplication;
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -90,6 +93,7 @@ import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.InvokeUtil;
 import org.apache.hyracks.util.PidHelper;
 import org.apache.hyracks.util.trace.ITracer;
 import org.apache.hyracks.util.trace.Tracer;
@@ -128,7 +132,9 @@ public class NodeControllerService implements IControllerService {
 
     private Exception registrationException;
 
-    private IClusterController ccs;
+    private IClusterController primaryCcs;
+
+    private final Map<CcId, IClusterController> ccsMap = Collections.synchronizedMap(new HashMap<>());
 
     private final Map<JobId, Joblet> jobletMap;
 
@@ -140,7 +146,9 @@ public class NodeControllerService implements IControllerService {
 
     private NodeParameters nodeParameters;
 
-    private Thread heartbeatThread;
+    private Map<IClusterController, Thread> heartbeatThreads = new ConcurrentHashMap<>();
+
+    private Map<IClusterController, Timer> ccTimers = new ConcurrentHashMap<>();
 
     private final ServerContext serverCtx;
 
@@ -180,6 +188,8 @@ public class NodeControllerService implements IControllerService {
         ExitUtil.init();
     }
 
+    private NCShutdownHook ncShutdownHook;
+
     public NodeControllerService(NCConfig config) throws Exception {
         this(config, getApplication(config));
     }
@@ -201,13 +211,14 @@ public class NodeControllerService implements IControllerService {
             LOGGER.info("Setting uncaught exception handler " + getLifeCycleComponentManager());
         }
         // Set shutdown hook before so it doesn't have the same uncaught exception handler
-        Runtime.getRuntime().addShutdownHook(new NCShutdownHook(this));
+        ncShutdownHook = new NCShutdownHook(this);
+        Runtime.getRuntime().addShutdownHook(ncShutdownHook);
         Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
         ioManager =
                 new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver());
 
         workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
-        jobletMap = new Hashtable<>();
+        jobletMap = new ConcurrentHashMap<>();
         deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
@@ -235,13 +246,6 @@ public class NodeControllerService implements IControllerService {
         return lccm;
     }
 
-    synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
-        this.nodeParameters = parameters;
-        this.registrationException = exception;
-        this.registrationPending = false;
-        notifyAll();
-    }
-
     public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
         FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<>();
         synchronized (getNodeControllerInfosAcceptor) {
@@ -250,7 +254,7 @@ public class NodeControllerService implements IControllerService {
             }
             getNodeControllerInfosAcceptor.setValue(fv);
         }
-        ccs.getNodeControllerInfos();
+        primaryCcs.getNodeControllerInfos();
         return fv.get();
     }
 
@@ -297,79 +301,142 @@ public class NodeControllerService implements IControllerService {
         if (messagingNetManager != null) {
             messagingNetManager.start();
         }
-        this.ccs = new ClusterControllerRemoteProxy(
-                ipc.getHandle(
-                new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()),
-                ncConfig.getClusterConnectRetries(), 1, new IIPCEventListener() {
-                    @Override
-                    public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
-                        // we need to re-register in case of NC -> CC connection reset
-                        try {
-                            registerNode();
-                        } catch (Exception e) {
-                            LOGGER.log(Level.WARN, "Failed Registering with cc", e);
-                            throw new IPCException(e);
-                        }
-                    }
-                }));
-        registerNode();
+
+        final InetSocketAddress ccAddress = new InetSocketAddress(ncConfig.getClusterAddress(),
+                ncConfig.getClusterPort());
+        this.primaryCcs = addCc(ncConfig.getClusterControllerId(), ccAddress);
 
         workQueue.start();
 
         // Schedule tracing a human-readable datetime
         timer.schedule(new TraceCurrentTimeTask(serviceCtx.getTracer()), 0, 60000);
 
-        if (nodeParameters.getProfileDumpPeriod() > 0) {
-            // Schedule profile dump generator.
-            timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
+        LOGGER.log(Level.INFO, "Started NodeControllerService");
+        application.startupCompleted();
+    }
+
+    public ClusterControllerRemoteProxy addCc(CcId ccId, InetSocketAddress ccAddress) throws Exception {
+        ClusterControllerRemoteProxy ccProxy;
+        synchronized (ccsMap) {
+            if (ccsMap.containsKey(ccId)) {
+                throw new IllegalStateException("cc already registered: " + ccId);
+            }
+            final IIPCEventListener ipcEventListener = new IIPCEventListener() {
+                @Override
+                public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
+                    // we need to re-register in case of NC -> CC connection reset
+                    try {
+                        registerNode(ccsMap.get(ccId));
+                    } catch (Exception e) {
+                        LOGGER.log(Level.WARN, "Failed Registering with cc", e);
+                        throw new IPCException(e);
+                    }
+                }
+            };
+            ccProxy = new ClusterControllerRemoteProxy(ccId,
+                    ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
+            registerNode(ccProxy);
+            ccsMap.put(ccId, ccProxy);
         }
+        return ccProxy;
+    }
 
-        // Start heartbeat generator.
-        heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat");
-        heartbeatThread.setPriority(Thread.MAX_PRIORITY);
-        heartbeatThread.setDaemon(true);
-        heartbeatThread.start();
+    public void makePrimaryCc(CcId ccId) throws Exception {
+        synchronized (ccsMap) {
+            if (!ccsMap.containsKey(ccId)) {
+                throw new IllegalArgumentException("unknown cc: " + ccId);
+            }
+            primaryCcs = ccsMap.get(ccId);
+        }
+    }
 
-        LOGGER.log(Level.INFO, "Started NodeControllerService");
-        application.startupCompleted();
+    public void removeCc(CcId ccId) throws Exception {
+        synchronized (ccsMap) {
+            final IClusterController ccs = ccsMap.get(ccId);
+            if (ccs == null) {
+                throw new IllegalArgumentException("unknown cc: " + ccId);
+            }
+            if (primaryCcs.equals(ccs)) {
+                throw new IllegalStateException("cannot remove primary cc: " + ccId);
+            }
+            // TODO(mblow): consider how to handle running jobs
+            ccs.unregisterNode(id);
+            Thread hbThread = heartbeatThreads.remove(ccs);
+            hbThread.interrupt();
+            Timer ccTimer = ccTimers.remove(ccs);
+            if (ccTimer != null) {
+                ccTimer.cancel();
+            }
+        }
     }
 
-    public void registerNode() throws Exception {
-        LOGGER.info("Registering with Cluster Controller");
+    protected void registerNode(IClusterController ccs) throws Exception {
+        LOGGER.info("Registering with Cluster Controller {}", ccs);
         registrationPending = true;
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
             gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
         }
         HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
-        // Use "public" versions of network addresses and ports
+        // Use "public" versions of network addresses and ports, if defined
+        InetSocketAddress ncAddress;
+        if (ncConfig.getClusterPublicPort() == 0) {
+            ncAddress = ipc.getSocketAddress();
+        } else {
+            ncAddress = new InetSocketAddress(ncConfig.getClusterPublicAddress(), ncConfig.getClusterPublicPort());
+        }
         NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
-        NetworkAddress meesagingPort =
+        NetworkAddress messagingAddress =
                 messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null;
         int allCores = osMXBean.getAvailableProcessors();
-        nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
+        nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress,
                 osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores, runtimeMXBean.getVmName(),
                 runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(),
                 runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
-                runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort, application.getCapacity(),
+                runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress, application.getCapacity(),
                 PidHelper.getPid(), maxJobId.get());
 
         ccs.registerNode(nodeRegistration);
 
-        synchronized (this) {
-            while (registrationPending) {
-                wait();
-            }
+        completeNodeRegistration(ccs);
+
+        // Start heartbeat generator.
+        if (!heartbeatThreads.containsKey(ccs)) {
+            Thread heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()),
+                    id + "-Heartbeat");
+            heartbeatThread.setPriority(Thread.MAX_PRIORITY);
+            heartbeatThread.setDaemon(true);
+            heartbeatThread.start();
+            heartbeatThreads.put(ccs, heartbeatThread);
+        }
+        if (!ccTimers.containsKey(ccs) && nodeParameters.getProfileDumpPeriod() > 0) {
+            Timer ccTimer = new Timer("Timer-" + ccs.getCcId(), true);
+            // Schedule profile dump generator.
+            ccTimer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
+            ccTimers.put(ccs, ccTimer);
+        }
+
+        LOGGER.info("Registering with Cluster Controller {} complete", ccs);
+    }
+
+    synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+        this.nodeParameters = parameters;
+        this.registrationException = exception;
+        this.registrationPending = false;
+        notifyAll();
+    }
+
+    private synchronized void completeNodeRegistration(IClusterController ccs) throws Exception {
+        while (registrationPending) {
+            wait();
         }
         if (registrationException != null) {
-            LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception",
-                    registrationException);
+            LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception", registrationException);
             throw registrationException;
         }
         serviceCtx.setDistributedState(nodeParameters.getDistributedState());
-        application.onRegisterNode();
-        LOGGER.info("Registering with Cluster Controller complete");
+        application.onRegisterNode(ccs.getCcId());
     }
 
     private void startApplication() throws Exception {
@@ -404,17 +471,21 @@ public class NodeControllerService implements IControllerService {
             workQueue.stop();
             application.stop();
             /*
-             * Stop heartbeat after NC has stopped to avoid false node failure detection
+             * Stop heartbeats only after NC has stopped to avoid false node failure detection
              * on CC if an NC takes a long time to stop.
              */
-            if (heartbeatThread != null) {
-                heartbeatThread.interrupt();
-                heartbeatThread.join(1000); // give it 1s to stop gracefully
-            }
-            try {
-                ccs.notifyShutdown(id);
-            } catch (Exception e) {
-                LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e);
+            heartbeatThreads.values().parallelStream().forEach(t -> {
+                t.interrupt();
+                InvokeUtil.doUninterruptibly(() -> t.join(1000));
+            });
+            synchronized (ccsMap) {
+                ccsMap.values().parallelStream().forEach(ccs -> {
+                    try {
+                        ccs.notifyShutdown(id);
+                    } catch (Exception e) {
+                        LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e);
+                    }
+                });
             }
             ipc.stop();
 
@@ -423,6 +494,14 @@ public class NodeControllerService implements IControllerService {
             LOGGER.log(Level.ERROR, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack),
                     new Exception("Duplicate shutdown call"));
         }
+        if (ncShutdownHook != null) {
+            try {
+                Runtime.getRuntime().removeShutdownHook(ncShutdownHook);
+                LOGGER.info("removed shutdown hook for {}", id);
+            } catch (IllegalStateException e) {
+                LOGGER.log(Level.DEBUG, "ignoring exception while attempting to remove shutdown hook", e);
+            }
+        }
     }
 
     public String getId() {
@@ -488,8 +567,12 @@ public class NodeControllerService implements IControllerService {
         return partitionManager;
     }
 
-    public IClusterController getClusterController() {
-        return ccs;
+    public IClusterController getPrimaryClusterController() {
+        return primaryCcs;
+    }
+
+    public IClusterController getClusterController(CcId ccId) {
+        return ccsMap.get(ccId);
     }
 
     public NodeParameters getNodeParameters() {
@@ -619,7 +702,7 @@ public class NodeControllerService implements IControllerService {
         public void run() {
             try {
                 FutureValue<List<JobProfile>> fv = new FutureValue<>();
-                BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
+                BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, cc.getCcId(), fv);
                 workQueue.scheduleAndSync(bjpw);
                 List<JobProfile> profiles = fv.get();
                 if (!profiles.isEmpty()) {
@@ -651,8 +734,8 @@ public class NodeControllerService implements IControllerService {
         }
     }
 
-    public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId) throws Exception {
-        ccs.sendApplicationMessageToCC(data, deploymentId, id);
+    public void sendApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception {
+        ccsMap.get(ccId).sendApplicationMessageToCC(data, deploymentId, id);
     }
 
     public IDatasetPartitionManager getDatasetPartitionManager() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 34ddd6a..07bb504 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -437,12 +437,13 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
 
     @Override
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception {
-        this.ncs.sendApplicationMessageToCC(message, deploymentId);
+        this.ncs.sendApplicationMessageToCC(getJobletContext().getJobId().getCcId(), message, deploymentId);
     }
 
     @Override
     public void sendApplicationMessageToCC(Serializable message, DeploymentId deploymentId) throws Exception {
-        this.ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), deploymentId);
+        this.ncs.sendApplicationMessageToCC(getJobletContext().getJobId().getCcId(),
+                JavaSerializationUtils.serialize(message), deploymentId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 476aeae..fb7308e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -93,10 +93,10 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I
             boolean orderedResult, boolean emptyResult) throws HyracksException {
         try {
             // Be sure to send the *public* network address to the CC
-            ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
-                    partition, nPartitions, ncs.getDatasetNetworkManager().getPublicNetworkAddress());
+            ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId, rsId, orderedResult,
+                    emptyResult, partition, nPartitions, ncs.getDatasetNetworkManager().getPublicNetworkAddress());
         } catch (Exception e) {
-            throw new HyracksException(e);
+            throw HyracksException.create(e);
         }
     }
 
@@ -105,9 +105,9 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I
         try {
             LOGGER.debug("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId
                     + ":partition: " + partition);
-            ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+            ncs.getClusterController(jobId.getCcId()).reportResultPartitionWriteCompletion(jobId, rsId, partition);
         } catch (Exception e) {
-            throw new HyracksException(e);
+            throw HyracksException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index b9d2f4d..4787a50 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -96,7 +96,7 @@ public class MaterializedPartitionWriter implements IFrameWriter {
             ctx.getIoManager().close(handle);
         }
         if (!failed) {
-            manager.registerPartition(pid, taId,
+            manager.registerPartition(pid, ctx.getJobletContext().getJobId().getCcId(), taId,
                     new MaterializedPartition(ctx, fRef, executor, ctx.getIoManager()),
                     PartitionState.COMMITTED, taId.getAttempt() == 0 ? false : true);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 57eba53..147606d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -188,7 +188,8 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
         eos = false;
         failed = false;
         deallocated = false;
-        manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
+        manager.registerPartition(pid, ctx.getJobletContext().getJobId().getCcId(), taId, this, PartitionState.STARTED,
+                false);
     }
 
     private void checkOrCreateFile() throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index 667cfa3..bb69eec 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -58,10 +59,10 @@ public class PartitionManager {
         this.fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager());
     }
 
-    public synchronized void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition,
+    public synchronized void registerPartition(PartitionId pid, CcId ccId, TaskAttemptId taId, IPartition partition,
             PartitionState state, boolean updateToCC) throws HyracksDataException {
         try {
-            /**
+            /*
              * process pending requests
              */
             NetworkOutputChannel writer = partitionRequests.remove(pid);
@@ -73,24 +74,20 @@ public class PartitionManager {
                 }
             }
 
-            /**
+            /*
              * put a coming available partition into the available partition map
              */
-            List<IPartition> pList = availablePartitionMap.get(pid);
-            if (pList == null) {
-                pList = new ArrayList<>();
-                availablePartitionMap.put(pid, pList);
-            }
+            List<IPartition> pList = availablePartitionMap.computeIfAbsent(pid, k -> new ArrayList<>());
             pList.add(partition);
 
-            /**
+            /*
              * update to CC only when necessary
              */
             if (updateToCC) {
-                updatePartitionState(pid, taId, partition, state);
+                updatePartitionState(ccId, pid, taId, partition, state);
             }
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -128,7 +125,7 @@ public class PartitionManager {
                 partitionRequests.put(partitionId, writer);
             }
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -140,14 +137,15 @@ public class PartitionManager {
         deallocatableRegistry.close();
     }
 
-    public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+    public void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
+            PartitionState state)
             throws HyracksDataException {
         PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
         desc.setState(state);
         try {
-            ncs.getClusterController().registerPartitionProvider(desc);
+            ncs.getClusterController(ccId).registerPartitionProvider(desc);
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
index 16e5027..fc2f8e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -71,7 +71,8 @@ public class PipelinedPartition implements IFrameWriter, IPartition {
 
     @Override
     public void open() throws HyracksDataException {
-        manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
+        manager.registerPartition(pid, ctx.getJobletContext().getJobId().getCcId(), taId, this, PartitionState.STARTED,
+                false);
         pendingConnection = true;
         ensureConnected();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
index f43dcbc..4969a85 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
@@ -18,8 +18,9 @@
  */
 package org.apache.hyracks.control.nc.task;
 
-import org.apache.hyracks.util.ThreadDumpUtil;
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.util.ThreadDumpUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -28,10 +29,12 @@ public class ThreadDumpTask implements Runnable {
     private static final Logger LOGGER = LogManager.getLogger();
     private final NodeControllerService ncs;
     private final String requestId;
+    private final CcId ccId;
 
-    public ThreadDumpTask(NodeControllerService ncs, String requestId) {
+    public ThreadDumpTask(NodeControllerService ncs, String requestId, CcId ccId) {
         this.ncs = ncs;
         this.requestId = requestId;
+        this.ccId = ccId;
     }
 
     @Override
@@ -44,8 +47,7 @@ public class ThreadDumpTask implements Runnable {
             result = null;
         }
         try {
-            ncs.getClusterController().notifyThreadDump(
-                    ncs.getContext().getNodeId(), requestId, result);
+            ncs.getClusterController(ccId).notifyThreadDump(ncs.getContext().getNodeId(), requestId, result);
         } catch (Exception e) {
             LOGGER.log(Level.WARN, "Exception sending thread dump to CC", e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index 6132639..68d677f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.control.nc.work;
 
 import java.util.Collection;
 
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -34,24 +35,29 @@ public class AbortAllJobsWork extends SynchronizableWork {
 
     private static final Logger LOGGER = LogManager.getLogger();
     private final NodeControllerService ncs;
+    private final CcId ccId;
 
-    public AbortAllJobsWork(NodeControllerService ncs) {
+    public AbortAllJobsWork(NodeControllerService ncs, CcId ccId) {
         this.ncs = ncs;
+        this.ccId = ccId;
     }
 
     @Override
     protected void doRun() throws Exception {
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Aborting all tasks");
-        }
+        LOGGER.info("Aborting all tasks for controller {}", ccId);
         IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
-        if (dpm != null) {
-            ncs.getDatasetPartitionManager().abortAllReaders();
-        } else {
+        if (dpm == null) {
             LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId());
         }
         Collection<Joblet> joblets = ncs.getJobletMap().values();
         for (Joblet ji : joblets) {
+            // TODO(mblow): should we have one jobletmap per cc?
+            if (!ji.getJobId().getCcId().equals(ccId)) {
+                continue;
+            }
+            if (dpm != null) {
+                dpm.abortReader(ji.getJobId());
+            }
             Collection<Task> tasks = ji.getTaskMap().values();
             for (Task task : tasks) {
                 task.abort();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1a3a8212/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java
index 582f058..0dd5d4e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
@@ -33,20 +34,21 @@ import org.apache.hyracks.control.nc.NodeControllerService;
 public class BuildJobProfilesWork extends SynchronizableWork {
     private final NodeControllerService ncs;
 
+    private final CcId ccId;
     private final FutureValue<List<JobProfile>> fv;
 
-    public BuildJobProfilesWork(NodeControllerService ncs, FutureValue<List<JobProfile>> fv) {
+    public BuildJobProfilesWork(NodeControllerService ncs, CcId ccId, FutureValue<List<JobProfile>> fv) {
         this.ncs = ncs;
+        this.ccId = ccId;
         this.fv = fv;
     }
 
     @Override
     protected void doRun() throws Exception {
-        List<JobProfile> profiles = new ArrayList<JobProfile>();
+        List<JobProfile> profiles = new ArrayList<>();
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
-        for (Joblet ji : jobletMap.values()) {
-            profiles.add(new JobProfile(ji.getJobId()));
-        }
+        jobletMap.values().stream().filter(ji -> ji.getJobId().getCcId().equals(ccId))
+                .forEach(ji -> profiles.add(new JobProfile(ji.getJobId())));
         for (JobProfile jProfile : profiles) {
             Joblet ji;
             JobletProfile jobletProfile = new JobletProfile(ncs.getId());