You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2018/05/31 21:43:37 UTC

[12/43] asterixdb git commit: [NO ISSUE][NET] Networking Fixes

[NO ISSUE][NET] Networking Fixes

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Ensure received partitions requests after job failure
  are aborted to prevent leaked network channels.
- Do not send channel close after channel write error
  since the contract is to close the channel when remote
  errors are received.
- Only remove closed outgoing connections to establish
  new connections since incoming connections need to be
  reestablished by the remote destination.
- Do not perform further operations on failed multiplexed
  connections to avoid CanceledKeyException.
- Add test case for received partition requests after
  job failure.

Change-Id: Idc45f47fdf0419bf75d461e16f028237a5143de7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2613
Reviewed-by: Michael Blow <mb...@apache.org>
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Michael Blow <mb...@apache.org>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ba532209
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ba532209
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ba532209

Branch: refs/heads/release-0.9.4-pre-rc
Commit: ba5322099c4559d41ec5530099d0d994aaadb339
Parents: 62b1791
Author: Murtadha Hubail <mh...@apache.org>
Authored: Mon Apr 23 22:28:18 2018 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Mon Apr 23 12:56:06 2018 -0700

----------------------------------------------------------------------
 .../asterix/runtime/PartitionManagerTest.java   |  87 +++++++++++++++
 .../comm/channels/NetworkOutputChannel.java     |  10 +-
 .../hyracks-control/hyracks-control-nc/pom.xml  |   4 +
 .../control/nc/NodeControllerService.java       |   1 +
 .../hyracks/control/nc/net/NetworkManager.java  |   8 +-
 .../control/nc/partitions/PartitionManager.java | 107 +++++++++++++------
 .../control/nc/work/CleanupJobletWork.java      |  22 +---
 .../muxdemux/AbstractChannelWriteInterface.java |   5 +-
 .../muxdemux/MultiplexedConnection.java         |  52 +++++----
 .../net/protocols/muxdemux/MuxDemux.java        |   4 +-
 .../net/protocols/tcp/TCPConnection.java        |  15 ++-
 .../hyracks/net/protocols/tcp/TCPEndpoint.java  |   8 +-
 12 files changed, 231 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ba532209/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
