You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/01/16 19:06:08 UTC

asterixdb git commit: [NO ISSUE] Refactor IPC reconnect logic to be usable by all IPC connections

Repository: asterixdb
Updated Branches:
  refs/heads/master 76c11424f -> d94dc6dc9


[NO ISSUE] Refactor IPC reconnect logic to be usable by all IPC connections

Change-Id: I2430510b22f936b89879df98322ef51ec87c6da6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2284
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/d94dc6dc
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/d94dc6dc
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/d94dc6dc

Branch: refs/heads/master
Commit: d94dc6dc93091ab8879d74b951996df475093f0f
Parents: 76c1142
Author: Michael Blow <mi...@couchbase.com>
Authored: Tue Jan 16 11:23:59 2018 -0500
Committer: Michael Blow <mb...@apache.org>
Committed: Tue Jan 16 11:05:45 2018 -0800

----------------------------------------------------------------------
 .../api/http/server/NCQueryServiceServlet.java  |  3 +-
 .../common/exceptions/ExceptionUtils.java       | 19 +++++
 .../hyracks/api/client/HyracksConnection.java   |  3 +-
 ...yracksDatasetDirectoryServiceConnection.java |  2 +-
 .../hyracks/control/cc/cluster/NodeManager.java |  1 +
 .../control/cc/work/RegisterNodeWork.java       |  4 +-
 .../ipc/ClusterControllerRemoteProxy.java       | 63 ++++++--------
 .../common/ipc/ControllerRemoteProxy.java       | 86 --------------------
 .../IControllerRemoteProxyIPCEventListener.java | 37 ---------
 .../common/ipc/NodeControllerRemoteProxy.java   | 51 +++++-------
 .../control/nc/NodeControllerService.java       |  9 +-
 .../hyracks/ipc/api/IIPCEventListener.java      | 36 ++++++++
 .../org/apache/hyracks/ipc/impl/IPCSystem.java  | 23 ++++++
 .../hyracks/ipc/impl/NoOpIPCEventListener.java  | 28 +++++++
 .../hyracks/ipc/impl/ReconnectingIPCHandle.java | 83 +++++++++++++++++++
 15 files changed, 246 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 8a8342e..83a40f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -45,7 +45,6 @@ import org.apache.asterix.translator.SessionOutput;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.ipc.exceptions.IPCException;
