You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/04/14 22:30:27 UTC

[17/18] incubator-geode git commit: GEODE-679 Explore removing SocketIOWithTimeout and other classes related to FD soft leak

GEODE-679 Explore removing SocketIOWithTimeout and other classes related to FD soft leak

SocketUtils was added to avoid a file descriptor "leak" caused by the use of NIO socket
channel selectors.  This was spurred by a HADOOP JIRA ticket that claimed that
sun.misc.Cleaners were being used to close selectors (see
https://issues.apache.org/jira/browse/HADOOP-4346).  I have verified that Cleaners are
no longer used to close selectors and that SocketUtils is not making any difference in
the number of file descriptors created in servers using NIO selectors using the (recently
deleted) FDDUnitTest with modifications to force clients to close their connections to
the server.

So, this change-set removes SocketUtils and associated classes, reverting all modifications
made to introduce it in GemFire 8.0 to use direct method invokations on sockets.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7b3c8cb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7b3c8cb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7b3c8cb4

Branch: refs/heads/feature/GEODE-1162
Commit: 7b3c8cb42abc19e62829aa8662a37415afc50b75
Parents: 5c89fab
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Thu Apr 14 09:31:13 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Thu Apr 14 09:34:07 2016 -0700

----------------------------------------------------------------------
 .../client/internal/CloseConnectionOp.java      |   0
 .../cache/client/internal/ConnectionImpl.java   |   8 +-
 .../gemfire/internal/SocketCreator.java         |  17 +-
 .../gemfire/internal/SocketIOWithTimeout.java   | 491 -------------------
 .../gemfire/internal/SocketInputStream.java     | 181 -------
 .../gemfire/internal/SocketInputWrapper.java    |  93 ----
 .../gemfire/internal/SocketOutputStream.java    | 174 -------
 .../gemstone/gemfire/internal/SocketUtils.java  | 220 ---------
 .../internal/cache/GemFireCacheImpl.java        |  14 +-
 .../cache/tier/sockets/AcceptorImpl.java        |   3 +-
 .../cache/tier/sockets/CacheClientNotifier.java | 105 +---
 .../cache/tier/sockets/CacheClientUpdater.java  |   5 +-
 .../internal/cache/tier/sockets/HandShake.java  |  13 +-
 .../internal/cache/tier/sockets/Message.java    |   3 +-
 .../cache/tier/sockets/ServerConnection.java    |   5 +-
 .../tier/sockets/ServerHandShakeProcessor.java  |  26 +-
 .../tier/sockets/command/CloseConnection.java   |   0
 .../gemfire/internal/shared/NativeCalls.java    |   2 +-
 .../gemfire/internal/tcp/Connection.java        |   5 +-
 19 files changed, 52 insertions(+), 1313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
old mode 100644
new mode 100755
index 5016d67..c20b318
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
@@ -35,14 +35,10 @@ import com.gemstone.gemfire.cache.client.internal.ExecuteFunctionOp.ExecuteFunct
 import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
 import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
 import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.distributed.internal.ServerLocation;
 import com.gemstone.gemfire.internal.SocketCreator;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.tier.Acceptor;
 import com.gemstone.gemfire.internal.cache.tier.sockets.HandShake;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
@@ -109,8 +105,8 @@ public class ConnectionImpl implements Connection {
     verifySocketBufferSize(socketBufferSize, theSocket.getSendBufferSize(), "send");
     
     theSocket.setSoTimeout(handShakeTimeout);
-    out = SocketUtils.getOutputStream(theSocket);//theSocket.getOutputStream();
-    in = SocketUtils.getInputStream(theSocket);//theSocket.getInputStream();
+    out = theSocket.getOutputStream();
+    in = theSocket.getInputStream();
     this.status = handShake.greet(this, location, communicationMode);
     commBuffer = ServerConnection.allocateCommBuffer(socketBufferSize, theSocket);
     if (sender != null) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
index acdfbc7..9b7e5d7 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
@@ -19,8 +19,6 @@ package com.gemstone.gemfire.internal;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.net.BindException;
 import java.net.Inet4Address;
 import java.net.Inet6Address;
@@ -995,13 +993,7 @@ public class SocketCreator {
           if (optionalWatcher != null) {
             optionalWatcher.beforeConnect(socket);
           }
-          if (timeout > 0) {
-            SocketUtils.connect(socket, sockaddr, timeout);
-          }
-          else {
-            SocketUtils.connect(socket, sockaddr, 0);
-
-          }
+          socket.connect(sockaddr, Math.max(timeout,0));
           configureClientSSLSocket( socket );
           return socket;
         } 
@@ -1024,12 +1016,7 @@ public class SocketCreator {
             if (optionalWatcher != null) {
               optionalWatcher.beforeConnect(socket);
             }
-          if (timeout > 0) {
-            SocketUtils.connect(socket, sockaddr, timeout);
-            }
-            else {
-              SocketUtils.connect(socket, sockaddr, 0);
-            }
+            socket.connect(sockaddr, Math.max(timeout,0));
           }
           return socket;
         }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketIOWithTimeout.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketIOWithTimeout.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketIOWithTimeout.java
deleted file mode 100644
index 9fbead8..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketIOWithTimeout.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Pivotal Additions:
- * Using a ConcurrentHashMap with a LinkedBlockingDeque instead
- * Added a cleanup thread
- * Modifications to trimIdleSelector
- * 
- */
-
-package com.gemstone.gemfire.internal;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * This supports input and output streams for a socket channels. 
- * These streams can have a timeout.
- */
-public abstract class SocketIOWithTimeout {
-  
-  private static final Logger logger = LogService.getLogger();
-  
-  private SelectableChannel channel;
-  private long timeout;
-  private boolean closed = false;
-  
-  /*Pivotal Change to final*/
-  private static final SelectorPool selector = new SelectorPool();
-  
-  /*Pivotal Addition*/
-  //in seconds, the cleanup thread will first mark at the SELECTOR_TIME_OUT interval and
-  //close any selectors that have been marked on the next pass through
-  //This means that it will take approximately twice as long as SELECTOR_TIME_OUT to actually close
-  //an unused selector
-  private static final long SELECTOR_TIME_OUT = Long.getLong("gemfire.SELECTOR_TIME_OUT", 120L); 
-  
-  private static ScheduledExecutorService cleanUpExecutor = startSelectorCleanUpThread();
-
-  /*End Pivotal Addition*/
-  
-  /* A timeout value of 0 implies wait for ever. 
-   * We should have a value of timeout that implies zero wait.. i.e. 
-   * read or write returns immediately.
-   * 
-   * This will set channel to non-blocking.
-   */
-  SocketIOWithTimeout(SelectableChannel channel, long timeout) 
-                                                 throws IOException {
-    checkChannelValidity(channel);
-    
-    this.channel = channel;
-    this.timeout = timeout;
-    // Set non-blocking
-    channel.configureBlocking(false);
-  }
-  
-  void close() {
-    closed = true;
-  }
-
-  boolean isOpen() {
-    return !closed && channel.isOpen();
-  }
-
-  SelectableChannel getChannel() {
-    return channel;
-  }
-  
-  /** 
-   * Utility function to check if channel is ok.
-   * Mainly to throw IOException instead of runtime exception
-   * in case of mismatch. This mismatch can occur for many runtime
-   * reasons.
-   */
-  static void checkChannelValidity(Object channel) throws IOException {
-    if (channel == null) {
-      /* Most common reason is that original socket does not have a channel.
-       * So making this an IOException rather than a RuntimeException.
-       */
-      throw new IOException("Channel is null. Check " +
-                            "how the channel or socket is created.");
-    }
-    
-    if (!(channel instanceof SelectableChannel)) {
-      throw new IOException("Channel should be a SelectableChannel");
-    }    
-  }
-  
-  /**
-   * Performs actual IO operations. This is not expected to block.
-   *  
-   * @param buf
-   * @return number of bytes (or some equivalent). 0 implies underlying
-   *         channel is drained completely. We will wait if more IO is 
-   *         required.
-   * @throws IOException
-   */
-  abstract int performIO(ByteBuffer buf) throws IOException;  
-  
-  /**
-   * Performs one IO and returns number of bytes read or written.
-   * It waits up to the specified timeout. If the channel is 
-   * not read before the timeout, SocketTimeoutException is thrown.
-   * 
-   * @param buf buffer for IO
-   * @param ops Selection Ops used for waiting. Suggested values: 
-   *        SelectionKey.OP_READ while reading and SelectionKey.OP_WRITE while
-   *        writing. 
-   *        
-   * @return number of bytes read or written. negative implies end of stream.
-   * @throws IOException
-   */
-  int doIO(ByteBuffer buf, int ops) throws IOException {
-    
-    /* For now only one thread is allowed. If user want to read or write
-     * from multiple threads, multiple streams could be created. In that
-     * case multiple threads work as well as underlying channel supports it.
-     */
-    if (!buf.hasRemaining()) {
-      throw new IllegalArgumentException("Buffer has no data left.");
-      //or should we just return 0?
-    }
-
-    while (buf.hasRemaining()) {
-      if (closed) {
-        return -1;
-      }
-
-      try {
-        int n = performIO(buf);
-        if (n != 0) {
-          // successful io or an error.
-          return n;
-        }
-      } catch (IOException e) {
-        if (!channel.isOpen()) {
-          closed = true;
-        }
-        throw e;
-      }
-
-      //now wait for socket to be ready.
-      int count = 0;
-      try {
-        count = selector.select(channel, ops, timeout);  
-      } catch (IOException e) { //unexpected IOException.
-        closed = true;
-        throw e;
-      } 
-
-      if (count == 0) {
-        throw new SocketTimeoutException(timeoutExceptionString(channel,
-                                                                timeout, ops));
-      }
-      // otherwise the socket should be ready for io.
-    }
-    
-    return 0; // does not reach here.
-  }
-  
-  /**
-   * The contract is similar to {@link SocketChannel#connect(SocketAddress)} 
-   * with a timeout.
-   * 
-   * @see SocketChannel#connect(SocketAddress)
-   * 
-   * @param channel - this should be a {@link SelectableChannel}
-   * @param endpoint
-   * @throws IOException
-   */
-  static void connect(SocketChannel channel, 
-                      SocketAddress endpoint, int timeout) throws IOException {
-    
-    boolean blockingOn = channel.isBlocking();
-    if (blockingOn) {
-      channel.configureBlocking(false);
-    }
-    
-    try { 
-      if (channel.connect(endpoint)) {
-        return;
-      }
-
-      long timeoutLeft = timeout;
-      long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0;
-      
-      while (true) {
-        // we might have to call finishConnect() more than once
-        // for some channels (with user level protocols)
-        
-        int ret = selector.select((SelectableChannel)channel, 
-                                  SelectionKey.OP_CONNECT, timeoutLeft);
-        
-        if (ret > 0 && channel.finishConnect()) {
-          return;
-        }
-        
-        if (ret == 0 ||
-            (timeout > 0 &&  
-              (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
-          throw new SocketTimeoutException(
-                    timeoutExceptionString(channel, timeout, 
-                                           SelectionKey.OP_CONNECT));
-        }
-      }
-    } catch (IOException e) {
-      // javadoc for SocketChannel.connect() says channel should be closed.
-      try {
-        channel.close();
-      } catch (IOException ignored) {}
-      throw e;
-    } finally {
-      if (blockingOn && channel.isOpen()) {
-        channel.configureBlocking(true);
-      }
-    }
-  }
-  
-  /**
-   * This is similar to doIO(ByteBuffer, int)} except that it
-   * does not perform any I/O. It just waits for the channel to be ready
-   * for I/O as specified in ops.
-   * 
-   * @param ops Selection Ops used for waiting
-   * 
-   * @throws SocketTimeoutException 
-   *         if select on the channel times out.
-   * @throws IOException
-   *         if any other I/O error occurs. 
-   */
-  void waitForIO(int ops) throws IOException {
-    
-    if (selector.select(channel, ops, timeout) == 0) {
-      throw new SocketTimeoutException(timeoutExceptionString(channel, timeout,
-                                                              ops)); 
-    }
-  }
-
-  public void setTimeout(long timeoutMs) {
-    this.timeout = timeoutMs;
-  }
-    
-  private static String timeoutExceptionString(SelectableChannel channel,
-                                               long timeout, int ops) {
-    
-    String waitingFor;
-    switch(ops) {
-    
-    case SelectionKey.OP_READ :
-      waitingFor = "read"; break;
-      
-    case SelectionKey.OP_WRITE :
-      waitingFor = "write"; break;      
-      
-    case SelectionKey.OP_CONNECT :
-      waitingFor = "connect"; break;
-      
-    default :
-      waitingFor = "" + ops;  
-    }
-    
-    return timeout + " millis timeout while " +
-           "waiting for channel to be ready for " + 
-           waitingFor + ". ch : " + channel;    
-  }
-  
-  public static ScheduledExecutorService startSelectorCleanUpThread() {
-    ScheduledExecutorService cleanUpExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
-      public Thread newThread(final Runnable r) {
-        Thread result = new Thread(r, "selector-pool-cleanup");
-        result.setDaemon(true);
-        return result;
-      }
-    });
-    cleanUpExecutor.scheduleAtFixedRate(new Runnable(){
-        public void run() {
-          selector.trimIdleSelectors();
-        }
-      }, SELECTOR_TIME_OUT, SELECTOR_TIME_OUT, TimeUnit.SECONDS);
-    return cleanUpExecutor;
-  }
-  
-  public static void stopSelectorCleanUpThread() {
-    if (cleanUpExecutor != null) {
-      cleanUpExecutor.shutdownNow();
-    }
-  }
-  /**
-   * This maintains a pool of selectors. These selectors are closed
-   * once they are idle (unused) for a few seconds.
-   */
-  private static class SelectorPool {
-    
-    private static class SelectorInfo {
-      Selector              selector;
-      /**Pivotal Addition**/
-      LinkedBlockingDeque<SelectorInfo> deque;
-      volatile boolean markForClean = false;
-      /**End Pivotal Addition**/
- 
-      void close() {
-        if (selector != null) {
-          try {
-            selector.close();
-          } catch (IOException e) {
-            logger.warn("Unexpected exception while closing selector : ", e);
-          }
-        }
-      }    
-    }
-    
-    private final ConcurrentHashMap<SelectorProvider, LinkedBlockingDeque<SelectorInfo>> providerList = new ConcurrentHashMap<SelectorProvider, LinkedBlockingDeque<SelectorInfo>>();
-    
-    /**
-     * Waits on the channel with the given timeout using one of the 
-     * cached selectors. It also removes any cached selectors that are
-     * idle for a few seconds.
-     * 
-     * @param channel
-     * @param ops
-     * @param timeout
-     * @throws IOException
-     */
-    int select(SelectableChannel channel, int ops, long timeout) 
-                                                   throws IOException {
-     
-      SelectorInfo info = get(channel);
-      
-      SelectionKey key = null;
-      int ret = 0;
-      
-      try {
-        while (true) {
-          long start = (timeout == 0) ? 0 : System.currentTimeMillis();
-
-          key = channel.register(info.selector, ops);
-          ret = info.selector.select(timeout);
-          
-          if (ret != 0) {
-            return ret;
-          }
-          
-          /* Sometimes select() returns 0 much before timeout for 
-           * unknown reasons. So select again if required.
-           */
-          if (timeout > 0) {
-            timeout -= System.currentTimeMillis() - start;
-            if (timeout <= 0) {
-              return 0;
-            }
-          }
-          
-          if (Thread.currentThread().isInterrupted()) {
-            throw new InterruptedIOException("Interruped while waiting for " +
-                                             "IO on channel " + channel +
-                                             ". " + timeout + 
-                                             " millis timeout left.");
-          }
-        }
-      } finally {
-        if (key != null) {
-          key.cancel();
-        }
-        
-        //clear the canceled key.
-        try {
-          info.selector.selectNow();
-        } catch (IOException e) {
-          logger.info("Unexpected Exception while clearing selector : ", e); // TODO:WTF: why is this info level??
-          // don't put the selector back.
-          info.close();
-          return ret; 
-        }
-        
-        release(info);
-      }
-    }
-
-    /**
-     * Takes one selector from end of LRU list of free selectors.
-     * If there are no selectors awailable, it creates a new selector.
-     * 
-     * @param channel 
-     * @throws IOException
-     */
-    private SelectorInfo get(SelectableChannel channel) 
-                                                         throws IOException {
-      SelectorProvider provider = channel.provider();
-      
-      /**Pivotal Change**/
-      LinkedBlockingDeque<SelectorInfo> deque = providerList.get(provider);
-      if (deque == null) {
-        deque = new LinkedBlockingDeque<SelectorInfo>();
-        LinkedBlockingDeque<SelectorInfo> presentValue = providerList.putIfAbsent(provider, deque); 
-        if (presentValue != null && deque != presentValue) {
-          deque = presentValue;
-        }
-      } 
-      /**poll instead of check empty**/       
-      
-      SelectorInfo selInfo = deque.pollFirst(); 
-      if (selInfo != null) {
-        selInfo.markForClean = false;
-      } else {
-        Selector selector = provider.openSelector();
-        selInfo = new SelectorInfo();
-        selInfo.selector = selector;
-        selInfo.deque = deque;
-      }
-      
-      /**end Pivotal Change**/
-      return selInfo;
-    }
-    
-    /**
-     * puts selector back at the end of LRU list of free selectos.
-     * 
-     * @param info
-     */
-    private void release(SelectorInfo info) {
-      /**Pivotal Addition **/
-      info.deque.addFirst(info);
-      /**End Pivotal Addition **/
-    }
-    
-    private void trimIdleSelectors() {
-      Iterator<LinkedBlockingDeque<SocketIOWithTimeout.SelectorPool.SelectorInfo>> poolIterator = providerList.values().iterator();
-      while (poolIterator.hasNext()) {
-        LinkedBlockingDeque<SelectorInfo> selectorPool = poolIterator.next();
-        trimSelectorPool(selectorPool);
-      }
-    }
-        
-    private void trimSelectorPool(LinkedBlockingDeque<SelectorInfo> selectorPool) {
-      SelectorInfo selectorInfo = selectorPool.peekLast();
-      //iterate backwards and remove any selectors that have been marked
-      //once we hit a selector that has yet to be marked, we can then mark the remaining
-      while (selectorInfo != null && selectorInfo.markForClean) {
-        selectorInfo = selectorPool.pollLast();
-        //check the flag again just to be sure
-        if (selectorInfo.markForClean ) {
-          selectorInfo.close();
-        }
-        else {
-          selectorPool.addFirst(selectorInfo);
-        }
-        selectorInfo = selectorPool.peekLast();
-      }
-      
-      //Mark all the selectors
-      Iterator<SelectorInfo> selectorIterator = selectorPool.iterator();
-      while (selectorIterator.hasNext()) {
-        selectorIterator.next().markForClean = true;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputStream.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputStream.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputStream.java
deleted file mode 100644
index 430294a..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputStream.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Pivotal Additions:
- * Removed the usage of import org.apache.hadoop.classification.InterfaceAudience
- */
-package com.gemstone.gemfire.internal;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-
-/**
- * This implements an input stream that can have a timeout while reading.
- * This sets non-blocking flag on the socket channel.
- * So after create this object, read() on 
- * {@link Socket#getInputStream()} and write() on 
- * {@link Socket#getOutputStream()} for the associated socket will throw 
- * IllegalBlockingModeException. 
- * Please use {@link SocketOutputStream} for writing.
- */
-//@InterfaceAudience.LimitedPrivate("HDFS")
-public class SocketInputStream extends InputStream
-                               implements ReadableByteChannel {
-
-  private Reader reader;
-
-  private static class Reader extends SocketIOWithTimeout {
-    ReadableByteChannel channel;
-    
-    Reader(ReadableByteChannel channel, long timeout) throws IOException {
-      super((SelectableChannel)channel, timeout);
-      this.channel = channel;
-    }
-    
-    @Override
-    int performIO(ByteBuffer buf) throws IOException {
-      return channel.read(buf);
-    }
-  }
-  
-  /**
-   * Create a new input stream with the given timeout. If the timeout
-   * is zero, it will be treated as infinite timeout. The socket's
-   * channel will be configured to be non-blocking.
-   * 
-   * @param channel 
-   *        Channel for reading, should also be a {@link SelectableChannel}.
-   *        The channel will be configured to be non-blocking.
-   * @param timeout timeout in milliseconds. must not be negative.
-   * @throws IOException
-   */
-  public SocketInputStream(ReadableByteChannel channel, long timeout)
-                                                        throws IOException {
-    SocketIOWithTimeout.checkChannelValidity(channel);
-    reader = new Reader(channel, timeout);
-  }
-
-  /**
-   * Same as SocketInputStream(socket.getChannel(), timeout): <br><br>
-   * 
-   * Create a new input stream with the given timeout. If the timeout
-   * is zero, it will be treated as infinite timeout. The socket's
-   * channel will be configured to be non-blocking.
-   * 
-   * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
-   *  
-   * @param socket should have a channel associated with it.
-   * @param timeout timeout timeout in milliseconds. must not be negative.
-   * @throws IOException
-   */
-  public SocketInputStream(Socket socket, long timeout) 
-                                         throws IOException {
-    this(socket.getChannel(), timeout);
-  }
-  
-  /**
-   * Same as SocketInputStream(socket.getChannel(), socket.getSoTimeout())
-   * :<br><br>
-   * 
-   * Create a new input stream with the given timeout. If the timeout
-   * is zero, it will be treated as infinite timeout. The socket's
-   * channel will be configured to be non-blocking.
-   * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
-   *  
-   * @param socket should have a channel associated with it.
-   * @throws IOException
-   */
-  public SocketInputStream(Socket socket) throws IOException {
-    this(socket.getChannel(), socket.getSoTimeout());
-  }
-  
-  @Override
-  public int read() throws IOException {
-    /* Allocation can be removed if required.
-     * probably no need to optimize or encourage single byte read.
-     */
-    byte[] buf = new byte[1];
-    int ret = read(buf, 0, 1);
-    if (ret > 0) {
-      return (int)(buf[0] & 0xff);
-    }
-    if (ret != -1) {
-      // unexpected
-      throw new IOException("Could not read from stream");
-    }
-    return ret;
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    return read(ByteBuffer.wrap(b, off, len));
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    /* close the channel since Socket.getInputStream().close()
-     * closes the socket.
-     */
-    reader.channel.close();
-    reader.close();
-  }
-
-  /**
-   * Returns underlying channel used by inputstream.
-   * This is useful in certain cases like channel for 
-   * {@link FileChannel#transferFrom(ReadableByteChannel, long, long)}.
-   */
-  public ReadableByteChannel getChannel() {
-    return reader.channel; 
-  }
-  
-  //ReadableByteChannel interface
-    
-  @Override
-  public boolean isOpen() {
-    return reader.isOpen();
-  }
-    
-  @Override
-  public int read(ByteBuffer dst) throws IOException {
-    return reader.doIO(dst, SelectionKey.OP_READ);
-  }
-  
-  /**
-   * waits for the underlying channel to be ready for reading.
-   * The timeout specified for this stream applies to this wait.
-   * 
-   * @throws SocketTimeoutException 
-   *         if select on the channel times out.
-   * @throws IOException
-   *         if any other I/O error occurs. 
-   */
-  public void waitForReadable() throws IOException {
-    reader.waitForIO(SelectionKey.OP_READ);
-  }
-
-  public void setTimeout(long timeoutMs) {
-    reader.setTimeout(timeoutMs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputWrapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputWrapper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputWrapper.java
deleted file mode 100644
index 0bc0ecd..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketInputWrapper.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- * Pivotal Additions:
- * Removed Preconditions and classifications
- */
-package com.gemstone.gemfire.internal;
-
-import java.io.FilterInputStream;
-
-import java.io.InputStream;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.channels.ReadableByteChannel;
-
-//import org.apache.hadoop.classification.InterfaceAudience;
-//import org.apache.hadoop.classification.InterfaceStability;
-
-//import com.google.common.base.Preconditions;
-
-/**
- * A wrapper stream around a socket which allows setting of its timeout. If the
- * socket has a channel, this uses non-blocking IO via the package-private
- * {@link SocketInputStream} implementation. Otherwise, timeouts are managed by
- * setting the underlying socket timeout itself.
- */
-/*
-@InterfaceAudience.LimitedPrivate("HDFS")
-@InterfaceStability.Unstable
-*/
-public class SocketInputWrapper extends FilterInputStream {
-  private final Socket socket;
-  private final boolean hasChannel;
-
-  SocketInputWrapper(Socket s, InputStream is) {
-    super(is);
-    this.socket = s;
-    this.hasChannel = s.getChannel() != null;
-//    if (hasChannel) {
-//      Preconditions.checkArgument(is instanceof SocketInputStream,
-//          "Expected a SocketInputStream when there is a channel. " +
-//          "Got: %s", is);
-//    }
-  }
-
-  /**
-   * Set the timeout for reads from this stream.
-   * 
-   * Note: the behavior here can differ subtly depending on whether the
-   * underlying socket has an associated Channel. In particular, if there is no
-   * channel, then this call will affect the socket timeout for <em>all</em>
-   * readers of this socket. If there is a channel, then this call will affect
-   * the timeout only for <em>this</em> stream. As such, it is recommended to
-   * only create one {@link SocketInputWrapper} instance per socket.
-   * 
-   * @param timeoutMs
-   *          the new timeout, 0 for no timeout
-   * @throws SocketException
-   *           if the timeout cannot be set
-   */
-  public void setTimeout(long timeoutMs) throws SocketException {
-    if (hasChannel) {
-      ((SocketInputStream)in).setTimeout(timeoutMs);
-    } else {
-      socket.setSoTimeout((int)timeoutMs);
-    }
-  }
-
-  /**
-   * @return an underlying ReadableByteChannel implementation.
-   * @throws IllegalStateException if this socket does not have a channel
-   */
-  public ReadableByteChannel getReadableByteChannel() {
-//    Preconditions.checkState(hasChannel,
-//        "Socket %s does not have a channel",
-//        this.socket);
-    return (SocketInputStream)in;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketOutputStream.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketOutputStream.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketOutputStream.java
deleted file mode 100644
index 412635c..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketOutputStream.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- * Pivotal Additions:
- * Removed classifications
- * Removed method transferToFully
- * 
- */
-package com.gemstone.gemfire.internal;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.WritableByteChannel;
-
-/**
- * This implements an output stream that can have a timeout while writing.
- * This sets non-blocking flag on the socket channel.
- * So after creating this object , read() on 
- * {@link Socket#getInputStream()} and write() on 
- * {@link Socket#getOutputStream()} on the associated socket will throw 
- * llegalBlockingModeException.
- * Please use {@link SocketInputStream} for reading.
- */
-public class SocketOutputStream extends OutputStream 
-                                implements WritableByteChannel {                                
-  
-  private Writer writer;
-  
-  private static class Writer extends SocketIOWithTimeout {
-    WritableByteChannel channel;
-    
-    Writer(WritableByteChannel channel, long timeout) throws IOException {
-      super((SelectableChannel)channel, timeout);
-      this.channel = channel;
-    }
-    
-    @Override
-    int performIO(ByteBuffer buf) throws IOException {
-      return channel.write(buf);
-    }
-  }
-  
-  /**
-   * Create a new ouput stream with the given timeout. If the timeout
-   * is zero, it will be treated as infinite timeout. The socket's
-   * channel will be configured to be non-blocking.
-   * 
-   * @param channel 
-   *        Channel for writing, should also be a {@link SelectableChannel}.  
-   *        The channel will be configured to be non-blocking.
-   * @param timeout timeout in milliseconds. must not be negative.
-   * @throws IOException
-   */
-  public SocketOutputStream(WritableByteChannel channel, long timeout) 
-                                                         throws IOException {
-    SocketIOWithTimeout.checkChannelValidity(channel);
-    writer = new Writer(channel, timeout);
-  }
-  
-  /**
-   * Same as SocketOutputStream(socket.getChannel(), timeout):<br><br>
-   * 
-   * Create a new ouput stream with the given timeout. If the timeout
-   * is zero, it will be treated as infinite timeout. The socket's
-   * channel will be configured to be non-blocking.
-   * 
-   * @see SocketOutputStream#SocketOutputStream(WritableByteChannel, long)
-   *  
-   * @param socket should have a channel associated with it.
-   * @param timeout timeout timeout in milliseconds. must not be negative.
-   * @throws IOException
-   */
-  public SocketOutputStream(Socket socket, long timeout) 
-                                         throws IOException {
-    this(socket.getChannel(), timeout);
-  }
-  
-  @Override
-  public void write(int b) throws IOException {
-    /* If we need to, we can optimize this allocation.
-     * probably no need to optimize or encourage single byte writes.
-     */
-    byte[] buf = new byte[1];
-    buf[0] = (byte)b;
-    write(buf, 0, 1);
-  }
-  
-  @Override
-  public void write(byte[] b, int off, int len) throws IOException {
-    ByteBuffer buf = ByteBuffer.wrap(b, off, len);
-    while (buf.hasRemaining()) {
-      try {
-        if (write(buf) < 0) {
-          throw new IOException("The stream is closed");
-        }
-      } catch (IOException e) {
-        /* Unlike read, write can not inform user of partial writes.
-         * So will close this if there was a partial write.
-         */
-        if (buf.capacity() > buf.remaining()) {
-          writer.close();
-        }
-        throw e;
-      }
-    }
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    /* close the channel since Socket.getOuputStream().close() 
-     * closes the socket.
-     */
-    writer.channel.close();
-    writer.close();
-  }
-
-  /**
-   * Returns underlying channel used by this stream.
-   * This is useful in certain cases like channel for 
-   * {@link FileChannel#transferTo(long, long, WritableByteChannel)}
-   */
-  public WritableByteChannel getChannel() {
-    return writer.channel; 
-  }
-
-  //WritableByteChannle interface 
-  
-  @Override
-  public boolean isOpen() {
-    return writer.isOpen();
-  }
-
-  @Override
-  public int write(ByteBuffer src) throws IOException {
-    return writer.doIO(src, SelectionKey.OP_WRITE);
-  }
-  
-  /**
-   * waits for the underlying channel to be ready for writing.
-   * The timeout specified for this stream applies to this wait.
-   *
-   * @throws SocketTimeoutException 
-   *         if select on the channel times out.
-   * @throws IOException
-   *         if any other I/O error occurs. 
-   */
-  public void waitForWritable() throws IOException {
-    writer.waitForIO(SelectionKey.OP_WRITE);
-  }
-
-  public void setTimeout(int timeoutMs) {
-    writer.setTimeout(timeoutMs);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketUtils.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketUtils.java
deleted file mode 100644
index 36eaf04..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketUtils.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- * Pivotal Additions:
- * Flag to enable/disable selector pooling for test purposes
- * 
- */
-package com.gemstone.gemfire.internal;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.ConnectException;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.channels.SocketChannel;
-
-public class SocketUtils {
-
-  //used for testing
-  public static boolean USE_SELECTOR_POOLING = Boolean.valueOf(System.getProperty("gemfire.useSelectorPooling", "true")).booleanValue();
-  /**
-   * This is a drop-in replacement for 
-   * {@link Socket#connect(SocketAddress, int)}.
-   * In the case of normal sockets that don't have associated channels, this 
-   * just invokes <code>socket.connect(endpoint, timeout)</code>. If 
-   * <code>socket.getChannel()</code> returns a non-null channel,
-   * connect is implemented using Hadoop's selectors. This is done mainly
-   * to avoid Sun's connect implementation from creating thread-local 
-   * selectors, since Hadoop does not have control on when these are closed
-   * and could end up taking all the available file descriptors.
-   * 
-   * @see java.net.Socket#connect(java.net.SocketAddress, int)
-   * 
-   * @param socket
-   * @param address the remote address
-   * @param timeout timeout in milliseconds
-   */
-  public static void connect(Socket socket,
-      SocketAddress address,
-      int timeout) throws IOException {
-    connect(socket, address, null, timeout);
-  }
-  
-  /**
-   * Like SocketUtils.connect(Socket, SocketAddress, int) but
-   * also takes a local address and port to bind the socket to. 
-   * 
-   * @param socket
-   * @param endpoint the remote address
-   * @param localAddr the local address to bind the socket to
-   * @param timeout timeout in milliseconds
-   */
-  public static void connect(Socket socket, 
-                             SocketAddress endpoint,
-                             SocketAddress localAddr,
-                             int timeout) throws IOException {
-    if (socket == null || endpoint == null || timeout < 0) {
-      throw new IllegalArgumentException("Illegal argument for connect()");
-    }
-    SocketChannel ch = socket.getChannel();
-    
-    if (localAddr != null) {
-      socket.bind(localAddr);
-    }
-
-    try {
-      if (ch == null) {
-        // let the default implementation handle it.
-        socket.connect(endpoint, timeout);
-      } else {
-        if (USE_SELECTOR_POOLING) {
-          SocketIOWithTimeout.connect(ch, endpoint, timeout);
-        }
-        else {
-          socket.connect(endpoint, timeout);
-        }
-
-      }
-    } catch (SocketTimeoutException ste) {
-      throw new IOException(ste.getMessage());
-    }
-
-    /*
-     Pivotal Change: due to ticket #50734
-    // There is a very rare case allowed by the TCP specification, such that
-    // if we are trying to connect to an endpoint on the local machine,
-    // and we end up choosing an ephemeral port equal to the destination port,
-    // we will actually end up getting connected to ourself (ie any data we
-    // send just comes right back). This is only possible if the target
-    // daemon is down, so we'll treat it like connection refused.
-    if (socket.getLocalPort() == socket.getPort() &&
-        socket.getLocalAddress().equals(socket.getInetAddress())) {
-      socket.close();
-      throw new ConnectException(
-        "Localhost targeted connection resulted in a loopback. " +
-        "No daemon is listening on the target port.");
-    }
-    */
-  }
-  
-  /**
-   * Same as <code>getInputStream(socket, socket.getSoTimeout()).</code>
-   * <br><br>
-   * 
-   * @see #getInputStream(Socket, long)
-   */
-  public static InputStream getInputStream(Socket socket) 
-                                           throws IOException {
-    return getInputStream(socket, socket.getSoTimeout());
-  }
-
-  /**
-   * Return a {@link SocketInputWrapper} for the socket and set the given
-   * timeout. If the socket does not have an associated channel, then its socket
-   * timeout will be set to the specified value. Otherwise, a
-   * {@link SocketInputStream} will be created which reads with the configured
-   * timeout.
-   * 
-   * Any socket created using socket factories returned by {@link #SocketUtils},
-   * must use this interface instead of {@link Socket#getInputStream()}.
-   * 
-   * In general, this should be called only once on each socket: see the note
-   * in {@link SocketInputWrapper#setTimeout(long)} for more information.
-   *
-   * @see Socket#getChannel()
-   * 
-   * @param socket
-   * @param timeout timeout in milliseconds. zero for waiting as
-   *                long as necessary.
-   * @return SocketInputWrapper for reading from the socket.
-   * @throws IOException
-   */
-  /*Pivotal Addition
-   * Return type changed to InputStream instead of SocketInputWrapper
-   * Returning the regular inputstream if a channel is not present and does
-   * not wrap that around an input wrapper
-   */
-  public static InputStream getInputStream(Socket socket, long timeout) 
-                                           throws IOException {
-    if (socket.getChannel() == null || ! USE_SELECTOR_POOLING) {
-      return socket.getInputStream();
-    }
-    else {
-      SocketInputWrapper w = new SocketInputWrapper(socket, new SocketInputStream(socket));
-      w.setTimeout(timeout);
-      return w;
-    }
-  }
-  
-  /**
-   * Same as getOutputStream(socket, 0). Timeout of zero implies write will
-   * wait until data is available.<br><br>
-   * 
-   * From documentation for {@link #getOutputStream(Socket, long)} : <br>
-   * Returns OutputStream for the socket. If the socket has an associated
-   * SocketChannel then it returns a 
-   * {@link SocketOutputStream} with the given timeout. If the socket does not
-   * have a channel, {@link Socket#getOutputStream()} is returned. In the later
-   * case, the timeout argument is ignored and the write will wait until 
-   * data is available.<br><br>
-   * 
-   * Any socket created using socket factories returned by {@link SocketUtils},
-   * must use this interface instead of {@link Socket#getOutputStream()}.
-   * 
-   * @see #getOutputStream(Socket, long)
-   * 
-   * @param socket
-   * @return OutputStream for writing to the socket.
-   * @throws IOException
-   */  
-  public static OutputStream getOutputStream(Socket socket) 
-                                             throws IOException {
-    return getOutputStream(socket, 0);
-  }
-  
-  /**
-   * Returns OutputStream for the socket. If the socket has an associated
-   * SocketChannel then it returns a 
-   * {@link SocketOutputStream} with the given timeout. If the socket does not
-   * have a channel, {@link Socket#getOutputStream()} is returned. In the later
-   * case, the timeout argument is ignored and the write will wait until 
-   * data is available.<br><br>
-   * 
-   * Any socket created using socket factories returned by {@link SocketUtils},
-   * must use this interface instead of {@link Socket#getOutputStream()}.
-   * 
-   * @see Socket#getChannel()
-   * 
-   * @param socket
-   * @param timeout timeout in milliseconds. This may not always apply. zero
-   *        for waiting as long as necessary.
-   * @return OutputStream for writing to the socket.
-   * @throws IOException   
-   */
-  public static OutputStream getOutputStream(Socket socket, long timeout) 
-                                             throws IOException {
-    if (socket.getChannel() == null || !USE_SELECTOR_POOLING) {
-      return socket.getOutputStream();      
-    }
-    else {
-      return new SocketOutputStream(socket, timeout); 
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 05bc838..cc9727b 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -176,7 +176,6 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.ClassPathLoader;
 import com.gemstone.gemfire.internal.JarDeployer;
 import com.gemstone.gemfire.internal.SocketCreator;
-import com.gemstone.gemfire.internal.SocketIOWithTimeout;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
 import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
@@ -2072,9 +2071,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
           // ignore
         }
 
-          GatewaySenderAdvisor advisor = null;
-          for (GatewaySender sender : this.getAllGatewaySenders()) {
-            try {
+        GatewaySenderAdvisor advisor = null;
+        for (GatewaySender sender : this.getAllGatewaySenders()) {
+          try {
             sender.stop();
             advisor = ((AbstractGatewaySender) sender).getSenderAdvisor();
             if (advisor != null) {
@@ -2083,10 +2082,10 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
               }
               advisor.close();
             }
-            } catch (CancelException ce) {
-            }
+          } catch (CancelException ce) {
           }
-          ParallelGatewaySenderQueue.cleanUpStatics(null);
+        }
+        ParallelGatewaySenderQueue.cleanUpStatics(null);
 
         destroyGatewaySenderLockService();
 
@@ -2350,7 +2349,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
       SequenceLoggerImpl.signalCacheClose();
       SystemFailure.signalCacheClose();
       
-      SocketIOWithTimeout.stopSelectorCleanUpThread();
     } // static synchronization on GemFireCache.class
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java
index 5b20e86..eeb611e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java
@@ -74,7 +74,6 @@ import com.gemstone.gemfire.distributed.internal.LonerDistributionManager;
 import com.gemstone.gemfire.distributed.internal.PooledExecutorWithDMStats;
 import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
 import com.gemstone.gemfire.internal.SocketCreator;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.cache.BucketAdvisor;
 import com.gemstone.gemfire.internal.cache.BucketAdvisor.BucketProfile;
@@ -1483,7 +1482,7 @@ public class AcceptorImpl extends Acceptor implements Runnable
       }
     } else {
       s.setSoTimeout(this.acceptTimeout);
-      communicationMode = (byte)SocketUtils.getInputStream(s).read();//getInputStream().read();
+      communicationMode = (byte)s.getInputStream().read();
       if (logger.isTraceEnabled()) {
         logger.trace("read communications mode(2) ", communicationMode);
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
index 3178b8d..1ba2294 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -17,101 +17,23 @@
 
 package com.gemstone.gemfire.internal.cache.tier.sockets;
 
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.CancelException;
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.Instantiator;
-import com.gemstone.gemfire.InternalGemFireError;
-import com.gemstone.gemfire.StatisticsFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheEvent;
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.InterestRegistrationEvent;
-import com.gemstone.gemfire.cache.InterestRegistrationListener;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.cache.RegionExistsException;
-import com.gemstone.gemfire.cache.UnsupportedVersionException;
+import com.gemstone.gemfire.*;
+import com.gemstone.gemfire.cache.*;
 import com.gemstone.gemfire.cache.client.internal.PoolImpl;
 import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
 import com.gemstone.gemfire.cache.query.CqException;
 import com.gemstone.gemfire.cache.query.Query;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.cache.query.internal.cq.CqService;
-import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
 import com.gemstone.gemfire.cache.query.internal.cq.ServerCQ;
 import com.gemstone.gemfire.cache.server.CacheServer;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.distributed.internal.DM;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.distributed.internal.DistributionManager;
-import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.MessageWithReply;
-import com.gemstone.gemfire.distributed.internal.ReplyMessage;
-import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
-import com.gemstone.gemfire.internal.ClassLoadUtil;
-import com.gemstone.gemfire.internal.DummyStatisticsFactory;
-import com.gemstone.gemfire.internal.InternalDataSerializer;
-import com.gemstone.gemfire.internal.InternalInstantiator;
-import com.gemstone.gemfire.internal.SocketCloser;
-import com.gemstone.gemfire.internal.SocketUtils;
-import com.gemstone.gemfire.internal.SystemTimer;
-import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.VersionedDataInputStream;
-import com.gemstone.gemfire.internal.VersionedDataOutputStream;
-import com.gemstone.gemfire.internal.cache.ClientServerObserver;
-import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
-import com.gemstone.gemfire.internal.cache.ClientRegionEventImpl;
-import com.gemstone.gemfire.internal.cache.CacheServerImpl;
-import com.gemstone.gemfire.internal.cache.CacheClientStatus;
-import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor;
-import com.gemstone.gemfire.internal.cache.CachedDeserializable;
-import com.gemstone.gemfire.internal.cache.Conflatable;
-import com.gemstone.gemfire.internal.cache.DistributedRegion;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
-import com.gemstone.gemfire.internal.cache.EventID;
-import com.gemstone.gemfire.internal.cache.FilterProfile;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl.SerializedCacheValueImpl;
+import com.gemstone.gemfire.distributed.internal.*;
+import com.gemstone.gemfire.internal.*;
+import com.gemstone.gemfire.internal.cache.*;
 import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.InternalCacheEvent;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.RegionEventImpl;
-import com.gemstone.gemfire.internal.cache.ha.HAContainerMap;
-import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion;
-import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper;
-import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
-import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.cache.ha.*;
 import com.gemstone.gemfire.internal.cache.tier.Acceptor;
 import com.gemstone.gemfire.internal.cache.tier.MessageType;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
@@ -122,6 +44,15 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.security.AccessControl;
 import com.gemstone.gemfire.security.AuthenticationFailedException;
 import com.gemstone.gemfire.security.AuthenticationRequiredException;
+import org.apache.logging.log4j.Logger;
+
+import java.io.*;
+import java.lang.reflect.Method;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.security.Principal;
+import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Class <code>CacheClientNotifier</code> works on the server and manages
@@ -314,8 +245,8 @@ public class CacheClientNotifier {
   {
     // Since no remote ports were specified in the message, wait for them.
     long startTime = this._statistics.startTime();
-    DataInputStream dis = new DataInputStream(SocketUtils.getInputStream(socket));//socket.getInputStream());
-    DataOutputStream dos = new DataOutputStream(SocketUtils.getOutputStream(socket));//socket.getOutputStream());
+    DataInputStream dis = new DataInputStream(socket.getInputStream());
+    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
 
     // Read the client version
     short clientVersionOrdinal = Version.readOrdinal(dis);
@@ -607,7 +538,7 @@ public class CacheClientNotifier {
     // is attempted to be registered or authentication fails.
     try {
       DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(
-          SocketUtils.getOutputStream(socket)));//socket.getOutputStream()));
+          socket.getOutputStream()));
       // write the message type, message length and the error message (if any)
       writeMessage(dos, responseByte, unsuccessfulMsg, clientVersion, epType, qSize);
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java
index 3e43b69..8968f62 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -71,7 +71,6 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.InternalInstantiator;
 import com.gemstone.gemfire.internal.SocketCreator;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.ClientServerObserver;
@@ -328,8 +327,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater,
 
       // set the timeout for the handshake
       mySock.setSoTimeout(handshakeTimeout);
-      tmpOut = SocketUtils.getOutputStream(mySock);
-      tmpIn = SocketUtils.getInputStream(mySock);
+      tmpOut = mySock.getOutputStream();
+      tmpIn = mySock.getInputStream();
 
       if (isDebugEnabled) {
         logger.debug("Initialized server-to-client socket with send buffer size: {} bytes and receive buffer size: {} bytes", mySock.getSendBufferSize(), mySock.getReceiveBufferSize());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java
old mode 100644
new mode 100755
index 2ea6ca0..909b133
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java
@@ -80,7 +80,6 @@ import com.gemstone.gemfire.internal.ClassLoadUtil;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.InternalInstantiator;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.VersionedDataInputStream;
 import com.gemstone.gemfire.internal.VersionedDataOutputStream;
@@ -257,7 +256,7 @@ public class HandShake implements ClientHandShake
       try {
         soTimeout = sock.getSoTimeout();
         sock.setSoTimeout(timeout);
-        InputStream is = SocketUtils.getInputStream(sock);//sock.getInputStream();
+        InputStream is = sock.getInputStream();
         int valRead =  is.read();
         //this.code =  (byte)is.read();
         if (valRead == -1) {
@@ -269,7 +268,7 @@ public class HandShake implements ClientHandShake
         }
         try {
           DataInputStream dis = new DataInputStream(is);
-          DataOutputStream dos = new DataOutputStream(SocketUtils.getOutputStream(sock));//sock.getOutputStream());
+          DataOutputStream dos = new DataOutputStream(sock.getOutputStream());
           this.clientReadTimeout = dis.readInt();
           if (clientVersion.compareTo(Version.CURRENT) < 0) {
             // versioned streams allow object serialization code to deal with older clients
@@ -1272,8 +1271,8 @@ public class HandShake implements ClientHandShake
     try {
       ServerQueueStatus serverQStatus = null;
       Socket sock = conn.getSocket();
-      DataOutputStream dos = new DataOutputStream(SocketUtils.getOutputStream(sock));//sock.getOutputStream());
-      final InputStream in = SocketUtils.getInputStream(sock);//sock.getInputStream();
+      DataOutputStream dos = new DataOutputStream(sock.getOutputStream());
+      final InputStream in = sock.getInputStream();
       DataInputStream dis = new DataInputStream(in);
       DistributedMember member = getDistributedMember(sock);
       // if running in a loner system, use the new port number in the ID to 
@@ -1378,8 +1377,8 @@ public class HandShake implements ClientHandShake
       AuthenticationFailedException, ServerRefusedConnectionException, ClassNotFoundException {
     ServerQueueStatus sqs = null;
     try {
-      DataOutputStream dos = new DataOutputStream(SocketUtils.getOutputStream(sock));//sock.getOutputStream());
-      final InputStream in = SocketUtils.getInputStream(sock);//sock.getInputStream());
+      DataOutputStream dos = new DataOutputStream(sock.getOutputStream());
+      final InputStream in = sock.getInputStream();
       DataInputStream dis = new DataInputStream(in);
       DistributedMember member = getDistributedMember(sock);
       if (!this.multiuserSecureMode) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
index bfe382c..94b4953 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
@@ -32,7 +32,6 @@ import org.apache.logging.log4j.Logger;
 import com.gemstone.gemfire.SerializationException;
 import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.TXManagerImpl;
 import com.gemstone.gemfire.internal.cache.tier.MessageType;
@@ -1036,7 +1035,7 @@ public class Message  {
   public void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException {
     this.sockCh = socket.getChannel();
     if (this.sockCh == null) {
-      setComms(socket, SocketUtils.getInputStream(socket), SocketUtils.getOutputStream(socket), bb, msgStats);
+      setComms(socket, socket.getInputStream(), socket.getOutputStream(), bb, msgStats);
     } else {
       setComms(socket, null, null,  bb, msgStats);
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java
old mode 100644
new mode 100755
index 1dd2562..e608db3
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerConnection.java
@@ -27,7 +27,6 @@ import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.security.Principal;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
@@ -42,13 +41,11 @@ import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.client.internal.AbstractOp;
 import com.gemstone.gemfire.cache.client.internal.Connection;
-import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.EventID;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
@@ -630,7 +627,7 @@ public class ServerConnection implements Runnable {
   private boolean acceptHandShake(byte epType, int qSize)
   {
     try {
-      this.handshake.accept(SocketUtils.getOutputStream(theSocket), SocketUtils.getInputStream(this.theSocket)//this.theSocket
+      this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream()
           , epType, qSize, this.communicationMode,
           this.principal);
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerHandShakeProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerHandShakeProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerHandShakeProcessor.java
index 07185b8..867f397 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerHandShakeProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ServerHandShakeProcessor.java
@@ -17,20 +17,6 @@
 
 package com.gemstone.gemfire.internal.cache.tier.sockets;
 
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.security.Principal;
-import java.util.Properties;
-
-import org.apache.logging.log4j.Logger;
-
 import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.cache.IncompatibleVersionException;
 import com.gemstone.gemfire.cache.UnsupportedVersionException;
@@ -40,7 +26,6 @@ import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.VersionedDataStream;
 import com.gemstone.gemfire.internal.cache.tier.Acceptor;
@@ -52,6 +37,15 @@ import com.gemstone.gemfire.internal.security.AuthorizeRequest;
 import com.gemstone.gemfire.internal.security.AuthorizeRequestPP;
 import com.gemstone.gemfire.security.AuthenticationFailedException;
 import com.gemstone.gemfire.security.AuthenticationRequiredException;
+import org.apache.logging.log4j.Logger;
+
+import java.io.*;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.security.Principal;
+import java.util.Properties;
 
 /**
  * A <code>ServerHandShakeProcessor</code> verifies the client's version compatibility with server.
@@ -422,7 +416,7 @@ public class ServerHandShakeProcessor {
     try {
       soTimeout = socket.getSoTimeout();
       socket.setSoTimeout(timeout);
-      InputStream is = SocketUtils.getInputStream(socket);//socket.getInputStream();
+      InputStream is = socket.getInputStream();
       short clientVersionOrdinal = Version.readOrdinalFromInputStream(is);
       if (clientVersionOrdinal == -1) {
         throw new EOFException(

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseConnection.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CloseConnection.java
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/shared/NativeCalls.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/shared/NativeCalls.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/shared/NativeCalls.java
old mode 100644
new mode 100755
index d9dd826..f0fc27a
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/shared/NativeCalls.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/shared/NativeCalls.java
@@ -169,7 +169,7 @@ public abstract class NativeCalls {
       throw new UnsupportedOperationException(ex);
     }
 
-    // first try using SocketInputStream
+    // first try using FileInputStream
     if (sockStream instanceof FileInputStream) {
       try {
         fd = ((FileInputStream)sockStream).getFD();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b3c8cb4/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
old mode 100644
new mode 100755
index bf44cd3..6e949b2
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -74,7 +74,6 @@ import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.DSFIDFactory;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.SocketCreator;
-import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
 import com.gemstone.gemfire.internal.Version;
@@ -1279,7 +1278,7 @@ public class Connection implements Runnable {
         int connectTime = getP2PConnectTimeout();; 
 
         try {
-          SocketUtils.connect(channel.socket(), addr, connectTime);
+          channel.socket().connect(addr, connectTime);
         } catch (NullPointerException e) {
           // bug #45044 - jdk 1.7 sometimes throws an NPE here
           ConnectException c = new ConnectException("Encountered bug #45044 - retrying");
@@ -1330,7 +1329,7 @@ public class Connection implements Runnable {
         s.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
         setReceiveBufferSize(s, SMALL_BUFFER_SIZE);
         setSendBufferSize(s);
-        SocketUtils.connect(s, addr, 0);
+        s.connect(addr, 0);
       }
     }
     if (logger.isDebugEnabled()) {