You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/12/19 17:01:18 UTC

asterixdb git commit: [ASTERIXDB-2490][NET] Support Encrypted Replication Connections

Repository: asterixdb
Updated Branches:
  refs/heads/master aa0d1df18 -> 11c83fae1


[ASTERIXDB-2490][NET] Support Encrypted Replication Connections

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Use SocketChannelFactory in replication connections
  to support both unencrypted and encrypted sockets.
- Add SSL replication test cases.
- Make SslSocketChannel close idempotent.
- Ensure FlushDatasetOperatorDescriptor waits for all
  on-going dataset IO.

Change-Id: I9657624a5d54d4966357651efb671f3d8f0cb304
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3092
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/11c83fae
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/11c83fae
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/11c83fae

Branch: refs/heads/master
Commit: 11c83fae1bd3514576b5c1a3e2d265d486cfa4ea
Parents: aa0d1df
Author: Murtadha Hubail <mh...@apache.org>
Authored: Mon Dec 17 17:23:51 2018 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Wed Dec 19 09:00:06 2018 -0800

----------------------------------------------------------------------
 .../runtime/SslReplicationExecutionTest.java    | 96 ++++++++++++++++++++
 .../src/test/resources/cc-rep-ssl.conf          | 61 +++++++++++++
 .../replication/api/IReplicationWorker.java     |  5 +-
 .../replication/api/PartitionReplica.java       | 13 +--
 .../replication/api/ReplicationDestination.java | 18 ++--
 .../replication/logging/RemoteLogsNotifier.java |  2 +-
 .../management/LogReplicationManager.java       | 34 +++----
 .../replication/management/NetworkingUtil.java  | 12 +--
 .../management/ReplicationChannel.java          | 41 ++++++---
 .../messaging/ReplicateLogsTask.java            |  3 +-
 .../messaging/ReplicationProtocol.java          | 43 +++++++--
 .../replication/sync/FileSynchronizer.java      |  3 +-
 .../sync/ReplicaFilesSynchronizer.java          |  3 +-
 .../std/FlushDatasetOperatorDescriptor.java     |  1 +
 .../hyracks/api/network/ISocketChannel.java     |  9 +-
 .../hyracks/ipc/sockets/SslSocketChannel.java   |  8 +-
 16 files changed, 278 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslReplicationExecutionTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslReplicationExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslReplicationExecutionTest.java
