You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/12/01 00:00:53 UTC

[geode] 01/01: GEODE-6121 Port SnappyData DirectChannel comms changes to Geode

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-6121
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 3aa2a894802c50db276d9c87f00432cd1908291d
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Nov 30 15:30:00 2018 -0800

    GEODE-6121 Port SnappyData DirectChannel comms changes to Geode
    
    Not ready for review!
    
    Initial port of SnappyData peer-to-peer communication changes to Geode.
    See https://github.com/SnappyDataInc/snappy-store/commit/49222766a35c99f4853b7b6661457f37b3c39349
    
    The current changes have connection-streaming and conserve-sockets=false
    as the default for testing but I will be changing this.  The connection-streaming
    feature will be an opt-in thing at first and probably be marked experimental.
---
 geode-core/build.gradle                            |   1 +
 .../internal/ClusterDistributionManager.java       |   5 +-
 .../distributed/internal/DistributionConfig.java   |   2 +-
 .../distributed/internal/direct/DirectChannel.java |  52 +-
 .../geode/internal/shared/BufferAllocator.java     | 198 ++++++
 .../internal/shared/ChannelBufferOutputStream.java | 161 +++++
 .../geode/internal/shared/InputStreamChannel.java  | 308 ++++++++
 .../apache/geode/internal/shared/NativeCalls.java  |   0
 .../geode/internal/shared/OutputStreamChannel.java | 239 +++++++
 .../geode/internal/shared/StreamChannel.java       |  42 ++
 .../shared/unsafe/ChannelBufferInputStream.java    | 167 +++++
 .../unsafe/ChannelBufferUnsafeDataInputStream.java | 221 ++++++
 .../ChannelBufferUnsafeDataOutputStream.java       | 298 ++++++++
 .../unsafe/ChannelBufferUnsafeInputStream.java     | 270 +++++++
 .../unsafe/ChannelBufferUnsafeOutputStream.java    | 305 ++++++++
 .../shared/unsafe/DirectBufferAllocator.java       | 165 +++++
 .../geode/internal/shared/unsafe/FreeMemory.java   |  49 ++
 .../geode/internal/shared/unsafe/UnsafeHolder.java | 397 +++++++++++
 .../apache/geode/internal/tcp/BaseMsgStreamer.java |   6 +
 .../org/apache/geode/internal/tcp/Buffers.java     |  19 +-
 .../geode/internal/tcp/ConnectExceptions.java      |   0
 .../org/apache/geode/internal/tcp/Connection.java  | 781 ++++++++++++++++++---
 .../apache/geode/internal/tcp/ConnectionTable.java | 225 +++++-
 .../geode/internal/tcp/DirectReplySender.java      |  14 +-
 .../geode/internal/tcp/MsgChannelDestreamer.java   | 112 +++
 .../geode/internal/tcp/MsgChannelStreamer.java     | 360 ++++++++++
 .../apache/geode/internal/tcp/MsgOutputStream.java |   2 +-
 .../org/apache/geode/internal/tcp/MsgStreamer.java |   9 +-
 .../apache/geode/internal/tcp/MsgStreamerList.java |  23 +-
 .../geode/internal/tcp/QueryKeyedObjectPool.java   | 200 ++++++
 .../org/apache/geode/internal/tcp/TCPConduit.java  |  67 +-
 .../geode/pdx/internal/unsafe/UnsafeWrapper.java   |   5 +
 .../geode/internal/tcp/ConnectionJUnitTest.java    |   1 +
 geode-core/src/test/resources/expected-pom.xml     |   6 +
 gradle/dependency-versions.properties              |   1 +
 35 files changed, 4523 insertions(+), 188 deletions(-)

