You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/20 00:59:39 UTC
[21/51] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index 74660da,0000000..988ca33
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@@ -1,4153 -1,0 +1,4154 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.tcp;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.ConnectException;
+import java.net.Inet6Address;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
+import com.gemstone.gemfire.distributed.internal.ConflationKey;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DMStats;
+import com.gemstone.gemfire.distributed.internal.DirectReplyProcessor;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.distributed.internal.ReplyException;
+import com.gemstone.gemfire.distributed.internal.ReplyMessage;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.distributed.internal.ReplySender;
+import com.gemstone.gemfire.distributed.internal.direct.DirectChannel;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
+import com.gemstone.gemfire.i18n.StringId;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.ByteArrayDataInput;
+import com.gemstone.gemfire.internal.DSFIDFactory;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.SocketUtils;
+import com.gemstone.gemfire.internal.SystemTimer;
+import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.tcp.MsgReader.Header;
+import com.gemstone.gemfire.internal.util.concurrent.ReentrantSemaphore;
+
+/** <p>Connection is a socket holder that sends and receives serialized
+ message objects. A Connection may be closed to preserve system
+ resources and will automatically be reopened when it's needed.</p>
+
+ @author Bruce Schuchardt
+ @since 2.0
+
+*/
+
+public class Connection implements Runnable {
+ private static final Logger logger = LogService.getLogger();
+
+ private static final int INITIAL_CAPACITY = Integer.getInteger("p2p.readerBufferSize", 32768).intValue();
+ private static int P2P_CONNECT_TIMEOUT;
+ private static boolean IS_P2P_CONNECT_TIMEOUT_INITIALIZED = false;
+
+ public final static int NORMAL_MSG_TYPE = 0x4c;
+ public final static int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg
+ public final static int END_CHUNKED_MSG_TYPE = 0x4e; // last in a series of chunks
+ public final static int DIRECT_ACK_BIT = 0x20;
+ //We no longer support early ack
+ //public final static int EARLY_ACK_BIT = 0x10;
+
+ public static final int MSG_HEADER_SIZE_OFFSET = 0;
+ public static final int MSG_HEADER_TYPE_OFFSET = 4;
+ public static final int MSG_HEADER_ID_OFFSET = 5;
+ public static final int MSG_HEADER_BYTES = 7;
+
+ /**
+ * Small buffer used for send socket buffer on receiver connections
+ * and receive buffer on sender connections.
+ */
+ public final static int SMALL_BUFFER_SIZE = Integer.getInteger("gemfire.SMALL_BUFFER_SIZE",4096).intValue();
+
+ /** counter to give connections a unique id */
+ private static AtomicLong idCounter = new AtomicLong(1);
+
+ /** string used as the reason for initiating suspect processing */
+ public static final String INITIATING_SUSPECT_PROCESSING = "member unexpectedly shut down shared, unordered connection";
+
+ /** the table holding this connection */
+ final ConnectionTable owner;
+
+ /** Set to false once run() is terminating. Using this instead of Thread.isAlive
+ * as the reader thread may be a pooled thread.
+ */
+ private volatile boolean isRunning = false;
+
+ /** true if connection is a shared resource that can be used by more than one thread */
+ private boolean sharedResource;
+
+ public final boolean isSharedResource() {
+ return this.sharedResource;
+ }
+
+ /** The idle timeout timer task for this connection */
+ private SystemTimerTask idleTask;
+
+ /**
+ * Returns the depth of unshared reader threads from this thread to
+ * the original non-reader-thread.
+ * E.g., ServerConnection -> reader(domino=1) -> reader(domino=2) -> reader(domino=3)
+ */
+ public static int getDominoCount() {
+ return dominoCount.get().intValue();
+ }
+
+ private final static ThreadLocal isReaderThread = new ThreadLocal();
+ public final static void makeReaderThread() {
+ // mark this thread as a reader thread
+ makeReaderThread(true);
+ }
+ private final static void makeReaderThread(boolean v) {
+ isReaderThread.set(v);
+ }
+ // return true if this thread is a reader thread
+ public final static boolean isReaderThread() {
+ Object o = isReaderThread.get();
+ if (o == null) {
+ return false;
+ } else {
+ return ((Boolean)o).booleanValue();
+ }
+ }
+
+ private int getP2PConnectTimeout() {
+ if(IS_P2P_CONNECT_TIMEOUT_INITIALIZED)
+ return P2P_CONNECT_TIMEOUT;
+ String connectTimeoutStr = System.getProperty("p2p.connectTimeout");
+ if(connectTimeoutStr != null) {
+ P2P_CONNECT_TIMEOUT = Integer.parseInt(connectTimeoutStr);
+ }
+ else {
+ P2P_CONNECT_TIMEOUT = 6*this.owner.owner.getDM().getConfig().getMemberTimeout();
+ }
+ IS_P2P_CONNECT_TIMEOUT_INITIALIZED = true;
+ return P2P_CONNECT_TIMEOUT;
+ }
+ /**
+ * If true then readers for thread owned sockets will send all messages on thread owned senders.
+ * Even normally unordered msgs get send on TO socks.
+ */
+ private static final boolean DOMINO_THREAD_OWNED_SOCKETS = Boolean.getBoolean("p2p.ENABLE_DOMINO_THREAD_OWNED_SOCKETS");
+ private final static ThreadLocal isDominoThread = new ThreadLocal();
+ // return true if this thread is a reader thread
+ public final static boolean tipDomino() {
+ if (DOMINO_THREAD_OWNED_SOCKETS) {
+ // mark this thread as one who wants to send ALL on TO sockets
+ ConnectionTable.threadWantsOwnResources();
+ isDominoThread.set(Boolean.TRUE);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ public final static boolean isDominoThread() {
+ Object o = isDominoThread.get();
+ if (o == null) {
+ return false;
+ } else {
+ return ((Boolean)o).booleanValue();
+ }
+ }
+
+ /** the socket entrusted to this connection */
+ private final Socket socket;
+
+ /** the non-NIO output stream */
+ OutputStream output;
+
+ /** output stream/channel lock */
+ private final Object outLock = new Object();
+
+ /** the ID string of the conduit (for logging) */
+ String conduitIdStr;
+
+ /** Identifies the java group member on the other side of the connection. */
+ InternalDistributedMember remoteAddr;
+
+ /**
+ * Identifies the version of the member on the other side of the connection.
+ */
+ Version remoteVersion;
+
+ /**
+ * True if this connection was accepted by a listening socket.
+ * This makes it a receiver.
+ * False if this connection was explicitly created by a connect call.
+ * This makes it a sender.
+ */
+ private final boolean isReceiver;
+
+ /**
+ * count of how many unshared p2p-readers removed from the original action this
+ * thread is. For instance, server-connection -> owned p2p reader (count 0)
+ * -> owned p2p reader (count 1) -> owned p2p reader (count 2). This shows
+ * up in thread names as "DOM #x" (domino #x)
+ */
+ private static ThreadLocal<Integer> dominoCount = new ThreadLocal<Integer>() {
+ @Override
+ protected Integer initialValue() {
+ return 0;
+ }};
+
+// /**
+// * name of sender thread thread. Useful in finding out why a reader
+// * thread was created. Add sending of the name in handshakes and
+// * add it to the name of the reader thread (the code is there but commented out)
+// */
+// private String senderName = null;
+
+ // If we are a sender then we want to know if the receiver on the
+ // other end is willing to have its messages queued. The following
+ // four "async" inst vars come from his handshake response.
+ /**
+ * How long to wait if receiver will not accept a message before we
+ * go into queue mode.
+ * @since 4.2.2
+ */
+ private int asyncDistributionTimeout = 0;
+ /**
+ * How long to wait,
+ * with the receiver not accepting any messages,
+ * before kicking the receiver out of the distributed system.
+ * Ignored if asyncDistributionTimeout is zero.
+ * @since 4.2.2
+ */
+ private int asyncQueueTimeout = 0;
+ /**
+ * How much queued data we can have,
+ * with the receiver not accepting any messages,
+ * before kicking the receiver out of the distributed system.
+ * Ignored if asyncDistributionTimeout is zero.
+ * Canonicalized to bytes (property file has it as megabytes
+ * @since 4.2.2
+ */
+ private long asyncMaxQueueSize = 0;
+ /**
+ * True if an async queue is already being filled.
+ */
+ private volatile boolean asyncQueuingInProgress = false;
+
+ /**
+ * Maps ConflatedKey instances to ConflatedKey instance.
+ * Note that even though the key and value for an entry is the map
+ * will always be "equal" they will not always be "==".
+ */
+ private final Map conflatedKeys = new HashMap();
+
+ //private final Queue outgoingQueue = new LinkedBlockingQueue();
+
+
+ // NOTE: LinkedBlockingQueue has a bug in which removes from the queue
+ // cause future offer to increase the size without adding anything to the queue.
+ // So I've changed from this backport class to a java.util.LinkedList
+ private final LinkedList outgoingQueue = new LinkedList();
+
+ /**
+ * Number of bytes in the outgoingQueue.
+ * Used to control capacity.
+ */
+ private long queuedBytes = 0;
+
+ /** used for async writes */
+ Thread pusherThread;
+
+ /**
+ * The maximum number of concurrent senders sending a message to a single recipient.
+ */
+ private final static int MAX_SENDERS = Integer.getInteger("p2p.maxConnectionSenders", DirectChannel.DEFAULT_CONCURRENCY_LEVEL).intValue();
+ /**
+ * This semaphore is used to throttle how many threads will try to do sends on
+ * this connection concurrently. A thread must acquire this semaphore before it
+ * is allowed to start serializing its message.
+ */
+ private final Semaphore senderSem = new ReentrantSemaphore(MAX_SENDERS);
+
+ /** Set to true once the handshake has been read */
+ volatile boolean handshakeRead = false;
+ volatile boolean handshakeCancelled = false;
+
+ private volatile int replyCode = 0;
+
+ private static final byte REPLY_CODE_OK = (byte)69;
+ private static final byte REPLY_CODE_OK_WITH_ASYNC_INFO = (byte)70;
+
+ private final Object handshakeSync = new Object();
+
+ /** message reader thread */
+ private volatile Thread readerThread;
+
+// /**
+// * When a thread owns the outLock and is writing to the socket, it must
+// * be placed in this variable so that it can be interrupted should the
+// * socket need to be closed.
+// */
+// private volatile Thread writerThread;
+
+ /** whether the reader thread is, or should be, running */
+ volatile boolean stopped = true;
+
+ /** set to true once a close begins */
+ private final AtomicBoolean closing = new AtomicBoolean(false);
+
+ volatile boolean readerShuttingDown = false;
+
+ /** whether the socket is connected */
+ volatile boolean connected = false;
+
+ /**
+ * Set to true once a connection finishes its constructor
+ */
+ volatile boolean finishedConnecting = false;
+
+ volatile boolean accessed = true;
+ volatile boolean socketInUse = false;
+ volatile boolean timedOut = false;
+
+ /**
+ * task for detecting ack timeouts and issuing alerts
+ */
+ private SystemTimer.SystemTimerTask ackTimeoutTask;
+
+ // State for ackTimeoutTask: transmissionStartTime, ackWaitTimeout, ackSATimeout, ackConnectionGroup, ackThreadName
+
+ /** millisecond clock at the time message transmission started, if doing
+ * forced-disconnect processing */
+ long transmissionStartTime;
+
+ /** ack wait timeout - if socketInUse, use this to trigger SUSPECT processing */
+ private long ackWaitTimeout;
+
+ /** ack severe alert timeout - if socketInUse, use this to send alert */
+ private long ackSATimeout;
+
+ /**
+ * other connections participating in the current transmission. we notify
+ * them if ackSATimeout expires to keep all members from generating alerts
+ * when only one is slow
+ */
+ List ackConnectionGroup;
+
+ /** name of thread that we're currently performing an operation in (may be null) */
+ String ackThreadName;
+
+
+ /** the buffer used for NIO message receipt */
+ ByteBuffer nioInputBuffer;
+
+ /** the position of the next message's content */
+// int nioMessageStart;
+
+ /** the length of the next message to be dispatched */
+ int nioMessageLength;
+// byte nioMessageVersion;
+
+ /** the type of message being received */
+ byte nioMessageType;
+
+ /** used to lock access to destreamer data */
+ private final Object destreamerLock = new Object();
+ /** caches a msg destreamer that is currently not being used */
+ MsgDestreamer idleMsgDestreamer;
+ /** used to map a msgId to a MsgDestreamer which are
+ * used for destreaming chunked messages using nio */
+ HashMap destreamerMap;
+
+ boolean directAck;
+
+ short nioMsgId;
+
+ /** whether the length of the next message has been established */
+ boolean nioLengthSet = false;
+
+ /** is this connection used for serial message delivery? */
+ boolean preserveOrder = false;
+
+ /** number of messages sent on this connection */
+ private long messagesSent;
+
+ /** number of messages received on this connection */
+ private long messagesReceived;
+
+ /** unique ID of this connection (remote if isReceiver==true) */
+ private volatile long uniqueId;
+
+ private int sendBufferSize = -1;
+ private int recvBufferSize = -1;
+
+ private ReplySender replySender;
+
+ private void setSendBufferSize(Socket sock) {
+ setSendBufferSize(sock, this.owner.getConduit().tcpBufferSize);
+ }
+ private void setReceiveBufferSize(Socket sock) {
+ setReceiveBufferSize(sock, this.owner.getConduit().tcpBufferSize);
+ }
+ private void setSendBufferSize(Socket sock, int requestedSize) {
+ setSocketBufferSize(sock, true, requestedSize);
+ }
+ private void setReceiveBufferSize(Socket sock, int requestedSize) {
+ setSocketBufferSize(sock, false, requestedSize);
+ }
+ public int getReceiveBufferSize() {
+ return recvBufferSize;
+ }
+ private void setSocketBufferSize(Socket sock, boolean send, int requestedSize) {
+ setSocketBufferSize(sock, send, requestedSize, false);
+ }
+ private void setSocketBufferSize(Socket sock, boolean send, int requestedSize, boolean alreadySetInSocket) {
+ if (requestedSize > 0) {
+ try {
+ int currentSize = send
+ ? sock.getSendBufferSize()
+ : sock.getReceiveBufferSize();
+ if (currentSize == requestedSize) {
+ if (send) {
+ this.sendBufferSize = currentSize;
+ }
+ return;
+ }
+ if (!alreadySetInSocket) {
+ if (send) {
+ sock.setSendBufferSize(requestedSize);
+ } else {
+ sock.setReceiveBufferSize(requestedSize);
+ }
+ } else {
+ }
+ } catch (SocketException ignore) {
+ }
+ try {
+ int actualSize = send
+ ? sock.getSendBufferSize()
+ : sock.getReceiveBufferSize();
+ if (send) {
+ this.sendBufferSize = actualSize;
+ } else {
+ this.recvBufferSize = actualSize;
+ }
+ if (actualSize < requestedSize) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.Connection_SOCKET_0_IS_1_INSTEAD_OF_THE_REQUESTED_2,
+ new Object[] {(send ? "send buffer size" : "receive buffer size"), Integer.valueOf(actualSize), Integer.valueOf(requestedSize)}));
+ } else if (actualSize > requestedSize) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Socket {} buffer size is {} instead of the requested {}",
+ (send ? "send" : "receive"), actualSize, requestedSize);
+ }
+ // Remember the request size which is smaller.
+ // This remembered value is used for allocating direct mem buffers.
+ if (send) {
+ this.sendBufferSize = requestedSize;
+ } else {
+ this.recvBufferSize = requestedSize;
+ }
+ }
+ } catch (SocketException ignore) {
+ if (send) {
+ this.sendBufferSize = requestedSize;
+ } else {
+ this.recvBufferSize = requestedSize;
+ }
+ }
+ }
+ }
+ /**
+ * Returns the size of the send buffer on this connection's socket.
+ */
+ public int getSendBufferSize() {
+ int result = this.sendBufferSize;
+ if (result != -1) {
+ return result;
+ }
+ try {
+ result = getSocket().getSendBufferSize();
+ } catch (SocketException ignore) {
+ // just return a default
+ result = this.owner.getConduit().tcpBufferSize;
+ }
+ this.sendBufferSize = result;
+ return result;
+ }
+
+ /** creates a connection that we accepted (it was initiated by
+ * an explicit connect being done on the other side).
+ * We will only receive data on this socket; never send.
+ */
+ protected static Connection createReceiver(ConnectionTable t, Socket s)
+ throws IOException, ConnectionException
+ {
+ Connection c = new Connection(t, s);
+ boolean readerStarted = false;
+ try {
+ c.startReader(t);
+ readerStarted = true;
+ } finally {
+ if (!readerStarted) {
+ c.closeForReconnect(LocalizedStrings.Connection_COULD_NOT_START_READER_THREAD.toLocalizedString());
+ }
+ }
+ c.waitForHandshake();
+ //sendHandshakeReplyOK();
+ c.finishedConnecting = true;
+ return c;
+ }
+
+ /** creates a connection that we accepted (it was initiated by
+ * an explicit connect being done on the other side).
+ */
+ protected Connection(ConnectionTable t, Socket s)
+ throws IOException, ConnectionException
+ {
+ if (t == null) {
+ throw new IllegalArgumentException(LocalizedStrings.Connection_NULL_CONNECTIONTABLE.toLocalizedString());
+ }
+ this.isReceiver = true;
+ this.owner = t;
+ this.socket = s;
+ this.conduitIdStr = owner.getConduit().getId().toString();
+ this.handshakeRead = false;
+ this.handshakeCancelled = false;
+ this.connected = true;
+
+ try {
+ s.setTcpNoDelay(true);
+ s.setKeepAlive(true);
+// s.setSoLinger(true, (Integer.valueOf(System.getProperty("p2p.lingerTime", "5000"))).intValue());
+ setSendBufferSize(s, SMALL_BUFFER_SIZE);
+ setReceiveBufferSize(s);
+ }
+ catch (SocketException e) {
+ // unable to get the settings we want. Don't log an error because it will
+ // likely happen a lot
+ }
+ if (!useNIO()) {
+ try {
+ //this.output = new BufferedOutputStream(s.getOutputStream(), SMALL_BUFFER_SIZE);
+ this.output = s.getOutputStream();
+ }
+ catch (IOException io) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS), io);
+ t.getSocketCloser().asyncClose(s, this.remoteAddr.toString(), null);
+ throw io;
+ }
+ }
+ }
+
+ void setIdleTimeoutTask(SystemTimerTask task) {
+ this.idleTask = task;
+ }
+
+
+ /**
+ * Returns true if an idle connection was detected.
+ */
+ public boolean checkForIdleTimeout() {
+ if (isSocketClosed()) {
+ return true;
+ }
+ if (isSocketInUse()) {
+ return false;
+ }
+ boolean isIdle = !this.accessed;
+ this.accessed = false;
+ if (isIdle) {
+ this.timedOut = true;
+ this.owner.getConduit().stats.incLostLease();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Closing idle connection {} shared={} ordered={}", this, this.sharedResource, this.preserveOrder);
+ }
+ try {
+ // Instead of calling requestClose
+ // we call closeForReconnect.
+ // We don't want this timeout close to close
+ // any other connections. The problem with
+ // requestClose has removeEndpoint set to true
+ // which will close an receivers we have if this
+ // connection is a shared one.
+ closeForReconnect(LocalizedStrings.Connection_IDLE_CONNECTION_TIMED_OUT.toLocalizedString());
+ } catch (Exception ignore) {}
+ }
+ return isIdle;
+ }
+
+ static private byte[] okHandshakeBytes;
+ static private ByteBuffer okHandshakeBuf;
+ static {
+ int msglen = 1; // one byte for reply code
+ byte[] bytes = new byte[MSG_HEADER_BYTES + msglen];
+ msglen = calcHdrSize(msglen);
+ bytes[MSG_HEADER_SIZE_OFFSET] = (byte)((msglen/0x1000000) & 0xff);
+ bytes[MSG_HEADER_SIZE_OFFSET+1] = (byte)((msglen/0x10000) & 0xff);
+ bytes[MSG_HEADER_SIZE_OFFSET+2] = (byte)((msglen/0x100) & 0xff);
+ bytes[MSG_HEADER_SIZE_OFFSET+3] = (byte)(msglen & 0xff);
+ bytes[MSG_HEADER_TYPE_OFFSET] = (byte)NORMAL_MSG_TYPE; // message type
+ bytes[MSG_HEADER_ID_OFFSET] = (byte)((MsgIdGenerator.NO_MSG_ID/0x100) & 0xff);
+ bytes[MSG_HEADER_ID_OFFSET+1] = (byte)(MsgIdGenerator.NO_MSG_ID & 0xff);
+ bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
+ int allocSize = bytes.length;
+ ByteBuffer bb;
+ if (TCPConduit.useDirectBuffers) {
+ bb = ByteBuffer.allocateDirect(allocSize);
+ } else {
+ bb = ByteBuffer.allocate(allocSize);
+ }
+ bb.put(bytes);
+ okHandshakeBuf = bb;
+ okHandshakeBytes = bytes;
+ }
+
+ /**
+ * maximum message buffer size
+ */
+ public static final int MAX_MSG_SIZE = 0x00ffffff;
+ public static int calcHdrSize(int byteSize) {
+ if (byteSize > MAX_MSG_SIZE) {
+ throw new IllegalStateException(LocalizedStrings.Connection_TCP_MESSAGE_EXCEEDED_MAX_SIZE_OF_0.toLocalizedString(Integer.valueOf(MAX_MSG_SIZE)));
+ }
+ int hdrSize = byteSize;
+ hdrSize |= (HANDSHAKE_VERSION << 24);
+ return hdrSize;
+ }
+ public static int calcMsgByteSize(int hdrSize) {
+ return hdrSize & MAX_MSG_SIZE;
+ }
+ public static byte calcHdrVersion(int hdrSize) throws IOException {
+ byte ver = (byte)(hdrSize >> 24);
+ if (ver != HANDSHAKE_VERSION) {
+ throw new IOException(LocalizedStrings.Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1.toLocalizedString(new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(ver)}));
+ }
+ return ver;
+ }
+ private void sendOKHandshakeReply() throws IOException, ConnectionException {
+ byte[] my_okHandshakeBytes = null;
+ ByteBuffer my_okHandshakeBuf = null;
+ if (this.isReceiver) {
+ DistributionConfig cfg = owner.getConduit().config;
+ ByteBuffer bb;
+ if (useNIO() && TCPConduit.useDirectBuffers) {
+ bb = ByteBuffer.allocateDirect(128);
+ } else {
+ bb = ByteBuffer.allocate(128);
+ }
+ bb.putInt(0); // reserve first 4 bytes for packet length
+ bb.put((byte)NORMAL_MSG_TYPE);
+ bb.putShort(MsgIdGenerator.NO_MSG_ID);
+ bb.put(REPLY_CODE_OK_WITH_ASYNC_INFO);
+ bb.putInt(cfg.getAsyncDistributionTimeout());
+ bb.putInt(cfg.getAsyncQueueTimeout());
+ bb.putInt(cfg.getAsyncMaxQueueSize());
+ // write own product version
+ Version.writeOrdinal(bb, Version.CURRENT.ordinal(), true);
+ // now set the msg length into position 0
+ bb.putInt(0, calcHdrSize(bb.position()-MSG_HEADER_BYTES));
+ if (useNIO()) {
+ my_okHandshakeBuf = bb;
+ bb.flip();
+ } else {
+ my_okHandshakeBytes = new byte[bb.position()];
+ bb.flip();
+ bb.get(my_okHandshakeBytes);
+ }
+ } else {
+ my_okHandshakeBuf = okHandshakeBuf;
+ my_okHandshakeBytes = okHandshakeBytes;
+ }
+ if (useNIO()) {
+ synchronized (my_okHandshakeBuf) {
+ my_okHandshakeBuf.position(0);
+ nioWriteFully(getSocket().getChannel(), my_okHandshakeBuf, false, null);
+ }
+ } else {
+ synchronized(outLock) {
+ try {
+// this.writerThread = Thread.currentThread();
+ this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length);
+ this.output.flush();
+ }
+ finally {
+// this.writerThread = null;
+ }
+ }
+ }
+ }
+
+ private static final int HANDSHAKE_TIMEOUT_MS = Integer.getInteger("p2p.handshakeTimeoutMs", 59000).intValue();
+ //private static final byte HANDSHAKE_VERSION = 1; // 501
+ //public static final byte HANDSHAKE_VERSION = 2; // cbb5x_PerfScale
+ //public static final byte HANDSHAKE_VERSION = 3; // durable_client
+ //public static final byte HANDSHAKE_VERSION = 4; // dataSerialMay19
+// public static final byte HANDSHAKE_VERSION = 5; // split-brain bits
+ //public static final byte HANDSHAKE_VERSION = 6; // direct ack changes
+ // NOTICE: handshake_version should not be changed anymore. Use the gemfire
+ // version transmitted with the handshake bits and handle old handshakes
+ // based on that
+ public static final byte HANDSHAKE_VERSION = 7; // product version exchange during handshake
+
+ /**
+ * @throws ConnectionException if the conduit has stopped
+ */
+ private void waitForHandshake() throws ConnectionException {
+ boolean needToClose = false;
+ String reason = null;
+ try {
+ synchronized (this.handshakeSync) {
+ if (!this.handshakeRead && !this.handshakeCancelled) {
+ boolean success = false;
+ reason = LocalizedStrings.Connection_UNKNOWN.toLocalizedString();
+ boolean interrupted = Thread.interrupted();
+ try {
+ final long endTime = System.currentTimeMillis() + HANDSHAKE_TIMEOUT_MS;
+ long msToWait = HANDSHAKE_TIMEOUT_MS;
+ while (!this.handshakeRead && !this.handshakeCancelled && msToWait > 0) {
+ this.handshakeSync.wait(msToWait); // spurious wakeup ok
+ if (!this.handshakeRead && !this.handshakeCancelled) {
+ msToWait = endTime - System.currentTimeMillis();
+ }
+ }
+ if (!this.handshakeRead && !this.handshakeCancelled) {
+ reason = LocalizedStrings.Connection_HANDSHAKE_TIMED_OUT.toLocalizedString();
+ String peerName;
+ if (this.remoteAddr != null) {
+ peerName = this.remoteAddr.toString();
+ // late in the life of jdk 1.7 we started seeing connections accepted
+ // when accept() was not even being called. This started causing timeouts
+ // to occur in the handshake threads instead of causing failures in
+ // connection-formation. So, we need to initiate suspect processing here
+ owner.getDM().getMembershipManager().suspectMember(this.remoteAddr,
+ LocalizedStrings.Connection_CONNECTION_HANDSHAKE_WITH_0_TIMED_OUT_AFTER_WAITING_1_MILLISECONDS.toLocalizedString(
+ new Object[] {peerName, Integer.valueOf(HANDSHAKE_TIMEOUT_MS)}));
+ }
+ else {
+ peerName = "socket " + this.socket.getRemoteSocketAddress().toString()
+ + ":" + this.socket.getPort();
+ }
+ throw new ConnectionException(LocalizedStrings.Connection_CONNECTION_HANDSHAKE_WITH_0_TIMED_OUT_AFTER_WAITING_1_MILLISECONDS.toLocalizedString(
+ new Object[] {peerName, Integer.valueOf(HANDSHAKE_TIMEOUT_MS)}));
+ } else {
+ success = this.handshakeRead;
+ }
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+ reason = LocalizedStrings.Connection_INTERRUPTED.toLocalizedString();
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ if (success) {
+ if (this.isReceiver) {
+ needToClose = !owner.getConduit().getMembershipManager().addSurpriseMember(this.remoteAddr);
+ if (needToClose) {
+ reason = "this member is shunned";
+ }
+ }
+ }
+ else {
+ needToClose = true; // for bug 42159
+ }
+ }
+ } // !handshakeRead
+ } // synchronized
+
+ } finally {
+ if (needToClose) {
+ // moved this call outside of the sync for bug 42159
+ try {
+ requestClose(reason); // fix for bug 31546
+ }
+ catch (Exception ignore) {
+ }
+ }
+ }
+ }
+
+ private void notifyHandshakeWaiter(boolean success) {
+ synchronized (this.handshakeSync) {
+ if (success) {
+ this.handshakeRead = true;
+ } else {
+ this.handshakeCancelled = true;
+ }
+ this.handshakeSync.notify();
+ }
+ }
+
+ private final AtomicBoolean asyncCloseCalled = new AtomicBoolean();
+
+ /**
+ * asynchronously close this connection
+ *
+ * @param beingSick
+ */
+ private void asyncClose(boolean beingSick) {
+ // note: remoteAddr may be null if this is a receiver that hasn't finished its handshake
+
+ // we do the close in a background thread because the operation may hang if
+ // there is a problem with the network. See bug #46659
+
+ // if simulating sickness, sockets must be closed in-line so that tests know
+ // that the vm is sick when the beSick operation completes
+ if (beingSick) {
+ prepareForAsyncClose();
+ }
+ else {
+ if (this.asyncCloseCalled.compareAndSet(false, true)) {
+ Socket s = this.socket;
+ if (s != null && !s.isClosed()) {
+ prepareForAsyncClose();
+ this.owner.getSocketCloser().asyncClose(s, String.valueOf(this.remoteAddr), null);
+ }
+ }
+ }
+ }
+
+ private void prepareForAsyncClose() {
+ synchronized(stateLock) {
+ if (readerThread != null && isRunning && !readerShuttingDown
+ && (connectionState == STATE_READING || connectionState == STATE_READING_ACK)) {
+ readerThread.interrupt();
+ }
+ }
+ }
+
+ private static final int CONNECT_HANDSHAKE_SIZE = 4096;
+
+ /**
+ * waits until we've joined the distributed system
+ * before returning
+ */
+ private void waitForAddressCompletion() {
+ InternalDistributedMember myAddr = this.owner.getConduit().getLocalAddress();
+ synchronized (myAddr) {
+ while ((owner.getConduit().getCancelCriterion().cancelInProgress() == null)
+ && myAddr.getInetAddress() == null && myAddr.getVmViewId() < 0) {
+ try {
+ myAddr.wait(100); // spurious wakeup ok
+ }
+ catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
+ }
+ }
+ Assert.assertTrue(myAddr.getDirectChannelPort() == this.owner.getConduit().getPort());
+ }
+ }
+
+ private void handshakeNio() throws IOException {
+ waitForAddressCompletion();
+
+ InternalDistributedMember myAddr = this.owner.getConduit().getLocalAddress();
+ final MsgOutputStream connectHandshake
+ = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
+ //connectHandshake.reset();
+ /**
+ * Note a byte of zero is always written because old products
+ * serialized a member id with always sends an ip address.
+ * My reading of the ip-address specs indicated that the first byte
+ * of a valid address would never be 0.
+ */
+ connectHandshake.writeByte(0);
+ connectHandshake.writeByte(HANDSHAKE_VERSION);
+ // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
+ InternalDataSerializer.invokeToData(myAddr, connectHandshake);
+ connectHandshake.writeBoolean(this.sharedResource);
+ connectHandshake.writeBoolean(this.preserveOrder);
+ connectHandshake.writeLong(this.uniqueId);
+ // write the product version ordinal
+ Version.CURRENT.writeOrdinal(connectHandshake, true);
+ connectHandshake.writeInt(dominoCount.get()+1);
+// this writes the sending member + thread name that is stored in senderName
+// on the receiver to show the cause of reader thread creation
+// if (dominoCount.get() > 0) {
+// connectHandshake.writeUTF(Thread.currentThread().getName());
+// } else {
+// String name = owner.getDM().getConfig().getName();
+// if (name == null) {
+// name = "pid="+OSProcess.getId();
+// }
+// connectHandshake.writeUTF("["+name+"] "+Thread.currentThread().getName());
+// }
+ connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, DistributionManager.STANDARD_EXECUTOR, MsgIdGenerator.NO_MSG_ID);
+ nioWriteFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
+ }
+
+ private void handshakeStream() throws IOException {
+ waitForAddressCompletion();
+
+ //this.output = new BufferedOutputStream(getSocket().getOutputStream(), owner.getConduit().bufferSize);
+ this.output = getSocket().getOutputStream();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(CONNECT_HANDSHAKE_SIZE);
+ DataOutputStream os = new DataOutputStream(baos);
+ InternalDistributedMember myAddr = owner.getConduit().getLocalAddress();
+ os.writeByte(0);
+ os.writeByte(HANDSHAKE_VERSION);
+ // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
+ InternalDataSerializer.invokeToData(myAddr, os);
+ os.writeBoolean(this.sharedResource);
+ os.writeBoolean(this.preserveOrder);
+ os.writeLong(this.uniqueId);
+ Version.CURRENT.writeOrdinal(os, true);
+ os.writeInt(dominoCount.get()+1);
+ // this writes the sending member + thread name that is stored in senderName
+ // on the receiver to show the cause of reader thread creation
+// if (dominoCount.get() > 0) {
+// os.writeUTF(Thread.currentThread().getName());
+// } else {
+// String name = owner.getDM().getConfig().getName();
+// if (name == null) {
+// name = "pid="+OSProcess.getId();
+// }
+// os.writeUTF("["+name+"] "+Thread.currentThread().getName());
+// }
+ os.flush();
+
+ byte[] msg = baos.toByteArray();
+ int len = calcHdrSize(msg.length);
+ byte[] lenbytes = new byte[MSG_HEADER_BYTES];
+ lenbytes[MSG_HEADER_SIZE_OFFSET] = (byte)((len/0x1000000) & 0xff);
+ lenbytes[MSG_HEADER_SIZE_OFFSET+1] = (byte)((len/0x10000) & 0xff);
+ lenbytes[MSG_HEADER_SIZE_OFFSET+2] = (byte)((len/0x100) & 0xff);
+ lenbytes[MSG_HEADER_SIZE_OFFSET+3] = (byte)(len & 0xff);
+ lenbytes[MSG_HEADER_TYPE_OFFSET] = (byte)NORMAL_MSG_TYPE;
+ lenbytes[MSG_HEADER_ID_OFFSET] = (byte)((MsgIdGenerator.NO_MSG_ID/0x100) & 0xff);
+ lenbytes[MSG_HEADER_ID_OFFSET+1] = (byte)(MsgIdGenerator.NO_MSG_ID & 0xff);
+ synchronized(outLock) {
+ try {
+// this.writerThread = Thread.currentThread();
+ this.output.write(lenbytes, 0, lenbytes.length);
+ this.output.write(msg, 0, msg.length);
+ this.output.flush();
+ }
+ finally {
+// this.writerThread = null;
+ }
+ }
+ }
+
+ /**
+ *
+ * @throws IOException if handshake fails
+ */
+ private void attemptHandshake(ConnectionTable connTable) throws IOException {
+ // send HANDSHAKE
+ // send this server's port. It's expected on the other side
+ if (useNIO()) {
+ handshakeNio();
+ }
+ else {
+ handshakeStream();
+ }
+
+ startReader(connTable); // this reader only reads the handshake and then exits
+ waitForHandshake(); // waiting for reply
+ }
+
+ /** time between connection attempts*/
+ private static final int RECONNECT_WAIT_TIME
+ = Integer.getInteger("gemfire.RECONNECT_WAIT_TIME", 2000).intValue();
+
+ /** creates a new connection to a remote server.
+ * We are initiating this connection; the other side must accept us
+ * We will almost always send messages; small acks are received.
+ */
+ protected static Connection createSender(final MembershipManager mgr,
+ final ConnectionTable t,
+ final boolean preserveOrder,
+ final DistributedMember remoteAddr,
+ final boolean sharedResource,
+ final long startTime,
+ final long ackTimeout,
+ final long ackSATimeout)
+ throws IOException, DistributedSystemDisconnectedException
+ {
+ boolean warningPrinted = false;
+ boolean success = false;
+ boolean firstTime = true;
+ Connection conn = null;
+ // keep trying. Note that this may be executing during the shutdown window
+ // where a cancel criterion has not been established, but threads are being
+ // interrupted. In this case we must allow the connection to succeed even
+ // though subsequent messaging using the socket may fail
+ boolean interrupted = Thread.interrupted();
+ boolean severeAlertIssued = false;
+ boolean suspected = false;
+ long reconnectWaitTime = RECONNECT_WAIT_TIME;
+ boolean connectionErrorLogged = false;
+ try {
+ while (!success) { // keep trying
+ // Quit if DM has stopped distribution
+ t.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ long now = System.currentTimeMillis();
+ if (!severeAlertIssued && ackSATimeout > 0 && startTime + ackTimeout < now) {
+ if (startTime + ackTimeout + ackSATimeout < now) {
+ if (remoteAddr != null) {
+ logger.fatal(LocalizedMessage.create(
+ LocalizedStrings.Connection_UNABLE_TO_FORM_A_TCPIP_CONNECTION_TO_0_IN_OVER_1_SECONDS, new Object[] { remoteAddr,
+ (ackSATimeout + ackTimeout) / 1000 }));
+ }
+ severeAlertIssued = true;
+ }
+ else if (!suspected) {
+ if (remoteAddr != null) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.Connection_UNABLE_TO_FORM_A_TCPIP_CONNECTION_TO_0_IN_OVER_1_SECONDS,
+ new Object[] { remoteAddr, (ackTimeout)/1000 }));
+ }
+ mgr.suspectMember(remoteAddr, LocalizedStrings.Connection_UNABLE_TO_FORM_A_TCPIP_CONNECTION_IN_A_REASONABLE_AMOUNT_OF_TIME.toLocalizedString());
+ suspected = true;
+ }
+ reconnectWaitTime = Math.min(RECONNECT_WAIT_TIME,
+ ackSATimeout - (now - startTime - ackTimeout));
+ if (reconnectWaitTime <= 0) {
+ reconnectWaitTime = RECONNECT_WAIT_TIME;
+ }
+ } else if (!suspected && (startTime > 0) && (ackTimeout > 0)
+ && (startTime + ackTimeout < now)) {
+ mgr.suspectMember(remoteAddr, LocalizedStrings.Connection_UNABLE_TO_FORM_A_TCPIP_CONNECTION_IN_A_REASONABLE_AMOUNT_OF_TIME.toLocalizedString());
+ suspected = true;
+ }
+ if (firstTime) {
+ firstTime = false;
+ if (!mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress()) {
+ throw new IOException("Member " + remoteAddr + " left the system");
+ }
+ }
+ else {
+ // if we're sending an alert and can't connect, bail out. A sick
+ // alert listener should not prevent cache operations from continuing
+ if (AlertAppender.isThreadAlerting()) {
+ // do not change the text of this exception - it is looked for in exception handlers
+ throw new IOException("Cannot form connection to alert listener " + remoteAddr);
+ }
+
+ // Wait briefly...
+ interrupted = Thread.interrupted() || interrupted;
+ try {
+ Thread.sleep(reconnectWaitTime);
+ }
+ catch (InterruptedException ie) {
+ interrupted = true;
+ t.getConduit().getCancelCriterion().checkCancelInProgress(ie);
+ }
+ t.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ if (giveUpOnMember(mgr, remoteAddr)) {
+ throw new IOException(LocalizedStrings.Connection_MEMBER_LEFT_THE_GROUP.toLocalizedString(remoteAddr));
+ }
+ if (!warningPrinted) {
+ warningPrinted = true;
+ logger.warn(LocalizedMessage.create(LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0, remoteAddr));
+ }
+ t.getConduit().stats.incReconnectAttempts();
+ }
+ //create connection
+ try {
+ conn = null;
+ conn = new Connection(mgr, t, preserveOrder, remoteAddr, sharedResource);
+ }
+ catch (javax.net.ssl.SSLHandshakeException se) {
+ // no need to retry if certificates were rejected
+ throw se;
+ }
+ catch (IOException ioe) {
+ // Only give up if the member leaves the view.
+ if (giveUpOnMember(mgr, remoteAddr)) {
+ throw ioe;
+ }
+ t.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ if ("Too many open files".equals(ioe.getMessage())) {
+ t.fileDescriptorsExhausted();
+ }
+ else if (!connectionErrorLogged) {
+ connectionErrorLogged = true; // otherwise change to use 100ms intervals causes a lot of these
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.Connection_CONNECTION_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1,
+ new Object[] {sharedResource, preserveOrder, remoteAddr, ioe}));
+ }
+ } // IOException
+ finally {
+ if (conn == null) {
+ t.getConduit().stats.incFailedConnect();
+ }
+ }
+ if (conn != null) {
+ // handshake
+ try {
+ conn.attemptHandshake(t);
+ if (conn.isSocketClosed()) {
+ // something went wrong while reading the handshake
+ // and the socket was closed or this guy sent us a
+ // ShutdownMessage
+ if (giveUpOnMember(mgr, remoteAddr)) {
+ throw new IOException(LocalizedStrings.Connection_MEMBER_LEFT_THE_GROUP.toLocalizedString(remoteAddr));
+ }
+ t.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ // no success but no need to log; just retry
+ }
+ else {
+ success = true;
+ }
+ }
+ catch (DistributedSystemDisconnectedException e) {
+ throw e;
+ }
+ catch (ConnectionException e) {
+ if (giveUpOnMember(mgr, remoteAddr)) {
+ IOException ioe = new IOException(LocalizedStrings.Connection_HANDSHAKE_FAILED.toLocalizedString());
+ ioe.initCause(e);
+ throw ioe;
+ }
+ t.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1,
+ new Object[] {sharedResource, preserveOrder, remoteAddr ,e}));
+ }
+ catch (IOException e) {
+ if (giveUpOnMember(mgr, remoteAddr)) {
+ throw e;
+ }
+ t.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1,
+ new Object[] {sharedResource, preserveOrder, remoteAddr ,e}));
+ if (!sharedResource && "Too many open files".equals(e.getMessage())) {
+ t.fileDescriptorsExhausted();
+ }
+ }
+ finally {
+ if (!success) {
+ try {
+ conn.requestClose(LocalizedStrings.Connection_FAILED_HANDSHAKE.toLocalizedString());
+ } catch (Exception ignore) {}
+ conn = null;
+ }
+ }
+ }
+ } // while
+ if (warningPrinted) {
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.Connection_0_SUCCESSFULLY_REESTABLISHED_CONNECTION_TO_PEER_1,
+ new Object[] {mgr.getLocalMember(), remoteAddr}));
+ }
+ }
+ finally {
+ try {
+ if (!success) {
+ if (conn != null) {
+ conn.requestClose(LocalizedStrings.Connection_FAILED_CONSTRUCTION.toLocalizedString());
+ conn = null;
+ }
+ }
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ //Assert.assertTrue(conn != null);
+ if (conn == null) {
+ throw new ConnectionException(
+ LocalizedStrings.Connection_CONNECTION_FAILED_CONSTRUCTION_FOR_PEER_0
+ .toLocalizedString(remoteAddr));
+ }
+ if (preserveOrder && BATCH_SENDS) {
+ conn.createBatchSendBuffer();
+ }
+ conn.finishedConnecting = true;
+ return conn;
+ }
+
+ private static boolean giveUpOnMember(MembershipManager mgr, DistributedMember remoteAddr) {
+ return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress();
+ }
+
+ private void setRemoteAddr(DistributedMember m) {
+ this.remoteAddr = this.owner.getDM().getCanonicalId(m);
+ MembershipManager mgr = this.owner.owner.getMembershipManager();
+ mgr.addSurpriseMember(m);
+ }
+
+ /** creates a new connection to a remote server.
+ * We are initiating this connection; the other side must accept us
+ * We will almost always send messages; small acks are received.
+ */
+ private Connection(MembershipManager mgr,
+ ConnectionTable t,
+ boolean preserveOrder,
+ DistributedMember remoteID,
+ boolean sharedResource)
+ throws IOException, DistributedSystemDisconnectedException
+ {
+ InternalDistributedMember remoteAddr = (InternalDistributedMember)remoteID;
+ if (t == null) {
+ throw new IllegalArgumentException(LocalizedStrings.Connection_CONNECTIONTABLE_IS_NULL.toLocalizedString());
+ }
+ this.isReceiver = false;
+ this.owner = t;
+ this.sharedResource = sharedResource;
+ this.preserveOrder = preserveOrder;
+ setRemoteAddr(remoteAddr);
+ this.conduitIdStr = this.owner.getConduit().getId().toString();
+ this.handshakeRead = false;
+ this.handshakeCancelled = false;
+ this.connected = true;
+
+ this.uniqueId = idCounter.getAndIncrement();
+
+ // connect to listening socket
+
+ InetSocketAddress addr = new InetSocketAddress(remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort());
+ if (useNIO()) {
+ SocketChannel channel = SocketChannel.open();
+ this.owner.addConnectingSocket(channel.socket(), addr.getAddress());
+ try {
+ channel.socket().setTcpNoDelay(true);
+
+ channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
+
+ /** If conserve-sockets is false, the socket can be used for receiving responses,
+ * so set the receive buffer accordingly.
+ */
+ if(!sharedResource) {
+ setReceiveBufferSize(channel.socket(), this.owner.getConduit().tcpBufferSize);
+ }
+ else {
+ setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only receive ack messages
+ }
+ setSendBufferSize(channel.socket());
+ channel.configureBlocking(true);
+
+ int connectTime = getP2PConnectTimeout();;
+
+ try {
+ SocketUtils.connect(channel.socket(), addr, connectTime);
+ } catch (NullPointerException e) {
+ // bug #45044 - jdk 1.7 sometimes throws an NPE here
+ ConnectException c = new ConnectException("Encountered bug #45044 - retrying");
+ c.initCause(e);
+ // prevent a hot loop by sleeping a little bit
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ throw c;
+ } catch (CancelledKeyException e) {
+ // bug #44469: for some reason NIO throws this runtime exception
+ // instead of an IOException on timeouts
+ ConnectException c = new ConnectException(LocalizedStrings.Connection_ATTEMPT_TO_CONNECT_TIMED_OUT_AFTER_0_MILLISECONDS
+ .toLocalizedString(new Object[]{connectTime}));
+ c.initCause(e);
+ throw c;
+ } catch (ClosedSelectorException e) {
+ // bug #44808: for some reason JRockit NIO thorws this runtime exception
+ // instead of an IOException on timeouts
+ ConnectException c = new ConnectException(LocalizedStrings.Connection_ATTEMPT_TO_CONNECT_TIMED_OUT_AFTER_0_MILLISECONDS
+ .toLocalizedString(new Object[]{connectTime}));
+ c.initCause(e);
+ throw c;
+ }
+ }
+ finally {
+ this.owner.removeConnectingSocket(channel.socket());
+ }
+ this.socket = channel.socket();
+ }
+ else {
+ if (TCPConduit.useSSL) {
+ // socket = javax.net.ssl.SSLSocketFactory.getDefault()
+ // .createSocket(remoteAddr.getInetAddress(), remoteAddr.getPort());
+ int socketBufferSize = sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
+ this.socket = SocketCreator.getDefaultInstance().connectForServer( remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort(), socketBufferSize );
+ // Set the receive buffer size local fields. It has already been set in the socket.
+ setSocketBufferSize(this.socket, false, socketBufferSize, true);
+ setSendBufferSize(this.socket);
+ }
+ else {
+ //socket = new Socket(remoteAddr.getInetAddress(), remoteAddr.getPort());
+ Socket s = new Socket();
+ this.socket = s;
+ s.setTcpNoDelay(true);
+ s.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
+ setReceiveBufferSize(s, SMALL_BUFFER_SIZE);
+ setSendBufferSize(s);
+ SocketUtils.connect(s, addr, 0);
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Connection: connected to {} with stub {}", remoteAddr, addr);
+ }
+ try { getSocket().setTcpNoDelay(true); } catch (SocketException e) { }
+ }
+
+ /**
+ * Batch sends currently should not be turned on because:
+ * 1. They will be used for all sends (instead of just no-ack)
+ * and thus will break messages that wait for a response (or kill perf).
+ * 2. The buffer is not properly flushed and closed on shutdown.
+ * The code attempts to do this but must not be doing it correctly.
+ */
+ private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends");
+ protected static final int BATCH_BUFFER_SIZE = Integer.getInteger("p2p.batchBufferSize", 1024*1024).intValue();
+ protected static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50).intValue();
+ protected Object batchLock;
+ protected ByteBuffer fillBatchBuffer;
+ protected ByteBuffer sendBatchBuffer;
+ private BatchBufferFlusher batchFlusher;
+
+ private void createBatchSendBuffer() {
+ // batch send buffer isn't needed if old-io is being used
+ if (!this.useNIO) {
+ return;
+ }
+ this.batchLock = new Object();
+ if (TCPConduit.useDirectBuffers) {
+ this.fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
+ this.sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
+ } else {
+ this.fillBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE);
+ this.sendBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE);
+ }
+ this.batchFlusher = new BatchBufferFlusher();
+ this.batchFlusher.start();
+ }
+
+ private class BatchBufferFlusher extends Thread {
+ private volatile boolean flushNeeded = false;
+ private volatile boolean timeToStop = false;
+ private DMStats stats;
+
+
+ public BatchBufferFlusher() {
+ setDaemon(true);
+ this.stats = owner.getConduit().stats;
+ }
+ /**
+ * Called when a message writer needs the current fillBatchBuffer flushed
+ */
+ public void flushBuffer(ByteBuffer bb) {
+ final long start = DistributionStats.getStatTime();
+ try {
+ synchronized (this) {
+ synchronized (batchLock) {
+ if (bb != fillBatchBuffer) {
+ // it must have already been flushed. So just return
+ // and use the new fillBatchBuffer
+ return;
+ }
+ }
+ this.flushNeeded = true;
+ this.notify();
+ }
+ synchronized (batchLock) {
+ // Wait for the flusher thread
+ while (bb == fillBatchBuffer) {
+ Connection.this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ boolean interrupted = Thread.interrupted();
+ try {
+ batchLock.wait(); // spurious wakeup ok
+ }
+ catch (InterruptedException ex) {
+ interrupted = true;
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // while
+ }
+ } finally {
+ owner.getConduit().stats.incBatchWaitTime(start);
+ }
+ }
+
+ public void close() {
+ synchronized (this) {
+ this.timeToStop = true;
+ this.flushNeeded = true;
+ this.notify();
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ while (!timeToStop) {
+ if (!this.flushNeeded && fillBatchBuffer.position() <= (BATCH_BUFFER_SIZE/2)) {
+ wait(BATCH_FLUSH_MS); // spurious wakeup ok
+ }
+ if (this.flushNeeded || fillBatchBuffer.position() > (BATCH_BUFFER_SIZE/2)) {
+ final long start = DistributionStats.getStatTime();
+ synchronized (batchLock) {
+ // This is the only block of code that will swap
+ // the buffer references
+ this.flushNeeded = false;
+ ByteBuffer tmp = fillBatchBuffer;
+ fillBatchBuffer = sendBatchBuffer;
+ sendBatchBuffer = tmp;
+ batchLock.notifyAll();
+ }
+ // We now own the sendBatchBuffer
+ if (sendBatchBuffer.position() > 0) {
+ final boolean origSocketInUse = socketInUse;
+ socketInUse = true;
+ try {
+ sendBatchBuffer.flip();
+ SocketChannel channel = getSocket().getChannel();
+ nioWriteFully(channel, sendBatchBuffer, false, null);
+ sendBatchBuffer.clear();
+ } catch (IOException ex) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0,ex));
+ readerShuttingDown = true;
+ requestClose(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0.toLocalizedString(ex));
+ } catch (ConnectionException ex) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0,ex));
+ readerShuttingDown = true;
+ requestClose(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0.toLocalizedString(ex));
+ } finally {
+ accessed();
+ socketInUse = origSocketInUse;
+ }
+ }
+ this.stats.incBatchFlushTime(start);
+ }
+ }
+ }
+ } catch (InterruptedException ex) {
+ // time for this thread to shutdown
+// Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private void closeBatchBuffer() {
+ if (this.batchFlusher != null) {
+ this.batchFlusher.close();
+ }
+ }
+
+ /** use to test message prep overhead (no socket write).
+ * WARNING: turning this on completely disables distribution of batched sends
+ */
+ private static final boolean SOCKET_WRITE_DISABLED = Boolean.getBoolean("p2p.disableSocketWrite");
+
+ private void batchSend(ByteBuffer src) throws IOException {
+ if (SOCKET_WRITE_DISABLED) {
+ return;
+ }
+ final long start = DistributionStats.getStatTime();
+ try {
+ ByteBuffer dst = null;
+ Assert.assertTrue(src.remaining() <= BATCH_BUFFER_SIZE , "Message size(" + src.remaining() + ") exceeded BATCH_BUFFER_SIZE(" + BATCH_BUFFER_SIZE + ")");
+ do {
+ synchronized (this.batchLock) {
+ dst = this.fillBatchBuffer;
+ if (src.remaining() <= dst.remaining()) {
+ final long copyStart = DistributionStats.getStatTime();
+ dst.put(src);
+ this.owner.getConduit().stats.incBatchCopyTime(copyStart);
+ return;
+ }
+ }
+ // If we got this far then we do not have room in the current
+ // buffer and need the flusher thread to flush before we can fill it
+ this.batchFlusher.flushBuffer(dst);
+ } while (true);
+ } finally {
+ this.owner.getConduit().stats.incBatchSendTime(start);
+ }
+ }
+
+ /**
+ * Request that the manager close this connection, or close it
+ * forcibly if there is no manager. Invoking this method ensures
+ * that the proper synchronization is done.
+ */
+ void requestClose(String reason) {
+ close(reason, true, true, false, false);
+ }
+
+ boolean isClosing() {
+ return this.closing.get();
+ }
+
+ /**
+ * Used to close a connection that has not yet been registered
+ * with the distribution manager.
+ */
+ void closePartialConnect(String reason) {
+ close(reason, false, false, false, false);
+ }
+
+ void closePartialConnect(String reason, boolean beingSick) {
+ close(reason, false, false, beingSick, false);
+ }
+
+ void closeForReconnect(String reason) {
+ close(reason, true, false, false, false);
+ }
+
+ void closeOldConnection(String reason) {
+ close(reason, true, true, false, true);
+ }
+
+ /**
+ * Closes the connection.
+ *
+ * @see #requestClose
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="TLW_TWO_LOCK_WAIT")
+ private void close(String reason, boolean cleanupEndpoint,
+ boolean p_removeEndpoint, boolean beingSick, boolean forceRemoval) {
+ boolean removeEndpoint = p_removeEndpoint;
+ // use getAndSet outside sync on this to fix 42330
+ boolean onlyCleanup = this.closing.getAndSet(true);
+ if (onlyCleanup && !forceRemoval) {
+ return;
+ }
+ if (!onlyCleanup) {
+ synchronized (this) {
+ this.stopped = true;
+ if (this.connected) {
+ if (this.asyncQueuingInProgress
+ && this.pusherThread != Thread.currentThread()) {
+ // We don't need to do this if we are the pusher thread
+ // and we have determined that we need to close the connection.
+ // See bug 37601.
+ synchronized (this.outgoingQueue) {
+ // wait for the flusher to complete (it may timeout)
+ while (this.asyncQueuingInProgress) {
+ // Don't do this: causes closes to not get done in the event
+ // of an orderly shutdown:
+ // this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ boolean interrupted = Thread.interrupted();
+ try {
+ this.outgoingQueue.wait(); // spurious wakeup ok
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ // this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
+ }
+ finally {
+ if (interrupted) Thread.currentThread().interrupt();
+ }
+ } // while
+ } // synchronized
+ }
+ this.connected = false;
+ closeSenderSem();
+ {
+ final DMStats stats = this.owner.getConduit().stats;
+ if (this.finishedConnecting) {
+ if (this.isReceiver) {
+ stats.decReceivers();
+ } else {
+ stats.decSenders(this.sharedResource, this.preserveOrder);
+ }
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Closing socket for {}", this);
+ }
+ }
+ else if (!forceRemoval) {
+ removeEndpoint = false;
+ }
+ // make sure our socket is closed
+ asyncClose(false);
+ nioLengthSet = false;
+ } // synchronized
+
+ // moved the call to notifyHandshakeWaiter out of the above
+ // synchronized block to fix bug #42159
+ // Make sure anyone waiting for a handshake stops waiting
+ notifyHandshakeWaiter(false);
+ // wait a bit for the our reader thread to exit
+ // don't wait if we are the reader thread
+ boolean isIBM = false;
+ // if network partition detection is enabled or this is an admin vm
+ // we can't wait for the reader thread when running in an IBM JRE. See
+ // bug 41889
+ if (this.owner.owner.config.getEnableNetworkPartitionDetection() ||
+ this.owner.owner.getLocalAddr().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE ||
+ this.owner.owner.getLocalAddr().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
+ isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
+ }
+ {
+ // Now that readerThread is returned to a pool after we close
+ // we need to be more careful not to join on a thread that belongs
+ // to someone else.
+ Thread readerThreadSnapshot = this.readerThread;
+ if (!beingSick && readerThreadSnapshot != null && !isIBM
+ && this.isRunning && !this.readerShuttingDown
+ && readerThreadSnapshot != Thread.currentThread()) {
+ try {
+ readerThreadSnapshot.join(500);
+ readerThreadSnapshot = this.readerThread;
+ if (this.isRunning && !this.readerShuttingDown
+ && readerThreadSnapshot != null
+ && owner.getDM().getRootCause() == null) { // don't wait twice if there's a system failure
+ readerThreadSnapshot.join(1500);
+ if (this.isRunning) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.Connection_TIMED_OUT_WAITING_FOR_READERTHREAD_ON_0_TO_FINISH, this));
+ }
+ }
+ }
+ catch (IllegalThreadStateException ignore) {
+ // ignored - thread already stopped
+ }
+ catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ // but keep going, we're trying to close.
+ }
+ }
+ }
+
+ closeBatchBuffer();
+ closeAllMsgDestreamers();
+ }
+ if (cleanupEndpoint) {
+ if (this.isReceiver) {
+ this.owner.removeReceiver(this);
+ }
+ if (removeEndpoint) {
+ if (this.sharedResource) {
+ if (!this.preserveOrder) {
+ // only remove endpoint when shared unordered connection
+ // is closed. This is part of the fix for bug 32980.
+ if (!this.isReceiver) {
+ // Only remove endpoint if sender.
+ if (this.finishedConnecting) {
+ // only remove endpoint if our constructor finished
+ this.owner.removeEndpoint(this.remoteAddr, reason);
+ }
+ }
+ }
+ else {
+ this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
+ }
+ }
+ else if (!this.isReceiver) {
+ this.owner.removeThreadConnection(this.remoteAddr, this);
+ }
+ }
+ else {
+ // This code is ok to do even if the ConnectionTable
+ // has never added this Connection to its maps since
+ // the calls in this block use our identity to do the removes.
+ if (this.sharedResource) {
+ this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
+ }
+ else if (!this.isReceiver) {
+ this.owner.removeThreadConnection(this.remoteAddr, this);
+ }
+ }
+ }
+
+ //This cancels the idle timer task, but it also removes the tasks
+ //reference to this connection, freeing up the connection (and it's buffers
+ //for GC sooner.
+ if(idleTask != null) {
+ idleTask.cancel();
+ }
+
+ if(ackTimeoutTask!=null){
+ ackTimeoutTask.cancel();
+ }
+
+ }
+
+ /** starts a reader thread */
+ private void startReader(ConnectionTable connTable) {
+ Assert.assertTrue(!this.isRunning);
+ stopped = false;
+ this.isRunning = true;
+ connTable.executeCommand(this);
+ }
+
+
+ /** in order to read non-NIO socket-based messages we need to have a thread
+ actively trying to grab bytes out of the sockets input queue.
+ This is that thread. */
+ public void run() {
+ this.readerThread = Thread.currentThread();
+ this.readerThread.setName(p2pReaderName());
+ ConnectionTable.threadWantsSharedResources();
+ makeReaderThread(this.isReceiver);
+ try {
+ if (useNIO()) {
+ runNioReader();
+ } else {
+ runOioReader();
+ }
+ } finally {
+ // bug36060: do the socket close within a finally block
+ if (logger.isDebugEnabled()) {
+ logger.debug("Stopping {} for {}", p2pReaderName(), remoteAddr);
+ }
+ initiateSuspicionIfSharedUnordered();
+ if (this.isReceiver) {
+ if (!this.sharedResource) {
+ this.owner.owner.stats.incThreadOwnedReceivers(-1L, dominoCount.get());
+ }
+ asyncClose(false);
+ this.owner.removeAndCloseThreadOwnedSockets();
+ }
+ ByteBuffer tmp = this.nioInputBuffer;
+ if(tmp != null) {
+ this.nioInputBuffer = null;
+ final DMStats stats = this.owner.getConduit().stats;
+ Buffers.releaseReceiveBuffer(tmp, stats);
+ }
+ // make sure that if the reader thread exits we notify a thread waiting
+ // for the handshake.
+ // see bug 37524 for an example of listeners hung in waitForHandshake
+ notifyHandshakeWaiter(false);
+ this.readerThread.setName("unused p2p reader");
+ synchronized (this.stateLock) {
+ this.isRunning = false;
+ this.readerThread = null;
+ }
+ } // finally
+ }
+
+ private String p2pReaderName() {
+ StringBuffer sb = new StringBuffer(64);
+ if (this.isReceiver) {
+ sb.append("P2P message reader@");
+ } else {
+ sb.append("P2P handshake reader@");
+ }
+ sb.append(Integer.toHexString(System.identityHashCode(this)));
+ if (!this.isReceiver) {
+ sb.append('-')
+ .append(getUniqueId());
+ }
+ return sb.toString();
+ }
+
+ private void runNioReader() {
+ // take a snapshot of uniqueId to detect reconnect attempts; see bug 37592
+ SocketChannel channel = null;
+ try {
+ channel = getSocket().getChannel();
+ channel.configureBlocking(true);
+ } catch (ClosedChannelException e) {
+ // bug 37693: the channel was asynchronously closed. Our work
+ // is done.
+ try {
+ requestClose(LocalizedStrings.Connection_RUNNIOREADER_CAUGHT_CLOSED_CHANNEL.toLocalizedString());
+ } catch (Exception ignore) {}
+ return; // exit loop and thread
+ } catch (IOException ex) {
+ if (stopped || owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
+ try {
+ requestClose(LocalizedStrings.Connection_RUNNIOREADER_CAUGHT_SHUTDOWN.toLocalizedString());
+ } catch (Exception ignore) {}
+ return; // bug37520: exit loop (and thread)
+ }
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_FAILED_SETTING_CHANNEL_TO_BLOCKING_MODE_0, ex));
+ this.readerShuttingDown = true;
+ try { requestClose(LocalizedStrings.Connection_FAILED_SETTING_CHANNEL_TO_BLOCKING_MODE_0.toLocalizedString(ex)); } catch (Exception ignore) {}
+ return;
+ }
+
+ if (!stopped) {
+// Assert.assertTrue(owner != null, "How did owner become null");
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting {}", p2pReaderName());
+ }
+ }
+ // we should not change the state of the connection if we are a handshake reader thread
+ // as there is a race between this thread and the application thread doing direct ack
+ // fix for #40869
+ boolean isHandShakeReader = false;
+ try {
+ for (;;) {
+ if (stopped) {
+ break;
+ }
+ if (SystemFailure.getFailure() != null) {
+ // Allocate no objects here!
+ Socket s = this.socket;
+ if (s != null) {
+ try {
+ s.close();
+ }
+ catch (IOException e) {
+ // don't care
+ }
+ }
+ SystemFailure.checkFailure(); // throws
+ }
+ if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
+ break;
+ }
+
+ try {
+ ByteBuffer buff = getNIOBuffer();
+ synchronized(stateLock) {
+ connectionState = STATE_READING;
+ }
+ int amt = channel.read(buff);
+ synchronized(stateLock) {
+ connectionState = STATE_IDLE;
+ }
+ if (amt == 0) {
+ continue;
+ }
+ if (amt < 0) {
+ this.readerShuttingDown = true;
+ try {
+ requestClose("SocketChannel.read returned EOF");
+ requestClose(LocalizedStrings.Connection_SOCKETCHANNEL_READ_RETURNED_EOF.toLocalizedString());
+ } catch (Exception e) {
+ // ignore - shutting down
+ }
+ return;
+ }
+
+ processNIOBuffer();
+ if (!this.isReceiver
+ && (this.handshakeRead || this.handshakeCancelled)) {
+ if (logger.isDebugEnabled()) {
+ if (this.handshakeRead) {
+ logger.debug("{} handshake has been read {}", p2pReaderName(), this);
+ } else {
+ logger.debug("{} handshake has been cancelled {}", p2pReaderName(), this);
+ }
+ }
+ isHandShakeReader = true;
+ // Once we have read the handshake the reader can go away
+ break;
+ }
+ }
+ catch (CancelException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} Terminated <{}> due to cancellation", p2pReaderName(), this, e);
+ }
+ this.readerShuttingDown = true;
+ try {
+ requestClose(LocalizedStrings.Connection_CACHECLOSED_IN_CHANNEL_READ_0.toLocalizedString(e));
+ } catch (Exception ex) {}
+ return;
+ }
+ catch (ClosedChannelException e) {
+ this.readerShuttingDown = true;
+ try {
+ requestClose(LocalizedStrings.Connection_CLOSEDCHANNELEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e));
+ } catch (Exception ex) {}
+ return;
+ }
+ catch (IOException e) {
+ if (! isSocketClosed()
+ && !"Socket closed".equalsIgnoreCase(e.getMessage()) // needed for Solaris jdk 1.4.2_08
+ ) {
+ if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
+ logger.debug("{} io exception for {}", p2pReaderName(), this, e);
+ }
+ if(e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} received unexpected WSACancelBlockingCall exception, which may result in a hang", p2pReaderName());
+ }
+ }
+ }
+ this.readerShuttingDown = true;
+ try {
+ requestClose(LocalizedStrings.Connection_IOEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e));
+ } catch (Exception ex) {}
+ return;
+
+ } catch (Exception e) {
+ this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); // bug 37101
+ if (!stopped && ! isSocketClosed() ) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ, p2pReaderName()), e);
+ }
+ this.readerShuttingDown = true;
+ try {
+ requestClose(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ.toLocalizedString(e));
+ } catch (Exception ex) {}
+ return;
+ }
+ } // for
+ }
+ finally {
+ if (!isHandShakeReader) {
+ synchronized(stateLock) {
+ connectionState = STATE_IDLE;
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} runNioReader terminated id={} from {}", p2pReaderName(), conduitIdStr, remoteAddr);
+ }
+ }
+ }
+
+ /** initiate suspect processing if a shared/ordered connection is lost and we're not shutting down */
+ private void initiateSuspicionIfSharedUnordered() {
+ if (this.isReceiver && this.handshakeRead && !this.preserveOrder && this.sharedResource) {
+ if (this.owner.getConduit().getCancelCriterion().cancelInProgress() == null) {
+ this.owner.getDM().getMembershipManager().suspectMember(this.getRemoteAddress(),
+ INITIATING_SUSPECT_PROCESSING);
+ }
+ }
+ }
+
+ /**
+ * checks to see if an exception should not be logged: i.e., "forcibly closed",
+ * "reset by peer", or "connection reset"
+ * */
+ public static final boolean isIgnorableIOException(Exception e) {
+ if (e instanceof ClosedChannelException) {
+ return true;
+ }
+
+ String msg = e.getMessage();
+ if (msg == null) {
+ msg = e.toString();
+ }
+
+ msg = msg.toLowerCase();
+ return (msg.indexOf("forcibly closed") >= 0)
+ || (msg.indexOf("reset by peer") >= 0)
+ || (msg.indexOf("connection reset") >= 0);
+ }
+
+ private static boolean validMsgType(int msgType) {
+ return msgType == NORMAL_MSG_TYPE
+ || msgType == CHUNKED_MSG_TYPE
+ || msgType == END_CHUNKED_MSG_TYPE;
+ }
+
+ private void closeAllMsgDestreamers() {
+ synchronized (this.destreamerLock) {
+ if (this.idleMsgDestreamer != null) {
+ this.idleMsgDestreamer.close();
+ this.idleMsgDestreamer = null;
+ }
+ if (this.destreamerMap != null) {
+ Iterator it = this.destreamerMap.values().iterator();
+ while (it.hasNext()) {
+ MsgDestreamer md = (MsgDestreamer)it.next();
+ md.close();
+ }
+ this.destreamerMap = null;
+ }
+ }
+ }
+
+ MsgDestreamer obtainMsgDestreamer(short msgId, final Version v) {
+ synchronized (this.destreamerLock) {
+ if (this.destreamerMap == null) {
+ this.destreamerMap = new HashMap();
+ }
+ Short key = new Short(msgId);
+ MsgDestreamer result = (MsgDestreamer)this.destreamerMap.get(key);
+ if (result == null) {
+ result = this.idleMsgDestreamer;
+ if (result != null) {
+ this.idleMsgDestreamer = null;
+ } else {
+ result = new MsgDestreamer(this.owner.getConduit().stats,
+ this.owner.owner.getCancelCriterion(), v);
+ }
+ result.setName(p2pReaderName() + " msgId=" + msgId);
+ this.destreamerMap.put(key, result);
+ }
+ return result;
+ }
+ }
+ void releaseMsgDestreamer(short msgId, MsgDestreamer md) {
+ Short key = new Short(msgId);
+ synchronized (this.destreamerLock) {
+ this.destreamerMap.remove(key);
+ if (this.idleMsgDestreamer == null) {
+ md.reset();
+ this.idleMsgDestreamer = md;
+ } else {
+ md.close();
+ }
+ }
+ }
+
+ private void sendFailureReply(int rpId, String exMsg, Throwable ex, boolean directAck) {
+ ReplySender dm = null;
+ if(directAck) {
+ dm = new DirectReplySender(this);
+ } else if(rpId != 0) {
+ dm = this.owner.getDM();
+ }
+ if (dm != null) {
+ ReplyMessage.send(getRemoteAddress(), rpId, new ReplyException(exMsg, ex), dm);
+ }
+ }
+ private void runOioReader() {
+ InputStream input = null;
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Socket is of type: {}", getSocket().getClass() );
+ }
+ input = new BufferedInputStream(getSocket().getInputStream(), INITIAL_CAPACITY);
+ }
+ catch (IOException io) {
+ if (stopped || owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
+ return; // bug 37520: exit run loop (and thread)
+ }
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNABLE_TO_GET_INPUT_STREAM), io);
+ stopped = true;
+ }
+
+ if (!stopped) {
+ Assert.assertTrue(owner != null, LocalizedStrings.Connection_OWNER_SHOULD_NOT_BE_NULL.toLocalizedString());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting {}", p2pReaderName());
+ }
+ }
+
+ byte[] lenbytes = new byte[MSG_HEADER_BYTES];
+
+ final ByteArrayDataInput dis = new ByteArrayDataInput();
+ while (!stopped) {
+ try {
+ if (SystemFailure.getFailure() != null) {
+ // Allocate no objects here!
+ Socket s = this.socket;
+ if (s != null) {
+ try {
+ s.close();
+ }
+ catch (IOException e) {
+ // don't care
+ }
+ }
+ SystemFailure.checkFailure(); // throws
+ }
+ if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
+ break;
+ }
+ int len = 0;
+ if (readFully(input, lenbytes, lenbytes.length) < 0) {
+ stopped = true;
+ continue;
+ }
+// long recvNanos = DistributionStats.getStatTime();
+ len = ((lenbytes[MSG_HEADER_SIZE_OFFSET]&0xff) * 0x1000000) +
+ ((lenbytes[MSG_HEADER_SIZE_OFFSET+1]&0xff) * 0x10000) +
+ ((lenbytes[MSG_HEADER_SIZE_OFFSET+2]&0xff) * 0x100) +
+ (lenbytes[MSG_HEADER_SIZE_OFFSET+3]&0xff);
+ /*byte msgHdrVersion =*/ calcHdrVersion(len);
+ len = calcMsgByteSize(len);
+ int msgType = lenbytes[MSG_HEADER_TYPE_OFFSET];
+ short msgId = (short)((lenbytes[MSG_HEADER_ID_OFFSET]&0xff * 0x100)
+ + (lenbytes[MSG_HEADER_ID_OFFSET+1]&0xff));
+ boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0;
+ if (myDirectAck) {
+ msgType &= ~DIRECT_ACK_BIT; // clear the bit
+ }
+ // Following validation fixes bug 31145
+ if (!validMsgType(msgType)) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNKNOWN_P2P_MESSAGE_TYPE_0, Integer.valueOf(msgType)));
+ this.readerShuttingDown = true;
+ requestClose(LocalizedStrings.Connection_UNKNOWN_P2P_MESSAGE_TYPE_0.toLocalizedString(Integer.valueOf(msgType)));
+ break;
+ }
+ if (logger.isTraceEnabled())
+ logger.trace("{} reading {} bytes", conduitIdStr, len);
+ byte[] bytes = new byte[len];
+ if (readFully(input, bytes, len) < 0) {
+ stopped = true;
+ continue;
+ }
+ boolean interrupted = Thread.interrupted();
+ try {
+ if (this.handshakeRead) {
+ if (msgType == NORMAL_MSG_TYPE) {
+ //DMStats stats = this.owner.getConduit().stats;
+ //long start = DistributionStats.getStatTime();
+ this.owner.getConduit().stats.incMessagesBeingReceived(true, len);
+ dis.initialize(bytes, this.remoteVersion);
+ DistributionMessage msg = null;
+ try {
+ ReplyProcessor21.initMessageRPId();
+ long startSer = this.owner.getConduit().stats.startMsgDeserialization();
+ msg = (DistributionMessage)InternalDataSerializer.readDSFID(dis);
+ this.owner.getConduit().stats.endMsgDeserialization(startSer);
+ if (dis.available() != 0) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.Connection_MESSAGE_DESERIALIZATION_OF_0_DID_NOT_READ_1_BYTES,
+ new Object[] { msg, Integer.valueOf(dis.available())}));
+ }
+ //stats.incBatchCopyTime(start);
+ try {
+ //start = DistributionStats.getStatTime();
+ if (!dispatchMessage(msg, len, myDirectAck)) {
+ continue;
+ }
+ //stats.incBatchSendTime(start);
+ }
+ catch (MemberShunnedException e) {
+ continue;
+ }
+ catch (Exception de) {
+ this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de); // bug 37101
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ERROR_DISPATCHING_MESSAGE), de);
+ }
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable e) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ // In particular I want OutOfMem to be caught here
+ if (!myDirectAck) {
+ String reason = LocalizedStrings.Connection_ERROR_DESERIALIZING_MESSAGE.toLocalizedString();
+ sendFailureReply(ReplyProcessor21.getMessageRPId(), reason, e, myDirectAck);
+ }
+ if(e instanceof CancelException) {
+ if (!(e instanceof CacheClosedException)) {
+ // Just log a message if we had trouble deserializing due to CacheClosedException; see bug 43543
+ throw (CancelException) e;
+ }
+ }
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_ERROR_DESERIALIZING_MESSAGE), e);
+ //requestClose();
+ //return;
+ } finally {
+ ReplyProcessor21.clearMessageRPId();
+ }
+ } else if (msgType == CHUNKED_MSG_TYPE) {
+ MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
+ this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len);
+ try {
+ md.addChunk(bytes);
+ } catch (IOException ex) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_FAILED_HANDLING_CHUNK_MESSAGE), ex);
+ }
+ } else /* (msgType == END_CHUNKED_MSG_TYPE) */ {
+ MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
+ this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len);
+ try {
+ md.addChunk(bytes);
+
<TRUNCATED>