new file mode 100644
index 0000000..14aac1f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslReplicationExecutionTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class SslReplicationExecutionTest {
+    protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-rep-ssl.conf";
+    private static final TestExecutor testExecutor = new TestExecutor();
+    private static boolean configured = false;
+
+    @BeforeClass
+    public static void setUp() {
+        LangExecutionUtil.setCheckStorageDistribution(false);
+    }
+
+    @Before
+    public void before() throws Exception {
+        TestUtils.redirectLoggingToConsole();
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+        if (!configured) {
+            final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
+            Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+            Map<String, InetSocketAddress> replicationAddress = new HashMap<>();
+            final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+            for (NodeControllerService nc : ncs) {
+                final String nodeId = nc.getId();
+                final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
+                int apiPort = appCtx.getExternalProperties().getNcApiPort();
+                int replicationPort =
+                        (int) appCtx.getServiceContext().getAppConfig().get(NCConfig.Option.REPLICATION_LISTEN_PORT);
+                ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+                replicationAddress.put(nodeId, InetSocketAddress.createUnresolved(ip, replicationPort));
+            }
+            testExecutor.setNcEndPoints(ncEndPoints);
+            testExecutor.setNcReplicationAddress(replicationAddress);
+            configured = true;
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        LangExecutionUtil.tearDown();
+    }
+
+    @Parameters(name = "ReplicationExecutionTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests("replication.xml", "replication.xml");
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public SslReplicationExecutionTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        LangExecutionUtil.test(tcCtx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-app/src/test/resources/cc-rep-ssl.conf
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/cc-rep-ssl.conf b/asterixdb/asterix-app/src/test/resources/cc-rep-ssl.conf
new file mode 100644
index 0000000..db4ca20
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/cc-rep-ssl.conf
@@ -0,0 +1,61 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements.  See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership.  The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License.  You may obtain a copy of the License at
+;
+;   http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied.  See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=../asterix-server/target/tmp/asterix_nc1/txnlog
+core.dump.dir=../asterix-server/target/tmp/asterix_nc1/coredump
+iodevices=asterix_nc1/iodevice1,asterix_nc1/iodevice2
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006
+replication.listen.port=2001
+nc.api.port=19004
+key.store.path=security/nc1/asterix_nc1.jks
+key.store.password=asterixdb
+trust.store.path=security/root/root.truststore
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=../asterix-server/target/tmp/asterix_nc2/txnlog
+core.dump.dir=../asterix-server/target/tmp/asterix_nc2/coredump
+iodevices=asterix_nc2/iodevice1,asterix_nc2/iodevice2
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007
+replication.listen.port=2002
+nc.api.port=19005
+key.store.path=security/nc2/asterix_nc2.jks
+key.store.password=asterixdb
+trust.store.path=security/root/root.truststore
+
+[nc]
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.memorycomponent.globalbudget = 1073741824
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+key.store.path=security/cc/cc.jks
+key.store.password=asterixdb
+trust.store.path=security/root/root.truststore
+
+[common]
+log.level = INFO
+replication.enabled=true
+replication.strategy=all
+ssl.enabled=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
index c8abe8f..c7b2561 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
@@ -19,14 +19,15 @@
 package org.apache.asterix.replication.api;
 
 import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
+
+import org.apache.hyracks.api.network.ISocketChannel;
 
 public interface IReplicationWorker extends Runnable {
 
     /**
      * @return The replication socket channel.
      */
-    SocketChannel getChannel();
+    ISocketChannel getChannel();
 
     /**
      * Gets a reusable buffer that can be used to send data

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index 8847e7e..0e85665 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -24,7 +24,6 @@ import static org.apache.asterix.common.replication.IPartitionReplica.PartitionR
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
@@ -32,6 +31,7 @@ import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
 import org.apache.asterix.replication.sync.ReplicaSynchronizer;
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.hyracks.util.NetworkUtil;
 import org.apache.hyracks.util.StorageUtil;
 import org.apache.hyracks.util.annotations.ThreadSafe;
@@ -53,7 +53,7 @@ public class PartitionReplica implements IPartitionReplica {
     private final ReplicaIdentifier id;
     private ByteBuffer reusbaleBuf;
     private PartitionReplicaStatus status = DISCONNECTED;
-    private SocketChannel sc;
+    private ISocketChannel sc;
 
     public PartitionReplica(ReplicaIdentifier id, INcApplicationContext appCtx) {
         this.id = id;
@@ -93,13 +93,10 @@ public class PartitionReplica implements IPartitionReplica {
         });
     }
 
-    public synchronized SocketChannel getChannel() {
+    public synchronized ISocketChannel getChannel() {
         try {
-            if (sc == null || !sc.isOpen() || !sc.isConnected()) {
-                sc = SocketChannel.open();
-                NetworkUtil.configure(sc);
-                sc.configureBlocking(true);
-                sc.connect(id.getLocation());
+            if (sc == null || !sc.getSocketChannel().isOpen() || !sc.getSocketChannel().isConnected()) {
+                sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.getLocation());
             }
             return sc;
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index 8ccfced..eda37b5 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -20,17 +20,17 @@ package org.apache.asterix.replication.api;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
 import java.util.HashSet;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.replication.IReplicationDestination;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
-import org.apache.hyracks.util.NetworkUtil;
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -39,7 +39,7 @@ public class ReplicationDestination implements IReplicationDestination {
     private static final Logger LOGGER = LogManager.getLogger();
     private final Set<IPartitionReplica> replicas = new HashSet<>();
     private final InetSocketAddress location;
-    private SocketChannel logRepChannel;
+    private ISocketChannel logRepChannel;
 
     private ReplicationDestination(InetSocketAddress location) {
         this.location = location;
@@ -75,13 +75,11 @@ public class ReplicationDestination implements IReplicationDestination {
                 && replica.getStatus() == IPartitionReplica.PartitionReplicaStatus.IN_SYNC).findAny();
     }
 
-    public synchronized SocketChannel getLogReplicationChannel() {
+    public synchronized ISocketChannel getLogReplicationChannel(INcApplicationContext appCtx) {
         try {
-            if (logRepChannel == null || !logRepChannel.isOpen() || !logRepChannel.isConnected()) {
-                logRepChannel = SocketChannel.open();
-                NetworkUtil.configure(logRepChannel);
-                logRepChannel.configureBlocking(true);
-                logRepChannel.connect(location);
+            if (logRepChannel == null || !logRepChannel.getSocketChannel().isOpen()
+                    || !logRepChannel.getSocketChannel().isConnected()) {
+                logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, location);
             }
             return logRepChannel;
         } catch (IOException e) {
@@ -91,7 +89,7 @@ public class ReplicationDestination implements IReplicationDestination {
 
     private synchronized void closeLogReplicationChannel() {
         try {
-            if (logRepChannel != null && logRepChannel.isOpen()) {
+            if (logRepChannel != null && logRepChannel.getSocketChannel().isOpen()) {
                 ReplicationProtocol.sendGoodbye(logRepChannel);
                 logRepChannel.close();
                 logRepChannel = null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
index 366abce..440f8ef 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -65,7 +65,7 @@ class RemoteLogsNotifier implements Runnable {
                     case LogType.JOB_COMMIT:
                     case LogType.ABORT:
                         // send ACK to requester
-                        logRecord.getReplicationWorker().getChannel().socket().getOutputStream()
+                        logRecord.getReplicationWorker().getChannel().getSocketChannel().socket().getOutputStream()
                                 .write((nodeId + ReplicationProtocol.LOG_REPLICATION_ACK + logRecord.getTxnId()
                                         + System.lineSeparator()).getBytes());
                         break;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index 6c8e372..0bcffc6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.SocketChannel;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -46,6 +45,7 @@ import org.apache.asterix.replication.logging.TxnAckTracker;
 import org.apache.asterix.replication.logging.TxnLogReplicator;
 import org.apache.asterix.replication.messaging.ReplicateLogsTask;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -55,17 +55,17 @@ public class LogReplicationManager {
     private final LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ;
     private final LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ;
     private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
-    private final Map<ReplicationDestination, SocketChannel> destinations = new HashMap<>();
+    private final Map<ReplicationDestination, ISocketChannel> destinations = new HashMap<>();
     private final IReplicationManager replicationManager;
     private final Executor executor;
     private final TxnAckTracker ackTracker = new TxnAckTracker();
-    private final Set<SocketChannel> failedSockets = new HashSet<>();
+    private final Set<ISocketChannel> failedSockets = new HashSet<>();
     private final Object transferLock = new Object();
     private final INcApplicationContext appCtx;
     private final int logPageSize;
     private final int logBatchSize;
     private ReplicationLogBuffer currentTxnLogBuffer;
-    private SocketChannel[] destSockets;
+    private ISocketChannel[] destSockets;
 
     public LogReplicationManager(INcApplicationContext appCtx, IReplicationManager replicationManager) {
         this.appCtx = appCtx;
@@ -100,11 +100,11 @@ public class LogReplicationManager {
                     return;
                 }
                 LOGGER.info(() -> "register " + dest);
-                SocketChannel socketChannel = dest.getLogReplicationChannel();
+                ISocketChannel socketChannel = dest.getLogReplicationChannel(appCtx);
                 handshake(dest, socketChannel);
                 destinations.put(dest, socketChannel);
                 failedSockets.remove(socketChannel);
-                destSockets = destinations.values().toArray(new SocketChannel[destinations.size()]);
+                destSockets = destinations.values().toArray(new ISocketChannel[0]);
             }
         }
     }
@@ -117,9 +117,9 @@ public class LogReplicationManager {
                 }
                 LOGGER.info(() -> "unregister " + dest);
                 ackTracker.unregister(dest);
-                SocketChannel destSocket = destinations.remove(dest);
+                ISocketChannel destSocket = destinations.remove(dest);
                 failedSockets.remove(destSocket);
-                destSockets = destinations.values().toArray(new SocketChannel[destinations.size()]);
+                destSockets = destinations.values().toArray(new ISocketChannel[0]);
                 endReplication(destSocket);
             }
         }
@@ -143,7 +143,7 @@ public class LogReplicationManager {
         buffer.mark();
         synchronized (transferLock) {
             if (destSockets != null) {
-                for (SocketChannel replicaSocket : destSockets) {
+                for (ISocketChannel replicaSocket : destSockets) {
                     try {
                         // send batch size then the batch itself
                         NetworkingUtil.transferBufferToChannel(replicaSocket, txnLogsBatchSizeBuffer);
@@ -192,15 +192,15 @@ public class LogReplicationManager {
         pendingFlushLogBuffersQ.add(currentTxnLogBuffer);
     }
 
-    private void handshake(ReplicationDestination dest, SocketChannel socketChannel) {
+    private void handshake(ReplicationDestination dest, ISocketChannel socketChannel) {
         final String nodeId = appCtx.getServiceContext().getNodeId();
         final ReplicateLogsTask task = new ReplicateLogsTask(nodeId);
         ReplicationProtocol.sendTo(socketChannel, task, null);
         executor.execute(new TxnAckListener(dest, socketChannel));
     }
 
-    private void endReplication(SocketChannel socketChannel) {
-        if (socketChannel.isConnected()) {
+    private void endReplication(ISocketChannel socketChannel) {
+        if (socketChannel.getSocketChannel().isConnected()) {
             // end log replication (by sending a dummy log with a single byte)
             final ByteBuffer endLogRepBuffer = ReplicationProtocol.getEndLogReplicationBuffer();
             try {
@@ -211,7 +211,7 @@ public class LogReplicationManager {
         }
     }
 
-    private synchronized void handleFailure(SocketChannel replicaSocket, IOException e) {
+    private synchronized void handleFailure(ISocketChannel replicaSocket, IOException e) {
         if (failedSockets.contains(replicaSocket)) {
             return;
         }
@@ -224,9 +224,9 @@ public class LogReplicationManager {
 
     private class TxnAckListener implements Runnable {
         private final ReplicationDestination dest;
-        private final SocketChannel replicaSocket;
+        private final ISocketChannel replicaSocket;
 
-        TxnAckListener(ReplicationDestination dest, SocketChannel replicaSocket) {
+        TxnAckListener(ReplicationDestination dest, ISocketChannel replicaSocket) {
             this.dest = dest;
             this.replicaSocket = replicaSocket;
         }
@@ -235,8 +235,8 @@ public class LogReplicationManager {
         public void run() {
             Thread.currentThread().setName("TxnAckListener (" + dest + ")");
             LOGGER.info("Started listening on socket: {}", dest);
-            try (BufferedReader incomingResponse =
-                    new BufferedReader(new InputStreamReader(replicaSocket.socket().getInputStream()))) {
+            try (BufferedReader incomingResponse = new BufferedReader(
+                    new InputStreamReader(replicaSocket.getSocketChannel().socket().getInputStream()))) {
                 while (true) {
                     final String response = incomingResponse.readLine();
                     if (response == null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index c93920f..30ad72c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -33,6 +33,7 @@ import java.nio.channels.SocketChannel;
 import java.util.Enumeration;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.network.ISocketChannel;
 
 public class NetworkingUtil {
 
@@ -40,7 +41,7 @@ public class NetworkingUtil {
         throw new AssertionError("This util class should not be initialized.");
     }
 
-    public static void readBytes(SocketChannel socketChannel, ByteBuffer byteBuffer, int length) throws IOException {
+    public static void readBytes(ISocketChannel socketChannel, ByteBuffer byteBuffer, int length) throws IOException {
         byteBuffer.clear();
         byteBuffer.limit(length);
 
@@ -53,7 +54,7 @@ public class NetworkingUtil {
         byteBuffer.flip();
     }
 
-    public static void sendFile(FileChannel fileChannel, SocketChannel socketChannel) throws IOException {
+    public static void sendFile(FileChannel fileChannel, ISocketChannel socketChannel) throws IOException {
         long pos = 0;
         long fileSize = fileChannel.size();
         long remainingBytes = fileSize;
@@ -63,11 +64,10 @@ public class NetworkingUtil {
             pos += transferredBytes;
             remainingBytes -= transferredBytes;
         }
-
-        socketChannel.socket().getOutputStream().flush();
+        socketChannel.getSocketChannel().socket().getOutputStream().flush();
     }
 
-    public static void downloadFile(FileChannel fileChannel, SocketChannel socketChannel) throws IOException {
+    public static void downloadFile(FileChannel fileChannel, ISocketChannel socketChannel) throws IOException {
         long pos = 0;
         long fileSize = fileChannel.size();
         long count = fileSize;
@@ -97,7 +97,7 @@ public class NetworkingUtil {
         return hostName;
     }
 
-    public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer)
+    public static void transferBufferToChannel(ISocketChannel socketChannel, ByteBuffer requestBuffer)
             throws IOException {
         while (requestBuffer.hasRemaining()) {
             socketChannel.write(requestBuffer);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 1f6efa8..3dc094e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -35,6 +35,10 @@ import org.apache.asterix.replication.logging.RemoteLogsProcessor;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
 import org.apache.asterix.replication.messaging.ReplicationProtocol.ReplicationRequestType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -70,9 +74,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + nodeIP + ":" + dataPort);
             while (serverSocketChannel.isOpen()) {
                 SocketChannel socketChannel = serverSocketChannel.accept();
-                socketChannel.configureBlocking(true);
-                //start a new thread to handle the request
-                appCtx.getThreadExecutor().execute(new ReplicationWorker(socketChannel));
+                connectionAccepted(socketChannel);
             }
         } catch (AsynchronousCloseException e) {
             LOGGER.debug("Replication channel closed", e);
@@ -93,12 +95,27 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         }
     }
 
+    private void connectionAccepted(SocketChannel socketChannel) {
+        try {
+            NetworkUtil.configure(socketChannel);
+            socketChannel.configureBlocking(false);
+            final INetworkSecurityManager networkSecurityManager =
+                    appCtx.getServiceContext().getControllerService().getNetworkSecurityManager();
+            final ISocketChannelFactory socketChannelFactory = networkSecurityManager.getSocketChannelFactory();
+            final ISocketChannel serverChannel = socketChannelFactory.createServerChannel(socketChannel);
+            //start a new thread to handle the request
+            appCtx.getThreadExecutor().execute(new ReplicationWorker(serverChannel));
+        } catch (Exception e) {
+            LOGGER.error("failed to process accepted connection", e);
+        }
+    }
+
     private class ReplicationWorker implements IReplicationWorker {
-        private final SocketChannel socketChannel;
+        private final ISocketChannel socketChannel;
         private final ByteBuffer inBuffer;
         private final ByteBuffer outBuffer;
 
-        public ReplicationWorker(SocketChannel socketChannel) {
+        public ReplicationWorker(ISocketChannel socketChannel) {
             this.socketChannel = socketChannel;
             inBuffer = ByteBuffer.allocate(ReplicationProtocol.INITIAL_BUFFER_SIZE);
             outBuffer = ByteBuffer.allocate(ReplicationProtocol.INITIAL_BUFFER_SIZE);
@@ -108,6 +125,10 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         public void run() {
             Thread.currentThread().setName("Replication Worker");
             try {
+                if (socketChannel.requiresHandshake() && !socketChannel.handshake()) {
+                    return;
+                }
+                socketChannel.getSocketChannel().configureBlocking(true);
                 ReplicationRequestType requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
                 while (requestType != ReplicationRequestType.GOODBYE) {
                     handle(requestType);
@@ -116,18 +137,12 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             } catch (Exception e) {
                 LOGGER.warn("Unexpected error during replication.", e);
             } finally {
-                if (socketChannel.isOpen()) {
-                    try {
-                        socketChannel.close();
-                    } catch (IOException e) {
-                        LOGGER.warn("Failed to close replication socket.", e);
-                    }
-                }
+                NetworkUtil.closeQuietly(socketChannel);
             }
         }
 
         @Override
-        public SocketChannel getChannel() {
+        public ISocketChannel getChannel() {
             return socketChannel;
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
index b71f4b8..e38a33d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
@@ -34,6 +34,7 @@ import org.apache.asterix.replication.logging.RemoteLogRecord;
 import org.apache.asterix.replication.logging.RemoteLogsProcessor;
 import org.apache.asterix.replication.management.ReplicationChannel;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.network.ISocketChannel;
 
 /**
  * A task to replicate transaction logs from master replica
@@ -53,7 +54,7 @@ public class ReplicateLogsTask implements IReplicaTask {
         final RemoteLogsProcessor logsProcessor = replicationChannel.getRemoteLogsProcessor();
         final ILogManager logManager = appCtx.getTransactionSubsystem().getLogManager();
         final RemoteLogRecord reusableLog = new RemoteLogRecord();
-        final SocketChannel channel = worker.getChannel();
+        final ISocketChannel channel = worker.getChannel();
         ByteBuffer logsBuffer = ByteBuffer.allocate(logManager.getLogPageSize());
         try {
             while (true) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
index 41e7d9e..c702f1b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
@@ -22,17 +22,22 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
 import org.apache.asterix.replication.api.IReplicationMessage;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.hyracks.util.StorageUtil;
 
 public class ReplicationProtocol {
@@ -65,7 +70,7 @@ public class ReplicationProtocol {
         Stream.of(ReplicationRequestType.values()).forEach(type -> TYPES.put(type.ordinal(), type));
     }
 
-    public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException {
+    public static ByteBuffer readRequest(ISocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException {
         // read request size
         NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES);
         final int requestSize = dataBuffer.getInt();
@@ -75,7 +80,7 @@ public class ReplicationProtocol {
         return buf;
     }
 
-    public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer)
+    public static ReplicationRequestType getRequestType(ISocketChannel socketChannel, ByteBuffer byteBuffer)
             throws IOException {
         // read replication request type
         NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE);
@@ -93,12 +98,12 @@ public class ReplicationProtocol {
         return Integer.parseInt(msg.substring(msg.indexOf(LOG_REPLICATION_ACK) + 1));
     }
 
-    public static void sendGoodbye(SocketChannel socketChannel) throws IOException {
+    public static void sendGoodbye(ISocketChannel socketChannel) throws IOException {
         ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
         NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer);
     }
 
-    public static void sendAck(SocketChannel socketChannel, ByteBuffer buf) {
+    public static void sendAck(ISocketChannel socketChannel, ByteBuffer buf) {
         try {
             buf.clear();
             buf.putInt(ReplicationRequestType.ACK.ordinal());
@@ -110,7 +115,7 @@ public class ReplicationProtocol {
     }
 
     public static void waitForAck(PartitionReplica replica) throws IOException {
-        final SocketChannel channel = replica.getChannel();
+        final ISocketChannel channel = replica.getChannel();
         final ByteBuffer buf = replica.getReusableBuffer();
         ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(channel, buf);
         if (responseFunction != ReplicationRequestType.ACK) {
@@ -119,12 +124,12 @@ public class ReplicationProtocol {
     }
 
     public static void sendTo(PartitionReplica replica, IReplicationMessage task) {
-        final SocketChannel channel = replica.getChannel();
+        final ISocketChannel channel = replica.getChannel();
         final ByteBuffer buf = replica.getReusableBuffer();
         sendTo(channel, task, buf);
     }
 
-    public static void sendTo(SocketChannel channel, IReplicationMessage task, ByteBuffer buf) {
+    public static void sendTo(ISocketChannel channel, IReplicationMessage task, ByteBuffer buf) {
         ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
         try (DataOutputStream oos = new DataOutputStream(outputStream)) {
             task.serialize(oos);
@@ -135,18 +140,18 @@ public class ReplicationProtocol {
             requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
             requestBuffer.flip();
             NetworkingUtil.transferBufferToChannel(channel, requestBuffer);
-            channel.socket().getOutputStream().flush();
+            channel.getSocketChannel().socket().getOutputStream().flush();
         } catch (IOException e) {
             throw new ReplicationException(e);
         }
     }
 
-    public static IReplicationMessage read(SocketChannel socketChannel, ByteBuffer buffer) throws IOException {
+    public static IReplicationMessage read(ISocketChannel socketChannel, ByteBuffer buffer) throws IOException {
         final ReplicationRequestType type = getRequestType(socketChannel, buffer);
         return readMessage(type, socketChannel, buffer);
     }
 
-    public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel socketChannel,
+    public static IReplicationMessage readMessage(ReplicationRequestType type, ISocketChannel socketChannel,
             ByteBuffer buffer) {
         try {
             final ByteBuffer requestBuf = ReplicationProtocol.readRequest(socketChannel, buffer);
@@ -191,6 +196,24 @@ public class ReplicationProtocol {
         return endLogRepBuffer;
     }
 
+    public static ISocketChannel establishReplicaConnection(INcApplicationContext appCtx, InetSocketAddress location)
+            throws IOException {
+        final SocketChannel socketChannel = SocketChannel.open();
+        NetworkUtil.configure(socketChannel);
+        socketChannel.connect(location);
+        // perform handshake in a non-blocking mode
+        socketChannel.configureBlocking(false);
+        final ISocketChannelFactory socketChannelFactory =
+                appCtx.getServiceContext().getControllerService().getNetworkSecurityManager().getSocketChannelFactory();
+        final ISocketChannel clientChannel = socketChannelFactory.createClientChannel(socketChannel);
+        if (clientChannel.requiresHandshake() && !clientChannel.handshake()) {
+            throw new IllegalStateException("handshake failure");
+        }
+        // switch to blocking mode after handshake success
+        socketChannel.configureBlocking(true);
+        return clientChannel;
+    }
+
     private static ByteBuffer ensureSize(ByteBuffer buffer, int size) {
         if (buffer == null || buffer.capacity() < size) {
             return ByteBuffer.allocate(size);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
index cc0f7b4..d795d4e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
@@ -32,6 +32,7 @@ import org.apache.asterix.replication.messaging.ReplicateFileTask;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.network.ISocketChannel;
 
 public class FileSynchronizer {
 
@@ -50,7 +51,7 @@ public class FileSynchronizer {
     public void replicate(String file, boolean metadata) {
         try {
             final IIOManager ioManager = appCtx.getIoManager();
-            final SocketChannel channel = replica.getChannel();
+            final ISocketChannel channel = replica.getChannel();
             final FileReference filePath = ioManager.resolve(file);
             ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length(), metadata);
             ReplicationProtocol.sendTo(replica, task);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index 583f33d..3cd1a07 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -34,6 +34,7 @@ import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
 import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.network.ISocketChannel;
 
 /**
  * Ensures that the files between master and a replica are synchronized
@@ -69,7 +70,7 @@ public class ReplicaFilesSynchronizer {
 
     private Set<String> getReplicaFiles(int partition) throws IOException {
         final PartitionResourcesListTask replicaFilesRequest = new PartitionResourcesListTask(partition);
-        final SocketChannel channel = replica.getChannel();
+        final ISocketChannel channel = replica.getChannel();
         final ByteBuffer reusableBuffer = replica.getReusableBuffer();
         ReplicationProtocol.sendTo(replica, replicaFilesRequest);
         final PartitionResourcesListResponse response =

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index 7e42d14..a404ba8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -90,6 +90,7 @@ public class FlushDatasetOperatorDescriptor extends AbstractSingleActivityOperat
                             datasetLifeCycleManager.flushDataset(datasetId.getId(), false);
                         }
                     }
+                    datasetInfo.waitForIO();
                 } catch (ACIDException e) {
                     throw HyracksDataException.create(e);
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java
index 70ef1d2..5cfa442 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java
@@ -21,9 +21,11 @@ package org.apache.hyracks.api.network;
 import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
 
-public interface ISocketChannel extends Closeable {
+public interface ISocketChannel extends WritableByteChannel, ReadableByteChannel, Closeable {
 
     /**
      * Indicates whether this {@link ISocketChannel} requires a client/server handshake before
@@ -91,4 +93,9 @@ public interface ISocketChannel extends Closeable {
      * @return the socket channel
      */
     SocketChannel getSocketChannel();
+
+    @Override
+    default boolean isOpen() {
+        return getSocketChannel().isOpen();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/11c83fae/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
index 73475b0..abb8b15 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
@@ -184,9 +184,11 @@ public class SslSocketChannel implements ISocketChannel {
 
     @Override
     public synchronized void close() throws IOException {
-        engine.closeOutbound();
-        new SslHandshake(this).handshake();
-        socketChannel.close();
+        if (socketChannel.isOpen()) {
+            engine.closeOutbound();
+            new SslHandshake(this).handshake();
+            socketChannel.close();
+        }
     }
 
     @Override