diff --git a/geode-core/build.gradle b/geode-core/build.gradle
index 605fbb4..4c35fa2 100755
--- a/geode-core/build.gradle
+++ b/geode-core/build.gradle
@@ -159,6 +159,7 @@ dependencies {
   compile('antlr:antlr:' + project.'antlr.version')
   compile('com.fasterxml.jackson.core:jackson-annotations:' + project.'jackson.version')
   compile('com.fasterxml.jackson.core:jackson-databind:' + project.'jackson.version')
+  compile('org.apache.commons:commons-pool2:' + project.'commons-pool2.version')
   compile('commons-io:commons-io:' + project.'commons-io.version')
   compile('commons-validator:commons-validator:' + project.'commons-validator.version')
   compile('commons-digester:commons-digester:' + project.'commons-digester.version')
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index c523164..3c5c2b5 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -151,7 +151,10 @@ public class ClusterDistributionManager implements DistributionManager {
   public static final int MAX_THREADS =
       Integer.getInteger("DistributionManager.MAX_THREADS", 100).intValue();
 
-  private static final int MAX_PR_THREADS = Integer.getInteger("DistributionManager.MAX_PR_THREADS",
+  public static final int MAX_PR_THREADS_SET =
+      Integer.getInteger("DistributionManager.MAX_PR_THREADS", -1);
+
+  public static final int MAX_PR_THREADS = Integer.getInteger("DistributionManager.MAX_PR_THREADS",
       Math.max(Runtime.getRuntime().availableProcessors() * 4, 16)).intValue();
 
   public static final int MAX_FE_THREADS = Integer.getInteger("DistributionManager.MAX_FE_THREADS",
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
index 08e5fa0..c9f2ff8 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionConfig.java
@@ -1831,7 +1831,7 @@ public interface DistributionConfig extends Config, LogConfig, StatisticsConfig
   /**
    * The default value of the {@link ConfigurationProperties#CONSERVE_SOCKETS} property
    */
-  boolean DEFAULT_CONSERVE_SOCKETS = true;
+  boolean DEFAULT_CONSERVE_SOCKETS = false;
 
   /**
    * Returns the value of the {@link ConfigurationProperties#ROLES} property
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index dbb4068..66c8c54 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -54,6 +54,7 @@ import org.apache.geode.internal.tcp.ConnectExceptions;
 import org.apache.geode.internal.tcp.Connection;
 import org.apache.geode.internal.tcp.ConnectionException;
 import org.apache.geode.internal.tcp.MemberShunnedException;
+import org.apache.geode.internal.tcp.MsgChannelStreamer;
 import org.apache.geode.internal.tcp.MsgStreamer;
 import org.apache.geode.internal.tcp.TCPConduit;
 import org.apache.geode.internal.util.Breadcrumbs;
@@ -315,8 +316,10 @@ public class DirectChannel {
           msg, p_destinations.length, Arrays.toString(p_destinations));
     }
 
-    try {
-      do {
+    final boolean useNIOStream = getConduit().useNIOStream();
+    final List<Connection> allCons = new ArrayList<>(destinations.length);
+    do {
+      try {
         interrupted = Thread.interrupted() || interrupted;
         /**
          * Exceptions that happened during one attempt to send
@@ -331,9 +334,14 @@ public class DirectChannel {
           retryInfo = null;
           retry = true;
         }
-        final List cons = new ArrayList(destinations.length);
-        ConnectExceptions ce = getConnections(mgr, msg, destinations, orderedMsg, retry, ackTimeout,
-            ackSDTimeout, cons);
+        final List<Connection> cons = new ArrayList<>(destinations.length);
+        ConnectExceptions ce;
+        try {
+          ce = getConnections(mgr, msg, destinations, orderedMsg,
+              retry, ackTimeout, ackSDTimeout, cons);
+        } finally {
+          allCons.addAll(cons);
+        }
         if (directReply && msg.getProcessorId() > 0) { // no longer a direct-reply message?
           directReply = false;
         }
@@ -382,8 +390,13 @@ public class DirectChannel {
           DMStats stats = getDMStats();
           List<?> sentCons; // used for cons we sent to this time
 
-          final BaseMsgStreamer ms = MsgStreamer.create(cons, msg, directReply, stats);
+          BaseMsgStreamer ms = null;
           try {
+            if (useNIOStream) {
+              ms = MsgChannelStreamer.create(cons, msg, directReply, stats);
+            } else {
+              ms = MsgStreamer.create(cons, msg, directReply, stats);
+            }
             startTime = 0;
             if (ackTimeout > 0) {
               startTime = System.currentTimeMillis();
@@ -409,7 +422,9 @@ public class DirectChannel {
                 ex);
           } finally {
             try {
-              ms.close();
+              if (ms != null) {
+                ms.close();
+              }
             } catch (IOException e) {
               throw new InternalGemFireException("Unknown error serializing message", e);
             }
@@ -453,16 +468,21 @@ public class DirectChannel {
         if (retryInfo != null) {
           this.conduit.getCancelCriterion().checkCancelInProgress(null);
         }
-      } while (retryInfo != null);
-    } finally {
-      if (interrupted) {
-        Thread.currentThread().interrupt();
-      }
-      for (Iterator it = totalSentCons.iterator(); it.hasNext();) {
-        Connection con = (Connection) it.next();
-        con.setInUse(false, 0, 0, 0, null);
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+        for (Iterator it = totalSentCons.iterator(); it.hasNext();) {
+          Connection con = (Connection) it.next();
+          con.setInUse(false, 0, 0, 0, null);
+          this.conduit.releasePooledConnection(con);
+        }
+        allCons.clear();
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
       }
-    }
+    } while (retryInfo != null);
     if (failedCe != null) {
       throw failedCe;
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/BufferAllocator.java b/geode-core/src/main/java/org/apache/geode/internal/shared/BufferAllocator.java
new file mode 100644
index 0000000..663bac0
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/BufferAllocator.java
@@ -0,0 +1,198 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package org.apache.geode.internal.shared;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+
+import org.apache.geode.internal.shared.unsafe.FreeMemory;
+import org.apache.geode.internal.shared.unsafe.UnsafeHolder;
+import org.apache.geode.pdx.internal.unsafe.UnsafeWrapper;
+
+/**
+ * Allocate, release and expand ByteBuffers (in-place if possible).
+ */
+public abstract class BufferAllocator implements Closeable {
+  private static UnsafeWrapper unsafe = new UnsafeWrapper();
+
+  public static final String STORE_DATA_FRAME_OUTPUT =
+      "STORE_DATA_FRAME_OUTPUT";
+
+  /**
+   * Special owner indicating execution pool memory.
+   */
+  public static final String EXECUTION = "EXECUTION";
+
+  /**
+   * Allocate a new ByteBuffer of given size.
+   */
+  public abstract ByteBuffer allocate(int size, String owner);
+
+  /**
+   * Allocate using the default allocator and fallback to base JDK one in case
+   * allocation fails due to some reason (e.g. system stop).
+   */
+  public ByteBuffer allocateWithFallback(int size, String owner) {
+    return allocate(size, owner);
+  }
+
+  /**
+   * Allocate a new ByteBuffer of given size for storage in a Region.
+   */
+  public abstract ByteBuffer allocateForStorage(int size);
+
+  /**
+   * Clears the memory to be zeros immediately after allocation.
+   */
+  public abstract void clearPostAllocate(ByteBuffer buffer);
+
+  /**
+   * Fill the given portion of the buffer setting it with given byte.
+   */
+  public final void fill(ByteBuffer buffer, byte b, int position, int numBytes) {
+    unsafe.setMemory(baseObject(buffer), baseOffset(buffer) + position,
+        numBytes, b);
+  }
+
+  /**
+   * Fill the buffer from its current position to full capacity with given byte.
+   */
+  public final void fill(ByteBuffer buffer, byte b) {
+    final int position = buffer.position();
+    fill(buffer, b, position, buffer.capacity() - position);
+  }
+
+  /**
+   * Get the base object of the ByteBuffer for raw reads/writes by Unsafe API.
+   */
+  public abstract Object baseObject(ByteBuffer buffer);
+
+  /**
+   * Get the base offset of the ByteBuffer for raw reads/writes by Unsafe API.
+   */
+  public abstract long baseOffset(ByteBuffer buffer);
+
+  /**
+   * Expand given ByteBuffer to new capacity. The new buffer is positioned
+   * at the start and caller has to reposition if required.
+   *
+   * @return the new expanded ByteBuffer
+   */
+  public abstract ByteBuffer expand(ByteBuffer buffer, int required,
+      String owner);
+
+  /**
+   * Return the data as a heap byte array. Use of this should be minimal
+   * when no other option exists.
+   */
+  public byte[] toBytes(ByteBuffer buffer) {
+    final int bufferSize = buffer.remaining();
+    int numBytes = Math.min(bufferSize, bufferSize);
+    byte[] bytes = new byte[numBytes];
+    int initPosition = buffer.position();
+    buffer.get(bytes, 0, numBytes);
+    buffer.position(initPosition);
+    return bytes;
+  }
+
+  /**
+   * Return a ByteBuffer either copying from, or sharing the given heap bytes.
+   */
+  public abstract ByteBuffer fromBytesToStorage(byte[] bytes, int offset,
+      int length);
+
+  /**
+   * Return a ByteBuffer either sharing data of given ByteBuffer
+   * if its type matches, or else copying from the given ByteBuffer.
+   */
+  public ByteBuffer transfer(ByteBuffer buffer, String owner) {
+    final int position = buffer.position();
+    final ByteBuffer newBuffer = allocate(buffer.limit(), owner);
+    buffer.rewind();
+    newBuffer.order(buffer.order());
+    newBuffer.put(buffer);
+    buffer.position(position);
+    newBuffer.position(position);
+    return newBuffer;
+  }
+
+  /**
+   * For direct ByteBuffers the release method is preferred to eagerly release
+   * the memory instead of depending on heap GC which can be delayed.
+   */
+  public final void release(ByteBuffer buffer) {
+    releaseBuffer(buffer);
+  }
+
+  /**
+   * For direct ByteBuffers the release method is preferred to eagerly release
+   * the memory instead of depending on heap GC which can be delayed.
+   */
+  public static boolean releaseBuffer(ByteBuffer buffer) {
+    final boolean hasArray = buffer.hasArray();
+    // Actual release should depend on buffer type and not allocator type.
+    // Reserved off-heap space will be decremented by FreeMemory implementation.
+    if (hasArray) {
+      buffer.rewind().limit(0);
+      return false;
+    } else {
+      UnsafeHolder.releaseDirectBuffer(buffer);
+      return true;
+    }
+  }
+
+  /**
+   * Indicates if this allocator will produce direct ByteBuffers.
+   */
+  public abstract boolean isDirect();
+
+  /**
+   * Return true if this is a managed direct buffer allocator.
+   */
+  public boolean isManagedDirect() {
+    return false;
+  }
+
+  /**
+   * Allocate a buffer passing a custom FreeMemoryFactory. Requires that
+   * appropriate calls against Spark memory manager have already been done.
+   * Only for managed buffer allocator.
+   */
+  public ByteBuffer allocateCustom(int size,
+      FreeMemory.Factory factory) {
+    throw new UnsupportedOperationException("Not supported for " + toString());
+  }
+
+  /**
+   * Any cleanup required at system close.
+   */
+  @Override
+  public abstract void close();
+
+  protected static int expandedSize(int currentUsed, int required) {
+    final long minRequired = (long) currentUsed + required;
+    // increase the size by 50%
+    final int newLength = (int) Math.min(Math.max((currentUsed * 3) >>> 1L,
+        minRequired), Integer.MAX_VALUE - 1);
+    if (newLength >= minRequired) {
+      return newLength;
+    } else {
+      throw new IndexOutOfBoundsException("Cannot allocate more than " +
+          newLength + " bytes but required " + minRequired);
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/ChannelBufferOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/shared/ChannelBufferOutputStream.java
new file mode 100644
index 0000000..eb99838
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/ChannelBufferOutputStream.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * OutputStream for use by buffered write abstractions over channel using direct
+ * byte buffers. The implementation is not thread-safe by design. The class
+ * itself can be used as a non-synchronized efficient replacement of
+ * BufferedOutputStream.
+ * <p>
+ * Note that the close() method of this class does not closing the underlying
+ * channel.
+ *
+ * @author swale
+ * @since gfxd 1.1
+ */
+public class ChannelBufferOutputStream extends OutputStreamChannel {
+
+  protected final ByteBuffer buffer;
+
+  public static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+  public ChannelBufferOutputStream(WritableByteChannel channel)
+      throws IOException {
+    this(channel, DEFAULT_BUFFER_SIZE);
+  }
+
+  public ChannelBufferOutputStream(WritableByteChannel channel, int bufferSize)
+      throws IOException {
+    super(channel);
+    if (bufferSize < 32) {
+      throw new IllegalArgumentException("buffer size " + bufferSize +
+          " should be at least 32");
+    }
+    this.buffer = allocateBuffer(bufferSize).order(ByteOrder.BIG_ENDIAN);
+  }
+
+  protected ByteBuffer allocateBuffer(int bufferSize) {
+    return ByteBuffer.allocate(bufferSize);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void write(int b) throws IOException {
+    if (!this.buffer.hasRemaining()) {
+      flushBufferBlocking(this.buffer);
+    }
+    this.buffer.put((byte) (b & 0xff));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void write(byte[] b,
+      int off, int len) throws IOException {
+    if (len == 1) {
+      write(b[off]);
+      return;
+    }
+
+    while (len > 0) {
+      int remaining = this.buffer.remaining();
+      if (len <= remaining) {
+        this.buffer.put(b, off, len);
+        return;
+      } else {
+        // copy b to buffer and flush
+        if (remaining > 0) {
+          this.buffer.put(b, off, remaining);
+          len -= remaining;
+          off += remaining;
+        }
+        flushBufferBlocking(this.buffer);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int write(ByteBuffer src) throws IOException {
+    return super.writeBuffered(src, this.buffer);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void writeInt(int v) throws IOException {
+    if (this.buffer.remaining() < 4) {
+      flushBufferBlocking(this.buffer);
+    }
+    // ByteBuffer will always be big-endian
+    this.buffer.putInt(v);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void flush() throws IOException {
+    if (this.buffer.position() > 0) {
+      flushBufferBlocking(this.buffer);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public final boolean isOpen() {
+    return this.channel.isOpen();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() throws IOException {
+    flush();
+  }
+
+  protected void flushBufferBlocking(final ByteBuffer buffer)
+      throws IOException {
+    buffer.flip();
+    try {
+      do {
+        writeBuffer(buffer, this.channel);
+      } while (buffer.hasRemaining());
+    } finally {
+      if (buffer.hasRemaining()) {
+        buffer.compact();
+      } else {
+        buffer.clear();
+      }
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/InputStreamChannel.java b/geode-core/src/main/java/org/apache/geode/internal/shared/InputStreamChannel.java
new file mode 100644
index 0000000..936aadd
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/InputStreamChannel.java
@@ -0,0 +1,308 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * Intermediate class that extends both an InputStream and ReadableByteChannel.
+ *
+ * @author swale
+ * @since gfxd 1.1
+ */
+public abstract class InputStreamChannel extends InputStream implements
+    ReadableByteChannel, StreamChannel {
+
+  protected final ReadableByteChannel channel;
+  private volatile Thread parkedThread;
+  protected volatile long bytesRead;
+
+  /**
+   * The default wait to use when waiting to read/write a channel
+   * (when there is no selector to signal)
+   */
+  public static final long PARK_NANOS_FOR_READ_WRITE = 100L;
+
+  /**
+   * Retries before waiting for {@link #PARK_NANOS_FOR_READ_WRITE}
+   * (when there is no selector to signal)
+   */
+  public static final int RETRIES_BEFORE_PARK = 20;
+
+  /**
+   * Maximum nanos to park thread to wait for reading/writing data in
+   * non-blocking mode (if selector is present then it will explicitly signal)
+   */
+  public static final long PARK_NANOS_MAX = 30000000000L;
+
+  static long parkThreadForAsyncOperationIfRequired(
+      final StreamChannel channel, long parkedNanos, int numTries)
+      throws SocketTimeoutException {
+    // at this point we are out of the selector thread and don't want to
+    // create unlimited size buffers upfront in selector, so will use
+    // simple signalling between selector and this thread to proceed
+    if ((numTries % RETRIES_BEFORE_PARK) == 0) {
+      if (channel != null) {
+        channel.setParkedThread(Thread.currentThread());
+      }
+      LockSupport.parkNanos(PARK_NANOS_FOR_READ_WRITE);
+      if (channel != null) {
+        channel.setParkedThread(null);
+        if ((parkedNanos += PARK_NANOS_FOR_READ_WRITE) > channel.getParkNanosMax()) {
+          throw new SocketTimeoutException("Connection operation timed out.");
+        }
+      }
+    }
+    return parkedNanos;
+  }
+
+  protected InputStreamChannel(ReadableByteChannel channel) {
+    this.channel = channel;
+  }
+
+  /**
+   * Get the underlying {@link ReadableByteChannel}.
+   */
+  public final ReadableByteChannel getUnderlyingChannel() {
+    return this.channel;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public abstract int read(ByteBuffer dst) throws IOException;
+
+  /**
+   * Reads four input bytes and returns an <code>int</code> value. Let
+   * <code>a-d</code> be the first through fourth bytes read. The value returned
+   * is:
+   * <p>
+   *
+   * <pre>
+   * <code>
+   * (((a &amp; 0xff) &lt;&lt; 24) | ((b &amp; 0xff) &lt;&lt; 16) |
+   * &#32;((c &amp; 0xff) &lt;&lt; 8) | (d &amp; 0xff))
+   * </code>
+   * </pre>
+   *
+   * This method is suitable for reading bytes written by the
+   * <code>writeInt</code> method of interface <code>DataOutput</code>.
+   *
+   * @return the <code>int</code> value read.
+   * @exception EOFException
+   *            if this stream reaches the end before reading all the bytes.
+   * @exception IOException
+   *            if an I/O error occurs.
+   */
+  public abstract int readInt() throws IOException;
+
+  public int readFrame() throws IOException {
+    throw new UnsupportedOperationException(getClass().getSimpleName()
+        + ": readFrame not supported");
+  }
+
+  public int readFrameFragment(int fragmentSize) throws IOException {
+    throw new UnsupportedOperationException(getClass().getSimpleName()
+        + ": readFrameFragment not supported");
+  }
+
+  /**
+   * Common base method to read into a given ByteBuffer destination via an
+   * intermediate direct byte buffer owned by the implementation of this class.
+   */
+  protected final int readBuffered(final ByteBuffer dst,
+      final ByteBuffer channelBuffer) throws IOException {
+    int dstLen = dst.remaining();
+    // first copy anything remaining from buffer
+    final int remaining = channelBuffer.remaining();
+    if (dstLen <= remaining) {
+      if (dstLen > 0) {
+        final int pos = channelBuffer.position();
+        // reduce this buffer's limit temporarily to the required len
+        channelBuffer.limit(pos + dstLen);
+        try {
+          dst.put(channelBuffer);
+        } finally {
+          // restore the limit
+          channelBuffer.limit(pos + remaining);
+        }
+        return dstLen;
+      } else {
+        return 0;
+      }
+    }
+
+    // refill buffer once and read whatever available into buf;
+    // caller should invoke in a loop if buffer is still not full
+    int readBytes = 0;
+    if (remaining > 0) {
+      dst.put(channelBuffer);
+      dstLen -= remaining;
+      readBytes += remaining;
+    }
+    // if dst is a reasonably large direct byte buffer, then refill from
+    // channel directly else use channel direct buffer for best performance
+    if (dstLen >= (channelBuffer.limit() >>> 1) && dst.isDirect()) {
+      final int bufBytes = readIntoBufferNoWait(dst);
+      if (bufBytes > 0) {
+        return (readBytes + bufBytes);
+      } else {
+        return readBytes > 0 ? readBytes : bufBytes;
+      }
+    } else {
+      final int bufBytes = refillBufferBase(channelBuffer, -1, null);
+      if (bufBytes > 0) {
+        if (dstLen >= bufBytes) {
+          dst.put(channelBuffer);
+          return (readBytes + bufBytes);
+        } else {
+          final int pos = channelBuffer.position();
+          // reduce this buffer's limit temporarily to the required length
+          channelBuffer.limit(pos + dstLen);
+          try {
+            dst.put(channelBuffer);
+          } finally {
+            // restore the limit
+            channelBuffer.limit(pos + bufBytes);
+          }
+          return (readBytes + dstLen);
+        }
+      } else {
+        return readBytes > 0 ? readBytes : bufBytes;
+      }
+    }
+  }
+
+  protected int refillBuffer(final ByteBuffer channelBuffer,
+      final int tryReadBytes, final String eofMessage) throws IOException {
+    return refillBufferBase(channelBuffer, tryReadBytes, eofMessage);
+  }
+
+  protected final int refillBufferBase(final ByteBuffer channelBuffer,
+      final int tryReadBytes, final String eofMessage) throws IOException {
+    resetAndCopyLeftOverBytes(channelBuffer);
+    int initPosition = channelBuffer.position();
+    int totalReadBytes = initPosition;
+    final int channelBytes = readIntoBuffer(channelBuffer);
+    if (channelBytes > 0) {
+      totalReadBytes += channelBytes;
+    }
+    // eof on stream but we may still have remaining bytes in channelBuffer
+    else if (totalReadBytes == 0) {
+      totalReadBytes = channelBytes;
+    }
+    while (tryReadBytes > totalReadBytes && channelBuffer.hasRemaining()) {
+      int readBytes = readIntoBuffer(channelBuffer);
+      if (readBytes < 0) {
+        if (eofMessage != null) {
+          throw new EOFException(eofMessage);
+        } else {
+          if (totalReadBytes == 0) {
+            totalReadBytes = readBytes;
+          }
+          break;
+        }
+      }
+      if (totalReadBytes < 0) {
+        totalReadBytes = 0;
+      }
+      totalReadBytes += readBytes;
+    }
+    channelBuffer.flip();
+    assert (totalReadBytes == channelBuffer.limit()) || (totalReadBytes == -1
+        && channelBuffer.limit() == 0) : "readBytes=" + totalReadBytes
+            + " != limit=" + channelBuffer.limit();
+    if (totalReadBytes > initPosition) {
+      this.bytesRead += (totalReadBytes - initPosition);
+    }
+    return totalReadBytes;
+  }
+
+  protected void resetAndCopyLeftOverBytes(final ByteBuffer channelBuffer) {
+    if (channelBuffer.hasRemaining()) {
+      channelBuffer.compact();
+    } else {
+      channelBuffer.clear();
+    }
+  }
+
+  /**
+   * Fill the given buffer reading data from channel at least one byte unless
+   * end-of-stream has been reached.
+   *
+   * @return number of bytes read or -1 on end-of-stream
+   */
+  protected int readIntoBuffer(ByteBuffer buffer) throws IOException {
+    long parkedNanos = 0L;
+    int numTries = 0;
+    int numBytes;
+    while ((numBytes = this.channel.read(buffer)) == 0) {
+      if (!buffer.hasRemaining()) {
+        break;
+      }
+      // wait for a bit after some retries
+      parkedNanos = parkThreadForAsyncOperationIfRequired(
+          this, parkedNanos, ++numTries);
+    }
+    if (numBytes > 0) {
+      this.bytesRead += numBytes;
+    }
+    return numBytes;
+  }
+
+  @Override
+  public final Thread getParkedThread() {
+    return this.parkedThread;
+  }
+
+  @Override
+  public final void setParkedThread(Thread thread) {
+    this.parkedThread = thread;
+  }
+
+  @Override
+  public long getParkNanosMax() {
+    return PARK_NANOS_MAX;
+  }
+
+  /**
+   * Fill the given buffer reading data from channel whatever is available in
+   * one call (non-blocking if channel is so configured).
+   *
+   * @return number of bytes read (can be zero) or -1 on end-of-stream
+   */
+  protected int readIntoBufferNoWait(ByteBuffer buffer)
+      throws IOException {
+    final int numBytes = this.channel.read(buffer);
+    if (numBytes > 0) {
+      this.bytesRead += numBytes;
+    }
+    return numBytes;
+  }
+
+  public final long getBytesRead() {
+    return this.bytesRead;
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/NativeCalls.java b/geode-core/src/main/java/org/apache/geode/internal/shared/NativeCalls.java
old mode 100755
new mode 100644
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/OutputStreamChannel.java b/geode-core/src/main/java/org/apache/geode/internal/shared/OutputStreamChannel.java
new file mode 100644
index 0000000..2c58792
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/OutputStreamChannel.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+/*
+ * Changes for SnappyData distributed computational and data platform.
+ *
+ * Portions Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * Intermediate class that extends both an OutputStream and WritableByteChannel.
+ *
+ * @author swale
+ * @since gfxd 1.1
+ */
+public abstract class OutputStreamChannel extends OutputStream implements
+    WritableByteChannel, StreamChannel {
+
+  protected final WritableByteChannel channel;
+  private final boolean socketToSameHost;
+  private volatile Thread parkedThread;
+  protected volatile long bytesWritten;
+
+  protected OutputStreamChannel(WritableByteChannel channel) {
+    this.channel = channel;
+    this.socketToSameHost = isSocketToSameHost(channel);
+  }
+
+
+  private boolean isSocketToSameHost(Channel channel) {
+    try {
+      if (channel instanceof SocketChannel) {
+        SocketChannel socketChannel = (SocketChannel) channel;
+        return isSocketToSameHost(socketChannel.getLocalAddress(),
+            socketChannel.getRemoteAddress());
+      }
+    } catch (IOException ignored) {
+    }
+    return false;
+  }
+
+  private boolean isSocketToSameHost(SocketAddress localSockAddress,
+      SocketAddress remoteSockAddress) {
+    if ((localSockAddress instanceof InetSocketAddress) &&
+        (remoteSockAddress instanceof InetSocketAddress)) {
+      InetAddress localAddress = ((InetSocketAddress) localSockAddress)
+          .getAddress();
+      return localAddress != null && localAddress.equals(
+          ((InetSocketAddress) remoteSockAddress).getAddress());
+    } else {
+      return false;
+    }
+  }
+
+  public final boolean isSocketToSameHost() {
+    return this.socketToSameHost;
+  }
+
+  /**
+   * Get the underlying {@link WritableByteChannel}.
+   */
+  public final WritableByteChannel getUnderlyingChannel() {
+    return this.channel;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public abstract int write(ByteBuffer src) throws IOException;
+
+  /**
+   * Writes an <code>int</code> value, which is comprised of four bytes,
+   * to the output stream in big-endian format
+   * compatible with {@link java.io.DataOutput#writeInt(int)}.
+   *
+   * @param v the <code>int</code> value to be written.
+   * @throws IOException if an I/O error occurs.
+   * @see java.io.DataOutput#writeInt(int)
+   */
+  public abstract void writeInt(int v) throws IOException;
+
+  /**
+   * Common base method to write a given ByteBuffer source via an intermediate
+   * direct byte buffer owned by the implementation of this class (if required).
+   */
+  protected final int writeBuffered(final ByteBuffer src,
+      final ByteBuffer channelBuffer) throws IOException {
+    int srcLen = src.remaining();
+    // flush large direct buffers directly into channel
+    final boolean flushBuffer = srcLen > (channelBuffer.limit() >>> 1) &&
+        src.isDirect();
+    int numWritten = 0;
+    while (srcLen > 0) {
+      final int remaining = channelBuffer.remaining();
+      if (srcLen <= remaining) {
+        channelBuffer.put(src);
+        return (numWritten + srcLen);
+      } else {
+        // flush directly if there is nothing in the channel buffer
+        if (flushBuffer && channelBuffer.position() == 0) {
+          return numWritten + writeBufferNoWait(src, this.channel);
+        }
+        // copy src to buffer and flush
+        if (remaining > 0) {
+          // lower limit of src temporarily to remaining
+          final int srcPos = src.position();
+          src.limit(srcPos + remaining);
+          try {
+            channelBuffer.put(src);
+          } finally {
+            // restore the limit
+            src.limit(srcPos + srcLen);
+          }
+          srcLen -= remaining;
+          numWritten += remaining;
+          assert srcLen == src.remaining() : "srcLen=" + srcLen
+              + " srcRemaining=" + src.remaining();
+        }
+        // if we were able to write the full buffer then try writing the
+        // remaining from source else return with whatever was written
+        if (!flushBufferNonBlockingBase(channelBuffer)) {
+          return numWritten;
+        } else if (flushBuffer) {
+          return numWritten + writeBufferNoWait(src, this.channel);
+        }
+        // for non-direct buffers use channel buffer for best performance
+        // so loop back and try again
+      }
+    }
+    return numWritten;
+  }
+
+  protected final boolean flushBufferNonBlockingBase(final ByteBuffer buffer)
+      throws IOException {
+    buffer.flip();
+
+    final boolean flushed;
+    try {
+      writeBufferNoWait(buffer, this.channel);
+    } finally {
+      // if we failed to write the full buffer then compact the remaining bytes
+      // to the start so we can start filling it again
+      if (buffer.hasRemaining()) {
+        buffer.compact();
+        flushed = false;
+      } else {
+        buffer.clear();
+        flushed = true;
+      }
+    }
+    return flushed;
+  }
+
+  protected int writeBuffer(final ByteBuffer buffer,
+      final WritableByteChannel channel) throws IOException {
+    long parkedNanos = 0;
+    int numTries = 0;
+    int numWritten;
+    while ((numWritten = channel.write(buffer)) == 0) {
+      if (!buffer.hasRemaining()) {
+        break;
+      }
+      // wait for a bit after some retries
+      parkedNanos = InputStreamChannel.parkThreadForAsyncOperationIfRequired(
+          this, parkedNanos, ++numTries);
+    }
+    if (numWritten > 0) {
+      this.bytesWritten += numWritten;
+    }
+    return numWritten;
+  }
+
+  @Override
+  public final Thread getParkedThread() {
+    return this.parkedThread;
+  }
+
+  @Override
+  public final void setParkedThread(Thread thread) {
+    this.parkedThread = thread;
+  }
+
+  @Override
+  public long getParkNanosMax() {
+    return InputStreamChannel.PARK_NANOS_MAX;
+  }
+
+  protected int writeBufferNoWait(final ByteBuffer buffer,
+      final WritableByteChannel channel) throws IOException {
+    int numWritten = channel.write(buffer);
+    if (numWritten > 0) {
+      this.bytesWritten += numWritten;
+    }
+    return numWritten;
+  }
+
+  public final long getBytesWritten() {
+    return this.bytesWritten;
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/StreamChannel.java b/geode-core/src/main/java/org/apache/geode/internal/shared/StreamChannel.java
new file mode 100644
index 0000000..fd3fc02
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/StreamChannel.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared;
+
+import java.io.Closeable;
+import java.nio.channels.Channel;
+
+/**
+ * Common base methods for {@link Channel} read/write operations (usually buffered).
+ */
+public interface StreamChannel extends Channel, Closeable {
+
+  /**
+   * Return the thread if waiting for read/write on this channel.
+   */
+  Thread getParkedThread();
+
+  /**
+   * Set the thread waiting for read/write on this channel (null to clear).
+   */
+  void setParkedThread(Thread thread);
+
+  /**
+   * Maximum time to wait before giving up read/write on channel
+   */
+  long getParkNanosMax();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferInputStream.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferInputStream.java
new file mode 100644
index 0000000..d5727e6
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferInputStream.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+
+/**
+ * Base class for use by buffered abstractions over channel using direct byte
+ * buffers. This particular class can be used as a much more efficient
+ * replacement for BufferedInputStream.
+ * <p>
+ * Note that the close() method of this class does not closing the underlying
+ * channel.
+ *
+ * @author swale
+ * @since gfxd 1.0
+ */
+public class ChannelBufferInputStream extends InputStreamChannel {
+
+  protected final ByteBuffer buffer;
+
+  public static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+  public ChannelBufferInputStream(ReadableByteChannel channel) throws IOException {
+    this(channel, DEFAULT_BUFFER_SIZE);
+  }
+
+  public ChannelBufferInputStream(ReadableByteChannel channel, int bufferSize)
+      throws IOException {
+    super(channel);
+    if (bufferSize <= 0) {
+      throw new IllegalArgumentException("invalid bufferSize=" + bufferSize);
+    }
+    this.buffer = allocateBuffer(bufferSize);
+    // flip to force refill on first use
+    this.buffer.flip();
+  }
+
+  protected ByteBuffer allocateBuffer(int bufferSize) {
+    return ByteBuffer.allocate(bufferSize);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int read() throws IOException {
+    if (this.buffer.hasRemaining()) {
+      return (this.buffer.get() & 0xff);
+    } else if (refillBuffer(this.buffer, 1, null) > 0) {
+      return (this.buffer.get() & 0xff);
+    } else {
+      return -1;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int read(byte[] buf, int off, int len) throws IOException {
+    if (len == 1) {
+      if (this.buffer.hasRemaining()) {
+        buf[off] = this.buffer.get();
+        return 1;
+      } else if (refillBuffer(this.buffer, 1, null) > 0) {
+        buf[off] = this.buffer.get();
+        return 1;
+      } else {
+        return -1;
+      }
+    }
+
+    // first copy anything remaining from buffer
+    final int remaining = this.buffer.remaining();
+    if (len <= remaining) {
+      if (len > 0) {
+        this.buffer.get(buf, off, len);
+        return len;
+      } else {
+        return 0;
+      }
+    }
+
+    // refill buffer once and read whatever available into buf;
+    // caller should invoke in a loop if buffer is still not full
+    int readBytes = 0;
+    if (remaining > 0) {
+      this.buffer.get(buf, off, remaining);
+      off += remaining;
+      len -= remaining;
+      readBytes += remaining;
+    }
+    final int bufBytes = refillBuffer(this.buffer, 1, null);
+    if (bufBytes > 0) {
+      if (len > bufBytes) {
+        len = bufBytes;
+      }
+      this.buffer.get(buf, off, len);
+      return (readBytes + len);
+    } else {
+      return readBytes > 0 ? readBytes : bufBytes;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int read(ByteBuffer dst) throws IOException {
+    return super.readBuffered(dst, this.buffer);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int readInt() throws IOException {
+    final ByteBuffer buffer = this.buffer;
+    if (buffer.remaining() >= 4) {
+      return buffer.getInt();
+    } else {
+      refillBuffer(buffer, 4, "readInt: premature end of stream");
+      return buffer.getInt();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int available() throws IOException {
+    return this.buffer.remaining();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public final boolean isOpen() {
+    return this.channel.isOpen();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() throws IOException {
+    this.buffer.clear();
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeDataInputStream.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeDataInputStream.java
new file mode 100644
index 0000000..d5f187b
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeDataInputStream.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+/*
+ * Changes for SnappyData distributed computational and data platform.
+ *
+ * Portions Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared.unsafe;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.geode.pdx.internal.unsafe.UnsafeWrapper;
+
+/**
+ * A buffered DataInput abstraction over channel using direct byte buffers, and
+ * using internal Unsafe class for best performance.
+ * <p>
+ * The implementation is not thread-safe by design. This particular class can be
+ * used as an efficient, buffered DataInput implementation for file channels,
+ * socket channels and other similar.
+ *
+ * @author swale
+ * @since gfxd 1.0
+ */
+public class ChannelBufferUnsafeDataInputStream extends
+    ChannelBufferUnsafeInputStream implements DataInput {
+  private static UnsafeWrapper unsafe = new UnsafeWrapper();
+
+  public ChannelBufferUnsafeDataInputStream(ReadableByteChannel channel,
+      int bufferSize) {
+    super(channel, bufferSize);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void readFully(byte[] b) throws IOException {
+    readFully(b, 0, b.length);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void readFully(byte[] b,
+      int off, int len) throws IOException {
+    while (true) {
+      final int readBytes = super.read(b, off, len);
+      if (readBytes >= len) {
+        return;
+      } else if (readBytes >= 0) {
+        len -= readBytes;
+        off += readBytes;
+      } else {
+        throw new EOFException();
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int skipBytes(int n) {
+    return (int) skip(n);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long skip(long n) {
+    n = Math.max(0, Math.min(n, this.addrLimit - this.addrPosition));
+    this.addrPosition += n;
+    return n;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final boolean readBoolean() throws IOException {
+    return readByte() != 0;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final byte readByte() throws IOException {
+    if (this.addrPosition >= this.addrLimit) {
+      refillBuffer(this.buffer, 1, "readByte: premature end of stream");
+    }
+    return unsafe.getByte(null, this.addrPosition++);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int readUnsignedByte() throws IOException {
+    return (readByte() & 0xff);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final short readShort() throws IOException {
+    long addrPos = this.addrPosition;
+    if ((this.addrLimit - addrPos) < 2) {
+      refillBuffer(this.buffer, 2, "readShort: premature end of stream");
+      addrPos = this.addrPosition;
+    }
+    this.addrPosition += 2;
+    if (UnsafeHolder.littleEndian) {
+      return Short.reverseBytes(unsafe.getShort(null, addrPos));
+    } else {
+      return unsafe.getShort(null, addrPos);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int readUnsignedShort() throws IOException {
+    return (readShort() & 0xFFFF);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final char readChar() throws IOException {
+    return (char) readShort();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final long readLong() throws IOException {
+    long addrPos = this.addrPosition;
+    if ((this.addrLimit - addrPos) < 8) {
+      refillBuffer(this.buffer, 8, "readLong: premature end of stream");
+      addrPos = this.addrPosition;
+    }
+    this.addrPosition += 8;
+    if (UnsafeHolder.littleEndian) {
+      return Long.reverseBytes(unsafe.getLong(null, addrPos));
+    } else {
+      return unsafe.getLong(null, addrPos);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final float readFloat() throws IOException {
+    return Float.intBitsToFloat(readInt());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final double readDouble() throws IOException {
+    return Double.longBitsToDouble(readLong());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String readLine() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String readUTF() throws IOException {
+    return DataInputStream.readUTF(this);
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeDataOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeDataOutputStream.java
new file mode 100644
index 0000000..01d89b8
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeDataOutputStream.java
@@ -0,0 +1,298 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared.unsafe;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.geode.pdx.internal.unsafe.UnsafeWrapper;
+
+
+/**
+ * A buffered DataOutput abstraction over channel using direct byte buffers, and
+ * using internal Unsafe class for best performance. Users must check for
+ * {@link UnsafeHolder#hasUnsafe()} before trying to use this class.
+ * <p>
+ * The implementation is not thread-safe by design. This particular class can be
+ * used as an efficient, buffered DataOutput implementation for file channels,
+ * socket channels and other similar.
+ *
+ * @author swale
+ * @since gfxd 1.0
+ */
+public class ChannelBufferUnsafeDataOutputStream extends
+    ChannelBufferUnsafeOutputStream implements DataOutput {
+  private static UnsafeWrapper unsafe = new UnsafeWrapper();
+
+  public ChannelBufferUnsafeDataOutputStream(WritableByteChannel channel) {
+    super(channel);
+  }
+
+  public ChannelBufferUnsafeDataOutputStream(WritableByteChannel channel,
+      int bufferSize) {
+    super(channel, bufferSize);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void writeBoolean(boolean v) throws IOException {
+    putByte(v ? (byte) 1 : 0);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void writeByte(int v) throws IOException {
+    putByte((byte) (v & 0xff));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void writeShort(int v) throws IOException {
+    long addrPos = this.addrPosition;
+    if ((this.addrLimit - addrPos) < 2) {
+      flushBufferBlocking(this.buffer);
+      addrPos = this.addrPosition;
+    }
+    this.addrPosition = putShort(addrPos, v);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void writeChar(int v) throws IOException {
+    writeShort(v);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void writeLong(long v) throws IOException {
+    long addrPos = this.addrPosition;
+    if ((this.addrLimit - addrPos) < 8) {
+      flushBufferBlocking(this.buffer);
+      addrPos = this.addrPosition;
+    }
+    this.addrPosition = putLong(addrPos, v);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void writeFloat(float v) throws IOException {
+    writeInt(Float.floatToIntBits(v));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void writeDouble(double v) throws IOException {
+    writeLong(Double.doubleToLongBits(v));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void writeBytes(String s) throws IOException {
+    if (s.length() > 0) {
+      write(s.getBytes(StandardCharsets.US_ASCII));
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void writeChars(String s) throws IOException {
+    int off = 0;
+    int len = s.length();
+    while (len > 0) {
+      long addrPos = this.addrPosition;
+      final int remaining = (int) (this.addrLimit - addrPos);
+      if ((len << 1) <= remaining) {
+        final int end = (off + len);
+        while (off < end) {
+          addrPos = putShort(addrPos, s.charAt(off++));
+        }
+        this.addrPosition = addrPos;
+        return;
+      } else {
+        final int remchars = (remaining >>> 1);
+        final int end = (off + remchars);
+        while (off < end) {
+          addrPos = putShort(addrPos, s.charAt(off++));
+        }
+        this.addrPosition = addrPos;
+        flushBufferBlocking(this.buffer);
+        len -= remchars;
+      }
+    }
+  }
+
+  private int getUTFLength(final String str, final int strLen) {
+    int utfLen = strLen;
+    for (int i = 0; i < strLen; i++) {
+      final char c = str.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        // 1 byte for character
+        continue;
+      } else if (c > 0x07FF) {
+        utfLen += 2; // 3 bytes for character
+      } else {
+        utfLen++; // 2 bytes for character
+      }
+    }
+    return utfLen;
+  }
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void writeUTF(String str) throws IOException {
+    int strLen = str.length();
+    if (strLen > 65535) {
+      throw new UTFDataFormatException("encoded string too long: " + strLen);
+    }
+
+    // first check the optimistic case where worst case of 2 for length + 3 for
+    // each char fits into remaining space in buffer
+    long addrPos = this.addrPosition;
+    long remaining = this.addrLimit - addrPos;
+    if (remaining >= ((strLen * 3) + 2)) {
+      // write the UTF string skipping the length, then write length at the last
+      addrPos += 2;
+      final long finalAddrPos = writeUTFSegmentNoOverflow(str, 0, strLen,
+          -1, null, addrPos);
+      long utflen = (finalAddrPos - addrPos);
+      if (utflen <= 65535) {
+        putShort(addrPos - 2, (int) utflen);
+        this.addrPosition = finalAddrPos;
+      } else {
+        // act as if we wrote nothing to this buffer (no change to addrPosition)
+        throw new UTFDataFormatException("encoded string too long: " + utflen
+            + " bytes");
+      }
+      return;
+    }
+
+
+    // otherwise first calculate the UTF encoded length, write it in buffer
+    // (which may need to be flushed at any point), then break string into worst
+    // case segments for writing to buffer and flushing if end of buffer reached
+    int utfLen = getUTFLength(str, strLen);
+    if (utfLen > 65535) {
+      throw new UTFDataFormatException("encoded string too long: " + utfLen
+          + " bytes");
+    }
+    // write the length first
+    if (remaining > 2) {
+      addrPos = putShort(addrPos, utfLen);
+      remaining -= 2;
+    } else {
+      flushBufferBlocking(this.buffer);
+      addrPos = putShort(this.addrPosition, utfLen);
+      remaining = this.addrLimit - addrPos;
+    }
+
+    // next break string into segments assuming worst case of 3 bytes per char,
+    // flushing buffer as required after each segment write
+    int offset = 0;
+    while (strLen > 0) {
+      int writeLen = Math.min(strLen, (int) (remaining / 3));
+      if (writeLen >= 3) {
+        // write the UTF segment and update the number of remaining characters,
+        // offset, remaining buffer size etc
+        long newAddrPos = writeUTFSegmentNoOverflow(str, offset, writeLen,
+            -1, null, addrPos);
+        strLen -= writeLen;
+        offset += writeLen;
+        remaining -= (newAddrPos - addrPos);
+        addrPos = newAddrPos;
+      } else {
+        // if we have too few to write then better to flush the buffer and then
+        // try (bufferSize is at least 10 as ensured in constructors)
+        this.addrPosition = addrPos;
+        flushBufferBlocking(this.buffer);
+        remaining = this.addrLimit - (addrPos = this.addrPosition);
+      }
+    }
+    this.addrPosition = addrPos;
+  }
+
+  public static long writeUTFSegmentNoOverflow(String str, int offset,
+      int length, final int utfLen, final Object target, long addrPos) {
+    final int end = (offset + length);
+    // fast path for ASCII strings
+    if (length == utfLen) {
+      while (offset < end) {
+        final char c = str.charAt(offset++);
+        unsafe.putByte(target, addrPos++, (byte) c);
+      }
+      return addrPos;
+    }
+    while (offset < end) {
+      final char c = str.charAt(offset++);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        unsafe.putByte(target, addrPos++, (byte) c);
+      } else if (c > 0x07FF) {
+        unsafe.putByte(target, addrPos++, (byte) (0xE0 | ((c >> 12) & 0x0F)));
+        unsafe.putByte(target, addrPos++, (byte) (0x80 | ((c >> 6) & 0x3F)));
+        unsafe.putByte(target, addrPos++, (byte) (0x80 | (c & 0x3F)));
+      } else {
+        unsafe.putByte(target, addrPos++, (byte) (0xC0 | ((c >> 6) & 0x1F)));
+        unsafe.putByte(target, addrPos++, (byte) (0x80 | (c & 0x3F)));
+      }
+    }
+    return addrPos;
+  }
+
+  /** Write a short in big-endian format on given off-heap address. */
+  protected static long putShort(long addrPos, final int v) {
+    if (UnsafeHolder.littleEndian) {
+      unsafe.putShort(null, addrPos, Short.reverseBytes((short) v));
+    } else {
+      unsafe.putShort(null, addrPos, (short) v);
+    }
+    return addrPos + 2;
+  }
+
+  /** Write a long in big-endian format on given off-heap address. */
+  protected static long putLong(long addrPos, final long v) {
+    if (UnsafeHolder.littleEndian) {
+      unsafe.putLong(null, addrPos, Long.reverseBytes(v));
+    } else {
+      unsafe.putLong(null, addrPos, v);
+    }
+    return addrPos + 8;
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeInputStream.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeInputStream.java
new file mode 100644
index 0000000..9be7956
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeInputStream.java
@@ -0,0 +1,270 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+/*
+ * Changes for SnappyData distributed computational and data unsafe.
+ *
+ * Portions Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared.unsafe;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.geode.internal.shared.ChannelBufferInputStream;
+import org.apache.geode.internal.shared.InputStreamChannel;
+import org.apache.geode.pdx.internal.unsafe.UnsafeWrapper;
+
+/**
+ * A more efficient implementation of {@link ChannelBufferInputStream}
+ * using internal unsafe class (~30% in raw read calls).
+ * Use {@link UnsafeHolder#newChannelBufferInputStream} method to
+ * create either this or {@link ChannelBufferInputStream} depending on
+ * availability.
+ * <p>
+ * Note that the close() method of this class does not closing the underlying
+ * channel.
+ *
+ * @author swale
+ * @since gfxd 1.1
+ */
+public class ChannelBufferUnsafeInputStream extends InputStreamChannel {
+  private static UnsafeWrapper unsafe = new UnsafeWrapper();
+
+  protected ByteBuffer buffer;
+  protected final long baseAddress;
+  /**
+   * Actual buffer position (+baseAddress) accounting is done by this. Buffer
+   * position is adjusted during refill and other places where required using
+   * this.
+   */
+  protected long addrPosition;
+  protected long addrLimit;
+
+  public ChannelBufferUnsafeInputStream(ReadableByteChannel channel) {
+    this(channel, ChannelBufferInputStream.DEFAULT_BUFFER_SIZE);
+  }
+
+  public ChannelBufferUnsafeInputStream(ReadableByteChannel channel,
+      int bufferSize) {
+    super(channel);
+    if (bufferSize <= 0) {
+      throw new IllegalArgumentException("invalid bufferSize=" + bufferSize);
+    }
+    this.buffer = allocateBuffer(bufferSize);
+    // force refill on first use
+    this.buffer.position(bufferSize);
+
+    try {
+      this.baseAddress = UnsafeHolder.getDirectBufferAddress(this.buffer);
+      resetBufferPositions();
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "failed in creating an 'unsafe' buffered channel stream", e);
+    }
+  }
+
+  protected final void resetBufferPositions() {
+    this.addrPosition = this.baseAddress + this.buffer.position();
+    this.addrLimit = this.baseAddress + this.buffer.limit();
+  }
+
+  protected ByteBuffer allocateBuffer(int bufferSize) {
+    // use allocator which will restrict total allocated size
+    ByteBuffer buffer = DirectBufferAllocator.instance().allocateWithFallback(
+        bufferSize, "CHANNELINPUT");
+    // set the order to native explicitly to skip any byte order conversions
+    buffer.order(ByteOrder.nativeOrder());
+    return buffer;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int read() throws IOException {
+    if (this.addrPosition >= this.addrLimit) {
+      if (refillBuffer(this.buffer, 1, null) <= 0) {
+        return -1;
+      }
+    }
+    return unsafe.getByte(null, this.addrPosition++) & 0xff;
+  }
+
+  private int read_(byte[] buf, int off, int len) throws IOException {
+    if (len == 1) {
+      final int b = read();
+      if (b != -1) {
+        buf[off] = (byte) b;
+        return 1;
+      } else {
+        return -1;
+      }
+    }
+
+    // first copy anything remaining from buffer
+    final int remaining = (int) (this.addrLimit - this.addrPosition);
+    if (len <= remaining) {
+      if (len > 0) {
+        unsafe.copyMemory(null, this.addrPosition, buf,
+            unsafe.arrayBaseOffset(byte[].class) + off, len);
+        this.addrPosition += len;
+        return len;
+      } else {
+        return 0;
+      }
+    }
+
+    // refill buffer once and read whatever available into buf;
+    // caller should invoke in a loop if buffer is still not full
+    if (remaining > 0) {
+      unsafe.copyMemory(null, this.addrPosition, buf,
+          unsafe.arrayBaseOffset(byte[].class) + off, remaining);
+      this.addrPosition += remaining;
+      return remaining;
+    }
+    final int bufBytes = refillBuffer(this.buffer, 1, null);
+    if (bufBytes > 0) {
+      if (len > bufBytes) {
+        len = bufBytes;
+      }
+      unsafe.copyMemory(null, this.addrPosition, buf,
+          unsafe.arrayBaseOffset(byte[].class) + off, len);
+      this.addrPosition += len;
+      return len;
+    } else {
+      return bufBytes;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int read(byte[] buf) throws IOException {
+    return read_(buf, 0, buf.length);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int read(byte[] buf,
+      int off, int len) throws IOException {
+    UnsafeHolder.checkBounds(buf.length, off, len);
+    return read_(buf, off, len);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int read(ByteBuffer dst) throws IOException {
+    // We will just use our ByteBuffer for the read. It might be possible
+    // to get slight performance advantage in using unsafe instead, but
+    // copying to ByteBuffer will not be efficient without reflection
+    // to get dst's native address in case it is a direct byte buffer.
+    // Avoiding the complication since the benefit will be very small
+    // in any case (and reflection cost may well offset that).
+    // We can use unsafe for a small perf benefit for heap byte buffers.
+
+    // adjust this buffer position first
+    this.buffer.position((int) (this.addrPosition - this.baseAddress));
+    try {
+      // now we are set to just call base class method
+      return super.readBuffered(dst, this.buffer);
+    } finally {
+      // finally reset the raw positions from buffer
+      resetBufferPositions();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int readInt() throws IOException {
+    long addrPos = this.addrPosition;
+    if ((this.addrLimit - addrPos) < 4) {
+      refillBuffer(this.buffer, 4, "readInt: premature end of stream");
+      addrPos = this.addrPosition;
+    }
+    this.addrPosition += 4;
+    if (UnsafeHolder.littleEndian) {
+      return Integer.reverseBytes(unsafe.getInt(null, addrPos));
+    } else {
+      return unsafe.getInt(null, addrPos);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int available() {
+    return (int) (this.addrLimit - this.addrPosition);
+  }
+
+  @Override
+  protected int refillBuffer(final ByteBuffer channelBuffer,
+      final int tryReadBytes, final String eofMessage) throws IOException {
+    // adjust this buffer position first
+    channelBuffer.position((int) (this.addrPosition - this.baseAddress));
+    try {
+      return super.refillBuffer(channelBuffer, tryReadBytes, eofMessage);
+    } finally {
+      // adjust back position and limit
+      resetBufferPositions();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final boolean isOpen() {
+    return this.channel.isOpen();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() {
+    final ByteBuffer buffer = this.buffer;
+    if (buffer != null) {
+      this.addrPosition = this.addrLimit = 0;
+      this.buffer = null;
+      DirectBufferAllocator.instance().release(buffer);
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeOutputStream.java
new file mode 100644
index 0000000..a923d08
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/ChannelBufferUnsafeOutputStream.java
@@ -0,0 +1,305 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+/*
+ * Changes for SnappyData distributed computational and data platform.
+ *
+ * Portions Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared.unsafe;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.geode.internal.shared.ChannelBufferOutputStream;
+import org.apache.geode.internal.shared.OutputStreamChannel;
+import org.apache.geode.pdx.internal.unsafe.UnsafeWrapper;
+
+/**
+ * A somewhat more efficient implementation of {@link ChannelBufferOutputStream}
+ * using internal unsafe class (~30% in raw single byte write calls).
+ * Use {@link UnsafeHolder#newChannelBufferOutputStream} method to create
+ * either this or {@link ChannelBufferOutputStream} depending on availability.
+ * <p>
+ * NOTE: THIS CLASS IS NOT THREAD-SAFE BY DESIGN. IF IT IS USED CONCURRENTLY
+ * BY MULTIPLE THREADS THEN BAD THINGS CAN HAPPEN DUE TO UNSAFE MEMORY WRITES.
+ * <p>
+ * Note that the close() method of this class does not close the underlying
+ * channel.
+ *
+ * @author swale
+ * @since gfxd 1.1
+ */
+public class ChannelBufferUnsafeOutputStream extends OutputStreamChannel {
+  private static UnsafeWrapper unsafe = new UnsafeWrapper();
+  protected ByteBuffer buffer;
+  protected final long baseAddress;
+  /**
+   * Actual buffer position (+baseAddress) accounting is done by this. Buffer
+   * position is adjusted during refill and other places where required using
+   * this.
+   */
+  protected long addrPosition;
+  protected long addrLimit;
+
+  /**
+   * Some minimum buffer size, particularly for longs and encoding UTF strings
+   * efficiently. If reducing this, then consider the logic in
+   * {@link ChannelBufferUnsafeDataOutputStream#writeUTF(String)} carefully.
+   */
+  protected static final int MIN_BUFFER_SIZE = 32;
+
+  public ChannelBufferUnsafeOutputStream(WritableByteChannel channel) {
+    this(channel, ChannelBufferOutputStream.DEFAULT_BUFFER_SIZE);
+  }
+
+  public ChannelBufferUnsafeOutputStream(WritableByteChannel channel,
+      int bufferSize) {
+    super(channel);
+    this.baseAddress = allocateBuffer(bufferSize);
+    resetBufferPositions();
+  }
+
+  /**
+   * Get handle to the underlying ByteBuffer. ONLY TO BE USED BY TESTS.
+   */
+  public ByteBuffer getInternalBuffer() {
+    // set the current position
+    this.buffer.position(position());
+    return this.buffer;
+  }
+
+  protected final void resetBufferPositions() {
+    this.addrPosition = this.baseAddress + this.buffer.position();
+    this.addrLimit = this.baseAddress + this.buffer.limit();
+  }
+
+  protected long allocateBuffer(int bufferSize) {
+    // expect minimum bufferSize of 10 bytes
+    if (bufferSize < MIN_BUFFER_SIZE) {
+      throw new IllegalArgumentException(
+          "ChannelBufferUnsafeDataOutputStream: buffersize=" + bufferSize
+              + " too small (minimum " + MIN_BUFFER_SIZE + ')');
+    }
+    // use allocator which will restrict total allocated size
+    final ByteBuffer buffer = DirectBufferAllocator.instance().allocateWithFallback(
+        bufferSize, "CHANNELOUTPUT");
+    // set the order to native explicitly to skip any byte order conversions
+    buffer.order(ByteOrder.nativeOrder());
+    this.buffer = buffer;
+
+    try {
+      return UnsafeHolder.getDirectBufferAddress(buffer);
+    } catch (Exception e) {
+      releaseBuffer();
+      throw new RuntimeException(
+          "failed in creating an 'unsafe' buffered channel stream", e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void write(int b) throws IOException {
+    putByte((byte) (b & 0xff));
+  }
+
+  protected final void write_(byte[] b, int off, int len) throws IOException {
+    if (len == 1) {
+      putByte(b[off]);
+      return;
+    }
+
+    while (len > 0) {
+      final long addrPos = this.addrPosition;
+      final int remaining = (int) (this.addrLimit - addrPos);
+      if (len <= remaining) {
+        unsafe.copyMemory(b, unsafe.arrayBaseOffset(byte[].class) + off,
+            null, addrPos, len);
+        this.addrPosition += len;
+        return;
+      } else {
+        // copy b to buffer and flush
+        if (remaining > 0) {
+          unsafe.copyMemory(b, unsafe.arrayBaseOffset(byte[].class) + off,
+              null, addrPos, remaining);
+          this.addrPosition += remaining;
+          len -= remaining;
+          off += remaining;
+        }
+        flushBufferBlocking(this.buffer);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void write(byte[] b) throws IOException {
+    write_(b, 0, b.length);
+  }
+
+  protected final void putByte(byte b) throws IOException {
+    if (this.addrPosition >= this.addrLimit) {
+      flushBufferBlocking(this.buffer);
+    }
+    unsafe.putByte(null, this.addrPosition++, b);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void write(byte[] b,
+      int off, int len) throws IOException {
+    UnsafeHolder.checkBounds(b.length, off, len);
+    write_(b, off, len);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final int write(ByteBuffer src) throws IOException {
+    // We will just use our ByteBuffer for the write. It might be possible
+    // to get slight performance advantage in using unsafe instead, but
+    // copying from source ByteBuffer will not be efficient without
+    // reflection to get src's native address in case it is a direct
+    // byte buffer. Avoiding the complication since the benefit will be
+    // very small in any case (and reflection cost may well offset that).
+
+    // adjust this buffer position first
+    this.buffer.position((int) (this.addrPosition - this.baseAddress));
+    // now we are actually set to just call base class method
+    try {
+      return super.writeBuffered(src, this.buffer);
+    } finally {
+      // finally reset the raw positions from buffer
+      resetBufferPositions();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void writeInt(int v) throws IOException {
+    long addrPos = this.addrPosition;
+    if ((this.addrLimit - addrPos) < 4) {
+      flushBufferBlocking(this.buffer);
+      addrPos = this.addrPosition;
+    }
+    this.addrPosition = putInt(addrPos, v);
+  }
+
+  public final int position() {
+    return (int) (this.addrPosition - this.baseAddress);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void flush() throws IOException {
+    final ByteBuffer buffer;
+    if (this.addrPosition > this.baseAddress &&
+        (buffer = this.buffer) != null) {
+      flushBufferBlocking(buffer);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final boolean isOpen() {
+    return this.channel.isOpen();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void close() throws IOException {
+    flush();
+    this.addrPosition = this.addrLimit = 0;
+    releaseBuffer();
+  }
+
+  protected final void releaseBuffer() {
+    final ByteBuffer buffer = this.buffer;
+    if (buffer != null) {
+      this.buffer = null;
+      DirectBufferAllocator.instance().release(buffer);
+    }
+  }
+
+  /**
+   * Close the underlying channel in addition to flushing/clearing the buffer.
+   */
+  public void closeChannel() throws IOException {
+    flush();
+    this.addrPosition = this.addrLimit = 0;
+    this.channel.close();
+    releaseBuffer();
+  }
+
+  protected void flushBufferBlocking(final ByteBuffer buffer)
+      throws IOException {
+    buffer.position(position());
+    buffer.flip();
+    try {
+      do {
+        writeBuffer(buffer, this.channel);
+      } while (buffer.hasRemaining());
+    } finally {
+      if (buffer.hasRemaining()) {
+        buffer.compact();
+      } else {
+        buffer.clear();
+      }
+      resetBufferPositions();
+    }
+  }
+
+  /** Write an integer in big-endian format on given off-heap address. */
+  protected static long putInt(long addrPos, final int v) {
+    if (UnsafeHolder.littleEndian) {
+      unsafe.putInt(null, addrPos, Integer.reverseBytes(v));
+    } else {
+      unsafe.putInt(null, addrPos, v);
+    }
+    return addrPos + 4;
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/DirectBufferAllocator.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/DirectBufferAllocator.java
new file mode 100644
index 0000000..d589cde
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/DirectBufferAllocator.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared.unsafe;
+
+import java.nio.ByteBuffer;
+import java.util.function.BiConsumer;
+
+import org.apache.geode.internal.shared.BufferAllocator;
+
+/**
+ * Generic implementation of {@link BufferAllocator} for direct ByteBuffers
+ * using Java NIO API.
+ */
+public class DirectBufferAllocator extends BufferAllocator {
+
+  /**
+   * Overhead of allocation on off-heap memory is kept fixed at 8 even though
+   * actual overhead will be dependent on the malloc implementation.
+   */
+  public static final int DIRECT_OBJECT_OVERHEAD = 8;
+
+  /**
+   * The owner of direct buffers that are stored in Regions and tracked in UMM.
+   */
+  public static final String DIRECT_STORE_OBJECT_OWNER =
+      "GEODE_DIRECT_STORE_OBJECTS";
+
+  public static final String DIRECT_STORE_DATA_FRAME_OUTPUT =
+      "DIRECT_" + STORE_DATA_FRAME_OUTPUT;
+
+  private static final DirectBufferAllocator globalInstance =
+      new DirectBufferAllocator();
+
+  private static volatile DirectBufferAllocator instance = globalInstance;
+
+  public static DirectBufferAllocator instance() {
+    return instance;
+  }
+
+  public DirectBufferAllocator initialize() {
+    DirectBufferAllocator.setInstance(this);
+    return this;
+  }
+
+  public static synchronized void setInstance(DirectBufferAllocator allocator) {
+    instance = allocator;
+  }
+
+  public static synchronized void resetInstance() {
+    instance = globalInstance;
+  }
+
+  protected DirectBufferAllocator() {}
+
+  public RuntimeException lowMemoryException(String op, int required) {
+    return new RuntimeException();
+  }
+
+  public void changeOwnerToStorage(ByteBuffer buffer, int capacity,
+      BiConsumer<String, Object> changeOwner) {}
+
+  @Override
+  public ByteBuffer allocate(int size, String owner) {
+    return allocateForStorage(size);
+  }
+
+  @Override
+  public ByteBuffer allocateWithFallback(int size, String owner) {
+    try {
+      return allocateForStorage(size);
+    } catch (RuntimeException re) {
+      if (instance() != globalInstance) {
+        return globalInstance.allocateForStorage(size);
+      } else {
+        throw re;
+      }
+    }
+  }
+
+  @Override
+  public ByteBuffer allocateForStorage(int size) {
+    ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+    return buffer;
+  }
+
+  @Override
+  public void clearPostAllocate(ByteBuffer buffer) {
+    // clear till the capacity and not limit since former will be a factor
+    // of 8 and hence more efficient in Unsafe.setMemory
+    fill(buffer, (byte) 0, 0, buffer.capacity());
+  }
+
+  @Override
+  public Object baseObject(ByteBuffer buffer) {
+    return null;
+  }
+
+  @Override
+  public long baseOffset(ByteBuffer buffer) {
+    return UnsafeHolder.getDirectBufferAddress(buffer);
+  }
+
+  @Override
+  public ByteBuffer expand(ByteBuffer buffer, int required, String owner) {
+    assert required > 0 : "expand: unexpected required = " + required;
+
+    final int currentUsed = buffer.limit();
+    if (currentUsed + required > buffer.capacity()) {
+      final int newLength = BufferAllocator.expandedSize(currentUsed, required);
+      final ByteBuffer newBuffer = ByteBuffer.allocateDirect(newLength)
+          .order(buffer.order());
+      buffer.rewind();
+      newBuffer.put(buffer);
+      UnsafeHolder.releaseDirectBuffer(buffer);
+      newBuffer.rewind(); // position at start as per the contract of expand
+      return newBuffer;
+    } else {
+      buffer.limit(currentUsed + required);
+      return buffer;
+    }
+  }
+
+  @Override
+  public ByteBuffer fromBytesToStorage(byte[] bytes, int offset, int length) {
+    final ByteBuffer buffer = allocateForStorage(length);
+    buffer.put(bytes, offset, length);
+    // move to the start
+    buffer.rewind();
+    return buffer;
+  }
+
+  @Override
+  public ByteBuffer transfer(ByteBuffer buffer, String owner) {
+    if (buffer.isDirect()) {
+      return buffer;
+    } else {
+      return super.transfer(buffer, owner);
+    }
+  }
+
+  @Override
+  public boolean isDirect() {
+    return true;
+  }
+
+  @Override
+  public void close() {
+    UnsafeHolder.releasePendingReferences();
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/FreeMemory.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/FreeMemory.java
new file mode 100644
index 0000000..686af0f
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/FreeMemory.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package org.apache.geode.internal.shared.unsafe;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.geode.pdx.internal.unsafe.UnsafeWrapper;
+
+@SuppressWarnings("serial")
+public abstract class FreeMemory extends AtomicLong implements Runnable {
+
+  protected FreeMemory(long address) {
+    super(address);
+  }
+
+  protected final long tryFree() {
+    // try hard to ensure freeMemory call happens only once
+    final long address = get();
+    return (address != 0 && compareAndSet(address, 0L)) ? address : 0L;
+  }
+
+  protected abstract String objectName();
+
+  @Override
+  public void run() {
+    final long address = tryFree();
+    if (address != 0) {
+      new UnsafeWrapper().freeMemory(address);
+    }
+  }
+
+  public interface Factory {
+    FreeMemory newFreeMemory(long address, int size);
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/UnsafeHolder.java b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/UnsafeHolder.java
new file mode 100644
index 0000000..dfa5a81
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/shared/unsafe/UnsafeHolder.java
@@ -0,0 +1,397 @@
+/*
+ * Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+/*
+ * Changes for SnappyData distributed computational and data platform.
+ *
+ * Portions Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.shared.unsafe;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.locks.LockSupport;
+
+import org.apache.geode.internal.shared.ChannelBufferInputStream;
+import org.apache.geode.internal.shared.ChannelBufferOutputStream;
+import org.apache.geode.internal.shared.InputStreamChannel;
+import org.apache.geode.internal.shared.OutputStreamChannel;
+
+/**
+ * Holder for static sun.misc.Unsafe instance and some convenience methods. Use
+ * other methods only if {@link UnsafeHolder#hasUnsafe()} returns true;
+ *
+ * @author swale
+ * @since gfxd 1.1
+ */
+public abstract class UnsafeHolder {
+
+  private static final class Wrapper {
+
+    static final sun.misc.Unsafe unsafe;
+    static final boolean unaligned;
+    static final Constructor<?> directBufferConstructor;
+    static final Field cleanerField;
+    static final Field cleanerRunnableField;
+    static final Object javaLangRefAccess;
+    static final Method handlePendingRefs;
+
+    static {
+      sun.misc.Unsafe v;
+      Constructor<?> dbConstructor;
+      Field cleaner;
+      Field runnableField = null;
+      try {
+        final ClassLoader systemLoader = ClassLoader.getSystemClassLoader();
+        // try using "theUnsafe" field
+        Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+        field.setAccessible(true);
+        v = (sun.misc.Unsafe) field.get(null);
+
+        // get the constructor of DirectByteBuffer that accepts a Runnable
+        Class<?> cls = Class.forName("java.nio.DirectByteBuffer",
+            false, systemLoader);
+        dbConstructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
+        dbConstructor.setAccessible(true);
+
+        cleaner = cls.getDeclaredField("cleaner");
+        cleaner.setAccessible(true);
+
+        // search for the Runnable field in Cleaner
+        Class<?> runnableClass = Runnable.class;
+        Field[] fields = sun.misc.Cleaner.class.getDeclaredFields();
+        for (Field f : fields) {
+          if (runnableClass.isAssignableFrom(f.getType())) {
+            if (runnableField == null || f.getName().contains("thunk")) {
+              f.setAccessible(true);
+              runnableField = f;
+            }
+          }
+        }
+
+        Class<?> bitsClass = Class.forName("java.nio.Bits",
+            false, systemLoader);
+        Method m = bitsClass.getDeclaredMethod("unaligned");
+        m.setAccessible(true);
+        unaligned = Boolean.TRUE.equals(m.invoke(null));
+
+      } catch (LinkageError le) {
+        throw le;
+      } catch (Throwable t) {
+        throw new ExceptionInInitializerError(t);
+      }
+      if (v == null) {
+        throw new ExceptionInInitializerError("theUnsafe not found");
+      }
+      if (runnableField == null) {
+        throw new ExceptionInInitializerError(
+            "DirectByteBuffer cleaner thunk runnable field not found");
+      }
+      unsafe = v;
+      directBufferConstructor = dbConstructor;
+      cleanerField = cleaner;
+      cleanerRunnableField = runnableField;
+
+      Method m;
+      Object langRefAccess;
+      try {
+        m = sun.misc.SharedSecrets.class.getMethod("getJavaLangRefAccess");
+        m.setAccessible(true);
+        langRefAccess = m.invoke(null);
+        m = langRefAccess.getClass().getMethod("tryHandlePendingReference");
+        m.setAccessible(true);
+        m.invoke(langRefAccess);
+      } catch (Throwable ignored) {
+        langRefAccess = null;
+        m = null;
+      }
+      javaLangRefAccess = langRefAccess;
+      handlePendingRefs = m;
+    }
+
+    static void init() {}
+  }
+
+  private static final boolean hasUnsafe;
+  // Limit to the chunk copied per Unsafe.copyMemory call to allow for
+  // safepoint polling by JVM.
+  public static final boolean littleEndian =
+      ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
+
+  static {
+    boolean v;
+    try {
+      Wrapper.init();
+      v = true;
+    } catch (LinkageError le) {
+      le.printStackTrace();
+      v = false;
+    }
+    hasUnsafe = v;
+  }
+
+  private UnsafeHolder() {
+    // no instance
+  }
+
+  public static boolean hasUnsafe() {
+    return hasUnsafe;
+  }
+
+  public static int getAllocationSize(int size) {
+    // round to word size
+    size = ((size + 7) >>> 3) << 3;
+    if (size > 0)
+      return size;
+    else
+      throw new BufferOverflowException();
+  }
+
+  public static ByteBuffer allocateDirectBuffer(int size,
+      FreeMemory.Factory factory) {
+    final int allocSize = getAllocationSize(size);
+    final ByteBuffer buffer = allocateDirectBuffer(
+        getUnsafe().allocateMemory(allocSize), allocSize, factory);
+    buffer.limit(size);
+    return buffer;
+  }
+
+  public static ByteBuffer allocateDirectBuffer(long address, int size,
+      FreeMemory.Factory factory) {
+    try {
+      ByteBuffer buffer = (ByteBuffer) Wrapper.directBufferConstructor
+          .newInstance(address, size);
+      if (factory != null) {
+        sun.misc.Cleaner cleaner = sun.misc.Cleaner.create(buffer,
+            factory.newFreeMemory(address, size));
+        Wrapper.cleanerField.set(buffer, cleaner);
+      }
+      return buffer;
+    } catch (Exception e) {
+      getUnsafe().throwException(e);
+      throw new IllegalStateException("unreachable");
+    }
+  }
+
+  public static long getDirectBufferAddress(ByteBuffer buffer) {
+    return ((sun.nio.ch.DirectBuffer) buffer).address();
+  }
+
+  // public static ByteBuffer reallocateDirectBuffer(ByteBuffer buffer,
+  // int newSize, Class<?> expectedClass, FreeMemory.Factory factory) {
+  // sun.nio.ch.DirectBuffer directBuffer = (sun.nio.ch.DirectBuffer)buffer;
+  // final long address = directBuffer.address();
+  // long newAddress = 0L;
+  //
+  // newSize = getAllocationSize(newSize);
+  // final sun.misc.Cleaner cleaner = directBuffer.cleaner();
+  // if (cleaner != null) {
+  // // reset the runnable to not free the memory and clean it up
+  // try {
+  // Object freeMemory = Wrapper.cleanerRunnableField.get(cleaner);
+  // if (expectedClass != null && (freeMemory == null ||
+  // !expectedClass.isInstance(freeMemory))) {
+  // throw new IllegalStateException("Expected class to be " +
+  // expectedClass.getName() + " in reallocate but was " +
+  // (freeMemory != null ? freeMemory.getClass().getName() : "null"));
+  // }
+  // // use the efficient realloc call if possible
+  // if ((freeMemory instanceof FreeMemory) &&
+  // ((FreeMemory)freeMemory).tryFree() != 0L) {
+  // newAddress = Wrapper.unsafe.reallocateMemory(address, newSize);
+  // }
+  // } catch (IllegalAccessException e) {
+  // // fallback to full copy
+  // }
+  // }
+  // if (newAddress == 0L) {
+  // if (expectedClass != null) {
+  // throw new IllegalStateException("Expected class to be " +
+  // expectedClass.getName() + " in reallocate but was non-runnable");
+  // }
+  // newAddress = getUnsafe().allocateMemory(newSize);
+  // Platform.copyMemory(null, address, null, newAddress,
+  // Math.min(newSize, buffer.limit()));
+  // }
+  // // clean only after copying is done
+  // if (cleaner != null) {
+  // cleaner.clean();
+  // cleaner.clear();
+  // }
+  // return allocateDirectBuffer(newAddress, newSize, factory)
+  // .order(buffer.order());
+  // }
+
+  // /**
+  // * Change the runnable field of Cleaner using given factory. The "to"
+  // * argument specifies that target Runnable type that factory will produce.
+  // * If the existing Runnable already matches "to" then its a no-op.
+  // * <p>
+  // * The provided {@link BiConsumer} is used to apply any action before actually
+  // * changing the runnable field with the boolean argument indicating whether
+  // * the current field matches "from" or if it is something else.
+  // */
+  // public static void changeDirectBufferCleaner(
+  // ByteBuffer buffer, int size, Class<? extends FreeMemory> from,
+  // Class<? extends FreeMemory> to, FreeMemory.Factory factory,
+  // final BiConsumer<String, Object> changeOwner) throws IllegalAccessException {
+  // sun.nio.ch.DirectBuffer directBuffer = (sun.nio.ch.DirectBuffer)buffer;
+  // final sun.misc.Cleaner cleaner = directBuffer.cleaner();
+  // if (cleaner != null) {
+  // // change the runnable
+  // final Field runnableField = Wrapper.cleanerRunnableField;
+  // Object runnable = runnableField.get(cleaner);
+  // // skip if it already matches the target Runnable type
+  // if (!to.isInstance(runnable)) {
+  // if (changeOwner != null) {
+  // if (from.isInstance(runnable)) {
+  // changeOwner.accept(((FreeMemory)runnable).objectName(), runnable);
+  // } else {
+  // changeOwner.accept(null, runnable);
+  // }
+  // }
+  // Runnable newFree = factory.newFreeMemory(directBuffer.address(), size);
+  // runnableField.set(cleaner, newFree);
+  // }
+  // } else {
+  // throw new IllegalAccessException(
+  // "ByteBuffer without a Cleaner cannot be marked for storage");
+  // }
+  // }
+
+  /**
+   * Release explicitly assuming passed ByteBuffer is a direct one. Avoid using
+   * this directly rather use BufferAllocator.allocate/release where possible.
+   */
+  public static void releaseDirectBuffer(ByteBuffer buffer) {
+    sun.misc.Cleaner cleaner = ((sun.nio.ch.DirectBuffer) buffer).cleaner();
+    if (cleaner != null) {
+      cleaner.clean();
+      cleaner.clear();
+    }
+    buffer.rewind().limit(0);
+  }
+
+  public static void releasePendingReferences() {
+    // commented code intended to be invoked by reflection for platforms
+    // that may not have the requisite classes (e.g. Mac default JDK)
+    /*
+     * final sun.misc.JavaLangRefAccess refAccess =
+     * sun.misc.SharedSecrets.getJavaLangRefAccess();
+     * while (refAccess.tryHandlePendingReference()) ;
+     */
+    final Method handlePendingRefs = Wrapper.handlePendingRefs;
+    if (handlePendingRefs != null) {
+      try {
+        // retry while helping enqueue pending Cleaner Reference objects
+        // noinspection StatementWithEmptyBody
+        while ((Boolean) handlePendingRefs.invoke(Wrapper.javaLangRefAccess));
+      } catch (Exception ignored) {
+        // ignore any exceptions in releasing pending references
+      }
+    }
+  }
+
+  public static sun.misc.Unsafe getUnsafe() {
+    return Wrapper.unsafe;
+  }
+
+  public static boolean tryMonitorEnter(Object obj, boolean checkSelf) {
+    if (checkSelf && Thread.holdsLock(obj)) {
+      return false;
+    } else if (!getUnsafe().tryMonitorEnter(obj)) {
+      // try once more after a small wait
+      LockSupport.parkNanos(100L);
+      if (!getUnsafe().tryMonitorEnter(obj)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static void monitorEnter(Object obj) {
+    getUnsafe().monitorEnter(obj);
+  }
+
+  public static void monitorExit(Object obj) {
+    getUnsafe().monitorExit(obj);
+  }
+
+  @SuppressWarnings("resource")
+  public static InputStreamChannel newChannelBufferInputStream(
+      ReadableByteChannel channel, int bufferSize) throws IOException {
+    return (hasUnsafe
+        ? new ChannelBufferUnsafeInputStream(channel, bufferSize)
+        : new ChannelBufferInputStream(channel, bufferSize));
+  }
+
+  @SuppressWarnings("resource")
+  public static OutputStreamChannel newChannelBufferOutputStream(
+      WritableByteChannel channel, int bufferSize) throws IOException {
+    return (hasUnsafe
+        ? new ChannelBufferUnsafeOutputStream(channel, bufferSize)
+        : new ChannelBufferOutputStream(channel, bufferSize));
+  }
+
+  // @SuppressWarnings("resource")
+  // public static InputStreamChannel newChannelBufferFramedInputStream(
+  // ReadableByteChannel channel, int bufferSize) throws IOException {
+  // return (hasUnsafe
+  // ? new ChannelBufferUnsafeFramedInputStream(channel, bufferSize)
+  // : new ChannelBufferFramedInputStream(channel, bufferSize));
+  // }
+  //
+  // @SuppressWarnings("resource")
+  // public static OutputStreamChannel newChannelBufferFramedOutputStream(
+  // WritableByteChannel channel, int bufferSize) throws IOException {
+  // return (hasUnsafe
+  // ? new ChannelBufferUnsafeFramedOutputStream(channel, bufferSize)
+  // : new ChannelBufferFramedOutputStream(channel, bufferSize));
+  // }
+
+  /**
+   * Checks that the range described by {@code offset} and {@code size}
+   * doesn't exceed {@code arrayLength}.
+   */
+  public static void checkBounds(int arrayLength, int offset, int len) {
+    if ((offset | len) < 0 || offset > arrayLength ||
+        arrayLength - offset < len) {
+      throw new ArrayIndexOutOfBoundsException("Array index out of range: " +
+          "length=" + arrayLength + " offset=" + offset + " length=" + len);
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/BaseMsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/BaseMsgStreamer.java
index 550def8..24e426a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/BaseMsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/BaseMsgStreamer.java
@@ -53,4 +53,10 @@ public interface BaseMsgStreamer {
    * @throws IOException on exception
    */
   void close() throws IOException;
+
+
+  /**
+   * Called to free up resources used by this streamer after the streamer has produced its message.
+   */
+  void release();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java
index abb7fdb..5c4024c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Buffers.java
@@ -27,7 +27,8 @@ public class Buffers {
   /**
    * A list of soft references to byte buffers.
    */
-  private static final ConcurrentLinkedQueue bufferQueue = new ConcurrentLinkedQueue();
+  private static final ConcurrentLinkedQueue<BBSoftReference> bufferQueue =
+      new ConcurrentLinkedQueue<>();
 
   /**
    * Should only be called by threads that have currently acquired send permission.
@@ -47,14 +48,14 @@ public class Buffers {
     if (TCPConduit.useDirectBuffers) {
       IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a
                                                                             // set
-      BBSoftReference ref = (BBSoftReference) bufferQueue.poll();
+      BBSoftReference ref = bufferQueue.poll();
       while (ref != null) {
         ByteBuffer bb = ref.getBB();
         if (bb == null) {
           // it was garbage collected
           int refSize = ref.consumeSize();
           if (refSize > 0) {
-            if (ref.getSend()) { // fix bug 46773
+            if (ref.getSend()) {
               stats.incSenderBufferSize(-refSize, true);
             } else {
               stats.incReceiverBufferSize(-refSize, true);
@@ -68,7 +69,7 @@ public class Buffers {
           // wasn't big enough so put it back in the queue
           Assert.assertTrue(bufferQueue.offer(ref));
           if (alreadySeen == null) {
-            alreadySeen = new IdentityHashMap<BBSoftReference, BBSoftReference>();
+            alreadySeen = new IdentityHashMap<>();
           }
           if (alreadySeen.put(ref, ref) != null) {
             // if it returns non-null then we have already seen this item
@@ -77,7 +78,7 @@ public class Buffers {
             break;
           }
         }
-        ref = (BBSoftReference) bufferQueue.poll();
+        ref = bufferQueue.poll();
       }
       result = ByteBuffer.allocateDirect(size);
     } else {
@@ -116,14 +117,14 @@ public class Buffers {
     }
   }
 
-  public static void initBufferStats(DMStats stats) { // fixes 46773
+  public static void initBufferStats(DMStats stats) {
     if (TCPConduit.useDirectBuffers) {
       @SuppressWarnings("unchecked")
-      Iterator<BBSoftReference> it = (Iterator<BBSoftReference>) bufferQueue.iterator();
+      Iterator<BBSoftReference> it = bufferQueue.iterator();
       while (it.hasNext()) {
         BBSoftReference ref = it.next();
         if (ref.getBB() != null) {
-          if (ref.getSend()) { // fix bug 46773
+          if (ref.getSend()) {
             stats.incSenderBufferSize(ref.getSize(), true);
           } else {
             stats.incReceiverBufferSize(ref.getSize(), true);
@@ -163,7 +164,7 @@ public class Buffers {
     }
 
     public ByteBuffer getBB() {
-      return (ByteBuffer) super.get();
+      return super.get();
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectExceptions.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectExceptions.java
old mode 100755
new mode 100644
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index f234ee7..7c2c7a2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -20,6 +20,7 @@ import java.io.BufferedInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
@@ -31,6 +32,7 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ClosedSelectorException;
@@ -44,10 +46,13 @@ import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
+import org.apache.geode.SerializationException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.distributed.DistributedMember;
@@ -81,6 +86,7 @@ import org.apache.geode.internal.logging.LoggingThread;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.tcp.MsgReader.Header;
 import org.apache.geode.internal.util.concurrent.ReentrantSemaphore;
+import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
 
 /**
  * Connection is a socket holder that sends and receives serialized message objects. A Connection
@@ -107,6 +113,12 @@ public class Connection implements Runnable {
   public static final int MSG_HEADER_BYTES = 7;
 
   /**
+   * The default wait to use when waiting to read/write a channel
+   * (when there is no selector to signal)
+   */
+  public static final long PARK_NANOS_FOR_READ_WRITE = 100L;
+
+  /**
    * Small buffer used for send socket buffer on receiver connections and receive buffer on sender
    * connections.
    */
@@ -162,6 +174,10 @@ public class Connection implements Runnable {
     }
   }
 
+  public boolean isPooled() {
+    return this.pooled;
+  }
+
   private int getP2PConnectTimeout() {
     if (IS_P2P_CONNECT_TIMEOUT_INITIALIZED)
       return P2P_CONNECT_TIMEOUT;
@@ -211,8 +227,23 @@ public class Connection implements Runnable {
   /** the non-NIO output stream */
   OutputStream output;
 
+  /**
+   * The NIO stream message stream reader for readAck. The "processNIOStream"
+   * thread uses its own local inputStream for the case of readAcks which is
+   * spawned just for doing the handshake and then the thread terminates
+   * while readAck will wait for that initialization to happen (so there
+   * is no race in reading from channel between the two threads).
+   */
+  private MsgChannelDestreamer ackInputStream;
+
+  /** Set to true when in message dispatch processing. */
+  private boolean inDispatch;
+
+  /** Set to true when this connection is a pooled one. */
+  private final boolean pooled;
+
   /** output stream/channel lock */
-  private final Object outLock = new Object();
+  private final StoppableReentrantLock outLock;
 
   /** the ID string of the conduit (for logging) */
   String conduitIdStr;
@@ -395,10 +426,10 @@ public class Connection implements Runnable {
   boolean preserveOrder = false;
 
   /** number of messages sent on this connection */
-  private long messagesSent;
+  private AtomicLong messagesSent = new AtomicLong();
 
   /** number of messages received on this connection */
-  private long messagesReceived;
+  private AtomicLong messagesReceived = new AtomicLong();
 
   /** unique ID of this connection (remote if isReceiver==true) */
   private volatile long uniqueId;
@@ -536,7 +567,9 @@ public class Connection implements Runnable {
     }
     this.conduit = t.getConduit();
     this.isReceiver = true;
+    this.pooled = false;
     this.owner = t;
+    this.outLock = new StoppableReentrantLock(t.getConduit().getCancelCriterion());
     this.socket = socket;
     this.conduitIdStr = owner.getConduit().getSocketId().toString();
     this.handshakeRead = false;
@@ -665,7 +698,7 @@ public class Connection implements Runnable {
       throw new IOException(
           String.format(
               "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
-              new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(ver)}));
+              HANDSHAKE_VERSION, ver));
     }
     return ver;
   }
@@ -886,6 +919,12 @@ public class Connection implements Runnable {
 
     InternalDistributedMember myAddr = this.owner.getConduit().getMemberId();
     final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
+
+    // NIO stream expects network big-endian byte order
+    if (useNIOStream()) {
+      connectHandshake.buffer.order(ByteOrder.BIG_ENDIAN);
+    }
+
     /*
      * Note a byte of zero is always written because old products serialized a member id with always
      * sends an ip address. My reading of the ip-address specs indicated that the first byte of a
@@ -917,7 +956,7 @@ public class Connection implements Runnable {
     nioWriteFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
   }
 
-  private void handshakeStream() throws IOException {
+  private void handshakeOldIO() throws IOException {
     waitForAddressCompletion();
 
     this.output = getSocket().getOutputStream();
@@ -962,7 +1001,7 @@ public class Connection implements Runnable {
     if (useNIO()) {
       handshakeNio();
     } else {
-      handshakeStream();
+      handshakeOldIO();
     }
 
     startReader(connTable); // this reader only reads the handshake and then exits
@@ -979,7 +1018,7 @@ public class Connection implements Runnable {
    */
   protected static Connection createSender(final MembershipManager mgr, final ConnectionTable t,
       final boolean preserveOrder, final DistributedMember remoteAddr, final boolean sharedResource,
-      final long startTime, final long ackTimeout, final long ackSATimeout)
+      final boolean pooled, final long startTime, final long ackTimeout, final long ackSATimeout)
       throws IOException, DistributedSystemDisconnectedException {
     boolean warningPrinted = false;
     boolean success = false;
@@ -1063,7 +1102,7 @@ public class Connection implements Runnable {
         // create connection
         try {
           conn = null;
-          conn = new Connection(t, preserveOrder, remoteAddr, sharedResource);
+          conn = new Connection(t, preserveOrder, remoteAddr, sharedResource, pooled);
         } catch (javax.net.ssl.SSLHandshakeException se) {
           // no need to retry if certificates were rejected
           throw se;
@@ -1183,7 +1222,8 @@ public class Connection implements Runnable {
    * must accept us We will almost always send messages; small acks are received.
    */
   private Connection(ConnectionTable t, boolean preserveOrder, DistributedMember remoteID,
-      boolean sharedResource) throws IOException, DistributedSystemDisconnectedException {
+      boolean sharedResource, boolean pooled)
+      throws IOException, DistributedSystemDisconnectedException {
 
     // initialize a socket upfront. So that the
     InternalDistributedMember remoteAddr = (InternalDistributedMember) remoteID;
@@ -1194,7 +1234,9 @@ public class Connection implements Runnable {
     this.conduit = t.getConduit();
     this.isReceiver = false;
     this.owner = t;
+    this.outLock = new StoppableReentrantLock(t.getConduit().getCancelCriterion());
     this.sharedResource = sharedResource;
+    this.pooled = pooled;
     this.preserveOrder = preserveOrder;
     setRemoteAddr(remoteAddr);
     this.conduitIdStr = this.owner.getConduit().getSocketId().toString();
@@ -1208,7 +1250,60 @@ public class Connection implements Runnable {
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort());
-    if (useNIO()) {
+    if (useNIOStream()) {
+      SocketChannel channel = SocketChannel.open();
+      final Socket socket = channel.socket();
+      this.owner.addConnectingSocket(socket, addr.getAddress());
+      final int connectTime = getP2PConnectTimeout();
+      try {
+        setSendBufferSize(socket);
+        // disable Nagle since we are already buffering and a flush
+        // at this layer will always be followed by a read (either on this
+        // channel or some other with ReplyProcessor21)
+        socket.setTcpNoDelay(true);
+        // Non-blocking mode to write only as much as possible in one round.
+        // MsgChannelStreamer will move to writing to other servers,
+        // if remaining, for best performance.
+        if (pooled) {
+          channel.configureBlocking(false);
+          channel.connect(addr);
+          final long connectTimeNanos = connectTime * 1000000L;
+          long start = 0L;
+          while (!channel.finishConnect()) {
+            if (start == 0L && connectTimeNanos > 0) {
+              start = System.nanoTime();
+            }
+            LockSupport.parkNanos(PARK_NANOS_FOR_READ_WRITE);
+            if (connectTimeNanos > 0 &&
+                (System.nanoTime() - start) > connectTimeNanos) {
+              throw new ConnectException(
+                  "Attempt to connect timed out after " + connectTime + "ms");
+            }
+          }
+        } else {
+          // configure as blocking channel for non-pooled connections
+          channel.configureBlocking(true);
+          socket.connect(addr, connectTime);
+        }
+      } catch (NullPointerException | IllegalStateException e) {
+        // bug #44469: for some reason NIO throws runtime exceptions
+        // instead of an IOException on timeouts
+        ConnectException c =
+            new ConnectException("Attempt to connect timed out after " + connectTime + "ms");
+        c.initCause(e);
+        // prevent a hot loop by sleeping a little bit
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+        throw c;
+      } finally {
+        this.owner.removeConnectingSocket(socket);
+      }
+      this.socket = socket;
+
+    } else if (useNIO()) {
       SocketChannel channel = SocketChannel.open();
       this.owner.addConnectingSocket(channel.socket(), addr.getAddress());
       try {
@@ -1672,7 +1767,9 @@ public class Connection implements Runnable {
     ConnectionTable.threadWantsSharedResources();
     makeReaderThread(this.isReceiver);
     try {
-      if (useNIO()) {
+      if (useNIOStream()) {
+        processNIOStream();
+      } else if (useNIO()) {
         runNioReader();
       } else {
         runOioReader();
@@ -1932,6 +2029,10 @@ public class Connection implements Runnable {
 
   private void closeAllMsgDestreamers() {
     synchronized (this.destreamerLock) {
+      final MsgChannelDestreamer inputStream = this.ackInputStream;
+      if (inputStream != null) {
+        inputStream.close();
+      }
       if (this.idleMsgDestreamer != null) {
         this.idleMsgDestreamer.close();
         this.idleMsgDestreamer = null;
@@ -2315,7 +2416,7 @@ public class Connection implements Runnable {
                   }
                   // } else {
                   // ConnectionTable.threadWantsSharedResources();
-                  // logger.fine("thread-owned receiver with domino count of " + dominoNumber + "
+                  // logger.debug("thread-owned receiver with domino count of " + dominoNumber + "
                   // will prefer shared sockets");
                 }
                 this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
@@ -2528,7 +2629,7 @@ public class Connection implements Runnable {
         }
       }
       if (cacheContentChanges) {
-        messagesSent++;
+        messagesSent.incrementAndGet();
       }
     } finally {
       accessed();
@@ -3281,6 +3382,10 @@ public class Connection implements Runnable {
           } finally {
             stats.endSocketWrite(true, start, amtWritten, 0);
             // this.writerThread = null;
+            if (amtWritten == 0) {
+              // wait for a bit before retrying
+              LockSupport.parkNanos(PARK_NANOS_FOR_READ_WRITE);
+            }
           }
         } while (buffer.remaining() > 0);
       } // synchronized
@@ -3305,10 +3410,10 @@ public class Connection implements Runnable {
   /**
    * stateLock is used to synchronize state changes.
    */
-  private final Object stateLock = new Object();
+  final Object stateLock = new Object();
 
   /** for timeout processing, this is the current state of the connection */
-  private byte connectionState = STATE_IDLE;
+  byte connectionState = STATE_IDLE;
 
   /* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */
   /** the connection is idle, but may be in use */
@@ -3349,8 +3454,22 @@ public class Connection implements Runnable {
     MsgReader msgReader = null;
     DMStats stats = owner.getConduit().getStats();
     final Version version = getRemoteVersion();
+    CancelCriterion cancelCriterion = owner.getConduit().getCancelCriterion();
     try {
-      if (useNIO()) {
+      if (useNIOStream()) {
+        // use the normal message reader with fast dispatch route
+        synchronized (this.destreamerLock) {
+          // create a shared MsgChannelDestreamer on first use
+          MsgChannelDestreamer inputStream = this.ackInputStream;
+          if (inputStream == null) {
+            inputStream = new MsgChannelDestreamer(this,
+                getSocket().getChannel(), owner.getConduit().tcpBufferSize);
+            this.ackInputStream = inputStream;
+          }
+          readAndDispatchMessage(inputStream, processor, stats, cancelCriterion);
+          return;
+        }
+      } else if (useNIO()) {
         msgReader = new NIOMsgReader(this, version);
       } else {
         msgReader = new OioMsgReader(this, version);
@@ -3374,20 +3493,7 @@ public class Connection implements Runnable {
         releaseMsgDestreamer(header.getNioMessageId(), destreamer);
         len = destreamer.size();
       }
-      // I'd really just like to call dispatchMessage here. However,
-      // that call goes through a bunch of checks that knock about
-      // 10% of the performance. Since this direct-ack stuff is all
-      // about performance, we'll skip those checks. Skipping them
-      // should be legit, because we just sent a message so we know
-      // the member is already in our view, etc.
-      ClusterDistributionManager dm = (ClusterDistributionManager) owner.getDM();
-      msg.setBytesRead(len);
-      msg.setSender(remoteAddr);
-      stats.incReceivedMessages(1L);
-      stats.incReceivedBytes(msg.getBytesRead());
-      stats.incMessageChannelTime(msg.resetTimestamp());
-      msg.process(dm, processor);
-      // dispatchMessage(msg, len, false);
+      fastDispatchMessage(msg, len, processor, stats);
     } catch (MemberShunnedException e) {
       // do nothing
     } catch (SocketTimeoutException timeout) {
@@ -3407,10 +3513,10 @@ public class Connection implements Runnable {
       throw new ConnectionException(
           String.format("Unable to read direct ack because: %s", e));
     } catch (ConnectionException e) {
-      this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+      cancelCriterion.checkCancelInProgress(e);
       throw e;
     } catch (Exception e) {
-      this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+      cancelCriterion.checkCancelInProgress(e);
       if (!isSocketClosed()) {
         logger.fatal("ack read exception", e);
       }
@@ -3438,6 +3544,19 @@ public class Connection implements Runnable {
     }
   }
 
+  private void fastDispatchMessage(final ReplyMessage msg,
+      final int bytesRead, final DirectReplyProcessor processor,
+      final DMStats stats) {
+    DistributionManager dm = owner.getDM();
+    msg.setBytesRead(bytesRead);
+    msg.setSender(remoteAddr);
+    stats.incReceivedMessages(1L);
+    stats.incReceivedBytes(bytesRead);
+    stats.incMessageChannelTime(msg.resetTimestamp());
+    msg.process(dm, processor);
+  }
+
+
   /**
    * processes the current NIO buffer. If there are complete messages in the buffer, they are
    * deserialized and passed to TCPConduit for further processing
@@ -3517,29 +3636,15 @@ public class Connection implements Runnable {
                   throw td;
                 } catch (VirtualMachineError err) {
                   SystemFailure.initiateFailure(err);
-                  // If this ever returns, rethrow the error. We're poisoned
-                  // now, so don't let this thread continue.
                   throw err;
                 } catch (Throwable t) {
-                  // Whenever you catch Error or Throwable, you must also
-                  // catch VirtualMachineError (see above). However, there is
-                  // _still_ a possibility that you are dealing with a cascading
-                  // error condition, so you also need to check to see if the JVM
-                  // is still usable:
                   SystemFailure.checkFailure();
                   logger.fatal("Throwable dispatching message", t);
                 }
               } catch (VirtualMachineError err) {
                 SystemFailure.initiateFailure(err);
-                // If this ever returns, rethrow the error. We're poisoned
-                // now, so don't let this thread continue.
                 throw err;
               } catch (Throwable t) {
-                // Whenever you catch Error or Throwable, you must also
-                // catch VirtualMachineError (see above). However, there is
-                // _still_ a possibility that you are dealing with a cascading
-                // error condition, so you also need to check to see if the JVM
-                // is still usable:
                 SystemFailure.checkFailure();
                 sendFailureReply(ReplyProcessor21.getMessageRPId(),
                     "Error deserializing message", t,
@@ -3602,15 +3707,8 @@ public class Connection implements Runnable {
                 this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
               } catch (VirtualMachineError err) {
                 SystemFailure.initiateFailure(err);
-                // If this ever returns, rethrow the error. We're poisoned
-                // now, so don't let this thread continue.
                 throw err;
               } catch (Throwable ex) {
-                // Whenever you catch Error or Throwable, you must also
-                // catch VirtualMachineError (see above). However, there is
-                // _still_ a possibility that you are dealing with a cascading
-                // error condition, so you also need to check to see if the JVM
-                // is still usable:
                 SystemFailure.checkFailure();
                 this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
                 this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
@@ -3641,15 +3739,8 @@ public class Connection implements Runnable {
                   throw td;
                 } catch (VirtualMachineError err) {
                   SystemFailure.initiateFailure(err);
-                  // If this ever returns, rethrow the error. We're poisoned
-                  // now, so don't let this thread continue.
                   throw err;
                 } catch (Throwable t) {
-                  // Whenever you catch Error or Throwable, you must also
-                  // catch VirtualMachineError (see above). However, there is
-                  // _still_ a possibility that you are dealing with a cascading
-                  // error condition, so you also need to check to see if the JVM
-                  // is still usable:
                   SystemFailure.checkFailure();
                   logger.fatal("Throwable dispatching message", t);
                 }
@@ -3690,15 +3781,8 @@ public class Connection implements Runnable {
                 throw td;
               } catch (VirtualMachineError err) {
                 SystemFailure.initiateFailure(err);
-                // If this ever returns, rethrow the error. We're poisoned
-                // now, so don't let this thread continue.
                 throw err;
               } catch (Throwable t) {
-                // Whenever you catch Error or Throwable, you must also
-                // catch VirtualMachineError (see above). However, there is
-                // _still_ a possibility that you are dealing with a cascading
-                // error condition, so you also need to check to see if the JVM
-                // is still usable:
                 SystemFailure.checkFailure();
                 logger.fatal("Throwable deserializing P2P handshake reply",
                     t);
@@ -3780,10 +3864,6 @@ public class Connection implements Runnable {
                   // buffer.
                   setSendBufferSize(this.socket);
                 }
-                // String name = owner.getDM().getConfig().getName();
-                // if (name == null) {
-                // name = "pid="+OSProcess.getId();
-                // }
                 setThreadName(dominoNumber);
               } catch (Exception e) {
                 this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); // bug 37101
@@ -3853,6 +3933,514 @@ public class Connection implements Runnable {
     }
   }
 
+  private void processNIOStream() {
+    // take a snapshot of uniqueId to detect reconnect attempts; see bug 37592
+    SocketChannel channel;
+    try {
+      Socket socket = getSocket();
+      channel = socket.getChannel();
+      socket.setTcpNoDelay(true);
+      channel.configureBlocking(true);
+    } catch (ClosedChannelException e) {
+      // bug 37693: the channel was asynchronously closed. Our work
+      // is done.
+      try {
+        requestClose("channel closing: " + e);
+      } catch (Exception ignore) {
+      }
+      return; // exit loop and thread
+    } catch (IOException ex) {
+      if (stopped || owner.getConduit().getCancelCriterion()
+          .cancelInProgress() != null) {
+        try {
+          requestClose("shutting down");
+        } catch (Exception ignore) {
+        }
+        return; // bug37520: exit loop (and thread)
+      }
+      logger.warn("failed to set channel to blocking mode: " + ex);
+      this.readerShuttingDown = true;
+      try {
+        requestClose("failed to set channel to blocking mode: " + ex);
+      } catch (Exception ignore) {
+      }
+      return;
+    }
+
+    if (!stopped) {
+      logger.debug("Starting " + p2pReaderName());
+    }
+    // we should not change the state of the connection if we are a handshake
+    // reader thread as there is a race between this thread and the application
+    // thread doing direct ack
+    // fix for #40869
+    boolean isHandShakeReader = false;
+    MsgChannelDestreamer inputStream = null;
+    try {
+      inputStream = new MsgChannelDestreamer(
+          this, channel, this.owner.getConduit().tcpBufferSize);
+      for (;;) {
+        if (stopped) {
+          break;
+        }
+        if (SystemFailure.getFailure() != null) {
+          // Allocate no objects here!
+          Socket s = this.socket;
+          if (s != null) {
+            try {
+              s.close();
+            } catch (IOException e) {
+              // don't care
+            }
+          }
+          SystemFailure.checkFailure(); // throws
+        }
+        if (this.owner.getConduit().getCancelCriterion()
+            .cancelInProgress() != null) {
+          break;
+        }
+
+        synchronized (stateLock) {
+          connectionState = Connection.STATE_IDLE;
+        }
+        try {
+          // no locking required here since only one thread will ever
+          // do processing (for readAck case this thread will terminate
+          // after handshake since "isReceiver" will be false)
+          processMessage(inputStream);
+          if (!this.isReceiver
+              && (this.handshakeRead || this.handshakeCancelled)) {
+            if (logger.isDebugEnabled()) {
+              if (this.handshakeRead) {
+                logger.debug(p2pReaderName() +
+                    " handshake has been read " + this);
+              } else {
+                logger.debug(p2pReaderName() +
+                    " handshake has been cancelled " + this);
+              }
+            }
+            isHandShakeReader = true;
+            // Once we have read the handshake the reader can go away
+            break;
+          }
+        } catch (CancelException e) {
+          if (logger.isDebugEnabled()) {
+            logger.debug(p2pReaderName() + " Terminated <" + this
+                + "> due to cancellation:", e);
+          }
+          this.readerShuttingDown = true;
+          try {
+            requestClose("cache closed in channel read: " + e);
+          } catch (Exception ignored) {
+          }
+          return;
+        } catch (ClosedChannelException | EOFException e) {
+          this.readerShuttingDown = true;
+          try {
+            requestClose("channel closing: " + e);
+          } catch (Exception ignored) {
+          }
+          return;
+        } catch (IOException e) {
+          if (!isSocketClosed()
+              // needed for Solaris jdk 1.4.2_08
+              && !"Socket closed".equalsIgnoreCase(e.getMessage())) {
+            if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
+              logger.debug(p2pReaderName() + " io exception for " + this, e);
+            }
+            if (e.getMessage().contains(
+                "interrupted by a call to WSACancelBlockingCall")) {
+              logger.warn(p2pReaderName() +
+                  " received unexpected WSACancelBlockingCall exception, " +
+                  "which may result in a hang - bug 45675");
+            }
+          }
+          this.readerShuttingDown = true;
+          try {
+            requestClose("I/O exception in channel read: " + e);
+          } catch (Exception ignored) {
+          }
+          return;
+
+        } catch (Throwable t) {
+          Error err;
+          if (t instanceof Error && SystemFailure.isJVMFailureError(
+              err = (Error) t)) {
+            SystemFailure.initiateFailure(err);
+            throw err;
+          }
+          SystemFailure.checkFailure();
+
+          // bug 37101
+          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+          if (!stopped && !isSocketClosed()) {
+            logger.warn("Exception in channel read", t);
+          }
+          this.readerShuttingDown = true;
+          try {
+            requestClose("exception in channel read: " + t);
+          } catch (Exception ignored) {
+          }
+          return;
+        }
+      } // for
+    } finally {
+      if (inputStream != null) {
+        inputStream.close();
+      }
+
+      if (!isHandShakeReader) {
+        synchronized (stateLock) {
+          connectionState = STATE_IDLE;
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug(p2pReaderName() + " runNioReader terminated "
+            + " id=" + conduitIdStr + " from " + remoteAddr);
+      }
+    }
+  }
+
+  private boolean validateMessageType(byte messageType) {
+    if (validMsgType(messageType)) {
+      return true;
+    } else {
+      logger.fatal("Unknown P2P message type: " + messageType, new Exception("stack trace"));
+      this.readerShuttingDown = true;
+      requestClose("Unknown P2P message type: " + messageType);
+      return false;
+    }
+  }
+
+  private boolean processHandshake(MsgChannelDestreamer input)
+      throws IOException {
+    // read the header in handshake
+    int messageLength = input.readInt();
+    calcHdrVersion(messageLength);
+    byte messageType = input.readByte();
+    // skip message ID
+    input.skipBytes(2);
+    directAck = (messageType & DIRECT_ACK_BIT) != 0;
+    if (directAck) {
+      // logger.info("DEBUG: msg from " + getRemoteAddress() + " is direct ack" );
+      messageType &= ~DIRECT_ACK_BIT; // clear the ack bit
+    }
+    // Following validation fixes bug 31145
+    if (!validateMessageType(messageType)) {
+      return false;
+    }
+    if (!this.isReceiver) {
+      try {
+        this.replyCode = input.readUnsignedByte();
+        // logger.info("DEBUG: replyCode=" + this.replyCode);
+        if (this.replyCode == REPLY_CODE_OK_WITH_ASYNC_INFO) {
+          this.asyncDistributionTimeout = input.readInt();
+          this.asyncQueueTimeout = input.readInt();
+          this.asyncMaxQueueSize = (long) input.readInt() * (1024 * 1024);
+          if (this.asyncDistributionTimeout != 0 && logger.isInfoEnabled()) {
+            logger.info("{} async configuration received {}.",
+                p2pReaderName(),
+                " asyncDistributionTimeout=" + this.asyncDistributionTimeout
+                    + " asyncQueueTimeout=" + this.asyncQueueTimeout
+                    + " asyncMaxQueueSize="
+                    + (this.asyncMaxQueueSize / (1024 * 1024)));
+          }
+          // read the product version ordinal for on-the-fly serialization
+          // transformations (for rolling upgrades)
+          this.remoteVersion = Version.readVersion(input, true);
+        }
+      } catch (Exception e) {
+        owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+        logger.fatal("Error deserializing handshake reply", e);
+        this.readerShuttingDown = true;
+        requestClose("Errpr deserializing handshake reply");
+        return false;
+      } catch (ThreadDeath td) {
+        throw td;
+      } catch (Error err) {
+        if (SystemFailure.isJVMFailureError(err)) {
+          SystemFailure.initiateFailure(err);
+          throw err;
+        }
+        SystemFailure.checkFailure();
+        logger.fatal("Error deserializing handshake reply", err);
+        this.readerShuttingDown = true;
+        requestClose("Error deserializing handshake reply");
+        return false;
+      }
+      if (this.replyCode != REPLY_CODE_OK &&
+          this.replyCode != REPLY_CODE_OK_WITH_ASYNC_INFO) {
+        String err =
+            "Unknown handshake reply code: %s nioMessageLength: %s";
+        Object[] errArgs = new Object[] {Integer.valueOf(this.replyCode),
+            Integer.valueOf(nioMessageLength)};
+        if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113
+          logger.debug(
+              String.format(err, errArgs) + " (peer probably departed ungracefully)");
+        } else {
+          logger.fatal(err, errArgs);
+        }
+        this.readerShuttingDown = true;
+        requestClose(String.format(err, errArgs));
+        return false;
+      }
+      notifyHandshakeWaiter(true);
+    } else {
+      try {
+        byte b = input.readByte();
+        if (b != 0) {
+          throw new IllegalStateException(
+              String.format(
+                  "Detected old version (pre 5.0.1) of GemFire or non-GemFire during handshake due to initial byte being %s",
+                  new Byte(b)));
+        }
+        byte handshakeByte = input.readByte();
+        if (handshakeByte != HANDSHAKE_VERSION) {
+          throw new IllegalStateException(
+              String.format(
+                  "Detected wrong version of GemFire product during handshake. Expected %s but found %s",
+
+                  new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(handshakeByte)}));
+        }
+        InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(input);
+        setRemoteAddr(remote);
+        this.sharedResource = input.readBoolean();
+        this.preserveOrder = input.readBoolean();
+        this.uniqueId = input.readLong();
+        // read the product version ordinal for on-the-fly serialization
+        // transformations (for rolling upgrades)
+        this.remoteVersion = Version.readVersion(input, true);
+        int dominoNumber = 0;
+        dominoNumber = input.readInt();
+        if (this.sharedResource) {
+          dominoNumber = 0;
+        }
+        dominoCount.set(dominoNumber);
+        if (!this.sharedResource) {
+          if (tipDomino()) {
+            logger.info(
+                "thread owned receiver forcing itself to send on thread owned sockets");
+          } else if (dominoNumber < 2) {
+            ConnectionTable.threadWantsOwnResources();
+            logger.debug("thread-owned receiver with domino count of " +
+                dominoNumber + " will prefer sending on thread-owned sockets");
+          } else {
+            ConnectionTable.threadWantsSharedResources();
+            logger.debug("thread-owned receiver with domino count of " +
+                dominoNumber + " will prefer shared sockets");
+          }
+        }
+        Thread.currentThread().setName("P2P message reader for " +
+            this.remoteAddr + " " + (this.sharedResource ? "" : "un") +
+            "shared " + (this.preserveOrder ? "" : "un") + "ordered uid=" +
+            uniqueId + (dominoNumber > 0 ? (" dom #" + dominoNumber) : ""));
+      } catch (Exception e) {
+        // bug 37101
+        owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+        logger.fatal("Error deserializing p2p handshake message", e);
+        this.readerShuttingDown = true;
+        requestClose("Error deserializing p2p handshake message");
+        return false;
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("P2P handshake remote address is " + this.remoteAddr +
+            (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
+      }
+      try {
+        String authInit = System.getProperty(
+            DistributionConfigImpl.SECURITY_SYSTEM_PREFIX +
+                DistributionConfig.SECURITY_PEER_AUTH_INIT_NAME);
+        boolean isSecure = authInit != null && authInit.length() != 0;
+
+        if (isSecure) {
+          if (owner.getConduit().waitForMembershipCheck(this.remoteAddr)) {
+            sendOKHandshakeReply(); // fix for bug 33224
+            notifyHandshakeWaiter(true);
+          } else {
+            // ARB: check if we need notifyHandshakeWaiter() call.
+            notifyHandshakeWaiter(false);
+            logger.warn("Timeout during membership check in " + p2pReaderName());
+            return false;
+          }
+        } else {
+          sendOKHandshakeReply(); // fix for bug 33224
+          try {
+            notifyHandshakeWaiter(true);
+          } catch (Exception e) {
+            logger.warn("Uncaught exception from listener", e);
+          }
+        }
+      } catch (IOException ex) {
+        final String err = "Failed sending p2p handshake reply";
+        if (logger.isDebugEnabled()) {
+          logger.debug(err, ex);
+        }
+        this.readerShuttingDown = true;
+        requestClose(err + ": " + ex);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Reads the next message from the channel stream and passes it
+   * to TCPConduit for further processing.
+   * <p>
+   * The "inLock" must be held when invoking this method.
+   */
+  private void processMessage(final MsgChannelDestreamer input)
+      throws ConnectionException, IOException {
+    final TCPConduit conduit = this.owner.getConduit();
+    final CancelCriterion cancelCriterion = conduit.getCancelCriterion();
+    final DMStats stats = conduit.getStats();
+
+    if (!connected) {
+      return;
+    }
+    cancelCriterion.checkCancelInProgress(null);
+
+    if (!handshakeRead) {
+      // read HANDSHAKE with old header
+      processHandshake(input);
+      // continue with next message processing
+      return;
+    }
+
+    try {
+      readAndDispatchMessage(input, null, stats, cancelCriterion);
+    } catch (IOException | ConnectionException e) {
+      // throw back connection exceptions to be handled by caller
+      throw e;
+    } catch (Throwable t) {
+      Error err;
+      if (t instanceof Error && SystemFailure.isJVMFailureError(
+          err = (Error) t)) {
+        SystemFailure.initiateFailure(err);
+        throw err;
+      }
+      SystemFailure.checkFailure();
+
+      // #49309
+      if (!(t instanceof CancelException)) {
+        final String reason = cancelCriterion.cancelInProgress();
+        Throwable cancelledEx = null;
+        if (reason != null) {
+          cancelledEx = cancelCriterion.generateCancelledException(t);
+          if (cancelledEx != null) {
+            t = cancelledEx;
+          }
+        }
+        if (cancelledEx != null) {
+          t = cancelledEx;
+        } else {
+          // check if CancelException is wrapped inside
+          Throwable cause = t;
+          while ((cause = cause.getCause()) != null) {
+            if (cause instanceof CancelException) {
+              t = cause;
+              break;
+            }
+          }
+        }
+      }
+
+      final String error = inDispatch ? "Uncaught exception while dispatching message"
+          : "Error deserializing message";
+      sendFailureReply(ReplyProcessor21.getMessageRPId(),
+          error, t, directAck);
+      if (t instanceof ThreadDeath) {
+        throw (ThreadDeath) t;
+      }
+      if (t instanceof CancelException) {
+        if (!inDispatch || !(t instanceof CacheClosedException)) {
+          // Just log a message if we had trouble processing
+          // due to CacheClosedException; see bug 43543
+          throw (CancelException) t;
+        }
+      } else if (!inDispatch) {
+        // connection must be closed if there was trouble during deserialization
+        // else there can be dangling data on the connection causing subsequent
+        // reads to fail and processing to hang on the sender side (SNAP-1488)
+        logger.fatal(error, t);
+        if (t instanceof RuntimeException) {
+          throw (RuntimeException) t;
+        } else if (t instanceof Error) {
+          throw (Error) t;
+        } else {
+          throw new SerializationException(t.getMessage(), t);
+        }
+      }
+      logger.fatal(error, t);
+    } finally {
+      ReplyProcessor21.clearMessageRPId();
+    }
+  }
+
+  private void readAndDispatchMessage(final MsgChannelDestreamer input,
+      final DirectReplyProcessor directProcessor, final DMStats stats,
+      final CancelCriterion cancelCriterion) throws ConnectionException,
+      IOException, ClassNotFoundException {
+
+    byte messageType = input.readByte();
+    directAck = (messageType & DIRECT_ACK_BIT) != 0;
+    if (directAck) {
+      messageType &= ~DIRECT_ACK_BIT; // clear the ack bit
+    }
+    // Following validation fixes bug 31145
+    if (directProcessor == null && !validateMessageType(messageType)) {
+      return;
+    }
+
+    // mark that a new message is started primarily for getting
+    // total message length and writing that to stats etc
+    input.startNewMessage();
+
+    input.setRemoteVersion(remoteVersion);
+    inDispatch = false;
+    DistributionMessage msg;
+
+    ReplyProcessor21.initMessageRPId();
+    // add serialization stats
+    long startSer = stats.startMsgDeserialization();
+    msg = (DistributionMessage) InternalDataSerializer.readDSFID(input);
+    stats.endMsgDeserialization(startSer);
+    /*
+     * (streaming model can read next message on channel immediately)
+     * final int remaining = input.available();
+     * if (remaining != 0) {
+     * logger.warning(LocalizedStrings
+     * .Connection_MESSAGE_DESERIALIZATION_OF_0_DID_NOT_READ_1_BYTES,
+     * new Object[]{msg, remaining});
+     * // move buffer point to the end in any case else will be cascading errors
+     * input.skipBytes(remaining);
+     * }
+     */
+    final int messageLength = input.getLastMessageLength();
+    inDispatch = true;
+    if (directProcessor == null) {
+      try {
+        if (!dispatchMessage(msg, messageLength, directAck)) {
+          directAck = false;
+        }
+        accessed();
+      } catch (MemberShunnedException e) {
+        directAck = false; // don't respond (bug39117)
+      } catch (Exception de) {
+        cancelCriterion.checkCancelInProgress(de);
+        logger.fatal("Error dispatching message", de);
+      }
+    } else {
+      // pass through exceptions to caller
+      fastDispatchMessage((ReplyMessage) msg, messageLength,
+          directProcessor, stats);
+    }
+  }
+
+  public final void incMessagesReceived() {
+    this.messagesReceived.incrementAndGet();
+  }
+
   private void setThreadName(int dominoNumber) {
     Thread.currentThread().setName("P2P message reader for " + this.remoteAddr + " "
         + (this.sharedResource ? "" : "un") + "shared" + " " + (this.preserveOrder ? "" : "un")
@@ -3899,7 +4487,7 @@ public class Connection implements Runnable {
       return true;
     } finally {
       if (msg.containsRegionContentChange()) {
-        messagesReceived++;
+        messagesReceived.incrementAndGet();
       }
     }
   }
@@ -3908,6 +4496,10 @@ public class Connection implements Runnable {
     return this.conduit;
   }
 
+  protected StoppableReentrantLock getOutLock() {
+    return outLock;
+  }
+
   protected Socket getSocket() throws SocketException {
     // fix for bug 37286
     Socket result = this.socket;
@@ -3988,14 +4580,14 @@ public class Connection implements Runnable {
    * answers the number of messages received by this connection
    */
   protected long getMessagesReceived() {
-    return messagesReceived;
+    return messagesReceived.get();
   }
 
   /**
    * answers the number of messages sent on this connection
    */
   protected long getMessagesSent() {
-    return messagesSent;
+    return messagesSent.get();
   }
 
   public void acquireSendPermission() throws ConnectionException {
@@ -4008,23 +4600,28 @@ public class Connection implements Runnable {
       return;
     }
     boolean interrupted = false;
-    try {
-      for (;;) {
-        this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-        try {
-          this.senderSem.acquire();
-          break;
-        } catch (InterruptedException ex) {
-          interrupted = true;
+    final boolean useNIOStream = useNIOStream();
+    if (!useNIOStream) {
+      try {
+        for (;;) {
+          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+          try {
+            this.senderSem.acquire();
+            break;
+          } catch (InterruptedException ex) {
+            interrupted = true;
+          }
+        } // for
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
         }
-      } // for
-    } finally {
-      if (interrupted) {
-        Thread.currentThread().interrupt();
       }
     }
     if (!this.connected) {
-      this.senderSem.release();
+      if (!useNIOStream) {
+        this.senderSem.release();
+      }
       this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); // bug 37101
       throw new ConnectionException(
           "connection is closed");
@@ -4032,7 +4629,7 @@ public class Connection implements Runnable {
   }
 
   public void releaseSendPermission() {
-    if (isReaderThread()) {
+    if (isReaderThread() || getConduit().useNIOStream()) {
       return;
     }
     this.senderSem.release();
@@ -4073,4 +4670,12 @@ public class Connection implements Runnable {
     }
     return this.useNIO;
   }
+
+  boolean useNIOStream() {
+    return this.owner.getConduit().useNIOStream();
+  }
+
+  void incMessagesSent() {
+    messagesSent.incrementAndGet();
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 8bf3b3a..2638b39 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -32,11 +32,15 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.SystemFailure;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -58,7 +62,7 @@ import org.apache.geode.internal.net.SocketCloser;
  * @since GemFire 2.1
  */
 public class ConnectionTable {
-  private static final Logger logger = LogService.getLogger();
+  static final Logger logger = LogService.getLogger();
 
   /** warning when descriptor limit reached */
   private static boolean ulimitWarningIssued;
@@ -72,7 +76,8 @@ public class ConnectionTable {
    * Used for messages whose order must be preserved Only connections used for sending messages, and
    * receiving acks, will be put in this map.
    */
-  protected final Map orderedConnectionMap = new ConcurrentHashMap();
+  protected final ConcurrentMap<InternalDistributedMember, Object> orderedConnectionMap =
+      new ConcurrentHashMap<>();
 
   /**
    * ordered connections local to this thread. Note that accesses to the resulting map must be
@@ -100,13 +105,14 @@ public class ConnectionTable {
    * threadOrderedConnMap. The value is an ArrayList since we can have any number of connections
    * with the same key.
    */
-  private ConcurrentMap threadConnectionMap;
+  private ConcurrentMap<InternalDistributedMember, List> threadConnectionMap;
 
   /**
    * Used for all non-ordered messages. Only connections used for sending messages, and receiving
    * acks, will be put in this map.
    */
-  protected final Map unorderedConnectionMap = new ConcurrentHashMap();
+  protected final ConcurrentMap<InternalDistributedMember, Object> unorderedConnectionMap =
+      new ConcurrentHashMap();
 
   /**
    * Used for all accepted connections. These connections are read only; we never send messages,
@@ -144,13 +150,51 @@ public class ConnectionTable {
    *
    * TODO this assumes no more than one instance is created at a time?
    */
-  private static final AtomicReference lastInstance = new AtomicReference();
+  private static final AtomicReference<ConnectionTable> lastInstance = new AtomicReference<>();
 
   /**
    * A set of sockets that are in the process of being connected
    */
   private Map connectingSockets = new HashMap();
 
+  private static final class ConnKey {
+    final DistributedMember member;
+    final long startTime;
+    final long ackTimeout;
+    final long ackSATimeout;
+
+    ConnKey(DistributedMember member) {
+      this(member, 0, 0, 0);
+    }
+
+    ConnKey(DistributedMember member, long startTime, long ackTimeout,
+        long ackSATimeout) {
+      this.member = member;
+      this.startTime = startTime;
+      this.ackTimeout = ackTimeout;
+      this.ackSATimeout = ackSATimeout;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof ConnKey && this.member.equals(((ConnKey) obj).member);
+    }
+
+    @Override
+    public int hashCode() {
+      return this.member.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return this.member.toString();
+    }
+  }
+
+  /** The connection pool used for sending messages keyed by members. */
+  private final QueryKeyedObjectPool<ConnKey, Connection> connectionPool;
+
+
   /**
    * Cause calling thread to share communication resources with other threads.
    */
@@ -192,11 +236,69 @@ public class ConnectionTable {
         ? new SystemTimer(conduit.getDM().getSystem(), true) : null;
     this.threadOrderedConnMap = new ThreadLocal();
     this.threadConnMaps = new ArrayList();
-    this.threadConnectionMap = new ConcurrentHashMap();
     this.p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
     this.socketCloser = new SocketCloser();
+
+    if (!conduit.useNIOStream()) {
+      this.threadConnectionMap = new ConcurrentHashMap<>();
+      this.connectionPool = null;
+      return;
+    }
+    this.threadConnectionMap = null; // no thread-local maps with NIOStream
+    // The factory for creating new Connections for connectionPool.
+    KeyedPooledObjectFactory<ConnKey, Connection> connectionFactory;
+    connectionFactory = new KeyedPooledObjectFactory<ConnKey, Connection>() {
+      @Override
+      public PooledObject<Connection> makeObject(ConnKey key) throws Exception {
+        final TCPConduit owner = ConnectionTable.this.owner;
+        Connection newConn = Connection.createSender(owner.getMembershipManager(),
+            ConnectionTable.this, true /* preserveOrder */, key.member,
+            false /* shared */,
+            true /* pooled */, key.startTime, key.ackTimeout, key.ackSATimeout);
+        if (logger.isDebugEnabled()) {
+          logger.debug("ConnectionTable: created a pooled ordered connection: " +
+              newConn);
+        }
+        owner.getStats().incSenders(false/* shared */, true /* preserveOrder */);
+        return new DefaultPooledObject<>(newConn);
+      }
+
+      @Override
+      public void destroyObject(ConnKey key,
+          PooledObject<Connection> p) throws Exception {
+        closeCon("Communications are shutting down", p.getObject());
+      }
+
+      @Override
+      public boolean validateObject(ConnKey key, PooledObject<Connection> p) {
+        return !p.getObject().isClosing();
+      }
+
+      @Override
+      public void activateObject(ConnKey key,
+          PooledObject<Connection> p) throws Exception {}
+
+      @Override
+      public void passivateObject(ConnKey key,
+          PooledObject<Connection> p) throws Exception {}
+    };
+    this.connectionPool = new QueryKeyedObjectPool<>(connectionFactory,
+        conduit.getCancelCriterion());
+    final int numConnections = Math.max(ClusterDistributionManager.MAX_PR_THREADS_SET,
+        // SNAP-1682
+        Math.max(32, Math.min(64, ClusterDistributionManager.MAX_PR_THREADS)));
+    this.connectionPool.setMaxTotalPerKey(numConnections);
+    this.connectionPool.setMaxIdlePerKey(numConnections);
+    this.connectionPool.setTestOnBorrow(true);
+    this.connectionPool.setTestOnReturn(true);
+    final int connectionTimeout = owner.idleConnectionTimeout;
+    this.connectionPool.setTimeBetweenEvictionRunsMillis(connectionTimeout);
+    // default idle-timeout for a connection is 5 minutes
+    this.connectionPool.setMinEvictableIdleTimeMillis(connectionTimeout * 5);
   }
 
+
+
   private Executor createThreadPoolForIO(boolean conserveSockets) {
     if (conserveSockets) {
       return LoggingExecutors.newThreadOnEachExecute("SharedP2PReader");
@@ -287,7 +389,7 @@ public class ConnectionTable {
     Connection con = null;
     try {
       con = Connection.createSender(owner.getMembershipManager(), this, preserveOrder, id,
-          sharedResource, startTime, ackThreshold, ackSAThreshold);
+          sharedResource, (connectionPool != null), startTime, ackThreshold, ackSAThreshold);
       this.owner.getStats().incSenders(sharedResource, preserveOrder);
     } finally {
       // our connection failed to notify anyone waiting for our pending con
@@ -373,7 +475,8 @@ public class ConnectionTable {
       throws IOException, DistributedSystemDisconnectedException {
     Connection result = null;
 
-    final Map m = preserveOrder ? this.orderedConnectionMap : this.unorderedConnectionMap;
+    final ConcurrentMap<InternalDistributedMember, Object> m =
+        preserveOrder ? this.orderedConnectionMap : this.unorderedConnectionMap;
 
     PendingConnection pc = null; // new connection, if needed
     Object mEntry = null; // existing connection (if we don't create a new one)
@@ -389,7 +492,7 @@ public class ConnectionTable {
       }
       if (mEntry == null) {
         pc = new PendingConnection(preserveOrder, id);
-        m.put(id, pc);
+        m.put((InternalDistributedMember) id, pc);
       }
     } // synchronized
 
@@ -425,10 +528,17 @@ public class ConnectionTable {
     return result;
   }
 
+  private void checkClosing() {
+    owner.getCancelCriterion().checkCancelInProgress(null);
+    if (this.closed) {
+      throw new DistributedSystemDisconnectedException("Connection table is closed");
+    }
+  }
+
   /**
    * Must be looking for an ordered connection that this thread owns
    *
-   * @param id stub on which to create the connection
+   * @param id member on which to create the connection
    * @param startTime the ms clock start time for the operation
    * @param ackTimeout the ms ack-wait-threshold, or zero
    * @param ackSATimeout the ms ack-severe-alert-threshold, or zero
@@ -437,6 +547,19 @@ public class ConnectionTable {
    */
   Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout,
       long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
+    if (this.connectionPool != null) {
+      try {
+        return this.connectionPool.borrowObject(
+            new ConnKey(id, startTime, ackTimeout, ackSATimeout));
+      } catch (IOException | RuntimeException e) {
+        checkClosing();
+        throw e;
+      } catch (Exception e) {
+        checkClosing();
+        throw new IllegalStateException(e);
+      }
+    }
+
     Connection result = null;
 
     // Look for result in the thread local
@@ -474,7 +597,7 @@ public class ConnectionTable {
 
     // OK, we have to create a new connection.
     result = Connection.createSender(owner.getMembershipManager(), this, true /* preserveOrder */,
-        id, false /* shared */, startTime, ackTimeout, ackSATimeout);
+        id, false, false, startTime, ackTimeout, ackSATimeout);
     if (logger.isDebugEnabled()) {
       logger.debug("ConnectionTable: created an ordered connection: {}", result);
     }
@@ -498,7 +621,7 @@ public class ConnectionTable {
 
       // Since it's a concurrent map, we just try to put it and then
       // return whichever we got.
-      Object o = this.threadConnectionMap.putIfAbsent(id, al);
+      Object o = this.threadConnectionMap.putIfAbsent((InternalDistributedMember) id, al);
       if (o != null) {
         al = (ArrayList) o;
       }
@@ -518,6 +641,18 @@ public class ConnectionTable {
     return result;
   }
 
+  public final void releasePooledConnection(Connection conn) {
+    if (conn.isPooled() && this.connectionPool != null) {
+      try {
+        this.connectionPool.returnObject(new ConnKey(conn.getRemoteAddress()), conn);
+      } catch (RuntimeException re) {
+        // log any exception in returning to pool and move on
+        logger.warn("Unexpected exception in returning connection to pool " + re);
+      }
+    }
+  }
+
+
   /** schedule an idle-connection timeout task */
   private void scheduleIdleTimeout(Connection conn) {
     if (conn == null) {
@@ -701,6 +836,9 @@ public class ConnectionTable {
         m.clear();
       }
     }
+    if (connectionPool != null) {
+      connectionPool.close();
+    }
     this.socketCloser.close();
   }
 
@@ -784,12 +922,17 @@ public class ConnectionTable {
       }
     }
     if (!needsRemoval) {
-      ConcurrentMap cm = this.threadConnectionMap;
+      ConcurrentMap<InternalDistributedMember, List> cm = this.threadConnectionMap;
       if (cm != null) {
-        ArrayList al = (ArrayList) cm.get(memberID);
+        List al = cm.get(memberID);
         needsRemoval = al != null && al.size() > 0;
       }
     }
+    ConnKey connKey = null;
+    if (this.connectionPool != null) {
+      connKey = new ConnKey(memberID);
+      needsRemoval = this.connectionPool.getNumTotal(connKey) > 0;
+    }
 
     if (needsRemoval) {
       InternalDistributedMember remoteAddress = null;
@@ -809,9 +952,9 @@ public class ConnectionTable {
       }
 
       {
-        ConcurrentMap cm = this.threadConnectionMap;
+        ConcurrentMap<InternalDistributedMember, List> cm = this.threadConnectionMap;
         if (cm != null) {
-          ArrayList al = (ArrayList) cm.remove(memberID);
+          List al = cm.remove(memberID);
           if (al != null) {
             synchronized (al) {
               for (Iterator it = al.iterator(); it.hasNext();) {
@@ -827,6 +970,15 @@ public class ConnectionTable {
         }
       }
 
+      // close all connections for the given address
+      if (this.connectionPool != null) {
+        this.connectionPool.clear(connKey);
+        this.connectionPool.foreachObject(connKey, c -> {
+          closeCon(reason, c);
+          return true;
+        });
+      }
+
       // close any sockets that are in the process of being connected
       Set toRemove = new HashSet();
       synchronized (connectingSockets) {
@@ -889,7 +1041,7 @@ public class ConnectionTable {
   }
 
   /** check to see if there are still any receiver threads for the given end-point */
-  protected boolean hasReceiversFor(DistributedMember endPoint) {
+  boolean hasReceiversFor(DistributedMember endPoint) {
     synchronized (this.receivers) {
       for (Iterator it = receivers.iterator(); it.hasNext();) {
         Connection con = (Connection) it.next();
@@ -901,10 +1053,11 @@ public class ConnectionTable {
     return false;
   }
 
-  private static void removeFromThreadConMap(ConcurrentMap cm, DistributedMember stub,
+  private static void removeFromThreadConMap(ConcurrentMap<InternalDistributedMember, List> cm,
+      DistributedMember memberId,
       Connection c) {
     if (cm != null) {
-      ArrayList al = (ArrayList) cm.get(stub);
+      List al = cm.get(memberId);
       if (al != null) {
         synchronized (al) {
           al.remove(c);
@@ -914,9 +1067,10 @@ public class ConnectionTable {
   }
 
   protected void removeThreadConnection(DistributedMember stub, Connection c) {
-    /*
-     * if (this.closed) { return; }
-     */
+    // no thread-locals when using NIOStream connection pooling
+    if (connectionPool != null) {
+      return;
+    }
     removeFromThreadConMap(this.threadConnectionMap, stub, c);
     Map m = (Map) this.threadOrderedConnMap.get();
     if (m != null) {
@@ -954,9 +1108,7 @@ public class ConnectionTable {
    *
    * @see SystemFailure#loadEmergencyClasses()
    */
-  public static void loadEmergencyClasses() {
-    // don't go any further, Frodo!
-  }
+  public static void loadEmergencyClasses() {}
 
   /**
    * Clears lastInstance. Does not yet close underlying sockets, but probably not strictly
@@ -965,7 +1117,7 @@ public class ConnectionTable {
    * @see SystemFailure#emergencyClose()
    */
   public static void emergencyClose() {
-    ConnectionTable ct = (ConnectionTable) lastInstance.get();
+    ConnectionTable ct = lastInstance.get();
     if (ct == null) {
       return;
     }
@@ -973,6 +1125,9 @@ public class ConnectionTable {
   }
 
   public void removeAndCloseThreadOwnedSockets() {
+    if (connectionPool != null) {
+      return;
+    }
     Map m = (Map) this.threadOrderedConnMap.get();
     if (m != null) {
       // Static cleanup may intervene; we MUST synchronize.
@@ -991,7 +1146,7 @@ public class ConnectionTable {
   }
 
   public static void releaseThreadsSockets() {
-    ConnectionTable ct = (ConnectionTable) lastInstance.get();
+    ConnectionTable ct = lastInstance.get();
     if (ct == null) {
       return;
     }
@@ -1007,9 +1162,20 @@ public class ConnectionTable {
    */
   protected void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) {
 
-    ConcurrentMap cm = this.threadConnectionMap;
+    if (this.connectionPool != null) {
+      this.connectionPool.foreachObject(new ConnKey(member), conn -> {
+        if (!conn.isSharedResource() && conn.getOriginatedHere()
+            && conn.getPreserveOrder()) {
+          result.put(conn.getUniqueId(), conn.getMessagesSent());
+        }
+        return true;
+      });
+      return;
+    }
+
+    ConcurrentMap<InternalDistributedMember, List> cm = this.threadConnectionMap;
     if (cm != null) {
-      ArrayList al = (ArrayList) cm.get(member);
+      List al = cm.get(member);
       if (al != null) {
         synchronized (al) {
           al = new ArrayList(al);
@@ -1075,6 +1241,7 @@ public class ConnectionTable {
     return this.owner.getDM();
   }
 
+
   // public boolean isShuttingDown() {
   // return this.owner.isShuttingDown();
   // }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index 06b60a9..15f4900 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -18,6 +18,7 @@ import java.io.IOException;
 import java.io.NotSerializableException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
@@ -60,10 +61,15 @@ class DirectReplySender implements ReplySender {
       logger.trace(LogMarker.DM_VERBOSE, "Sending a direct reply {} to {}", msg,
           conn.getRemoteAddress());
     }
-    ArrayList<Connection> conns = new ArrayList<Connection>(1);
+    List<Connection> conns = new ArrayList<>(1);
     conns.add(conn);
-    MsgStreamer ms = (MsgStreamer) MsgStreamer.create(conns, msg, false, DUMMY_STATS);
+    BaseMsgStreamer ms = null;
     try {
+      if (conn.useNIOStream()) {
+        ms = MsgChannelStreamer.create(conns, msg, false, DUMMY_STATS);
+      } else {
+        ms = MsgStreamer.create(conns, msg, false, DUMMY_STATS);
+      }
       ms.writeMessage();
       ConnectExceptions ce = ms.getConnectExceptions();
       if (ce != null && !ce.getMembers().isEmpty()) {
@@ -81,7 +87,9 @@ class DirectReplySender implements ReplySender {
           "Unknown error serializing message", ex);
     } finally {
       try {
-        ms.close();
+        if (ms != null) {
+          ms.close();
+        }
       } catch (IOException e) {
         throw new InternalGemFireException("Unknown error serializing message", e);
       }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgChannelDestreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgChannelDestreamer.java
new file mode 100644
index 0000000..ba57e42
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgChannelDestreamer.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SocketChannel;
+
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.VersionedDataStream;
+import org.apache.geode.internal.shared.unsafe.ChannelBufferUnsafeDataInputStream;
+
+/**
+ * Direct streaming buffered reads from TCP channels.
+ */
+public final class MsgChannelDestreamer
+    extends ChannelBufferUnsafeDataInputStream
+    implements VersionedDataStream {
+
+  private final Connection conn;
+  private Version remoteVersion;
+  private long newMessageStart;
+  private boolean newMessage;
+
+  MsgChannelDestreamer(Connection conn, SocketChannel channel, int bufferSize) {
+    super(channel, bufferSize);
+    this.conn = conn;
+  }
+
+  void setRemoteVersion(Version version) {
+    this.remoteVersion = version;
+  }
+
+  void startNewMessage() {
+    this.newMessageStart = this.bytesRead;
+    this.newMessage = true;
+  }
+
+  int getLastMessageLength() {
+    return (int) (this.bytesRead - this.newMessageStart);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Version getVersion() {
+    return this.remoteVersion;
+  }
+
+  @Override
+  protected int refillBuffer(final ByteBuffer channelBuffer,
+      final int tryReadBytes, final String eofMessage) throws IOException {
+    final byte originalState;
+    synchronized (conn.stateLock) {
+      originalState = conn.connectionState;
+      if (originalState == Connection.STATE_IDLE) {
+        conn.connectionState = Connection.STATE_READING;
+      }
+    }
+    final int numBytes = super.refillBuffer(channelBuffer,
+        tryReadBytes, eofMessage);
+    if (originalState == Connection.STATE_IDLE) {
+      synchronized (conn.stateLock) {
+        conn.connectionState = Connection.STATE_IDLE;
+      }
+    }
+    return numBytes;
+  }
+
+  @Override
+  protected int readIntoBuffer(ByteBuffer buffer) throws IOException {
+    if (!conn.connected) {
+      throw new ClosedChannelException();
+    }
+    int numBytes = super.readIntoBuffer(buffer);
+    if (numBytes > 0) {
+      conn.getConduit().getStats().incMessagesBeingReceived(
+          this.newMessage, numBytes);
+      this.newMessage = false;
+    }
+    return numBytes;
+  }
+
+  @Override
+  public long getParkNanosMax() {
+    // increased timeout to enable detection of failed sockets
+    return 90000000000L;
+  }
+
+  @Override
+  protected int readIntoBufferNoWait(ByteBuffer buffer)
+      throws IOException {
+    return readIntoBuffer(buffer);
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgChannelStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgChannelStreamer.java
new file mode 100644
index 0000000..ca91bb3
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgChannelStreamer.java
@@ -0,0 +1,360 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.VersionedDataStream;
+import org.apache.geode.internal.shared.unsafe.ChannelBufferUnsafeDataOutputStream;
+import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
+
+/**
+ * A {@link BaseMsgStreamer} implementation that directly writes to the channels
+ * in a buffered manner actually streaming data without any chunking etc.
+ */
+public final class MsgChannelStreamer extends ChannelBufferUnsafeDataOutputStream
+    implements BaseMsgStreamer, VersionedDataStream {
+
+  private final DistributionMessage msg;
+  private final boolean directReply;
+  private final List<Connection> connections;
+  private final StoppableReentrantLock[] locks;
+  private final DMStats stats;
+  private final Version remoteVersion;
+
+  /**
+   * Any exceptions that happen during sends.
+   */
+  private ConnectExceptions ce;
+
+  private static final Comparator<Connection> idCompare = (c1, c2) -> {
+    final long id1 = c1.getUniqueId();
+    final long id2 = c2.getUniqueId();
+    return (id1 < id2) ? -1 : ((id1 == id2) ? 0 : 1);
+  };
+
+  private MsgChannelStreamer(List<Connection> connections,
+      Connection firstConn, DistributionMessage msg, boolean directReply,
+      DMStats stats, Version remoteVersion) throws SocketException {
+    // apply the VM.maxDirectMemory() limit for these on-the-fly streamers
+    super(firstConn.getSocket().getChannel(),
+        firstConn.getConduit().tcpBufferSize);
+    this.msg = msg;
+    this.directReply = directReply;
+    final int numConnections = connections.size();
+    // keep the connections in order sorted by localPort for consistent locking
+    // of shared connections
+    if (numConnections > 1) {
+      connections.sort(idCompare);
+    }
+    this.connections = connections;
+    this.locks = new StoppableReentrantLock[numConnections];
+    this.stats = stats;
+    this.remoteVersion = remoteVersion;
+  }
+
+  private static ArrayList<Connection> singleConnection(Connection conn) {
+    ArrayList<Connection> list = new ArrayList<>(1);
+    list.add(conn);
+    return list;
+  }
+
+  public static BaseMsgStreamer create(List<Connection> connections,
+      final DistributionMessage msg, final boolean directReply,
+      DMStats stats) throws SocketException {
+    // if all connections have the same version then use one MsgChannelStreamer
+    Connection firstConn = connections.get(0);
+    Version remoteVersion = firstConn.remoteVersion;
+    boolean hasSameVersions = true;
+    final int numConnections = connections.size();
+    // choose firstConn for locking using some criteria (e.g. min localPort)
+    for (int i = 1; i < numConnections; i++) {
+      Connection conn = connections.get(i);
+      if (conn.getSocket().getLocalPort() < firstConn.getSocket()
+          .getLocalPort()) {
+        firstConn = conn;
+      }
+      Version connVersion = conn.remoteVersion;
+      if (remoteVersion == null) {
+        if (connVersion == null || connVersion.equals(Version.CURRENT)) {
+          continue;
+        }
+      } else if (remoteVersion.equals(connVersion)) {
+        continue;
+      }
+      hasSameVersions = false;
+      break;
+    }
+    if (hasSameVersions) {
+      return new MsgChannelStreamer(connections, firstConn, msg,
+          directReply, stats, remoteVersion);
+    } else {
+      // create list of streamers (separate even if there are common versions
+      // to keep things simple as this case itself is a rare one)
+      ArrayList<BaseMsgStreamer> streamers = new ArrayList<>(numConnections);
+      for (int i = 0; i < numConnections; i++) {
+        Connection conn = connections.get(i);
+        streamers.add(new MsgChannelStreamer(singleConnection(conn), conn,
+            msg, directReply, stats, conn.remoteVersion));
+      }
+      return new MsgStreamerList(streamers);
+    }
+  }
+
+  @Override
+  public void reserveConnections(long startTime, long ackTimeout,
+      long ackSDTimeout) {
+    final List<Connection> connections = this.connections;
+    final int numConnections = connections.size();
+    for (int i = 0; i < numConnections; i++) {
+      Connection conn = connections.get(i);
+      conn.setInUse(true, startTime, ackTimeout, ackSDTimeout, connections);
+      if (ackTimeout > 0) {
+        conn.scheduleAckTimeouts();
+      }
+    }
+  }
+
+  @Override
+  public List<?> getSentConnections() {
+    return this.connections;
+  }
+
+  @Override
+  public ConnectExceptions getConnectExceptions() {
+    return this.ce;
+  }
+
+  private void acquireLocks() {
+    final List<Connection> connections = this.connections;
+    final int numConnections = connections.size();
+    for (int i = 0; i < numConnections; i++) {
+      final StoppableReentrantLock lock = connections.get(i).getOutLock();
+      lock.lock();
+      // assign only after successful acquisition of the lock
+      this.locks[i] = lock;
+    }
+  }
+
+  private void releaseLocks() {
+    final int numLocks = this.locks.length;
+    for (int i = 0; i < numLocks; i++) {
+      this.locks[i].unlock();
+      this.locks[i] = null;
+    }
+  }
+
+  @Override
+  public int writeMessage() throws IOException {
+    final long startNumBytes = this.bytesWritten;
+    final DMStats stats = this.stats;
+    acquireLocks();
+    try {
+      long start = stats.startMsgSerialization();
+      byte msgType = Connection.NORMAL_MSG_TYPE;
+      if (directReply) {
+        msgType |= Connection.DIRECT_ACK_BIT;
+      }
+      putByte(msgType);
+      InternalDataSerializer.writeDSFID(msg, this);
+      stats.endMsgSerialization(start);
+      flush();
+    } finally {
+      releaseLocks();
+    }
+    if (msg.containsRegionContentChange()) {
+      final List<Connection> connections = this.connections;
+      final int numConnections = connections.size();
+      for (int i = 0; i < numConnections; i++) {
+        connections.get(i).incMessagesSent();
+      }
+    }
+    return (int) (this.bytesWritten - startNumBytes);
+  }
+
+  @Override
+  public void close() throws IOException {
+    // flush should already done in normal course but for exception
+    // cases discard whatever is left in the buffer
+    this.addrPosition = this.addrLimit = 0;
+    releaseBuffer();
+  }
+
+  @Override
+  public void release() {
+    // just clear the buffer
+    this.buffer.clear();
+    resetBufferPositions();
+  }
+
+  @Override
+  public Version getVersion() {
+    return this.remoteVersion;
+  }
+
+  private int writeBufferToChannel(final ByteBuffer buffer,
+      final Connection conn, final SocketChannel channel,
+      final boolean flush) {
+    final DMStats stats = this.stats;
+    final boolean origSocketInUse = conn.socketInUse;
+    byte originalState = -1;
+    try {
+      if (!conn.connected) {
+        throw new ConnectionException("not connected to " + conn.getRemoteAddress());
+      }
+      synchronized (conn.stateLock) {
+        originalState = conn.connectionState;
+        conn.connectionState = Connection.STATE_SENDING;
+      }
+      conn.socketInUse = true;
+      if (!conn.isSharedResource()) {
+        stats.incTOSentMsg();
+      }
+
+      long startLock = stats.startSocketLock();
+      stats.endSocketLock(startLock);
+      long start = stats.startSocketWrite(true);
+      int numWritten = 0;
+      try {
+        if (flush) {
+          do {
+            numWritten += super.writeBuffer(buffer, channel);
+          } while (buffer.hasRemaining());
+        } else {
+          numWritten += super.writeBufferNoWait(buffer, channel);
+        }
+        return numWritten;
+      } finally {
+        stats.endSocketWrite(true, start, numWritten, 0);
+      }
+
+    } catch (IOException ex) {
+      if (this.ce == null)
+        this.ce = new ConnectExceptions();
+      this.ce.addFailure(conn.getRemoteAddress(), ex);
+      conn.closeForReconnect("closing due to unexpected IOException");
+      return -1;
+    } catch (ConnectionException ex) {
+      if (this.ce == null)
+        this.ce = new ConnectExceptions();
+      this.ce.addFailure(conn.getRemoteAddress(), ex);
+      conn.closeForReconnect("closing due to unexpected ConnetionException");
+      return -1;
+    } finally {
+      conn.accessed();
+      conn.socketInUse = origSocketInUse;
+      synchronized (conn.stateLock) {
+        conn.connectionState = originalState;
+      }
+    }
+  }
+
+  @Override
+  protected int writeBuffer(final ByteBuffer buffer,
+      WritableByteChannel channel) throws IOException {
+    final List<Connection> connections = this.connections;
+    final int numConnections = connections.size();
+    final int bufferPosition = buffer.position();
+    final int bufferSize = buffer.limit() - bufferPosition;
+    if (numConnections == 1) {
+      Connection conn = connections.get(0);
+      if (writeBufferToChannel(buffer, conn, conn.getSocket().getChannel(),
+          true) < 0) {
+        connections.remove(0);
+      }
+      return bufferSize;
+    }
+
+    ArrayList<Connection> pendingConnections = null;
+    IntArrayList pendingWritten = null;
+    int numWritten;
+    // write to all the channels
+    for (int i = numConnections - 1; i >= 0; i--) {
+      Connection conn = connections.get(i);
+      SocketChannel socketChannel = conn.getSocket().getChannel();
+      // rewind buffer to the start
+      buffer.position(bufferPosition);
+      if ((numWritten = writeBufferToChannel(buffer, conn,
+          socketChannel, false)) >= 0) {
+        if (numWritten < bufferSize) {
+          // put into pending list and go on to next channel
+          if (pendingConnections == null) {
+            int initSize = numConnections >>> 1;
+            pendingConnections = new ArrayList<>(initSize);
+            pendingWritten = new IntArrayList(initSize);
+          }
+          pendingConnections.add(conn);
+          pendingWritten.add(bufferPosition + numWritten);
+        }
+      } else {
+        connections.remove(i);
+      }
+    }
+    // now process the pending channels in a blocking manner
+    if (pendingConnections != null) {
+      processPendingChannels(buffer, pendingConnections, pendingWritten);
+    }
+
+    // position the buffer at the end indicating everything was written
+    buffer.position(buffer.limit());
+
+    this.bytesWritten += bufferSize;
+    return bufferSize;
+  }
+
+  private void processPendingChannels(ByteBuffer buffer,
+      ArrayList<Connection> pendingConnections, IntArrayList written)
+      throws IOException {
+    final int numChannels = pendingConnections.size();
+    for (int i = 0; i < numChannels; i++) {
+      final Connection conn = pendingConnections.get(i);
+      // move buffer to appropriate position
+      buffer.position(written.getInt(i));
+      if (writeBufferToChannel(buffer, conn, conn.getSocket().getChannel(),
+          true) < 0) {
+        this.connections.remove(conn);
+      }
+    }
+  }
+
+  @Override
+  public long getParkNanosMax() {
+    // never throw timeout here
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  protected int writeBufferNoWait(ByteBuffer buffer,
+      WritableByteChannel channel) throws IOException {
+    // all writes are blocking for messages
+    return writeBuffer(buffer, channel);
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
index 600d967..b374066 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
@@ -31,7 +31,7 @@ import org.apache.geode.internal.ObjToByteArraySerializer;
  *
  */
 public class MsgOutputStream extends OutputStream implements ObjToByteArraySerializer {
-  private final ByteBuffer buffer;
+  final ByteBuffer buffer;
 
   /**
    * The caller of this constructor is responsible for managing the allocated instance.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index a09d2f2..9598024 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -90,10 +90,7 @@ public class MsgStreamer extends OutputStream
   private long serStartTime;
   private final boolean directReply;
 
-  /**
-   * Called to free up resources used by this streamer after the streamer has produced its message.
-   */
-  protected void release() {
+  public void release() {
     MsgIdGenerator.release(this.msgId);
     this.buffer.clear();
     this.overflowBuf = null;
@@ -171,8 +168,8 @@ public class MsgStreamer extends OutputStream
       } else {
         // if there is a versioned stream created, then split remaining
         // connections to unversioned stream
-        final ArrayList<MsgStreamer> streamers =
-            new ArrayList<MsgStreamer>(versionToConnMap.size() + 1);
+        final List<BaseMsgStreamer> streamers =
+            new ArrayList<>(versionToConnMap.size() + 1);
         final int sendBufferSize = firstCon.getSendBufferSize();
         if (numCons > numVersioned) {
           // allocating list of numCons size so that as the result of
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
index 5a51af0..b17829c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
@@ -34,11 +34,11 @@ public class MsgStreamerList implements BaseMsgStreamer {
   private static final Logger logger = LogService.getLogger();
 
   /**
-   * List of {@link MsgStreamer}s encapsulated by this MsgStreamerList.
+   * List of {@link BaseMsgStreamer}s encapsulated by this MsgStreamerList.
    */
-  private final List<MsgStreamer> streamers;
+  private final List<BaseMsgStreamer> streamers;
 
-  MsgStreamerList(List<MsgStreamer> streamers) {
+  MsgStreamerList(List<BaseMsgStreamer> streamers) {
     this.streamers = streamers;
   }
 
@@ -47,7 +47,7 @@ public class MsgStreamerList implements BaseMsgStreamer {
    */
   @Override
   public void reserveConnections(long startTime, long ackTimeout, long ackSDTimeout) {
-    for (MsgStreamer streamer : this.streamers) {
+    for (BaseMsgStreamer streamer : this.streamers) {
       streamer.reserveConnections(startTime, ackTimeout, ackSDTimeout);
     }
   }
@@ -60,7 +60,7 @@ public class MsgStreamerList implements BaseMsgStreamer {
     int result = 0;
     RuntimeException ex = null;
     IOException ioex = null;
-    for (MsgStreamer streamer : this.streamers) {
+    for (BaseMsgStreamer streamer : this.streamers) {
       if (ex != null) {
         streamer.release();
         // TODO: shouldn't we call continue here?
@@ -100,7 +100,7 @@ public class MsgStreamerList implements BaseMsgStreamer {
   @Override
   public List<?> getSentConnections() {
     List<Object> sentCons = Collections.emptyList();
-    for (MsgStreamer streamer : this.streamers) {
+    for (BaseMsgStreamer streamer : this.streamers) {
       if (sentCons.size() == 0) {
         sentCons = (List<Object>) streamer.getSentConnections();
       } else {
@@ -116,7 +116,7 @@ public class MsgStreamerList implements BaseMsgStreamer {
   @Override
   public ConnectExceptions getConnectExceptions() {
     ConnectExceptions ce = null;
-    for (MsgStreamer streamer : this.streamers) {
+    for (BaseMsgStreamer streamer : this.streamers) {
       if (ce == null) {
         ce = streamer.getConnectExceptions();
       } else {
@@ -141,7 +141,7 @@ public class MsgStreamerList implements BaseMsgStreamer {
   public void close() throws IOException {
     // only throw the first exception and try to close all
     IOException ex = null;
-    for (MsgStreamer m : this.streamers) {
+    for (BaseMsgStreamer m : this.streamers) {
       try {
         m.close();
       } catch (IOException e) {
@@ -157,4 +157,11 @@ public class MsgStreamerList implements BaseMsgStreamer {
       throw ex;
     }
   }
+
+  @Override
+  public void release() {
+    for (BaseMsgStreamer m : this.streamers) {
+      m.release();
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/QueryKeyedObjectPool.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/QueryKeyedObjectPool.java
new file mode 100644
index 0000000..192a33e
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/QueryKeyedObjectPool.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright (c) 2018 SnappyData, Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+package org.apache.geode.internal.tcp;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Predicate;
+
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+
+import org.apache.geode.CancelCriterion;
+
+/**
+ * A configurable <code>KeyedObjectPool</code> implementation.
+ * <p>
+ * This is extends common-pool2 GenericKeyedObjectPool and differs only
+ * in additional methods to query objects by key.
+ *
+ * @param <K> The type of keys maintained by this pool.
+ * @param <T> Type of element pooled in this pool.
+ *
+ * @see GenericObjectPool
+ * @see GenericKeyedObjectPool
+ */
+public final class QueryKeyedObjectPool<K, T>
+    extends GenericKeyedObjectPool<K, T> {
+
+  /*
+   * The "poolMap" field from the parent class obtained using reflection.
+   */
+  private final ConcurrentHashMap<?, ?> subPoolMap;
+
+  /**
+   * The "keyLock" field from the parent class obtained using reflection.
+   */
+  private final ReadWriteLock globalKeyLock;
+
+  /**
+   * Check for cancellation with this in borrowObject.
+   */
+  private final CancelCriterion cancelCriterion;
+
+  /**
+   * The "allObjects" field from ObjectDequeue class obtained using reflection.
+   */
+  private volatile Field allObjectsField;
+
+  /**
+   * Create a new <code>QueryKeyedObjectPool</code> using defaults from
+   * {@link GenericKeyedObjectPoolConfig}.
+   *
+   * @param factory the factory to be used to create entries
+   * @param cancelCriterion to check for cancellation of borrowObject
+   */
+  public QueryKeyedObjectPool(KeyedPooledObjectFactory<K, T> factory,
+      CancelCriterion cancelCriterion) {
+    this(factory, new GenericKeyedObjectPoolConfig(), cancelCriterion);
+  }
+
+  /**
+   * Create a new <code>QueryKeyedObjectPool</code> using a specific
+   * configuration.
+   *
+   * @param factory the factory to be used to create entries
+   * @param config The configuration to use for this pool instance.
+   *        The configuration is used by value. Subsequent
+   *        changes to the configuration object will not be
+   *        reflected in the pool.
+   * @param cancelCriterion to check for cancellation of borrowObject
+   */
+  @SuppressWarnings("WeakerAccess")
+  public QueryKeyedObjectPool(KeyedPooledObjectFactory<K, T> factory,
+      GenericKeyedObjectPoolConfig config, CancelCriterion cancelCriterion) {
+    super(factory, config);
+
+    try {
+      final Class<?> superClass = getClass().getSuperclass();
+      Field f = superClass.getDeclaredField("poolMap");
+      f.setAccessible(true);
+      this.subPoolMap = (ConcurrentHashMap<?, ?>) f.get(this);
+
+      f = superClass.getDeclaredField("keyLock");
+      f.setAccessible(true);
+      this.globalKeyLock = (ReadWriteLock) f.get(this);
+
+      this.cancelCriterion = cancelCriterion;
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          "Failed to initialize QueryKeyedObjectPool", e);
+    }
+  }
+
+  private Map<?, ?> getAllObjectsFromDeque(Object deque) {
+    try {
+      Field allObjects = this.allObjectsField;
+      if (allObjects == null) {
+        synchronized (this) {
+          allObjects = this.allObjectsField;
+          if (allObjects == null) {
+            Field f = deque.getClass().getDeclaredField("allObjects");
+            f.setAccessible(true);
+            this.allObjectsField = allObjects = f;
+          }
+        }
+      }
+      return (Map<?, ?>) allObjects.get(deque);
+    } catch (Exception e) {
+      throw new IllegalStateException("Failed to read pool queue", e);
+    }
+  }
+
+  @Override
+  public T borrowObject(K key, long borrowMaxWaitMillis) throws Exception {
+    // An issue with GenericKeyedObjectPool: it increments the createdCount
+    // first, then checks if it exceeds maxTotalPerKey. It can happen due to
+    // concurrency then that one of the threads goes into a wait due to that
+    // even though connections have not actually been created yet. The creates
+    // could all fail due to remote node going down, but then those threads
+    // that started to wait will still keep waiting (SNAP-1508, SNAP-1509).
+    // Hence this override does it with smaller timeout in a loop.
+    if (borrowMaxWaitMillis < 0) {
+      borrowMaxWaitMillis = Integer.MAX_VALUE;
+    }
+    // retries at max 1 second intervals
+    final long loopWaitMillis = Math.min(1000L, borrowMaxWaitMillis);
+    long startTime = 0L;
+    while (true) {
+      try {
+        return super.borrowObject(key, loopWaitMillis);
+      } catch (NoSuchElementException nse) {
+        // check for cancellation
+        this.cancelCriterion.checkCancelInProgress(nse);
+        // retry till borrowMaxWaitMillis
+        long currentTime = System.currentTimeMillis();
+        if (startTime == 0L) {
+          // first time assume it failed after loopWaitMillis to
+          // optimistically avoid a currentTimeMillis call in first loop
+          startTime = currentTime - loopWaitMillis;
+        }
+        if ((currentTime - startTime) >= borrowMaxWaitMillis) {
+          throw nse;
+        }
+      }
+    }
+  }
+
+  public int getNumTotal(K key) {
+    final Object objectDeque = subPoolMap.get(key);
+    if (objectDeque != null) {
+      return getAllObjectsFromDeque(objectDeque).size();
+    } else {
+      return 0;
+    }
+  }
+
+  /**
+   * Apply the given function on each object for a key both idle (waiting
+   * to be borrowed) and active (currently borrowed).
+   */
+  public void foreachObject(K key, Predicate<T> predicate) {
+    Lock readLock = globalKeyLock.readLock();
+    readLock.lock();
+    try {
+      Object queue = subPoolMap.get(key);
+      if (queue != null) {
+        for (Object p : getAllObjectsFromDeque(queue).values()) {
+          @SuppressWarnings("unchecked")
+          final PooledObject<T> pooledObject = (PooledObject<T>) p;
+          if (!predicate.test(pooledObject.getObject())) {
+            break;
+          }
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 0057847..f69b742 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -15,7 +15,6 @@
 package org.apache.geode.internal.tcp;
 
 import java.io.IOException;
-import java.net.Inet6Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -117,6 +116,11 @@ public class TCPConduit implements Runnable {
   private static boolean USE_NIO;
 
   /**
+   * Use the new streaming using NIO disabling chunking.
+   */
+  private static boolean USE_NIO_STREAM;
+
+  /**
    * use direct ByteBuffers instead of heap ByteBuffers for NIO operations
    */
   static boolean useDirectBuffers;
@@ -150,6 +154,7 @@ public class TCPConduit implements Runnable {
     useSSL = Boolean.getBoolean("p2p.useSSL");
     // only use nio if not SSL
     USE_NIO = !useSSL && !Boolean.getBoolean("p2p.oldIO");
+    USE_NIO_STREAM = USE_NIO && !Boolean.getBoolean("p2p.disableNIOStream");
     // only use direct buffers if we are using nio
     useDirectBuffers = USE_NIO && !Boolean.getBoolean("p2p.nodirectBuffers");
     LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000).intValue();
@@ -279,26 +284,6 @@ public class TCPConduit implements Runnable {
         SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
 
     this.useNIO = USE_NIO;
-    if (this.useNIO) {
-      InetAddress addr = address;
-      if (addr == null) {
-        try {
-          addr = SocketCreator.getLocalHost();
-        } catch (java.net.UnknownHostException e) {
-          throw new ConnectionException("Unable to resolve localHost address", e);
-        }
-      }
-      // JDK bug 6230761 - NIO can't be used with IPv6 on Windows
-      if (addr instanceof Inet6Address) {
-        String os = System.getProperty("os.name");
-        if (os != null) {
-          if (os.indexOf("Windows") != -1) {
-            this.useNIO = false;
-          }
-        }
-      }
-    }
-
     startAcceptor();
   }
 
@@ -841,6 +826,16 @@ public class TCPConduit implements Runnable {
   }
 
   /**
+   * Return true if NIO classes with buffered streaming is being used
+   * for the server socket. This is different from "useNIO" in that it
+   * streams messages directly to sockets as well as reads in streaming
+   * manner using buffered DataOutput/Input streams rather than chunking.
+   */
+  public boolean useNIOStream() {
+    return USE_NIO_STREAM;
+  }
+
+  /**
    * gets the address of this conduit's ServerSocket endpoint
    */
   public InetSocketAddress getSocketId() {
@@ -894,6 +889,7 @@ public class TCPConduit implements Runnable {
     for (;;) {
       stopper.checkCancelInProgress(null);
       boolean interrupted = Thread.interrupted();
+      boolean returningCon = false;
       try {
         // If this is the second time through this loop, we had
         // problems. Tear down the connection so that it gets
@@ -944,7 +940,10 @@ public class TCPConduit implements Runnable {
               conn.closeForReconnect("closing before retrying");
             } catch (CancelException ex) {
               throw ex;
-            } catch (Exception ex) {
+            } catch (Exception ignored) {
+            } finally {
+              releasePooledConnection(conn);
+              conn = null;
             }
           }
         } // not first time in loop
@@ -970,10 +969,14 @@ public class TCPConduit implements Runnable {
                 logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn,
                     conn.hashCode());
               }
-              conn.closeOldConnection("closing old connection");
-              conn = null;
-              retryForOldConnection = true;
-              debugRetry = true;
+              try {
+                conn.closeOldConnection("closing old connection");
+              } finally {
+                releasePooledConnection(conn);
+                conn = null;
+                retryForOldConnection = true;
+                debugRetry = true;
+              }
             }
           } while (retryForOldConnection);
           if (debugRetry && logger.isDebugEnabled()) {
@@ -1064,8 +1067,13 @@ public class TCPConduit implements Runnable {
             logger.trace("new connection is {} memberAddress={}", conn, memberAddress);
           }
         }
+        returningCon = true;
         return conn;
       } finally {
+        // need to return unused connections to pool
+        if (!returningCon && conn != null) {
+          releasePooledConnection(conn);
+        }
         if (interrupted) {
           Thread.currentThread().interrupt();
         }
@@ -1073,6 +1081,13 @@ public class TCPConduit implements Runnable {
     } // for(;;)
   }
 
+  public final void releasePooledConnection(Connection conn) {
+    final ConnectionTable conTable = this.conTable;
+    if (conTable != null) {
+      conTable.releasePooledConnection(conn);
+    }
+  }
+
   @Override
   public String toString() {
     return "" + id;
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/unsafe/UnsafeWrapper.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/unsafe/UnsafeWrapper.java
index b548938..153eb5b 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/unsafe/UnsafeWrapper.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/unsafe/UnsafeWrapper.java
@@ -195,4 +195,9 @@ public class UnsafeWrapper {
   public void setMemory(long addr, long size, byte v) {
     this.unsafe.setMemory(addr, size, v);
   }
+
+  public void setMemory(Object obj, long offset, long size, byte value) {
+    this.unsafe.setMemory(obj, offset, size, value);
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
index 075b252..6163b94 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
@@ -52,6 +52,7 @@ public class ConnectionJUnitTest {
     DistributionManager distMgr = mock(DistributionManager.class);
     MembershipManager membership = mock(MembershipManager.class);
     TCPConduit conduit = mock(TCPConduit.class);
+    when(conduit.useNIOStream()).thenReturn(false);
 
     // mock the connection table and conduit
 
diff --git a/geode-core/src/test/resources/expected-pom.xml b/geode-core/src/test/resources/expected-pom.xml
index 9b62224..5cd5bf3 100644
--- a/geode-core/src/test/resources/expected-pom.xml
+++ b/geode-core/src/test/resources/expected-pom.xml
@@ -66,6 +66,12 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-pool2</artifactId>
+      <version>2.4.2</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
       <version>2.6</version>
diff --git a/gradle/dependency-versions.properties b/gradle/dependency-versions.properties
index 0ad0625..3578d4f 100644
--- a/gradle/dependency-versions.properties
+++ b/gradle/dependency-versions.properties
@@ -32,6 +32,7 @@ commons-io.version = 2.6
 commons-lang3.version = 3.8.1
 commons-logging.version = 1.2
 commons-modeler.version = 2.0.1
+commons-pool2.version = 2.4.2
 commons-validator.version = 1.6
 HikariCP.version = 3.2.0
 derby.version = 10.14.2.0