@@ -154,7 +153,7 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
     @Override
     protected void handleExecuteStatementException(Throwable t, RequestExecutionState execution) {
         if (t instanceof TimeoutException
-                || (t instanceof HyracksDataException && ExceptionUtils.getRootCause(t) instanceof IPCException)) {
+                || ExceptionUtils.matchingCause(t, candidate -> candidate instanceof IPCException)) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.WARN, t.toString(), t);
             execution.setStatus(ResultStatus.FAILED, HttpResponseStatus.SERVICE_UNAVAILABLE);
         } else {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
index 3105b3f..256ce08 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.exceptions;
 
+import java.util.function.Predicate;
+
 public class ExceptionUtils {
     public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
     public static final String PARAMETER_NAME = "Parameter name: ";
@@ -49,4 +51,21 @@ public class ExceptionUtils {
         }
         return current;
     }
+
+    /**
+     * Determines whether supplied exception contains a matching cause in its hierarchy, or is itself a match
+     */
+    public static boolean matchingCause(Throwable e, Predicate<Throwable> test) {
+        Throwable current = e;
+        Throwable cause = e.getCause();
+        while (cause != null && cause != current) {
+            if (test.test(cause)) {
+                return true;
+            }
+            Throwable nextCause = current.getCause();
+            current = cause;
+            cause = nextCause;
+        }
+        return test.test(e);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 85ef927..80b61f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -83,7 +83,8 @@ public final class HyracksConnection implements IHyracksClientConnection {
         RPCInterface rpci = new RPCInterface();
         ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
         ipc.start();
-        hci = new HyracksClientInterfaceRemoteProxy(ipc.getHandle(new InetSocketAddress(ccHost, ccPort)), rpci);
+        hci = new HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new InetSocketAddress(ccHost, ccPort)),
+                rpci);
         ccInfo = hci.getClusterControllerInfo();
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 075747d..63139d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -40,7 +40,7 @@ public class HyracksDatasetDirectoryServiceConnection implements IHyracksDataset
         RPCInterface rpci = new RPCInterface();
         ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
         ipc.start();
-        IIPCHandle ddsIpchandle = ipc.getHandle(new InetSocketAddress(ddsHost, ddsPort));
+        IIPCHandle ddsIpchandle = ipc.getReconnectingHandle(new InetSocketAddress(ddsHost, ddsPort));
         this.ddsi = new HyracksDatasetDirectoryServiceInterfaceRemoteProxy(ddsIpchandle, rpci);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 590a0f3..712d2ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -101,6 +101,7 @@ public class NodeManager implements INodeManager {
             removeDeadNode(nodeId);
         } else {
             try {
+                // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
                 IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
                 ncIPCHandle.send(-1, new AbortCCJobsFunction(), null);
             } catch (IPCException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 07b0f04..3a38287 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -50,13 +50,15 @@ public class RegisterNodeWork extends SynchronizableWork {
     @Override
     protected void doRun() throws Exception {
         String id = reg.getNodeId();
+        // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
         IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
         CCNCFunctions.NodeRegistrationResult result;
         Map<IOption, Object> ncConfiguration = new HashMap<>();
         try {
             LOGGER.log(Level.WARN, "Registering INodeController: id = " + id);
             NodeControllerRemoteProxy nc =
-                    new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
+                    new NodeControllerRemoteProxy(
+                            ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
             NodeControllerState state = new NodeControllerState(nc, reg);
             INodeManager nodeManager = ccs.getNodeManager();
             nodeManager.addNode(id, state);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 447d678..f2e7d87 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
-import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -53,42 +52,26 @@ import org.apache.hyracks.control.common.job.PartitionDescriptor;
 import org.apache.hyracks.control.common.job.PartitionRequest;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.ipc.api.IIPCHandle;
 
-public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implements IClusterController {
-    private static final Logger LOGGER = LogManager.getLogger();
+public class ClusterControllerRemoteProxy implements IClusterController {
 
-    private final int clusterConnectRetries;
+    private IIPCHandle ipcHandle;
 
-    public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress, int clusterConnectRetries,
-                                        IControllerRemoteProxyIPCEventListener eventListener) {
-        super(ipc, inetSocketAddress, eventListener);
-        this.clusterConnectRetries = clusterConnectRetries;
-    }
-
-    @Override
-    protected int getMaxRetries(boolean first) {
-        // -1 == retry forever
-        return first ? clusterConnectRetries : 0;
-    }
-
-    @Override
-    protected Logger getLogger() {
-        return LOGGER;
+    public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
     }
 
     @Override
     public void registerNode(NodeRegistration reg) throws Exception {
         RegisterNodeFunction fn = new RegisterNodeFunction(reg);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void unregisterNode(String nodeId) throws Exception {
         UnregisterNodeFunction fn = new UnregisterNodeFunction(nodeId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
@@ -96,7 +79,7 @@ public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implemen
             throws Exception {
         NotifyTaskCompleteFunction fn = new NotifyTaskCompleteFunction(jobId, taskId,
                 nodeId, statistics);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
@@ -104,53 +87,53 @@ public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implemen
             throws Exception {
         NotifyTaskFailureFunction fn = new NotifyTaskFailureFunction(jobId, taskId, nodeId,
                 exceptions);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
         NotifyJobletCleanupFunction fn = new NotifyJobletCleanupFunction(jobId, nodeId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception {
         NotifyDeployBinaryFunction fn = new NotifyDeployBinaryFunction(deploymentId, nodeId,
                 status);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
         NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData);
-        ensureIpcHandle(0).send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
         ReportProfileFunction fn = new ReportProfileFunction(id, profiles);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
         RegisterPartitionProviderFunction fn = new RegisterPartitionProviderFunction(
                 partitionDescriptor);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
         RegisterPartitionRequestFunction fn = new RegisterPartitionRequestFunction(
                 partitionRequest);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
         SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data,
                 deploymentId, nodeId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
@@ -158,44 +141,44 @@ public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implemen
             boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception {
         RegisterResultPartitionLocationFunction fn = new RegisterResultPartitionLocationFunction(
                 jobId, rsId, orderedResult, emptyResult, partition, nPartitions, networkAddress);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception {
         ReportResultPartitionWriteCompletionFunction fn = new ReportResultPartitionWriteCompletionFunction(
                 jobId, rsId, partition);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception {
         ReportDeployedJobSpecFailureFunction fn = new ReportDeployedJobSpecFailureFunction(deployedJobSpecId, nodeId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void getNodeControllerInfos() throws Exception {
-        ensureIpcHandle().send(-1, new GetNodeControllersInfoFunction(), null);
+        ipcHandle.send(-1, new GetNodeControllersInfoFunction(), null);
     }
 
     @Override
     public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception {
         StateDumpResponseFunction fn = new StateDumpResponseFunction(nodeId, stateDumpId,
                 state);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyShutdown(String nodeId) throws Exception {
         ShutdownResponseFunction sdrf = new ShutdownResponseFunction(nodeId);
-        ensureIpcHandle().send(-1, sdrf, null);
+        ipcHandle.send(-1, sdrf, null);
     }
 
     @Override
     public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception {
         ThreadDumpResponseFunction tdrf = new ThreadDumpResponseFunction(nodeId, requestId,
                 threadDumpJSON);
-        ensureIpcHandle().send(-1, tdrf, null);
+        ipcHandle.send(-1, tdrf, null);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
deleted file mode 100644
index fe9e85a..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.ipc;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.exceptions.IPCException;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-import org.apache.logging.log4j.Logger;
-
-public abstract class ControllerRemoteProxy {
-    protected final IPCSystem ipc;
-    private final InetSocketAddress inetSocketAddress;
-    private final IControllerRemoteProxyIPCEventListener eventListener;
-    private IIPCHandle ipcHandle;
-
-    protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) {
-        this(ipc, inetSocketAddress, null);
-    }
-
-    protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress,
-            IControllerRemoteProxyIPCEventListener eventListener) {
-        this.ipc = ipc;
-        this.inetSocketAddress = inetSocketAddress;
-        this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() {
-        } : eventListener;
-    }
-
-    protected IIPCHandle ensureIpcHandle() throws HyracksDataException {
-        return ensureIpcHandle(getMaxRetries(ipcHandle == null));
-    }
-
-    protected IIPCHandle ensureIpcHandle(int maxRetries) throws HyracksDataException {
-        if (ipcHandle != null && ipcHandle.isConnected()) {
-            return ipcHandle;
-        }
-        try {
-            final boolean first = ipcHandle == null;
-            if (!first) {
-                getLogger().warn("ipcHandle " + ipcHandle + " disconnected; retrying connection");
-                eventListener.ipcHandleDisconnected(ipcHandle);
-            }
-            ipcHandle = ipc.getHandle(inetSocketAddress, maxRetries);
-            if (first) {
-                eventListener.ipcHandleConnected(ipcHandle);
-            } else {
-                getLogger().warn("ipcHandle " + ipcHandle + " restored");
-                eventListener.ipcHandleRestored(ipcHandle);
-            }
-        } catch (IPCException e) {
-            throw HyracksDataException.create(e);
-        }
-        return ipcHandle;
-    }
-
-    /**
-     * Maximum number of times to retry a failed connection attempt
-     * @param first true if the initial connection attempt (i.e. server start)
-     * @return the maximum number of retries, if any.  <0 means retry forever
-     */
-    protected abstract int getMaxRetries(boolean first);
-
-    protected abstract Logger getLogger();
-
-    public InetSocketAddress getAddress() {
-        return inetSocketAddress;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
deleted file mode 100644
index ec4f9e4..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.ipc;
-
-import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IControllerRemoteProxyIPCEventListener {
-
-    default void ipcHandleConnected(IIPCHandle handle) throws IPCException {
-        // no-op
-    }
-
-    default void ipcHandleDisconnected(IIPCHandle handle) throws IPCException {
-        // no-op
-    }
-
-    default void ipcHandleRestored(IIPCHandle handle) throws IPCException {
-        // no-op
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index b4aaf45..b6b9b4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -48,26 +48,13 @@ import org.apache.hyracks.control.common.ipc.CCNCFunctions.ThreadDumpRequestFunc
 import org.apache.hyracks.control.common.ipc.CCNCFunctions.UnDeployBinaryFunction;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions.UndeployJobSpecFunction;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.ipc.api.IIPCHandle;
 
-public class NodeControllerRemoteProxy extends ControllerRemoteProxy implements INodeController {
-    private static final Logger LOGGER = LogManager.getLogger();
+public class NodeControllerRemoteProxy implements INodeController {
+    private final IIPCHandle ipcHandle;
 
-    public NodeControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) {
-        super(ipc, inetSocketAddress);
-    }
-
-    @Override
-    protected int getMaxRetries(boolean first) {
-        // -1 == retry forever
-        return 0;
-    }
-
-    @Override
-    protected Logger getLogger() {
-        return LOGGER;
+    public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
+        this.ipcHandle = ipcHandle;
     }
 
     @Override
@@ -77,74 +64,78 @@ public class NodeControllerRemoteProxy extends ControllerRemoteProxy implements
             throws Exception {
         StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, planBytes,
                 taskDescriptors, connectorPolicies, flags, jobParameters, deployedJobSpecId);
-        ensureIpcHandle().send(-1, stf, null);
+        ipcHandle.send(-1, stf, null);
     }
 
     @Override
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
         AbortTasksFunction atf = new AbortTasksFunction(jobId, tasks);
-        ensureIpcHandle().send(-1, atf, null);
+        ipcHandle.send(-1, atf, null);
     }
 
     @Override
     public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
         CleanupJobletFunction cjf = new CleanupJobletFunction(jobId, status);
-        ensureIpcHandle().send(-1, cjf, null);
+        ipcHandle.send(-1, cjf, null);
     }
 
     @Override
     public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
         ReportPartitionAvailabilityFunction rpaf = new ReportPartitionAvailabilityFunction(
                 pid, networkAddress);
-        ensureIpcHandle().send(-1, rpaf, null);
+        ipcHandle.send(-1, rpaf, null);
     }
 
     @Override
     public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception {
         DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs);
-        ensureIpcHandle().send(-1, rpaf, null);
+        ipcHandle.send(-1, rpaf, null);
     }
 
     @Override
     public void undeployBinary(DeploymentId deploymentId) throws Exception {
         UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId);
-        ensureIpcHandle().send(-1, rpaf, null);
+        ipcHandle.send(-1, rpaf, null);
     }
 
     @Override
     public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception {
         DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
         UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void dumpState(String stateDumpId) throws Exception {
         StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId);
-        ensureIpcHandle().send(-1, dsf, null);
+        ipcHandle.send(-1, dsf, null);
     }
 
     @Override
     public void shutdown(boolean terminateNCService) throws Exception {
         ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService);
-        ensureIpcHandle().send(-1, sdrf, null);
+        ipcHandle.send(-1, sdrf, null);
     }
 
     @Override
     public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
         SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data,
                 deploymentId, nodeId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void takeThreadDump(String requestId) throws Exception {
         ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId);
-        ensureIpcHandle().send(-1, fn, null);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    public InetSocketAddress getAddress() {
+        return ipcHandle.getRemoteAddress();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 01e34c1..18a6b20 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -68,7 +68,6 @@ import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
-import org.apache.hyracks.control.common.ipc.IControllerRemoteProxyIPCEventListener;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.work.FutureValue;
 import org.apache.hyracks.control.common.work.WorkQueue;
@@ -83,6 +82,7 @@ import org.apache.hyracks.control.nc.net.NetworkManager;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
+import org.apache.hyracks.ipc.api.IIPCEventListener;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.ipc.exceptions.IPCException;
@@ -297,9 +297,10 @@ public class NodeControllerService implements IControllerService {
         if (messagingNetManager != null) {
             messagingNetManager.start();
         }
-        this.ccs = new ClusterControllerRemoteProxy(ipc,
+        this.ccs = new ClusterControllerRemoteProxy(
+                ipc.getHandle(
                 new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()),
-                ncConfig.getClusterConnectRetries(), new IControllerRemoteProxyIPCEventListener() {
+                ncConfig.getClusterConnectRetries(), 1, new IIPCEventListener() {
                     @Override
                     public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
                         // we need to re-register in case of NC -> CC connection reset
@@ -310,7 +311,7 @@ public class NodeControllerService implements IControllerService {
                             throw new IPCException(e);
                         }
                     }
-                });
+                }));
         registerNode();
 
         workQueue.start();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java
new file mode 100644
index 0000000..a6ba545
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.api;
+
+import org.apache.hyracks.ipc.exceptions.IPCException;
+
+public interface IIPCEventListener {
+
+    default void ipcHandleConnected(IIPCHandle handle) throws IPCException {
+        // no-op
+    }
+
+    default void ipcHandleDisconnected(IIPCHandle handle) throws IPCException {
+        // no-op
+    }
+
+    default void ipcHandleRestored(IIPCHandle handle) throws IPCException {
+        // no-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index 8e38651..b36e645 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hyracks.ipc.api.IIPCEventListener;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
@@ -70,6 +71,28 @@ public class IPCSystem {
     }
 
     public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries) throws IPCException {
+        return getHandle(remoteAddress, maxRetries, 0);
+    }
+
+    public IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress) throws IPCException {
+        return getReconnectingHandle(remoteAddress, 1);
+    }
+
+    public IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress, int reconnectAttempts)
+            throws IPCException {
+        return getHandle(remoteAddress, 0, reconnectAttempts, NoOpIPCEventListener.INSTANCE);
+    }
+
+    public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries, int reconnectAttempts)
+            throws IPCException {
+        return getHandle(remoteAddress, maxRetries, reconnectAttempts, NoOpIPCEventListener.INSTANCE);
+    }
+
+    public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries, int reconnectAttempts,
+            IIPCEventListener eventListener) throws IPCException {
+        if (reconnectAttempts > 0) {
+            return new ReconnectingIPCHandle(this, eventListener, remoteAddress, maxRetries, reconnectAttempts);
+        }
         try {
             return cMgr.getIPCHandle(remoteAddress, maxRetries);
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java
new file mode 100644
index 0000000..156641f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.impl;
+
+import org.apache.hyracks.ipc.api.IIPCEventListener;
+
+public class NoOpIPCEventListener implements IIPCEventListener {
+    public static final IIPCEventListener INSTANCE = new NoOpIPCEventListener();
+
+    private NoOpIPCEventListener() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d94dc6dc/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
new file mode 100644
index 0000000..f0da860
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.impl;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.ipc.api.IIPCEventListener;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+class ReconnectingIPCHandle implements IIPCHandle {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final IPCSystem ipc;
+    private final int reconnectAttempts;
+    private final IIPCEventListener listener;
+    private IIPCHandle delegate;
+
+    ReconnectingIPCHandle(IPCSystem ipc, IIPCEventListener listener, InetSocketAddress remoteAddress, int maxRetries,
+            int reconnectAttempts) throws IPCException {
+        this.ipc = ipc;
+        this.listener = listener;
+        this.reconnectAttempts = reconnectAttempts;
+        this.delegate = ipc.getHandle(remoteAddress, maxRetries);
+        listener.ipcHandleConnected(delegate);
+    }
+
+    @Override
+    public InetSocketAddress getRemoteAddress() {
+        return delegate.getRemoteAddress();
+    }
+
+    @Override
+    public long send(long requestId, Object payload, Exception exception) throws IPCException {
+        return ensureConnected().send(requestId, payload, exception);
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        delegate.setAttachment(attachment);
+    }
+
+    @Override
+    public Object getAttachment() {
+        return delegate.getAttachment();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return delegate.isConnected();
+    }
+
+    private IIPCHandle ensureConnected() throws IPCException {
+        if (delegate.isConnected()) {
+            return delegate;
+        }
+        listener.ipcHandleDisconnected(delegate);
+        delegate = ipc.getHandle(getRemoteAddress(), reconnectAttempts);
+        LOGGER.warn("ipcHandle " + delegate + " restored");
+        listener.ipcHandleRestored(delegate);
+
+        return delegate;
+    }
+
+}