You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/23 21:23:17 UTC

[21/94] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index 74660da,0000000..988ca33
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@@ -1,4153 -1,0 +1,4154 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.internal.tcp;
 +
 +import java.io.BufferedInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.InterruptedIOException;
 +import java.io.OutputStream;
 +import java.net.ConnectException;
 +import java.net.Inet6Address;
 +import java.net.InetSocketAddress;
 +import java.net.Socket;
 +import java.net.SocketException;
 +import java.net.SocketTimeoutException;
 +import java.nio.ByteBuffer;
 +import java.nio.channels.CancelledKeyException;
 +import java.nio.channels.ClosedChannelException;
 +import java.nio.channels.ClosedSelectorException;
 +import java.nio.channels.SocketChannel;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.Semaphore;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.apache.logging.log4j.Logger;
 +
 +import com.gemstone.gemfire.CancelException;
 +import com.gemstone.gemfire.SystemFailure;
 +import com.gemstone.gemfire.cache.CacheClosedException;
 +import com.gemstone.gemfire.distributed.DistributedMember;
 +import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
 +import com.gemstone.gemfire.distributed.internal.ConflationKey;
 +import com.gemstone.gemfire.distributed.internal.DM;
 +import com.gemstone.gemfire.distributed.internal.DMStats;
 +import com.gemstone.gemfire.distributed.internal.DirectReplyProcessor;
 +import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 +import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
 +import com.gemstone.gemfire.distributed.internal.DistributionManager;
 +import com.gemstone.gemfire.distributed.internal.DistributionMessage;
 +import com.gemstone.gemfire.distributed.internal.DistributionStats;
 +import com.gemstone.gemfire.distributed.internal.ReplyException;
 +import com.gemstone.gemfire.distributed.internal.ReplyMessage;
 +import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
 +import com.gemstone.gemfire.distributed.internal.ReplySender;
 +import com.gemstone.gemfire.distributed.internal.direct.DirectChannel;
 +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 +import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
 +import com.gemstone.gemfire.i18n.StringId;
 +import com.gemstone.gemfire.internal.Assert;
 +import com.gemstone.gemfire.internal.ByteArrayDataInput;
 +import com.gemstone.gemfire.internal.DSFIDFactory;
 +import com.gemstone.gemfire.internal.InternalDataSerializer;
 +import com.gemstone.gemfire.internal.SocketCreator;
 +import com.gemstone.gemfire.internal.SocketUtils;
 +import com.gemstone.gemfire.internal.SystemTimer;
 +import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
 +import com.gemstone.gemfire.internal.Version;
 +import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 +import com.gemstone.gemfire.internal.logging.LogService;
 +import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 +import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
 +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 +import com.gemstone.gemfire.internal.tcp.MsgReader.Header;
 +import com.gemstone.gemfire.internal.util.concurrent.ReentrantSemaphore;
 +
 +/** <p>Connection is a socket holder that sends and receives serialized
 +    message objects.  A Connection may be closed to preserve system
 +    resources and will automatically be reopened when it's needed.</p>
 +
 +    @author Bruce Schuchardt
 +    @since 2.0
 +
 +*/
 +
 +public class Connection implements Runnable {
 +  private static final Logger logger = LogService.getLogger();
 +
 +  private static final int INITIAL_CAPACITY = Integer.getInteger("p2p.readerBufferSize", 32768).intValue();
 +  private static int P2P_CONNECT_TIMEOUT;
 +  private static boolean IS_P2P_CONNECT_TIMEOUT_INITIALIZED = false; 
 +  
 +  public final static int NORMAL_MSG_TYPE = 0x4c;
 +  public final static int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg
 +  public final static int END_CHUNKED_MSG_TYPE = 0x4e; // last in a series of chunks
 +  public final static int DIRECT_ACK_BIT = 0x20;
 +  //We no longer support early ack
 +  //public final static int EARLY_ACK_BIT = 0x10;
 +
 +  public static final int MSG_HEADER_SIZE_OFFSET = 0;
 +  public static final int MSG_HEADER_TYPE_OFFSET = 4;
 +  public static final int MSG_HEADER_ID_OFFSET = 5;
 +  public static final int MSG_HEADER_BYTES = 7;
 +
 +  /**
 +   * Small buffer used for send socket buffer on receiver connections
 +   * and receive buffer on sender connections.
 +   */
 +  public final static int SMALL_BUFFER_SIZE = Integer.getInteger("gemfire.SMALL_BUFFER_SIZE",4096).intValue();
 +
 +  /** counter to give connections a unique id */
 +  private static AtomicLong idCounter = new AtomicLong(1);
 +
 +  /** string used as the reason for initiating suspect processing */
 +  public static final String INITIATING_SUSPECT_PROCESSING = "member unexpectedly shut down shared, unordered connection";
 +
 +  /** the table holding this connection */
 +  final ConnectionTable owner;
 +  
 +  /** Set to false once run() is terminating. Using this instead of Thread.isAlive  
 +    * as the reader thread may be a pooled thread.
 +    */ 
 +  private volatile boolean isRunning = false; 
 +
 +  /** true if connection is a shared resource that can be used by more than one thread */
 +  private boolean sharedResource;
 +
 +  public final boolean isSharedResource() {
 +    return this.sharedResource;
 +  }
 +  
 +  /** The idle timeout timer task for this connection */
 +  private SystemTimerTask idleTask;
 +  
 +  /**
 +   * Returns the depth of unshared reader threads from this thread to
 +   * the original non-reader-thread.
 +   * E.g., ServerConnection -> reader(domino=1) -> reader(domino=2) -> reader(domino=3)
 +   */
 +  public static int getDominoCount() {
 +    return dominoCount.get().intValue();
 +  }
 +
 +  private final static ThreadLocal isReaderThread = new ThreadLocal();
 +  public final static void makeReaderThread() {
 +    // mark this thread as a reader thread
 +    makeReaderThread(true);
 +  }
 +  private final static void makeReaderThread(boolean v) {
 +    isReaderThread.set(v);
 +  }
 +  // return true if this thread is a reader thread
 +  public final static boolean isReaderThread() {
 +    Object o = isReaderThread.get();
 +    if (o == null) {
 +      return false;
 +    } else {
 +      return ((Boolean)o).booleanValue();
 +    }
 +  }
 +
 +  private int getP2PConnectTimeout() {
 +    if(IS_P2P_CONNECT_TIMEOUT_INITIALIZED)
 +      return P2P_CONNECT_TIMEOUT;
 +    String connectTimeoutStr = System.getProperty("p2p.connectTimeout");
 +    if(connectTimeoutStr != null) {
 +      P2P_CONNECT_TIMEOUT =  Integer.parseInt(connectTimeoutStr);
 +    }
 +    else {      
 +      P2P_CONNECT_TIMEOUT = 6*this.owner.owner.getDM().getConfig().getMemberTimeout();
 +    }
 +    IS_P2P_CONNECT_TIMEOUT_INITIALIZED = true;
 +    return P2P_CONNECT_TIMEOUT;     
 +  }
 +  /**
 +   * If true then readers for thread owned sockets will send all messages on thread owned senders.
 +   * Even normally unordered msgs get send on TO socks.
 +   */
 +  private static final boolean DOMINO_THREAD_OWNED_SOCKETS = Boolean.getBoolean("p2p.ENABLE_DOMINO_THREAD_OWNED_SOCKETS");
 +  private final static ThreadLocal isDominoThread = new ThreadLocal();
 +  // return true if this thread is a reader thread
 +  public final static boolean tipDomino() {
 +    if (DOMINO_THREAD_OWNED_SOCKETS) {
 +      // mark this thread as one who wants to send ALL on TO sockets
 +      ConnectionTable.threadWantsOwnResources();
 +      isDominoThread.set(Boolean.TRUE);
 +      return true;
 +    } else {
 +      return false;
 +    }
 +  }
 +  public final static boolean isDominoThread() {
 +    Object o = isDominoThread.get();
 +    if (o == null) {
 +      return false;
 +    } else {
 +      return ((Boolean)o).booleanValue();
 +    }
 +  }
 +
 +  /** the socket entrusted to this connection */
 +  private final Socket socket;
 +
 +  /** the non-NIO output stream */
 +  OutputStream output;
 +
 +  /** output stream/channel lock */
 +  private final Object outLock = new Object();
 +
 +  /** the ID string of the conduit (for logging) */
 +  String conduitIdStr;
 +
 +  /** Identifies the java group member on the other side of the connection. */
 +  InternalDistributedMember remoteAddr;
 +
 +  /**
 +   * Identifies the version of the member on the other side of the connection.
 +   */
 +  Version remoteVersion;
 +
 +  /**
 +   * True if this connection was accepted by a listening socket.
 +   * This makes it a receiver.
 +   * False if this connection was explicitly created by a connect call.
 +   * This makes it a sender.
 +   */
 +  private final boolean isReceiver;
 +  
 +  /**
 +   * count of how many unshared p2p-readers removed from the original action this
 +   * thread is.  For instance, server-connection -> owned p2p reader (count 0)
 +   * -> owned p2p reader (count 1) -> owned p2p reader (count 2).  This shows
 +   * up in thread names as "DOM #x" (domino #x)
 +   */
 +  private static ThreadLocal<Integer> dominoCount = new ThreadLocal<Integer>() {
 +    @Override
 +    protected Integer initialValue() {
 +      return 0;
 +    }};
 +
 +//  /**
 +//   * name of sender thread thread.  Useful in finding out why a reader
 +//   * thread was created.  Add sending of the name in handshakes and
 +//   * add it to the name of the reader thread (the code is there but commented out)
 +//   */
 +//  private String senderName = null;
 +  
 +  // If we are a sender then we want to know if the receiver on the
 +  // other end is willing to have its messages queued. The following
 +  // four "async" inst vars come from his handshake response.
 +  /**
 +   * How long to wait if receiver will not accept a message before we
 +   * go into queue mode.
 +   * @since 4.2.2
 +   */
 +  private int asyncDistributionTimeout = 0;
 +  /**
 +   * How long to wait,
 +   * with the receiver not accepting any messages,
 +   * before kicking the receiver out of the distributed system.
 +   * Ignored if asyncDistributionTimeout is zero.
 +   * @since 4.2.2
 +   */
 +  private int asyncQueueTimeout = 0;
 +  /**
 +   * How much queued data we can have,
 +   * with the receiver not accepting any messages,
 +   * before kicking the receiver out of the distributed system.
 +   * Ignored if asyncDistributionTimeout is zero.
 +   * Canonicalized to bytes (property file has it as megabytes
 +   * @since 4.2.2
 +   */
 +  private long asyncMaxQueueSize = 0;
 +  /**
 +   * True if an async queue is already being filled.
 +   */
 +  private volatile boolean asyncQueuingInProgress = false;
 +
 +  /**
 +   * Maps ConflatedKey instances to ConflatedKey instance.
 +   * Note that even though the key and value for an entry is the map
 +   * will always be "equal" they will not always be "==".
 +   */
 +  private final Map conflatedKeys = new HashMap();
 +
 +  //private final Queue outgoingQueue = new LinkedBlockingQueue();
 +
 +
 +  // NOTE: LinkedBlockingQueue has a bug in which removes from the queue
 +  // cause future offer to increase the size without adding anything to the queue.
 +  // So I've changed from this backport class to a java.util.LinkedList
 +  private final LinkedList outgoingQueue = new LinkedList();
 +
 +  /**
 +   * Number of bytes in the outgoingQueue.
 +   * Used to control capacity.
 +   */
 +  private long queuedBytes = 0;
 +
 +  /** used for async writes */
 +  Thread pusherThread;
 +
 +  /**
 +   * The maximum number of concurrent senders sending a message to a single recipient.
 +   */
 +  private final static int MAX_SENDERS = Integer.getInteger("p2p.maxConnectionSenders", DirectChannel.DEFAULT_CONCURRENCY_LEVEL).intValue();
 +  /**
 +   * This semaphore is used to throttle how many threads will try to do sends on
 +   * this connection concurrently. A thread must acquire this semaphore before it
 +   * is allowed to start serializing its message.
 +   */
 +  private final Semaphore senderSem = new ReentrantSemaphore(MAX_SENDERS);
 +
 +  /** Set to true once the handshake has been read */
 +  volatile boolean handshakeRead = false;
 +  volatile boolean handshakeCancelled = false;
 +
 +  private volatile int replyCode = 0;
 +
 +  private static final byte REPLY_CODE_OK = (byte)69;
 +  private static final byte REPLY_CODE_OK_WITH_ASYNC_INFO = (byte)70;
 +
 +  private final Object handshakeSync = new Object();
 +
 +  /** message reader thread */
 +  private volatile Thread readerThread;
 +
 +//  /**
 +//   * When a thread owns the outLock and is writing to the socket, it must
 +//   * be placed in this variable so that it can be interrupted should the
 +//   * socket need to be closed.
 +//   */
 +//  private volatile Thread writerThread;
 +
 +  /** whether the reader thread is, or should be, running */
 +  volatile boolean stopped = true;
 +
 +  /** set to true once a close begins */
 +  private final AtomicBoolean closing = new AtomicBoolean(false);
 +
 +  volatile boolean readerShuttingDown = false;
 +
 +  /** whether the socket is connected */
 +  volatile boolean connected = false;
 +
 +  /**
 +   * Set to true once a connection finishes its constructor
 +   */
 +  volatile boolean finishedConnecting = false;
 +
 +  volatile boolean accessed = true;
 +  volatile boolean socketInUse = false;
 +  volatile boolean timedOut = false;
 +  
 +  /**
 +   * task for detecting ack timeouts and issuing alerts
 +   */
 +  private SystemTimer.SystemTimerTask ackTimeoutTask;
 +
 +  // State for ackTimeoutTask: transmissionStartTime, ackWaitTimeout, ackSATimeout, ackConnectionGroup, ackThreadName
 +  
 +  /** millisecond clock at the time message transmission started, if doing
 +   *  forced-disconnect processing */
 +  long transmissionStartTime;
 +  
 +  /** ack wait timeout - if socketInUse, use this to trigger SUSPECT processing */
 +  private long ackWaitTimeout;
 +
 +  /** ack severe alert timeout - if socketInUse, use this to send alert */
 +  private long ackSATimeout;
 +  
 +  /**
 +   * other connections participating in the current transmission.  we notify
 +   * them if ackSATimeout expires to keep all members from generating alerts
 +   * when only one is slow 
 +   */
 +  List ackConnectionGroup;
 +  
 +  /** name of thread that we're currently performing an operation in (may be null) */
 +  String ackThreadName;
 +  
 +
 +  /** the buffer used for NIO message receipt */
 +  ByteBuffer nioInputBuffer;
 +
 +  /** the position of the next message's content */
 +//  int nioMessageStart;
 +
 +  /** the length of the next message to be dispatched */
 +  int nioMessageLength;
 +//  byte nioMessageVersion;
 +
 +  /** the type of message being received */
 +  byte nioMessageType;
 +
 +  /** used to lock access to destreamer data */
 +  private final Object destreamerLock = new Object();
 +  /** caches a msg destreamer that is currently not being used */
 +  MsgDestreamer idleMsgDestreamer;
 +  /** used to map a msgId to a MsgDestreamer which are
 +    * used for destreaming chunked messages using nio */
 +  HashMap destreamerMap;
 +
 +  boolean directAck;
 +
 +  short nioMsgId;
 +
 +  /** whether the length of the next message has been established */
 +  boolean nioLengthSet = false;
 +
 +  /** is this connection used for serial message delivery? */
 +  boolean preserveOrder = false;
 +
 +  /** number of messages sent on this connection */
 +  private long messagesSent;
 +
 +  /** number of messages received on this connection */
 +  private long messagesReceived;
 +
 +  /** unique ID of this connection (remote if isReceiver==true) */
 +  private volatile long uniqueId;
 +
 +  private int sendBufferSize = -1;
 +  private int recvBufferSize = -1;
 +  
 +  private ReplySender replySender;
 +
 +  private void setSendBufferSize(Socket sock) {
 +    setSendBufferSize(sock, this.owner.getConduit().tcpBufferSize);
 +  }
 +  private void setReceiveBufferSize(Socket sock) {
 +    setReceiveBufferSize(sock, this.owner.getConduit().tcpBufferSize);
 +  }
 +  private void setSendBufferSize(Socket sock, int requestedSize) {
 +    setSocketBufferSize(sock, true, requestedSize);
 +  }
 +  private void setReceiveBufferSize(Socket sock, int requestedSize) {
 +    setSocketBufferSize(sock, false, requestedSize);
 +  }
 +  public int getReceiveBufferSize() {
 +    return recvBufferSize;
 +  }
 +  private void setSocketBufferSize(Socket sock, boolean send, int requestedSize) {
 +    setSocketBufferSize(sock, send, requestedSize, false);
 +  }
 +  private void setSocketBufferSize(Socket sock, boolean send, int requestedSize, boolean alreadySetInSocket) {
 +    if (requestedSize > 0) {
 +      try {
 +        int currentSize = send
 +          ? sock.getSendBufferSize()
 +          : sock.getReceiveBufferSize();
 +        if (currentSize == requestedSize) {
 +          if (send) {
 +            this.sendBufferSize = currentSize;
 +          }
 +          return;
 +        }
 +        if (!alreadySetInSocket) {
 +          if (send) {
 +            sock.setSendBufferSize(requestedSize);
 +          } else {
 +            sock.setReceiveBufferSize(requestedSize);
 +          }
 +        } else {
 +        }
 +      } catch (SocketException ignore) {
 +      }
 +      try {
 +        int actualSize = send
 +          ? sock.getSendBufferSize()
 +          : sock.getReceiveBufferSize();
 +        if (send) {
 +          this.sendBufferSize = actualSize;
 +        } else {
 +          this.recvBufferSize = actualSize;
 +        }
 +        if (actualSize < requestedSize) {
 +          logger.info(LocalizedMessage.create(LocalizedStrings.Connection_SOCKET_0_IS_1_INSTEAD_OF_THE_REQUESTED_2,
 +              new Object[] {(send ? "send buffer size" : "receive buffer size"), Integer.valueOf(actualSize), Integer.valueOf(requestedSize)}));
 +        } else if (actualSize > requestedSize) {
 +          if (logger.isTraceEnabled()) {
 +            logger.trace("Socket {} buffer size is {} instead of the requested {}",
 +                (send ? "send" : "receive"), actualSize, requestedSize);
 +          }
 +          // Remember the request size which is smaller.
 +          // This remembered value is used for allocating direct mem buffers.
 +          if (send) {
 +            this.sendBufferSize = requestedSize;
 +          } else {
 +            this.recvBufferSize = requestedSize;
 +          }
 +        }
 +      } catch (SocketException ignore) {
 +        if (send) {
 +          this.sendBufferSize = requestedSize;
 +        } else {
 +          this.recvBufferSize = requestedSize;
 +        }
 +      }
 +    }
 +  }
 +  /**
 +   * Returns the size of the send buffer on this connection's socket.
 +   */
 +  public int getSendBufferSize() {
 +    int result = this.sendBufferSize;
 +    if (result != -1) {
 +      return result;
 +    }
 +    try {
 +      result = getSocket().getSendBufferSize();
 +    } catch (SocketException ignore) {
 +      // just return a default
 +      result = this.owner.getConduit().tcpBufferSize;
 +    }
 +    this.sendBufferSize = result;
 +    return result;
 +  }
 +
 +  /** creates a connection that we accepted (it was initiated by
 +   * an explicit connect being done on the other side).
 +   * We will only receive data on this socket; never send.
 +   */
 +  protected static Connection createReceiver(ConnectionTable t, Socket s)
 +    throws IOException, ConnectionException
 +  {
 +    Connection c = new Connection(t, s);
 +    boolean readerStarted = false;
 +    try {
 +      c.startReader(t);
 +      readerStarted = true;
 +    } finally {
 +      if (!readerStarted) {
 +        c.closeForReconnect(LocalizedStrings.Connection_COULD_NOT_START_READER_THREAD.toLocalizedString());
 +      }
 +    }
 +    c.waitForHandshake();
 +    //sendHandshakeReplyOK();
 +    c.finishedConnecting = true;
 +    return c;
 +  }
 +  
 +  /** creates a connection that we accepted (it was initiated by
 +   * an explicit connect being done on the other side).
 +   */
 +  protected Connection(ConnectionTable t, Socket s)
 +    throws IOException, ConnectionException
 +  {
 +    if (t == null) {
 +      throw new IllegalArgumentException(LocalizedStrings.Connection_NULL_CONNECTIONTABLE.toLocalizedString());
 +    }
 +    this.isReceiver = true;
 +    this.owner = t;
 +    this.socket = s;
 +    this.conduitIdStr = owner.getConduit().getId().toString();
 +    this.handshakeRead = false;
 +    this.handshakeCancelled = false;
 +    this.connected = true;
 +
 +    try {
 +      s.setTcpNoDelay(true);
 +      s.setKeepAlive(true);
 +//      s.setSoLinger(true, (Integer.valueOf(System.getProperty("p2p.lingerTime", "5000"))).intValue());
 +      setSendBufferSize(s, SMALL_BUFFER_SIZE);
 +      setReceiveBufferSize(s);
 +    }
 +    catch (SocketException e) {
 +      // unable to get the settings we want.  Don't log an error because it will
 +      // likely happen a lot
 +    }
 +    if (!useNIO()) {
 +      try {
 +        //this.output = new BufferedOutputStream(s.getOutputStream(), SMALL_BUFFER_SIZE);
 +        this.output = s.getOutputStream();
 +      }
 +      catch (IOException io) {
 +        logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS), io);
 +        t.getSocketCloser().asyncClose(s, this.remoteAddr.toString(), null);
 +        throw io;
 +      }
 +    }
 +  }
 +  
 +  void setIdleTimeoutTask(SystemTimerTask task) {
 +    this.idleTask = task;
 +  }
 +
 +
 +  /**
 +   * Returns true if an idle connection was detected.
 +   */
 +  public boolean checkForIdleTimeout() {
 +    if (isSocketClosed()) {
 +      return true;
 +    }
 +    if (isSocketInUse()) {
 +      return false;
 +    }
 +    boolean isIdle = !this.accessed;
 +    this.accessed = false;
 +    if (isIdle) {
 +      this.timedOut = true;
 +      this.owner.getConduit().stats.incLostLease();
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("Closing idle connection {} shared={} ordered={}", this, this.sharedResource, this.preserveOrder);
 +      }
 +      try {
 +        // Instead of calling requestClose
 +        // we call closeForReconnect.
 +        // We don't want this timeout close to close
 +        // any other connections. The problem with
 +        // requestClose has removeEndpoint set to true
 +        // which will close an receivers we have if this
 +        // connection is a shared one.
 +        closeForReconnect(LocalizedStrings.Connection_IDLE_CONNECTION_TIMED_OUT.toLocalizedString());
 +      } catch (Exception ignore) {}
 +    }
 +    return isIdle;
 +  }
 +
 +  static private byte[] okHandshakeBytes;
 +  static private ByteBuffer okHandshakeBuf;
 +  static {
 +    int msglen = 1; // one byte for reply code
 +    byte[] bytes = new byte[MSG_HEADER_BYTES + msglen];
 +    msglen = calcHdrSize(msglen);
 +    bytes[MSG_HEADER_SIZE_OFFSET] = (byte)((msglen/0x1000000) & 0xff);
 +    bytes[MSG_HEADER_SIZE_OFFSET+1] = (byte)((msglen/0x10000) & 0xff);
 +    bytes[MSG_HEADER_SIZE_OFFSET+2] = (byte)((msglen/0x100) & 0xff);
 +    bytes[MSG_HEADER_SIZE_OFFSET+3] = (byte)(msglen & 0xff);
 +    bytes[MSG_HEADER_TYPE_OFFSET] = (byte)NORMAL_MSG_TYPE; // message type
 +    bytes[MSG_HEADER_ID_OFFSET] = (byte)((MsgIdGenerator.NO_MSG_ID/0x100) & 0xff);
 +    bytes[MSG_HEADER_ID_OFFSET+1] = (byte)(MsgIdGenerator.NO_MSG_ID & 0xff);
 +    bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
 +    int allocSize = bytes.length;
 +    ByteBuffer bb;
 +    if (TCPConduit.useDirectBuffers) {
 +      bb = ByteBuffer.allocateDirect(allocSize);
 +    } else {
 +      bb = ByteBuffer.allocate(allocSize);
 +    }
 +    bb.put(bytes);
 +    okHandshakeBuf = bb;
 +    okHandshakeBytes = bytes;
 +  }
 +
 +  /**
 +   * maximum message buffer size
 +   */
 +  public static final int MAX_MSG_SIZE = 0x00ffffff;
 +  public static int calcHdrSize(int byteSize) {
 +    if (byteSize > MAX_MSG_SIZE) {
 +      throw new IllegalStateException(LocalizedStrings.Connection_TCP_MESSAGE_EXCEEDED_MAX_SIZE_OF_0.toLocalizedString(Integer.valueOf(MAX_MSG_SIZE)));
 +    }
 +    int hdrSize = byteSize;
 +    hdrSize |= (HANDSHAKE_VERSION << 24);
 +    return hdrSize;
 +  }
 +  public static int calcMsgByteSize(int hdrSize) {
 +    return hdrSize & MAX_MSG_SIZE;
 +  }
 +  public static byte calcHdrVersion(int hdrSize) throws IOException {
 +    byte ver = (byte)(hdrSize >> 24);
 +    if (ver != HANDSHAKE_VERSION) {
 +      throw new IOException(LocalizedStrings.Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1.toLocalizedString(new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(ver)}));
 +    }
 +    return ver;
 +  }
 +  private void sendOKHandshakeReply() throws IOException, ConnectionException {
 +    byte[] my_okHandshakeBytes = null;
 +    ByteBuffer my_okHandshakeBuf = null;
 +    if (this.isReceiver) {
 +      DistributionConfig cfg = owner.getConduit().config;
 +      ByteBuffer bb;
 +      if (useNIO() && TCPConduit.useDirectBuffers) {
 +        bb = ByteBuffer.allocateDirect(128);
 +      } else {
 +        bb = ByteBuffer.allocate(128);
 +      }
 +      bb.putInt(0); // reserve first 4 bytes for packet length
 +      bb.put((byte)NORMAL_MSG_TYPE);
 +      bb.putShort(MsgIdGenerator.NO_MSG_ID);
 +      bb.put(REPLY_CODE_OK_WITH_ASYNC_INFO);
 +      bb.putInt(cfg.getAsyncDistributionTimeout());
 +      bb.putInt(cfg.getAsyncQueueTimeout());
 +      bb.putInt(cfg.getAsyncMaxQueueSize());
 +      // write own product version
 +      Version.writeOrdinal(bb, Version.CURRENT.ordinal(), true);
 +      // now set the msg length into position 0
 +      bb.putInt(0, calcHdrSize(bb.position()-MSG_HEADER_BYTES));
 +      if (useNIO()) {
 +        my_okHandshakeBuf = bb;
 +        bb.flip();
 +      } else {
 +        my_okHandshakeBytes = new byte[bb.position()];
 +        bb.flip();
 +        bb.get(my_okHandshakeBytes);
 +      }
 +    } else {
 +      my_okHandshakeBuf = okHandshakeBuf;
 +      my_okHandshakeBytes = okHandshakeBytes;
 +    }
 +    if (useNIO()) {
 +      synchronized (my_okHandshakeBuf) {
 +        my_okHandshakeBuf.position(0);
 +        nioWriteFully(getSocket().getChannel(), my_okHandshakeBuf, false, null);
 +      }
 +    } else {
 +      synchronized(outLock) {
 +        try {
 +//          this.writerThread = Thread.currentThread();
 +          this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length);
 +          this.output.flush();
 +        }
 +        finally {
 +//          this.writerThread = null;
 +        }
 +      }
 +    }
 +  }
 +
 +  private static final int HANDSHAKE_TIMEOUT_MS = Integer.getInteger("p2p.handshakeTimeoutMs", 59000).intValue();
 +  //private static final byte HANDSHAKE_VERSION = 1; // 501
 +  //public static final byte HANDSHAKE_VERSION = 2; // cbb5x_PerfScale
 +  //public static final byte HANDSHAKE_VERSION = 3; // durable_client
 +  //public static final byte HANDSHAKE_VERSION = 4; // dataSerialMay19
 +//  public static final byte HANDSHAKE_VERSION = 5; // split-brain bits
 +  //public static final byte HANDSHAKE_VERSION = 6; // direct ack changes
 +  // NOTICE: handshake_version should not be changed anymore.  Use the gemfire
 +  //         version transmitted with the handshake bits and handle old handshakes
 +  //         based on that
 +  public static final byte HANDSHAKE_VERSION = 7; // product version exchange during handshake
 +
 +  /**
 +   * @throws ConnectionException if the conduit has stopped
 +   */
 +  private void waitForHandshake() throws ConnectionException {
 +    boolean needToClose = false;
 +    String reason = null;
 +    try {
 +    synchronized (this.handshakeSync) {
 +      if (!this.handshakeRead && !this.handshakeCancelled) {
 +        boolean success = false;
 +        reason = LocalizedStrings.Connection_UNKNOWN.toLocalizedString();
 +        boolean interrupted = Thread.interrupted();
 +        try {
 +          final long endTime = System.currentTimeMillis() + HANDSHAKE_TIMEOUT_MS;
 +          long msToWait = HANDSHAKE_TIMEOUT_MS;
 +          while (!this.handshakeRead && !this.handshakeCancelled && msToWait > 0) {
 +            this.handshakeSync.wait(msToWait); // spurious wakeup ok
 +            if (!this.handshakeRead && !this.handshakeCancelled) {
 +              msToWait = endTime - System.currentTimeMillis();
 +            }
 +          }
 +          if (!this.handshakeRead && !this.handshakeCancelled) {
 +            reason = LocalizedStrings.Connection_HANDSHAKE_TIMED_OUT.toLocalizedString();
 +            String peerName;
 +            if (this.remoteAddr != null) {
 +              peerName = this.remoteAddr.toString();
 +              // late in the life of jdk 1.7 we started seeing connections accepted
 +              // when accept() was not even being called.  This started causing timeouts
 +              // to occur in the handshake threads instead of causing failures in
 +              // connection-formation.  So, we need to initiate suspect processing here
 +              owner.getDM().getMembershipManager().suspectMember(this.remoteAddr,
 +                  LocalizedStrings.Connection_CONNECTION_HANDSHAKE_WITH_0_TIMED_OUT_AFTER_WAITING_1_MILLISECONDS.toLocalizedString(
 +                      new Object[] {peerName, Integer.valueOf(HANDSHAKE_TIMEOUT_MS)}));
 +            }
 +            else {
 +              peerName = "socket " + this.socket.getRemoteSocketAddress().toString()
 +                  + ":" + this.socket.getPort();
 +            }
 +            throw new ConnectionException(LocalizedStrings.Connection_CONNECTION_HANDSHAKE_WITH_0_TIMED_OUT_AFTER_WAITING_1_MILLISECONDS.toLocalizedString(
 +                  new Object[] {peerName, Integer.valueOf(HANDSHAKE_TIMEOUT_MS)}));
 +          } else {
 +            success = this.handshakeRead;
 +          }
 +        } catch (InterruptedException ex) {
 +          interrupted = true;
 +          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
 +          reason = LocalizedStrings.Connection_INTERRUPTED.toLocalizedString();
 +        } finally {
 +          if (interrupted) {
 +            Thread.currentThread().interrupt();
 +          }
 +          if (success) {
 +            if (this.isReceiver) {
 +              needToClose = !owner.getConduit().getMembershipManager().addSurpriseMember(this.remoteAddr);
 +              if (needToClose) {
 +                reason = "this member is shunned";
 +              }
 +            }
 +          }
 +          else {
 +            needToClose = true; // for bug 42159
 +          }
 +        }
 +      } // !handshakeRead
 +    } // synchronized
 +
 +    } finally {
 +      if (needToClose) {
 +        // moved this call outside of the sync for bug 42159
 +        try {
 +          requestClose(reason); // fix for bug 31546
 +        } 
 +        catch (Exception ignore) {
 +        }
 +      }
 +    }
 +  }
 +
 +  private void notifyHandshakeWaiter(boolean success) {
 +    synchronized (this.handshakeSync) {
 +      if (success) {
 +        this.handshakeRead = true;
 +      } else {
 +        this.handshakeCancelled = true;
 +      }
 +      this.handshakeSync.notify();
 +    }
 +  }
 +  
 +  private final AtomicBoolean asyncCloseCalled = new AtomicBoolean();
 +  
 +  /**
 +   * asynchronously close this connection
 +   * 
 +   * @param beingSick
 +   */
 +  private void asyncClose(boolean beingSick) {
 +    // note: remoteAddr may be null if this is a receiver that hasn't finished its handshake
 +    
 +    // we do the close in a background thread because the operation may hang if 
 +    // there is a problem with the network.  See bug #46659
 +
 +    // if simulating sickness, sockets must be closed in-line so that tests know
 +    // that the vm is sick when the beSick operation completes
 +    if (beingSick) {
 +      prepareForAsyncClose();
 +    }
 +    else {
 +      if (this.asyncCloseCalled.compareAndSet(false, true)) {
 +        Socket s = this.socket;
 +        if (s != null && !s.isClosed()) {
 +          prepareForAsyncClose();
 +          this.owner.getSocketCloser().asyncClose(s, String.valueOf(this.remoteAddr), null);
 +        }
 +      }
 +    }
 +  }
 +  
 +  private void prepareForAsyncClose() {
 +    synchronized(stateLock) {
 +      if (readerThread != null && isRunning && !readerShuttingDown
 +          && (connectionState == STATE_READING || connectionState == STATE_READING_ACK)) {
 +        readerThread.interrupt();
 +      }
 +    }
 +  }
 +
 +  private static final int CONNECT_HANDSHAKE_SIZE = 4096;
 +
 +  /**
 +   * waits until we've joined the distributed system
 +   * before returning
 +   */
 +  private void waitForAddressCompletion() {
 +    InternalDistributedMember myAddr = this.owner.getConduit().getLocalAddress();
 +    synchronized (myAddr) {
 +      while ((owner.getConduit().getCancelCriterion().cancelInProgress() == null)
 +          && myAddr.getInetAddress() == null && myAddr.getVmViewId() < 0) {
 +        try {
 +          myAddr.wait(100); // spurious wakeup ok
 +        }
 +        catch (InterruptedException ie) {
 +          Thread.currentThread().interrupt();
 +          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
 +        }
 +      }
 +      Assert.assertTrue(myAddr.getDirectChannelPort() == this.owner.getConduit().getPort());
 +    }
 +  }
 +
 +  private void handshakeNio() throws IOException {
 +    waitForAddressCompletion();
 +    
 +    InternalDistributedMember myAddr = this.owner.getConduit().getLocalAddress();
 +    final MsgOutputStream connectHandshake
 +      = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
 +    //connectHandshake.reset();
 +    /**
 +     * Note a byte of zero is always written because old products
 +     * serialized a member id with always sends an ip address.
 +     * My reading of the ip-address specs indicated that the first byte
 +     * of a valid address would never be 0.
 +     */
 +    connectHandshake.writeByte(0);
 +    connectHandshake.writeByte(HANDSHAKE_VERSION);
 +    // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
 +    InternalDataSerializer.invokeToData(myAddr, connectHandshake);
 +    connectHandshake.writeBoolean(this.sharedResource);
 +    connectHandshake.writeBoolean(this.preserveOrder);
 +    connectHandshake.writeLong(this.uniqueId);
 +    // write the product version ordinal
 +    Version.CURRENT.writeOrdinal(connectHandshake, true);
 +    connectHandshake.writeInt(dominoCount.get()+1);
 +// this writes the sending member + thread name that is stored in senderName
 +// on the receiver to show the cause of reader thread creation
 +//    if (dominoCount.get() > 0) {
 +//      connectHandshake.writeUTF(Thread.currentThread().getName());
 +//    } else {
 +//      String name = owner.getDM().getConfig().getName();
 +//      if (name == null) {
 +//        name = "pid="+OSProcess.getId();
 +//      }
 +//      connectHandshake.writeUTF("["+name+"] "+Thread.currentThread().getName());
 +//    }
 +    connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, DistributionManager.STANDARD_EXECUTOR, MsgIdGenerator.NO_MSG_ID);
 +    nioWriteFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
 +  }
 +
 +  private void handshakeStream() throws IOException {
 +    waitForAddressCompletion();
 +
 +    //this.output = new BufferedOutputStream(getSocket().getOutputStream(), owner.getConduit().bufferSize);
 +    this.output = getSocket().getOutputStream();
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream(CONNECT_HANDSHAKE_SIZE);
 +    DataOutputStream os = new DataOutputStream(baos);
 +    InternalDistributedMember myAddr = owner.getConduit().getLocalAddress();
 +    os.writeByte(0);
 +    os.writeByte(HANDSHAKE_VERSION);
 +    // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
 +    InternalDataSerializer.invokeToData(myAddr, os);
 +    os.writeBoolean(this.sharedResource);
 +    os.writeBoolean(this.preserveOrder);
 +    os.writeLong(this.uniqueId);
 +    Version.CURRENT.writeOrdinal(os, true);
 +    os.writeInt(dominoCount.get()+1);
 + // this writes the sending member + thread name that is stored in senderName
 + // on the receiver to show the cause of reader thread creation
 +//    if (dominoCount.get() > 0) {
 +//      os.writeUTF(Thread.currentThread().getName());
 +//    } else {
 +//      String name = owner.getDM().getConfig().getName();
 +//      if (name == null) {
 +//        name = "pid="+OSProcess.getId();
 +//      }
 +//      os.writeUTF("["+name+"] "+Thread.currentThread().getName());
 +//    }
 +    os.flush();
 +
 +    byte[] msg = baos.toByteArray();
 +    int len = calcHdrSize(msg.length);
 +    byte[] lenbytes = new byte[MSG_HEADER_BYTES];
 +    lenbytes[MSG_HEADER_SIZE_OFFSET] = (byte)((len/0x1000000) & 0xff);
 +    lenbytes[MSG_HEADER_SIZE_OFFSET+1] = (byte)((len/0x10000) & 0xff);
 +    lenbytes[MSG_HEADER_SIZE_OFFSET+2] = (byte)((len/0x100) & 0xff);
 +    lenbytes[MSG_HEADER_SIZE_OFFSET+3] = (byte)(len & 0xff);
 +    lenbytes[MSG_HEADER_TYPE_OFFSET] = (byte)NORMAL_MSG_TYPE;
 +    lenbytes[MSG_HEADER_ID_OFFSET] = (byte)((MsgIdGenerator.NO_MSG_ID/0x100) & 0xff);
 +    lenbytes[MSG_HEADER_ID_OFFSET+1] = (byte)(MsgIdGenerator.NO_MSG_ID & 0xff);
 +    synchronized(outLock) {
 +      try {
 +//        this.writerThread = Thread.currentThread();
 +        this.output.write(lenbytes, 0, lenbytes.length);
 +        this.output.write(msg, 0, msg.length);
 +        this.output.flush();
 +      }
 +      finally {
 +//        this.writerThread = null;
 +      }
 +    }
 +  }
 +
 +  /**
 +   *
 +   * @throws IOException if handshake fails
 +   */
 +  private void attemptHandshake(ConnectionTable connTable) throws IOException {
 +    // send HANDSHAKE
 +    // send this server's port.  It's expected on the other side
 +    if (useNIO()) {
 +      handshakeNio();
 +    }
 +    else {
 +      handshakeStream();
 +    }
 +
 +    startReader(connTable); // this reader only reads the handshake and then exits
 +    waitForHandshake(); // waiting for reply
 +  }
 +
 +  /** time between connection attempts*/
 +  private static final int RECONNECT_WAIT_TIME
 +    = Integer.getInteger("gemfire.RECONNECT_WAIT_TIME", 2000).intValue();
 +
 +  /** creates a new connection to a remote server.
 +   *  We are initiating this connection; the other side must accept us
 +   *  We will almost always send messages; small acks are received.
 +   */
 +  protected static Connection createSender(final MembershipManager mgr,
 +                                           final ConnectionTable t,
 +                                           final boolean preserveOrder,
 +                                           final DistributedMember remoteAddr,
 +                                           final boolean sharedResource,
 +                                           final long startTime,
 +                                           final long ackTimeout,
 +                                           final long ackSATimeout)
 +    throws IOException, DistributedSystemDisconnectedException
 +  {
 +    boolean warningPrinted = false;
 +    boolean success = false;
 +    boolean firstTime = true;
 +    Connection conn = null;
 +    // keep trying.  Note that this may be executing during the shutdown window
 +    // where a cancel criterion has not been established, but threads are being
 +    // interrupted.  In this case we must allow the connection to succeed even
 +    // though subsequent messaging using the socket may fail
 +    boolean interrupted = Thread.interrupted();
 +    boolean severeAlertIssued = false;
 +    boolean suspected = false;
 +    long reconnectWaitTime = RECONNECT_WAIT_TIME;
 +    boolean connectionErrorLogged = false;
 +    try {
 +      while (!success) { // keep trying
 +        // Quit if DM has stopped distribution
 +        t.getConduit().getCancelCriterion().checkCancelInProgress(null);
 +        long now = System.currentTimeMillis();
 +        if (!severeAlertIssued && ackSATimeout > 0  &&  startTime + ackTimeout < now) {
 +          if (startTime + ackTimeout + ackSATimeout < now) {
 +            if (remoteAddr != null) {
 +              logger.fatal(LocalizedMessage.create(
 +                  LocalizedStrings.Connection_UNABLE_TO_FORM_A_TCPIP_CONNECTION_TO_0_IN_OVER_1_SECONDS, new Object[] { remoteAddr,
 +                  (ackSATimeout + ackTimeout) / 1000 }));
 +            }
 +            severeAlertIssued = true;
 +          }
 +          else if (!suspected) {
 +            if (remoteAddr != null) {
 +              logger.warn(LocalizedMessage.create(
 +                  LocalizedStrings.Connection_UNABLE_TO_FORM_A_TCPIP_CONNECTION_TO_0_IN_OVER_1_SECONDS,
 +                  new Object[] { remoteAddr, (ackTimeout)/1000 }));
 +            }
 +            mgr.suspectMember(remoteAddr, LocalizedStrings.Connection_UNABLE_TO_FORM_A_TCPIP_CONNECTION_IN_A_REASONABLE_AMOUNT_OF_TIME.toLocalizedString());
 +            suspected = true;
 +          }
 +          reconnectWaitTime = Math.min(RECONNECT_WAIT_TIME,
 +              ackSATimeout - (now - startTime - ackTimeout));
 +          if (reconnectWaitTime <= 0) {
 +            reconnectWaitTime = RECONNECT_WAIT_TIME;
 +          }
 +        } else if (!suspected && (startTime > 0) && (ackTimeout > 0)
 +            && (startTime + ackTimeout < now)) {
 +          mgr.suspectMember(remoteAddr, LocalizedStrings.Connection_UNABLE_TO_FORM_A_TCPIP_CONNECTION_IN_A_REASONABLE_AMOUNT_OF_TIME.toLocalizedString());
 +          suspected = true;
 +        }
 +        if (firstTime) {
 +          firstTime = false;
 +          if (!mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress()) {
 +            throw new IOException("Member " + remoteAddr + " left the system");
 +          }
 +        }
 +        else {
 +          // if we're sending an alert and can't connect, bail out.  A sick
 +          // alert listener should not prevent cache operations from continuing
 +          if (AlertAppender.isThreadAlerting()) {
 +            // do not change the text of this exception - it is looked for in exception handlers
 +            throw new IOException("Cannot form connection to alert listener " + remoteAddr);
 +          }
 +            
 +          // Wait briefly...
 +          interrupted = Thread.interrupted() || interrupted;
 +          try {
 +            Thread.sleep(reconnectWaitTime);
 +          }
 +          catch (InterruptedException ie) {
 +            interrupted = true;
 +            t.getConduit().getCancelCriterion().checkCancelInProgress(ie);
 +          }
 +          t.getConduit().getCancelCriterion().checkCancelInProgress(null);
 +          if (giveUpOnMember(mgr, remoteAddr)) {
 +            throw new IOException(LocalizedStrings.Connection_MEMBER_LEFT_THE_GROUP.toLocalizedString(remoteAddr));
 +          }
 +          if (!warningPrinted) {
 +            warningPrinted = true;
 +            logger.warn(LocalizedMessage.create(LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0, remoteAddr));
 +          }          
 +          t.getConduit().stats.incReconnectAttempts();
 +        }
 +        //create connection
 +        try {
 +          conn = null;
 +          conn = new Connection(mgr, t, preserveOrder, remoteAddr, sharedResource);
 +        }
 +        catch (javax.net.ssl.SSLHandshakeException se) {
 +          // no need to retry if certificates were rejected
 +          throw se;
 +        }
 +        catch (IOException ioe) {
 +          // Only give up if the member leaves the view.
 +          if (giveUpOnMember(mgr, remoteAddr)) {
 +            throw ioe;
 +          }
 +          t.getConduit().getCancelCriterion().checkCancelInProgress(null);
 +          if ("Too many open files".equals(ioe.getMessage())) {
 +            t.fileDescriptorsExhausted();
 +          }
 +          else if (!connectionErrorLogged) {
 +            connectionErrorLogged = true; // otherwise change to use 100ms intervals causes a lot of these
 +            logger.info(LocalizedMessage.create(
 +                LocalizedStrings.Connection_CONNECTION_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1,
 +                new Object[] {sharedResource, preserveOrder, remoteAddr, ioe}));
 +          }
 +        } // IOException
 +        finally {
 +          if (conn == null) {
 +            t.getConduit().stats.incFailedConnect();
 +          }
 +        }
 +        if (conn != null) {
 +          // handshake
 +          try {
 +            conn.attemptHandshake(t);
 +            if (conn.isSocketClosed()) {
 +              // something went wrong while reading the handshake
 +              // and the socket was closed or this guy sent us a
 +              // ShutdownMessage
 +              if (giveUpOnMember(mgr, remoteAddr)) {
 +                throw new IOException(LocalizedStrings.Connection_MEMBER_LEFT_THE_GROUP.toLocalizedString(remoteAddr));
 +              }
 +              t.getConduit().getCancelCriterion().checkCancelInProgress(null);
 +              // no success but no need to log; just retry
 +            }
 +            else {
 +              success = true;
 +            }
 +          }
 +          catch (DistributedSystemDisconnectedException e) {
 +            throw e;
 +          }
 +          catch (ConnectionException e) {
 +            if (giveUpOnMember(mgr, remoteAddr)) {
 +              IOException ioe = new IOException(LocalizedStrings.Connection_HANDSHAKE_FAILED.toLocalizedString());
 +              ioe.initCause(e);
 +              throw ioe;
 +            }
 +            t.getConduit().getCancelCriterion().checkCancelInProgress(null);
 +            logger.info(LocalizedMessage.create(
 +                LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1,
 +                new Object[] {sharedResource, preserveOrder, remoteAddr ,e}));
 +          }
 +          catch (IOException e) {
 +            if (giveUpOnMember(mgr, remoteAddr)) {
 +              throw e;
 +            }
 +            t.getConduit().getCancelCriterion().checkCancelInProgress(null);
 +            logger.info(LocalizedMessage.create(
 +                LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1,
 +                new Object[] {sharedResource, preserveOrder, remoteAddr ,e}));
 +            if (!sharedResource && "Too many open files".equals(e.getMessage())) {
 +              t.fileDescriptorsExhausted();
 +            }
 +          }
 +          finally {
 +            if (!success) {
 +              try {
 +                conn.requestClose(LocalizedStrings.Connection_FAILED_HANDSHAKE.toLocalizedString()); 
 +              } catch (Exception ignore) {}
 +              conn = null;
 +            }
 +          }
 +        }
 +      } // while
 +      if (warningPrinted) {
 +        logger.info(LocalizedMessage.create(
 +            LocalizedStrings.Connection_0_SUCCESSFULLY_REESTABLISHED_CONNECTION_TO_PEER_1,
 +            new Object[] {mgr.getLocalMember(), remoteAddr}));
 +      }
 +    }
 +    finally {
 +      try {
 +        if (!success) {
 +          if (conn != null) {
 +            conn.requestClose(LocalizedStrings.Connection_FAILED_CONSTRUCTION.toLocalizedString());
 +            conn = null;
 +          }
 +        }
 +      }
 +      finally {
 +        if (interrupted) {
 +          Thread.currentThread().interrupt();
 +        }
 +      }
 +    }
 +    //Assert.assertTrue(conn != null);
 +    if (conn == null) {
 +      throw new ConnectionException(
 +        LocalizedStrings.Connection_CONNECTION_FAILED_CONSTRUCTION_FOR_PEER_0
 +          .toLocalizedString(remoteAddr));
 +    }
 +    if (preserveOrder && BATCH_SENDS) {
 +      conn.createBatchSendBuffer();
 +    }
 +    conn.finishedConnecting = true;
 +    return conn;
 +  }
 +  
 +  private static boolean giveUpOnMember(MembershipManager mgr, DistributedMember remoteAddr) {
 +    return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress();
 +  }
 +
 +  private void setRemoteAddr(DistributedMember m) {
 +    this.remoteAddr = this.owner.getDM().getCanonicalId(m);
 +    MembershipManager mgr = this.owner.owner.getMembershipManager();
 +    mgr.addSurpriseMember(m);
 +  }
 +  
 +  /** creates a new connection to a remote server.
 +   *  We are initiating this connection; the other side must accept us
 +   *  We will almost always send messages; small acks are received.
 +   */
 +  private Connection(MembershipManager mgr,
 +                     ConnectionTable t,
 +                     boolean preserveOrder,
 +                     DistributedMember remoteID,
 +                     boolean sharedResource)
 +    throws IOException, DistributedSystemDisconnectedException
 +  {    
 +    InternalDistributedMember remoteAddr = (InternalDistributedMember)remoteID;
 +    if (t == null) {
 +      throw new IllegalArgumentException(LocalizedStrings.Connection_CONNECTIONTABLE_IS_NULL.toLocalizedString());
 +    }
 +    this.isReceiver = false;
 +    this.owner = t;
 +    this.sharedResource = sharedResource;
 +    this.preserveOrder = preserveOrder;
 +    setRemoteAddr(remoteAddr);
 +    this.conduitIdStr = this.owner.getConduit().getId().toString();
 +    this.handshakeRead = false;
 +    this.handshakeCancelled = false;
 +    this.connected = true;
 +
 +    this.uniqueId = idCounter.getAndIncrement();
 +
 +    // connect to listening socket
 +
 +    InetSocketAddress addr = new InetSocketAddress(remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort());
 +    if (useNIO()) {
 +      SocketChannel channel = SocketChannel.open();
 +      this.owner.addConnectingSocket(channel.socket(), addr.getAddress());
 +      try {
 +        channel.socket().setTcpNoDelay(true);
 +
 +        channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 +  
 +        /** If conserve-sockets is false, the socket can be used for receiving responses,
 +         * so set the receive buffer accordingly.
 +         */
 +        if(!sharedResource) {
 +          setReceiveBufferSize(channel.socket(), this.owner.getConduit().tcpBufferSize);
 +        } 
 +        else {
 +          setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only receive ack messages
 +        }
 +        setSendBufferSize(channel.socket());
 +        channel.configureBlocking(true);
 +
 +        int connectTime = getP2PConnectTimeout();; 
 +
 +        try {
 +          SocketUtils.connect(channel.socket(), addr, connectTime);
 +        } catch (NullPointerException e) {
 +          // bug #45044 - jdk 1.7 sometimes throws an NPE here
 +          ConnectException c = new ConnectException("Encountered bug #45044 - retrying");
 +          c.initCause(e);
 +          // prevent a hot loop by sleeping a little bit
 +          try {
 +            Thread.sleep(1000);
 +          } catch (InterruptedException ie) {
 +            Thread.currentThread().interrupt();
 +          }
 +          throw c;
 +        } catch (CancelledKeyException e) {
 +          // bug #44469: for some reason NIO throws this runtime exception
 +          //             instead of an IOException on timeouts
 +          ConnectException c = new ConnectException(LocalizedStrings.Connection_ATTEMPT_TO_CONNECT_TIMED_OUT_AFTER_0_MILLISECONDS
 +              .toLocalizedString(new Object[]{connectTime}));
 +          c.initCause(e);
 +          throw c;
 +        } catch (ClosedSelectorException e) {
 +          // bug #44808: for some reason JRockit NIO thorws this runtime exception
 +          //             instead of an IOException on timeouts
 +          ConnectException c = new ConnectException(LocalizedStrings.Connection_ATTEMPT_TO_CONNECT_TIMED_OUT_AFTER_0_MILLISECONDS
 +              .toLocalizedString(new Object[]{connectTime}));
 +          c.initCause(e);
 +          throw c;
 +        }
 +      }
 +      finally {
 +        this.owner.removeConnectingSocket(channel.socket());
 +      }
 +      this.socket = channel.socket();
 +    }
 +    else {
 +      if (TCPConduit.useSSL) {
 +        // socket = javax.net.ssl.SSLSocketFactory.getDefault()
 +        //  .createSocket(remoteAddr.getInetAddress(), remoteAddr.getPort());
 +        int socketBufferSize = sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
 +        this.socket = SocketCreator.getDefaultInstance().connectForServer( remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort(), socketBufferSize );
 +        // Set the receive buffer size local fields. It has already been set in the socket.
 +        setSocketBufferSize(this.socket, false, socketBufferSize, true);
 +        setSendBufferSize(this.socket);
 +      }
 +      else {
 +        //socket = new Socket(remoteAddr.getInetAddress(), remoteAddr.getPort());
 +        Socket s = new Socket();
 +        this.socket = s;
 +        s.setTcpNoDelay(true);
 +        s.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 +        setReceiveBufferSize(s, SMALL_BUFFER_SIZE);
 +        setSendBufferSize(s);
 +        SocketUtils.connect(s, addr, 0);
 +      }
 +    }
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("Connection: connected to {} with stub {}", remoteAddr, addr);
 +    }
 +    try { getSocket().setTcpNoDelay(true); } catch (SocketException e) {  }
 +  }
 +
 +  /**
 +   * Batch sends currently should not be turned on because:
 +   *  1. They will be used for all sends (instead of just no-ack)
 +   *     and thus will break messages that wait for a response (or kill perf).
 +   *  2. The buffer is not properly flushed and closed on shutdown.
 +   *     The code attempts to do this but must not be doing it  correctly.
 +   */
 +  private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends");
 +  protected static final int BATCH_BUFFER_SIZE = Integer.getInteger("p2p.batchBufferSize", 1024*1024).intValue();
 +  protected static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50).intValue();
 +  protected Object batchLock;
 +  protected ByteBuffer fillBatchBuffer;
 +  protected ByteBuffer sendBatchBuffer;
 +  private BatchBufferFlusher batchFlusher;
 +
 +  private void createBatchSendBuffer() {
 +    // batch send buffer isn't needed if old-io is being used
 +    if (!this.useNIO) {
 +      return;
 +    }
 +    this.batchLock = new Object();
 +    if (TCPConduit.useDirectBuffers) {
 +      this.fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
 +      this.sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
 +    } else {
 +      this.fillBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE);
 +      this.sendBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE);
 +    }
 +    this.batchFlusher = new BatchBufferFlusher();
 +    this.batchFlusher.start();
 +  }
 +
 +  private class BatchBufferFlusher extends Thread  {
 +    private volatile boolean flushNeeded = false;
 +    private volatile boolean timeToStop = false;
 +    private DMStats stats;
 +
 +
 +    public BatchBufferFlusher() {
 +      setDaemon(true);
 +      this.stats = owner.getConduit().stats;
 +    }
 +    /**
 +     * Called when a message writer needs the current fillBatchBuffer flushed
 +     */
 +    public void flushBuffer(ByteBuffer bb) {
 +      final long start = DistributionStats.getStatTime();
 +      try {
 +        synchronized (this) {
 +          synchronized (batchLock) {
 +            if (bb != fillBatchBuffer) {
 +              // it must have already been flushed. So just return
 +              // and use the new fillBatchBuffer
 +              return;
 +            }
 +          }
 +          this.flushNeeded = true;
 +          this.notify();
 +        }
 +        synchronized (batchLock) {
 +          // Wait for the flusher thread
 +          while (bb == fillBatchBuffer) {
 +            Connection.this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
 +            boolean interrupted = Thread.interrupted();
 +            try {
 +              batchLock.wait();  // spurious wakeup ok
 +            } 
 +            catch (InterruptedException ex) {
 +              interrupted = true;
 +            }
 +            finally {
 +              if (interrupted) {
 +                Thread.currentThread().interrupt();
 +              }
 +            }
 +          } // while
 +        }
 +      } finally {
 +        owner.getConduit().stats.incBatchWaitTime(start);
 +      }
 +    }
 +
 +    public void close() {
 +      synchronized (this) {
 +        this.timeToStop = true;
 +        this.flushNeeded = true;
 +        this.notify();
 +      }
 +    }
 +
 +    @Override
 +    public void run() {
 +      try {
 +        synchronized (this) {
 +          while (!timeToStop) {
 +            if (!this.flushNeeded && fillBatchBuffer.position() <= (BATCH_BUFFER_SIZE/2)) {
 +              wait(BATCH_FLUSH_MS); // spurious wakeup ok
 +            }
 +            if (this.flushNeeded || fillBatchBuffer.position() > (BATCH_BUFFER_SIZE/2)) {
 +              final long start = DistributionStats.getStatTime();
 +              synchronized (batchLock) {
 +                // This is the only block of code that will swap
 +                // the buffer references
 +                this.flushNeeded = false;
 +                ByteBuffer tmp = fillBatchBuffer;
 +                fillBatchBuffer = sendBatchBuffer;
 +                sendBatchBuffer = tmp;
 +                batchLock.notifyAll();
 +              }
 +              // We now own the sendBatchBuffer
 +              if (sendBatchBuffer.position() > 0) {
 +                final boolean origSocketInUse = socketInUse;
 +                socketInUse = true;
 +                try {
 +                  sendBatchBuffer.flip();
 +                  SocketChannel channel = getSocket().getChannel();
 +                  nioWriteFully(channel, sendBatchBuffer, false, null);
 +                  sendBatchBuffer.clear();
 +                } catch (IOException ex) {
 +                  logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0,ex));
 +                  readerShuttingDown = true;
 +                  requestClose(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0.toLocalizedString(ex));
 +                } catch (ConnectionException ex) {
 +                  logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0,ex));
 +                  readerShuttingDown = true;
 +                  requestClose(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0.toLocalizedString(ex));
 +                } finally {
 +                  accessed();
 +                  socketInUse = origSocketInUse;
 +                }
 +              }
 +              this.stats.incBatchFlushTime(start);
 +            }
 +          }
 +        }
 +      } catch (InterruptedException ex) {
 +        // time for this thread to shutdown
 +//        Thread.currentThread().interrupt();
 +      }
 +    }
 +  }
 +
 +  private void closeBatchBuffer() {
 +    if (this.batchFlusher != null) {
 +      this.batchFlusher.close();
 +    }
 +  }
 +
 +  /** use to test message prep overhead (no socket write).
 +   * WARNING: turning this on completely disables distribution of batched sends
 +   */
 +  private static final boolean SOCKET_WRITE_DISABLED = Boolean.getBoolean("p2p.disableSocketWrite");
 +
 +  private void batchSend(ByteBuffer src) throws IOException {
 +    if (SOCKET_WRITE_DISABLED) {
 +      return;
 +    }
 +    final long start = DistributionStats.getStatTime();
 +    try {
 +      ByteBuffer dst = null;
 +      Assert.assertTrue(src.remaining() <= BATCH_BUFFER_SIZE , "Message size(" + src.remaining() + ") exceeded BATCH_BUFFER_SIZE(" + BATCH_BUFFER_SIZE + ")");
 +      do {
 +        synchronized (this.batchLock) {
 +          dst = this.fillBatchBuffer;
 +          if (src.remaining() <= dst.remaining()) {
 +            final long copyStart = DistributionStats.getStatTime();
 +            dst.put(src);
 +            this.owner.getConduit().stats.incBatchCopyTime(copyStart);
 +            return;
 +          }
 +        }
 +        // If we got this far then we do not have room in the current
 +        // buffer and need the flusher thread to flush before we can fill it
 +        this.batchFlusher.flushBuffer(dst);
 +      } while (true);
 +    } finally {
 +      this.owner.getConduit().stats.incBatchSendTime(start);
 +    }
 +  }
 +
 +  /**
 +   * Request that the manager close this connection, or close it
 +   * forcibly if there is no manager.  Invoking this method ensures
 +   * that the proper synchronization is done.
 +   */
 +  void requestClose(String reason) {
 +    close(reason, true, true, false, false);
 +  }
 +
 +  boolean isClosing() {
 +    return this.closing.get();
 +  }
 +
 +  /**
 +   * Used to close a connection that has not yet been registered
 +   * with the distribution manager.
 +   */
 +  void closePartialConnect(String reason) {
 +    close(reason, false, false, false, false);
 +  }
 +  
 +  void closePartialConnect(String reason, boolean beingSick) {
 +    close(reason, false, false, beingSick, false);
 +  }
 +
 +  void closeForReconnect(String reason) {
 +    close(reason, true, false, false, false);
 +  }
 +
 +  void closeOldConnection(String reason) {
 +    close(reason, true, true, false, true);
 +  }
 +
 +  /**
 +   * Closes the connection.
 +   *
 +   * @see #requestClose
 +   */
 +  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="TLW_TWO_LOCK_WAIT")
 +  private void close(String reason, boolean cleanupEndpoint, 
 +      boolean p_removeEndpoint, boolean beingSick, boolean forceRemoval) {
 +    boolean removeEndpoint = p_removeEndpoint;
 +    // use getAndSet outside sync on this to fix 42330
 +    boolean onlyCleanup = this.closing.getAndSet(true);
 +    if (onlyCleanup && !forceRemoval) {
 +      return;
 +    }
 +    if (!onlyCleanup) {
 +    synchronized (this) {
 +      this.stopped = true;
 +      if (this.connected) {
 +        if (this.asyncQueuingInProgress
 +            && this.pusherThread != Thread.currentThread()) {
 +          // We don't need to do this if we are the pusher thread
 +          // and we have determined that we need to close the connection.
 +          // See bug 37601.
 +          synchronized (this.outgoingQueue) {
 +            // wait for the flusher to complete (it may timeout)
 +            while (this.asyncQueuingInProgress) {
 +              // Don't do this: causes closes to not get done in the event
 +              // of an orderly shutdown:
 +  //            this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
 +              boolean interrupted = Thread.interrupted();
 +              try {
 +                this.outgoingQueue.wait(); // spurious wakeup ok
 +              } catch (InterruptedException ie) {
 +                interrupted = true;
 +  //                this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
 +              }
 +              finally {
 +                if (interrupted) Thread.currentThread().interrupt();
 +              }
 +            } // while
 +          } // synchronized
 +        }
 +        this.connected = false;
 +        closeSenderSem();
 +        {
 +          final DMStats stats = this.owner.getConduit().stats;
 +          if (this.finishedConnecting) {
 +            if (this.isReceiver) {
 +              stats.decReceivers();
 +            } else {
 +              stats.decSenders(this.sharedResource, this.preserveOrder);
 +            }
 +          }
 +        }
 +        if (logger.isDebugEnabled()) {
 +          logger.debug("Closing socket for {}", this);
 +        }
 +      }
 +        else if (!forceRemoval) {
 +        removeEndpoint = false;
 +      }
 +      // make sure our socket is closed
 +      asyncClose(false);
 +      nioLengthSet = false;
 +    } // synchronized
 +    
 +      // moved the call to notifyHandshakeWaiter out of the above
 +      // synchronized block to fix bug #42159
 +      // Make sure anyone waiting for a handshake stops waiting
 +      notifyHandshakeWaiter(false);
 +    // wait a bit for the our reader thread to exit
 +    // don't wait if we are the reader thread
 +    boolean isIBM = false;
 +    // if network partition detection is enabled or this is an admin vm
 +    // we can't wait for the reader thread when running in an IBM JRE.  See
 +    // bug 41889
 +    if (this.owner.owner.config.getEnableNetworkPartitionDetection() ||
 +        this.owner.owner.getLocalAddr().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE ||
 +        this.owner.owner.getLocalAddr().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
 +      isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
 +    }
 +    {
 +      // Now that readerThread is returned to a pool after we close
 +      // we need to be more careful not to join on a thread that belongs
 +      // to someone else.
 +      Thread readerThreadSnapshot = this.readerThread;
 +      if (!beingSick && readerThreadSnapshot != null && !isIBM
 +          && this.isRunning && !this.readerShuttingDown
 +          && readerThreadSnapshot != Thread.currentThread()) {
 +        try {
 +          readerThreadSnapshot.join(500);
 +          readerThreadSnapshot = this.readerThread;
 +          if (this.isRunning && !this.readerShuttingDown
 +              && readerThreadSnapshot != null
 +              && owner.getDM().getRootCause() == null) { // don't wait twice if there's a system failure
 +            readerThreadSnapshot.join(1500);
 +            if (this.isRunning) {
 +              logger.info(LocalizedMessage.create(LocalizedStrings.Connection_TIMED_OUT_WAITING_FOR_READERTHREAD_ON_0_TO_FINISH, this));
 +            }
 +          }
 +        }
 +        catch (IllegalThreadStateException ignore) {
 +          // ignored - thread already stopped
 +        }
 +        catch (InterruptedException ignore) {
 +          Thread.currentThread().interrupt();
 +          // but keep going, we're trying to close.
 +        }
 +      }
 +    }
 +
 +    closeBatchBuffer();
 +    closeAllMsgDestreamers();
 +    }
 +    if (cleanupEndpoint) {
 +      if (this.isReceiver) {
 +        this.owner.removeReceiver(this);
 +      }
 +      if (removeEndpoint) {
 +        if (this.sharedResource) {
 +          if (!this.preserveOrder) {
 +            // only remove endpoint when shared unordered connection
 +            // is closed. This is part of the fix for bug 32980.
 +            if (!this.isReceiver) {
 +              // Only remove endpoint if sender.
 +              if (this.finishedConnecting) {
 +                // only remove endpoint if our constructor finished
 +                this.owner.removeEndpoint(this.remoteAddr, reason);
 +              }
 +            }
 +          }
 +          else {
 +            this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
 +          }
 +        }
 +        else if (!this.isReceiver) {
 +          this.owner.removeThreadConnection(this.remoteAddr, this);
 +        }
 +      }
 +      else {
 +        // This code is ok to do even if the ConnectionTable
 +        // has never added this Connection to its maps since
 +        // the calls in this block use our identity to do the removes.
 +        if (this.sharedResource) {
 +          this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
 +        }
 +        else if (!this.isReceiver) {
 +          this.owner.removeThreadConnection(this.remoteAddr, this);
 +        }
 +      }
 +    }
 +    
 +    //This cancels the idle timer task, but it also removes the tasks
 +    //reference to this connection, freeing up the connection (and it's buffers
 +    //for GC sooner.
 +    if(idleTask != null) {
 +      idleTask.cancel();
 +    }
 +    
 +    if(ackTimeoutTask!=null){
 +        ackTimeoutTask.cancel();
 +    }
 +
 +  }
 +
 +  /** starts a reader thread */
 +  private void startReader(ConnectionTable connTable) { 
 +    Assert.assertTrue(!this.isRunning); 
 +    stopped = false; 
 +    this.isRunning = true; 
 +    connTable.executeCommand(this);  
 +  } 
 +
 +
 +  /** in order to read non-NIO socket-based messages we need to have a thread
 +      actively trying to grab bytes out of the sockets input queue.
 +      This is that thread. */
 +  public void run() {
 +    this.readerThread = Thread.currentThread();
 +    this.readerThread.setName(p2pReaderName());
 +    ConnectionTable.threadWantsSharedResources();
 +    makeReaderThread(this.isReceiver);
 +    try {
 +      if (useNIO()) {
 +        runNioReader();
 +      } else {
 +        runOioReader();
 +      }
 +    } finally {
 +      // bug36060: do the socket close within a finally block
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("Stopping {} for {}", p2pReaderName(), remoteAddr);
 +      }
 +      initiateSuspicionIfSharedUnordered();
 +      if (this.isReceiver) {
 +        if (!this.sharedResource) {
 +          this.owner.owner.stats.incThreadOwnedReceivers(-1L, dominoCount.get());
 +        }
 +        asyncClose(false);
 +        this.owner.removeAndCloseThreadOwnedSockets();
 +      }
 +      ByteBuffer tmp = this.nioInputBuffer;
 +      if(tmp != null) {
 +        this.nioInputBuffer = null;
 +        final DMStats stats = this.owner.getConduit().stats;
 +        Buffers.releaseReceiveBuffer(tmp, stats);
 +      }
 +      // make sure that if the reader thread exits we notify a thread waiting
 +      // for the handshake.
 +      // see bug 37524 for an example of listeners hung in waitForHandshake
 +      notifyHandshakeWaiter(false);
 +      this.readerThread.setName("unused p2p reader");
 +      synchronized (this.stateLock) {
 +        this.isRunning = false;
 +        this.readerThread = null;
 +      }
 +    } // finally
 +  }
 +
 +  private String p2pReaderName() {
 +    StringBuffer sb = new StringBuffer(64);
 +    if (this.isReceiver) {
 +      sb.append("P2P message reader@");
 +    } else {
 +      sb.append("P2P handshake reader@");
 +    }
 +    sb.append(Integer.toHexString(System.identityHashCode(this)));
 +    if (!this.isReceiver) {
 +      sb.append('-')
 +        .append(getUniqueId());
 +    }
 +    return sb.toString();
 +  }
 +
 +  private void runNioReader() {
 +    // take a snapshot of uniqueId to detect reconnect attempts; see bug 37592
 +    SocketChannel channel = null;
 +    try {
 +      channel = getSocket().getChannel();
 +      channel.configureBlocking(true);
 +    } catch (ClosedChannelException e) {
 +      // bug 37693: the channel was asynchronously closed.  Our work
 +      // is done.
 +      try { 
 +        requestClose(LocalizedStrings.Connection_RUNNIOREADER_CAUGHT_CLOSED_CHANNEL.toLocalizedString()); 
 +      } catch (Exception ignore) {}      
 +      return; // exit loop and thread
 +    } catch (IOException ex) {
 +      if (stopped || owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
 +        try { 
 +          requestClose(LocalizedStrings.Connection_RUNNIOREADER_CAUGHT_SHUTDOWN.toLocalizedString());
 +        } catch (Exception ignore) {}
 +        return; // bug37520: exit loop (and thread)
 +      }
 +      logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_FAILED_SETTING_CHANNEL_TO_BLOCKING_MODE_0, ex));
 +      this.readerShuttingDown = true;
 +      try { requestClose(LocalizedStrings.Connection_FAILED_SETTING_CHANNEL_TO_BLOCKING_MODE_0.toLocalizedString(ex)); } catch (Exception ignore) {}
 +      return;
 +    }
 +
 +    if (!stopped) {
 +//      Assert.assertTrue(owner != null, "How did owner become null");
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("Starting {}", p2pReaderName());
 +      }
 +    }
 +    // we should not change the state of the connection if we are a handshake reader thread
 +    // as there is a race between this thread and the application thread doing direct ack
 +    // fix for #40869
 +    boolean isHandShakeReader = false;
 +    try {
 +      for (;;) {
 +        if (stopped) {
 +          break;
 +        }
 +        if (SystemFailure.getFailure() != null) {
 +          // Allocate no objects here!
 +          Socket s = this.socket;
 +          if (s != null) {
 +            try {
 +              s.close();
 +            }
 +            catch (IOException e) {
 +              // don't care
 +            }
 +          }
 +          SystemFailure.checkFailure(); // throws
 +        }
 +        if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
 +          break;
 +        }
 +
 +        try {
 +          ByteBuffer buff = getNIOBuffer();
 +          synchronized(stateLock) {
 +            connectionState = STATE_READING;
 +          }
 +          int amt = channel.read(buff);
 +          synchronized(stateLock) {
 +            connectionState = STATE_IDLE;
 +          }
 +          if (amt == 0) {
 +            continue;
 +          }
 +          if (amt < 0) {
 +            this.readerShuttingDown = true;
 +            try {
 +              requestClose("SocketChannel.read returned EOF");
 +              requestClose(LocalizedStrings.Connection_SOCKETCHANNEL_READ_RETURNED_EOF.toLocalizedString());
 +            } catch (Exception e) {
 +              // ignore - shutting down
 +            }
 +            return;
 +          }
 +
 +          processNIOBuffer();
 +          if (!this.isReceiver
 +              && (this.handshakeRead || this.handshakeCancelled)) {
 +            if (logger.isDebugEnabled()) {
 +              if (this.handshakeRead) {
 +                logger.debug("{} handshake has been read {}", p2pReaderName(), this);
 +              } else {
 +                logger.debug("{} handshake has been cancelled {}", p2pReaderName(), this);
 +              }
 +            }
 +            isHandShakeReader = true;
 +            // Once we have read the handshake the reader can go away
 +            break;
 +          }
 +        }
 +        catch (CancelException e) {
 +          if (logger.isDebugEnabled()) {
 +            logger.debug("{} Terminated <{}> due to cancellation", p2pReaderName(), this, e);
 +          }
 +          this.readerShuttingDown = true;
 +          try { 
 +            requestClose(LocalizedStrings.Connection_CACHECLOSED_IN_CHANNEL_READ_0.toLocalizedString(e));
 +          } catch (Exception ex) {}
 +          return;
 +        }
 +        catch (ClosedChannelException e) {
 +          this.readerShuttingDown = true;
 +          try { 
 +            requestClose(LocalizedStrings.Connection_CLOSEDCHANNELEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e));
 +          } catch (Exception ex) {}
 +          return;
 +        }
 +        catch (IOException e) {
 +          if (! isSocketClosed()
 +                && !"Socket closed".equalsIgnoreCase(e.getMessage()) // needed for Solaris jdk 1.4.2_08
 +                ) {
 +            if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
 +              logger.debug("{} io exception for {}", p2pReaderName(), this, e);
 +            }
 +            if(e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
 +              if (logger.isDebugEnabled()) {
 +                logger.debug("{} received unexpected WSACancelBlockingCall exception, which may result in a hang", p2pReaderName()); 
 +              }
 +            }
 +          }
 +          this.readerShuttingDown = true;
 +          try { 
 +            requestClose(LocalizedStrings.Connection_IOEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e));
 +          } catch (Exception ex) {}
 +          return;
 +
 +        } catch (Exception e) {
 +          this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); // bug 37101
 +          if (!stopped && ! isSocketClosed() ) {
 +            logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ, p2pReaderName()), e);
 +          }
 +          this.readerShuttingDown = true;
 +          try { 
 +            requestClose(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ.toLocalizedString(e)); 
 +          } catch (Exception ex) {}
 +          return;
 +        }
 +      } // for
 +    }
 +    finally {
 +      if (!isHandShakeReader) {
 +        synchronized(stateLock) {
 +          connectionState = STATE_IDLE;
 +        }
 +      }
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("{} runNioReader terminated id={} from {}", p2pReaderName(), conduitIdStr, remoteAddr);
 +      }
 +    }
 +  }
 +
 +  /** initiate suspect processing if a shared/ordered connection is lost and we're not shutting down */
 +  private void initiateSuspicionIfSharedUnordered() {
 +    if (this.isReceiver && this.handshakeRead && !this.preserveOrder && this.sharedResource) {
 +      if (this.owner.getConduit().getCancelCriterion().cancelInProgress() == null) {
 +        this.owner.getDM().getMembershipManager().suspectMember(this.getRemoteAddress(),
 +            INITIATING_SUSPECT_PROCESSING);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * checks to see if an exception should not be logged: i.e., "forcibly closed",
 +   * "reset by peer", or "connection reset"
 +   * */
 +  public static final boolean isIgnorableIOException(Exception e) {
 +    if (e instanceof ClosedChannelException) {
 +      return true;
 +    }
 +
 +    String msg = e.getMessage();
 +    if (msg == null) {
 +      msg = e.toString();
 +    }
 +
 +    msg = msg.toLowerCase();
 +    return (msg.indexOf("forcibly closed") >= 0)
 +      ||   (msg.indexOf("reset by peer")   >= 0)
 +      ||   (msg.indexOf("connection reset")   >= 0);
 +  }
 +
 +  private static boolean validMsgType(int msgType) {
 +    return msgType == NORMAL_MSG_TYPE
 +      || msgType == CHUNKED_MSG_TYPE
 +      || msgType == END_CHUNKED_MSG_TYPE;
 +  }
 +
 +  private void closeAllMsgDestreamers() {
 +    synchronized (this.destreamerLock) {
 +      if (this.idleMsgDestreamer != null) {
 +        this.idleMsgDestreamer.close();
 +        this.idleMsgDestreamer = null;
 +      }
 +      if (this.destreamerMap != null) {
 +        Iterator it = this.destreamerMap.values().iterator();
 +        while (it.hasNext()) {
 +          MsgDestreamer md = (MsgDestreamer)it.next();
 +          md.close();
 +        }
 +        this.destreamerMap = null;
 +      }
 +    }
 +  }
 +
 +  MsgDestreamer obtainMsgDestreamer(short msgId, final Version v) {
 +    synchronized (this.destreamerLock) {
 +      if (this.destreamerMap == null) {
 +        this.destreamerMap = new HashMap();
 +      }
 +      Short key = new Short(msgId);
 +      MsgDestreamer result = (MsgDestreamer)this.destreamerMap.get(key);
 +      if (result == null) {
 +        result = this.idleMsgDestreamer;
 +        if (result != null) {
 +          this.idleMsgDestreamer = null;
 +        } else {
 +          result = new MsgDestreamer(this.owner.getConduit().stats, 
 +              this.owner.owner.getCancelCriterion(), v);
 +        }
 +        result.setName(p2pReaderName() + " msgId=" + msgId);
 +        this.destreamerMap.put(key, result);
 +      }
 +      return result;
 +    }
 +  }
 +  void releaseMsgDestreamer(short msgId, MsgDestreamer md) {
 +    Short key = new Short(msgId);
 +    synchronized (this.destreamerLock) {
 +      this.destreamerMap.remove(key);
 +      if (this.idleMsgDestreamer == null) {
 +        md.reset();
 +        this.idleMsgDestreamer = md;
 +      } else {
 +        md.close();
 +      }
 +    }
 +  }
 +
 +  private void sendFailureReply(int rpId, String exMsg, Throwable ex, boolean directAck) {
 +    ReplySender dm = null;
 +    if(directAck) {
 +      dm = new DirectReplySender(this);
 +    } else if(rpId != 0) {
 +      dm = this.owner.getDM();
 +    }
 +    if (dm != null) {
 +      ReplyMessage.send(getRemoteAddress(), rpId, new ReplyException(exMsg, ex), dm);
 +    }
 +  }
 +  private void runOioReader() {
 +    InputStream input = null;
 +    try {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("Socket is of type: {}", getSocket().getClass() );
 +      }
 +      input = new BufferedInputStream(getSocket().getInputStream(), INITIAL_CAPACITY);
 +    }
 +    catch (IOException io) {
 +      if (stopped || owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
 +        return; // bug 37520: exit run loop (and thread)
 +      }
 +      logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNABLE_TO_GET_INPUT_STREAM), io);
 +      stopped = true;
 +    }
 +
 +    if (!stopped) {
 +      Assert.assertTrue(owner != null, LocalizedStrings.Connection_OWNER_SHOULD_NOT_BE_NULL.toLocalizedString());
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("Starting {}", p2pReaderName());
 +      }
 +    }
 +
 +    byte[] lenbytes = new byte[MSG_HEADER_BYTES];
 +
 +    final ByteArrayDataInput dis = new ByteArrayDataInput();
 +    while (!stopped) {
 +      try {
 +        if (SystemFailure.getFailure() != null) {
 +          // Allocate no objects here!
 +          Socket s = this.socket;
 +          if (s != null) {
 +            try {
 +              s.close();
 +            }
 +            catch (IOException e) {
 +              // don't care
 +            }
 +          }
 +          SystemFailure.checkFailure(); // throws
 +        }
 +        if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
 +          break;
 +        }
 +        int len = 0;
 +        if (readFully(input, lenbytes, lenbytes.length) < 0) {
 +          stopped = true;
 +          continue;
 +        }
 +//        long recvNanos = DistributionStats.getStatTime();
 +        len = ((lenbytes[MSG_HEADER_SIZE_OFFSET]&0xff) * 0x1000000) +
 +          ((lenbytes[MSG_HEADER_SIZE_OFFSET+1]&0xff) * 0x10000) +
 +          ((lenbytes[MSG_HEADER_SIZE_OFFSET+2]&0xff) * 0x100) +
 +          (lenbytes[MSG_HEADER_SIZE_OFFSET+3]&0xff);
 +        /*byte msgHdrVersion =*/ calcHdrVersion(len);
 +        len = calcMsgByteSize(len);
 +        int msgType = lenbytes[MSG_HEADER_TYPE_OFFSET];
 +        short msgId = (short)((lenbytes[MSG_HEADER_ID_OFFSET]&0xff * 0x100)
 +                              + (lenbytes[MSG_HEADER_ID_OFFSET+1]&0xff));
 +        boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0;
 +        if (myDirectAck) {
 +          msgType &= ~DIRECT_ACK_BIT; // clear the bit
 +        }
 +        // Following validation fixes bug 31145
 +        if (!validMsgType(msgType)) {
 +          logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNKNOWN_P2P_MESSAGE_TYPE_0, Integer.valueOf(msgType)));
 +          this.readerShuttingDown = true;
 +          requestClose(LocalizedStrings.Connection_UNKNOWN_P2P_MESSAGE_TYPE_0.toLocalizedString(Integer.valueOf(msgType)));
 +          break;
 +        }
 +        if (logger.isTraceEnabled())
 +          logger.trace("{} reading {} bytes", conduitIdStr, len);
 +        byte[] bytes = new byte[len];
 +        if (readFully(input, bytes, len) < 0) {
 +          stopped = true;
 +          continue;
 +        }
 +        boolean interrupted = Thread.interrupted();
 +        try {
 +          if (this.handshakeRead) {
 +            if (msgType == NORMAL_MSG_TYPE) {
 +              //DMStats stats = this.owner.getConduit().stats;
 +              //long start = DistributionStats.getStatTime();
 +              this.owner.getConduit().stats.incMessagesBeingReceived(true, len);
 +              dis.initialize(bytes, this.remoteVersion);
 +              DistributionMessage msg = null;
 +              try {
 +                ReplyProcessor21.initMessageRPId();
 +                long startSer = this.owner.getConduit().stats.startMsgDeserialization();
 +                msg = (DistributionMessage)InternalDataSerializer.readDSFID(dis);
 +                this.owner.getConduit().stats.endMsgDeserialization(startSer);
 +                if (dis.available() != 0) {
 +                  logger.warn(LocalizedMessage.create(
 +                      LocalizedStrings.Connection_MESSAGE_DESERIALIZATION_OF_0_DID_NOT_READ_1_BYTES,
 +                      new Object[] { msg, Integer.valueOf(dis.available())}));
 +                }
 +                //stats.incBatchCopyTime(start);
 +                try {
 +                  //start = DistributionStats.getStatTime();
 +                  if (!dispatchMessage(msg, len, myDirectAck)) {
 +                    continue;
 +                  }
 +                  //stats.incBatchSendTime(start);
 +                }
 +                catch (MemberShunnedException e) {
 +                  continue;
 +                }
 +                catch (Exception de) {
 +                  this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de); // bug 37101
 +                  logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ERROR_DISPATCHING_MESSAGE), de);
 +                }
 +              }
 +              catch (VirtualMachineError err) {
 +                SystemFailure.initiateFailure(err);
 +                // If this ever returns, rethrow the error.  We're poisoned
 +                // now, so don't let this thread continue.
 +                throw err;
 +              }
 +              catch (Throwable e) {
 +                // Whenever you catch Error or Throwable, you must also
 +                // catch VirtualMachineError (see above).  However, there is
 +                // _still_ a possibility that you are dealing with a cascading
 +                // error condition, so you also need to check to see if the JVM
 +                // is still usable:
 +                SystemFailure.checkFailure();
 +                // In particular I want OutOfMem to be caught here
 +                if (!myDirectAck) {
 +                  String reason = LocalizedStrings.Connection_ERROR_DESERIALIZING_MESSAGE.toLocalizedString();
 +                  sendFailureReply(ReplyProcessor21.getMessageRPId(), reason, e, myDirectAck);
 +                }
 +                if(e instanceof CancelException) {
 +                  if (!(e instanceof CacheClosedException)) {
 +                    // Just log a message if we had trouble deserializing due to CacheClosedException; see bug 43543
 +                    throw (CancelException) e;
 +                  }
 +                }
 +                logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ERROR_DESERIALIZING_MESSAGE), e);
 +                //requestClose();
 +                //return;
 +              } finally {
 +                ReplyProcessor21.clearMessageRPId();
 +              }
 +            } else if (msgType == CHUNKED_MSG_TYPE) {
 +              MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
 +              this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len);
 +              try {
 +                md.addChunk(bytes);
 +              } catch (IOException ex) {
 +                logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_FAILED_HANDLING_CHUNK_MESSAGE), ex);
 +              }
 +            } else /* (msgType == END_CHUNKED_MSG_TYPE) */ {
 +              MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
 +              this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len);
 +              try {
 +                md.addChunk(bytes);
 +     

<TRUNCATED>