new file mode 100644
index 0000000..25ab530
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/PartitionManagerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.hyracks.api.comm.FixedSizeFrame;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.comm.channels.NetworkInputChannel;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.dataflow.std.collectors.InputChannelFrameReader;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class PartitionManagerTest {
+
+    protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+        integrationUtil.init(true, TEST_CONFIG_FILE_NAME);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    @Test
+    public void failedJobPartitionRequestTest() throws Exception {
+        final NodeControllerService nc1 = integrationUtil.ncs[0];
+        final NodeControllerService nc2 = integrationUtil.ncs[1];
+        final JobId failedJob = new JobId(-1);
+        nc2.getPartitionManager().jobCompleted(failedJob, JobStatus.FAILURE);
+        final NetworkAddress localNetworkAddress = nc2.getNetworkManager().getPublicNetworkAddress();
+        final InetSocketAddress nc2Address =
+                new InetSocketAddress(localNetworkAddress.getAddress(), localNetworkAddress.getPort());
+        PartitionId id = new PartitionId(failedJob, new ConnectorDescriptorId(1), 0, 1);
+        NetworkInputChannel inputChannel = new NetworkInputChannel(nc1.getNetworkManager(), nc2Address, id, 1);
+        InputChannelFrameReader frameReader = new InputChannelFrameReader(inputChannel);
+        inputChannel.registerMonitor(frameReader);
+        AtomicBoolean failed = new AtomicBoolean(false);
+        Thread reader = new Thread(() -> {
+            try {
+                failed.set(!frameReader.nextFrame(new FixedSizeFrame()));
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+            }
+        });
+        reader.start();
+        final IHyracksCommonContext context = Mockito.mock(IHyracksCommonContext.class);
+        Mockito.when(context.getInitialFrameSize()).thenReturn(2000);
+        inputChannel.open(context);
+        reader.join(5000);
+        Assert.assertTrue(failed.get());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ba532209/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 126d9a4..3016a7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -25,6 +25,7 @@ import java.util.Deque;
 import org.apache.hyracks.api.comm.IBufferAcceptor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
 public class NetworkOutputChannel implements IFrameWriter {
@@ -43,7 +44,7 @@ public class NetworkOutputChannel implements IFrameWriter {
     public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
         this.ccb = ccb;
         this.nBuffers = nBuffers;
-        emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+        emptyStack = new ArrayDeque<>(nBuffers);
         ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
     }
 
@@ -58,7 +59,7 @@ public class NetworkOutputChannel implements IFrameWriter {
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        ByteBuffer destBuffer = null;
+        ByteBuffer destBuffer;
         while (buffer.hasRemaining()) {
             synchronized (this) {
                 while (true) {
@@ -76,6 +77,7 @@ public class NetworkOutputChannel implements IFrameWriter {
                     try {
                         wait();
                     } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
                         throw HyracksDataException.create(e);
                     }
                 }
@@ -94,7 +96,7 @@ public class NetworkOutputChannel implements IFrameWriter {
 
     @Override
     public void fail() throws HyracksDataException {
-        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+        ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
     }
 
     @Override
@@ -103,7 +105,7 @@ public class NetworkOutputChannel implements IFrameWriter {
     }
 
     public void abort() {
-        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+        ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE);
         synchronized (NetworkOutputChannel.this) {
             aborted = true;
             NetworkOutputChannel.this.notifyAll();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ba532209/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index d7ed47d..c962029 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -87,5 +87,9 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ba532209/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 76b5c8c..6a7d645 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -665,6 +665,7 @@ public class NodeControllerService implements IControllerService {
     }
 
     public void notifyTasksCompleted(CcId ccId) throws Exception {
+        partitionManager.jobsCompleted(ccId);
         application.onRegisterNode(ccId);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ba532209/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index f3276a4..cfe0991 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -27,7 +27,6 @@ import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
 import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.PartitionId;
@@ -129,12 +128,7 @@ public class NetworkManager implements IChannelConnectionFactory {
                 LOGGER.debug("Received initial partition request: " + pid + " on channel: " + ccb);
             }
             noc = new NetworkOutputChannel(ccb, nBuffers);
-            try {
-                partitionManager.registerPartitionRequest(pid, noc);
-            } catch (HyracksException e) {
-                e.printStackTrace();
-                noc.abort();
-            }
+            partitionManager.registerPartitionRequest(pid, noc);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ba532209/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index 9ee4a9e..d023ce9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -19,20 +19,22 @@
 package org.apache.hyracks.control.nc.partitions;
 
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.partitions.IPartition;
 import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
 import org.apache.hyracks.control.common.job.PartitionDescriptor;
 import org.apache.hyracks.control.common.job.PartitionState;
@@ -40,6 +42,9 @@ import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
 import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
 public class PartitionManager {
 
     private final NodeControllerService ncs;
@@ -52,11 +57,14 @@ public class PartitionManager {
 
     private final Map<PartitionId, NetworkOutputChannel> partitionRequests = new HashMap<>();
 
+    private final Cache<JobId, JobId> failedJobsCache;
+
     public PartitionManager(NodeControllerService ncs) {
         this.ncs = ncs;
         this.availablePartitionMap = new HashMap<>();
         this.deallocatableRegistry = new DefaultDeallocatableRegistry();
         this.fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager());
+        failedJobsCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
     }
 
     public synchronized void registerPartition(PartitionId pid, CcId ccId, TaskAttemptId taId, IPartition partition,
@@ -95,37 +103,20 @@ public class PartitionManager {
         return availablePartitionMap.get(pid).get(0);
     }
 
-    public synchronized void unregisterPartitions(JobId jobId, Collection<IPartition> unregisteredPartitions) {
-        for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
-                .hasNext();) {
-            Map.Entry<PartitionId, List<IPartition>> e = i.next();
-            PartitionId pid = e.getKey();
-            if (jobId.equals(pid.getJobId())) {
-                for (IPartition p : e.getValue()) {
-                    unregisteredPartitions.add(p);
-                }
-                i.remove();
-            }
+    public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer) {
+        if (failedJobsCache.getIfPresent(partitionId.getJobId()) != null) {
+            writer.abort();
         }
-    }
-
-    public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer)
-            throws HyracksException {
-        try {
-            List<IPartition> pList = availablePartitionMap.get(partitionId);
-            if (pList != null && !pList.isEmpty()) {
-                IPartition partition = pList.get(0);
-                writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
-                partition.writeTo(writer);
-                if (!partition.isReusable()) {
-                    availablePartitionMap.remove(partitionId);
-                }
-            } else {
-                //throw new HyracksException("Request for unknown partition " + partitionId);
-                partitionRequests.put(partitionId, writer);
+        List<IPartition> pList = availablePartitionMap.get(partitionId);
+        if (pList != null && !pList.isEmpty()) {
+            IPartition partition = pList.get(0);
+            writer.setFrameSize(partition.getTaskContext().getInitialFrameSize());
+            partition.writeTo(writer);
+            if (!partition.isReusable()) {
+                availablePartitionMap.remove(partitionId);
             }
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+        } else {
+            partitionRequests.put(partitionId, writer);
         }
     }
 
@@ -137,7 +128,25 @@ public class PartitionManager {
         deallocatableRegistry.close();
     }
 
-    public void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
+    public synchronized void jobCompleted(JobId jobId, JobStatus status) {
+        if (status == JobStatus.FAILURE) {
+            failedJobsCache.put(jobId, jobId);
+        }
+        final List<IPartition> jobPartitions = unregisterPartitions(jobId);
+        final List<NetworkOutputChannel> pendingRequests = removePendingRequests(jobId, status);
+        if (!jobPartitions.isEmpty() || !pendingRequests.isEmpty()) {
+            ncs.getExecutor().execute(() -> {
+                jobPartitions.forEach(IDeallocatable::deallocate);
+                pendingRequests.forEach(NetworkOutputChannel::abort);
+            });
+        }
+    }
+
+    public synchronized void jobsCompleted(CcId ccId) {
+        failedJobsCache.asMap().keySet().removeIf(jobId -> jobId.getCcId().equals(ccId));
+    }
+
+    private void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
             PartitionState state) throws HyracksDataException {
         PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
         desc.setState(state);
@@ -147,4 +156,36 @@ public class PartitionManager {
             throw HyracksDataException.create(e);
         }
     }
-}
+
+    private List<IPartition> unregisterPartitions(JobId jobId) {
+        final List<IPartition> unregisteredPartitions = new ArrayList<>();
+        for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
+                .hasNext();) {
+            Map.Entry<PartitionId, List<IPartition>> entry = i.next();
+            PartitionId pid = entry.getKey();
+            if (jobId.equals(pid.getJobId())) {
+                unregisteredPartitions.addAll(entry.getValue());
+                i.remove();
+            }
+        }
+        return unregisteredPartitions;
+    }
+
+    private List<NetworkOutputChannel> removePendingRequests(JobId jobId, JobStatus status) {
+        if (status != JobStatus.FAILURE) {
+            return Collections.emptyList();
+        }
+        final List<NetworkOutputChannel> pendingRequests = new ArrayList<>();
+        final Iterator<Map.Entry<PartitionId, NetworkOutputChannel>> requestsIterator =
+                partitionRequests.entrySet().iterator();
+        while (requestsIterator.hasNext()) {
+            final Map.Entry<PartitionId, NetworkOutputChannel> entry = requestsIterator.next();
+            final PartitionId partitionId = entry.getKey();
+            if (partitionId.getJobId().equals(jobId)) {
+                pendingRequests.add(entry.getValue());
+                requestsIterator.remove();
+            }
+        }
+        return pendingRequests;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ba532209/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index d38cd5e..c5a9d73 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -18,17 +18,13 @@
  */
 package org.apache.hyracks.control.nc.work;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
-import org.apache.hyracks.api.partitions.IPartition;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.Joblet;
 import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -53,23 +49,7 @@ public class CleanupJobletWork extends AbstractWork {
             LOGGER.info("Cleaning up after job: " + jobId);
         }
         ncs.removeJobParameterByteStore(jobId);
-        final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
-        ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
-        ncs.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                for (IPartition p : unregisteredPartitions) {
-                    try {
-                        // Put deallocate in a try block to make sure that every IPartition is de-allocated.
-                        p.deallocate();
-                    } catch (Exception e) {
-                        if (LOGGER.isWarnEnabled()) {
-                            LOGGER.log(Level.WARN, e.getMessage(), e);
-                        }
-                    }
-                }
-            }
-        });
+        ncs.getPartitionManager().jobCompleted(jobId, status);;
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet joblet = jobletMap.remove(jobId);
         if (joblet != null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ba532209/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 0b548f6..28c1a71 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -31,6 +31,7 @@ import org.apache.logging.log4j.Logger;
 
 public abstract class AbstractChannelWriteInterface implements IChannelWriteInterface {
 
+    public static final int REMOTE_WRITE_ERROR_CODE = 1;
     private static final Logger LOGGER = LogManager.getLogger();
     protected final IChannelControlBlock ccb;
     protected final Queue<ByteBuffer> wiFullQueue;
@@ -135,7 +136,9 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInte
                     return;
                 }
                 eos = true;
-                adjustChannelWritability();
+                if (ecode != REMOTE_WRITE_ERROR_CODE) {
+                    adjustChannelWritability();
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ba532209/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 286320b..a7fa49e 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -31,6 +31,7 @@ import org.apache.hyracks.api.comm.MuxDemuxCommand;
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
+import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -67,28 +68,7 @@ public class MultiplexedConnection implements ITCPConnectionEventListener {
 
     MultiplexedConnection(MuxDemux muxDemux) {
         this.muxDemux = muxDemux;
-        pendingWriteEventsCounter = new IEventCounter() {
-            private int counter;
-
-            @Override
-            public synchronized void increment() {
-                ++counter;
-                if (counter == 1) {
-                    tcpConnection.enable(SelectionKey.OP_WRITE);
-                }
-            }
-
-            @Override
-            public synchronized void decrement() {
-                --counter;
-                if (counter == 0) {
-                    tcpConnection.disable(SelectionKey.OP_WRITE);
-                }
-                if (counter < 0) {
-                    throw new IllegalStateException();
-                }
-            }
-        };
+        pendingWriteEventsCounter = new EventCounter();
         cSet = new ChannelSet(this, pendingWriteEventsCounter);
         readerState = new ReaderState();
         writerState = new WriterState();
@@ -429,4 +409,32 @@ public class MultiplexedConnection implements ITCPConnectionEventListener {
     public IChannelInterfaceFactory getChannelInterfaceFactory() {
         return muxDemux.getChannelInterfaceFactory();
     }
+
+    @ThreadSafetyGuaranteedBy("MultiplexedConnection.this")
+    private class EventCounter implements IEventCounter {
+        private int counter;
+
+        @Override
+        public synchronized void increment() {
+            if (!connectionFailure) {
+                ++counter;
+                if (counter == 1) {
+                    tcpConnection.enable(SelectionKey.OP_WRITE);
+                }
+            }
+        }
+
+        @Override
+        public synchronized void decrement() {
+            if (!connectionFailure) {
+                --counter;
+                if (counter == 0) {
+                    tcpConnection.disable(SelectionKey.OP_WRITE);
+                }
+                if (counter < 0) {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ba532209/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index c12909c..c58cb86 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -111,7 +111,9 @@ public class MuxDemux {
             @Override
             public void connectionClosed(TCPConnection connection) {
                 synchronized (MuxDemux.this) {
-                    connectionMap.remove(connection.getRemoteAddress());
+                    if (connection.getType() == TCPConnection.ConnectionType.OUTGOING) {
+                        connectionMap.remove(connection.getRemoteAddress());
+                    }
                 }
             }
         }, nThreads);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ba532209/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
index b0e2eed..ff4627a 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
@@ -29,6 +29,11 @@ import org.apache.logging.log4j.Logger;
 
 public class TCPConnection {
 
+    public enum ConnectionType {
+        INCOMING,
+        OUTGOING
+    }
+
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final TCPEndpoint endpoint;
@@ -43,11 +48,15 @@ public class TCPConnection {
 
     private Object attachment;
 
-    public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector) {
+    private ConnectionType type;
+
+    public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector,
+            ConnectionType type) {
         this.endpoint = endpoint;
         this.channel = channel;
         this.key = key;
         this.selector = selector;
+        this.type = type;
         remoteAddress = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
     }
 
@@ -102,6 +111,10 @@ public class TCPConnection {
         }
     }
 
+    public ConnectionType getType() {
+        return type;
+    }
+
     @Override
     public String toString() {
         return "TCPConnection[Remote Address: " + remoteAddress + " Local Address: " + endpoint.getLocalAddress() + "]";

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ba532209/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index affa59e..05e2175 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.net.protocols.tcp;
 
+import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -160,7 +162,8 @@ public class TCPEndpoint {
                         for (SocketChannel channel : workingIncomingConnections) {
                             register(channel);
                             SelectionKey sKey = channel.register(selector, 0);
-                            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
+                            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector,
+                                    ConnectionType.INCOMING);
                             sKey.attach(connection);
                             synchronized (connectionListener) {
                                 connectionListener.acceptedConnection(connection);
@@ -220,7 +223,8 @@ public class TCPEndpoint {
         }
 
         private void createConnection(SelectionKey key, SocketChannel channel) {
-            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, key, selector);
+            TCPConnection connection =
+                    new TCPConnection(TCPEndpoint.this, channel, key, selector, ConnectionType.OUTGOING);
             key.attach(connection);
             key.interestOps(0);
             synchronized (connectionListener) {