You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2018/03/31 00:42:24 UTC
[geode] branch feature/GEODE-3926_3 updated: Adding Stacktrace dump
in ConnectionTable.java for connection creation
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-3926_3
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-3926_3 by this push:
new 376ef1f Adding Stacktrace dump in ConnectionTable.java for connection creation
376ef1f is described below
commit 376ef1fbeb2778c81d32fdb5e07d48c3c52accd0
Author: Udo Kohlmeyer <uk...@pivotal.io>
AuthorDate: Fri Mar 30 17:42:13 2018 -0700
Adding Stacktrace dump in ConnectionTable.java for connection creation
---
.../apache/geode/internal/tcp/ConnectionTable.java | 139 ++++++++++++---------
1 file changed, 77 insertions(+), 62 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 693e4f1..7f8e200 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -61,13 +61,14 @@ import org.apache.geode.internal.net.SocketCloser;
* ConnectionTable holds all of the Connection objects in a conduit. Connections represent a pipe
* between two endpoints represented by generic DistributedMembers.
* </p>
- *
* @since GemFire 2.1
*/
public class ConnectionTable {
private static final Logger logger = LogService.getLogger();
- /** warning when descriptor limit reached */
+ /**
+ * warning when descriptor limit reached
+ */
private static boolean ulimitWarningIssued;
/**
@@ -248,7 +249,9 @@ public class ConnectionTable {
// }*/
// }
- /** conduit calls acceptConnection after an accept */
+ /**
+ * conduit calls acceptConnection after an accept
+ */
protected void acceptConnection(Socket sock, PeerConnectionFactory peerConnectionFactory)
throws IOException, ConnectionException, InterruptedException {
InetAddress connAddress = sock.getInetAddress(); // for bug 44736
@@ -266,14 +269,14 @@ public class ConnectionTable {
this.owner.getCancelCriterion().checkCancelInProgress(ex);
logger.warn(LocalizedMessage.create(
LocalizedStrings.ConnectionTable_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1,
- new Object[] {(connAddress != null ? connAddress : "unavailable address"), ex}));
+ new Object[]{(connAddress != null ? connAddress : "unavailable address"), ex}));
throw ex;
} catch (ConnectionException ex) {
// check for shutdown...
this.owner.getCancelCriterion().checkCancelInProgress(ex);
logger.warn(LocalizedMessage.create(
LocalizedStrings.ConnectionTable_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1,
- new Object[] {(connAddress != null ? connAddress : "unavailable address"), ex}));
+ new Object[]{(connAddress != null ? connAddress : "unavailable address"), ex}));
throw ex;
} finally {
// note: no need to call incFailedAccept here because it will be done
@@ -310,10 +313,8 @@ public class ConnectionTable {
}
-
/**
* Process a newly created PendingConnection
- *
* @param id DistributedMember on which the connection is created
* @param sharedResource whether the connection is used by multiple threads
* @param preserveOrder whether to preserve order
@@ -324,11 +325,12 @@ public class ConnectionTable {
* @param ackSAThreshold the ms ack-severe_alert-threshold, or zero
* @return the Connection, or null if someone else already created or closed it
* @throws IOException if unable to connect
- * @throws DistributedSystemDisconnectedException
*/
private Connection handleNewPendingConnection(DistributedMember id, boolean sharedResource,
- boolean preserveOrder, Map m, PendingConnection pc, long startTime, long ackThreshold,
- long ackSAThreshold) throws IOException, DistributedSystemDisconnectedException {
+ boolean preserveOrder, Map m, PendingConnection pc,
+ long startTime, long ackThreshold,
+ long ackSAThreshold)
+ throws IOException, DistributedSystemDisconnectedException {
// handle new pending connection
Connection con = null;
try {
@@ -405,7 +407,6 @@ public class ConnectionTable {
/**
* unordered or conserve-sockets=true note that unordered connections are currently always shared
- *
* @param id the DistributedMember on which we are creating a connection
* @param scheduleTimeout whether unordered connection should time out
* @param preserveOrder whether to preserve order
@@ -414,10 +415,10 @@ public class ConnectionTable {
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the new Connection, or null if an error
* @throws IOException if unable to create the connection
- * @throws DistributedSystemDisconnectedException
*/
private Connection getSharedConnection(DistributedMember id, boolean scheduleTimeout,
- boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout)
+ boolean preserveOrder, long startTime, long ackTimeout,
+ long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException {
Connection result = null;
@@ -475,17 +476,16 @@ public class ConnectionTable {
/**
* Must be looking for an ordered connection that this thread owns
- *
* @param id stub on which to create the connection
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the connection, or null if an error
* @throws IOException if the connection could not be created
- * @throws DistributedSystemDisconnectedException
*/
Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout,
- long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
+ long ackSATimeout)
+ throws IOException, DistributedSystemDisconnectedException {
Connection result = null;
// Look for result in the thread local
@@ -500,7 +500,7 @@ public class ConnectionTable {
LocalizedStrings.ConnectionTable_CONNECTION_TABLE_IS_CLOSED.toLocalizedString());
}
// check for stale references and remove them.
- for (Iterator it = this.threadConnMaps.iterator(); it.hasNext();) {
+ for (Iterator it = this.threadConnMaps.iterator(); it.hasNext(); ) {
Reference r = (Reference) it.next();
if (r.get() == null) {
it.remove();
@@ -518,10 +518,16 @@ public class ConnectionTable {
result = null;
}
}
- if (result != null)
+ if (result != null) {
return result;
+ }
// OK, we have to create a new connection.
+ StringBuilder builder = new StringBuilder();
+ for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
+ builder.append(ste+"\n");
+ }
+ logger.warn("Creating connection "+builder.toString());
result = Connection.createSender(owner.getMembershipManager(), this, true /* preserveOrder */,
id, false /* shared */, startTime, ackTimeout, ackSATimeout);
if (logger.isDebugEnabled()) {
@@ -567,7 +573,9 @@ public class ConnectionTable {
return result;
}
- /** schedule an idle-connection timeout task */
+ /**
+ * schedule an idle-connection timeout task
+ */
private void scheduleIdleTimeout(Connection conn) {
if (conn == null) {
// fix for bug 43529
@@ -609,7 +617,6 @@ public class ConnectionTable {
/**
* Get a new connection
- *
* @param id the DistributedMember on which to create the connection
* @param preserveOrder whether order should be preserved
* @param startTime the ms clock start time
@@ -617,10 +624,9 @@ public class ConnectionTable {
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the new Connection, or null if a problem
* @throws java.io.IOException if the connection could not be created
- * @throws DistributedSystemDisconnectedException
*/
protected Connection get(DistributedMember id, boolean preserveOrder, long startTime,
- long ackTimeout, long ackSATimeout)
+ long ackTimeout, long ackSATimeout)
throws java.io.IOException, DistributedSystemDisconnectedException {
if (this.closed) {
this.owner.getCancelCriterion().checkCancelInProgress(null);
@@ -702,7 +708,7 @@ public class ConnectionTable {
}
}
synchronized (this.orderedConnectionMap) {
- for (Iterator it = this.orderedConnectionMap.values().iterator(); it.hasNext();) {
+ for (Iterator it = this.orderedConnectionMap.values().iterator(); it.hasNext(); ) {
closeCon(
LocalizedStrings.ConnectionTable_CONNECTION_TABLE_BEING_DESTROYED.toLocalizedString(),
it.next());
@@ -710,7 +716,7 @@ public class ConnectionTable {
this.orderedConnectionMap.clear();
}
synchronized (this.unorderedConnectionMap) {
- for (Iterator it = this.unorderedConnectionMap.values().iterator(); it.hasNext();) {
+ for (Iterator it = this.unorderedConnectionMap.values().iterator(); it.hasNext(); ) {
closeCon(
LocalizedStrings.ConnectionTable_CONNECTION_TABLE_BEING_DESTROYED.toLocalizedString(),
it.next());
@@ -722,12 +728,12 @@ public class ConnectionTable {
}
if (this.threadConnMaps != null) {
synchronized (this.threadConnMaps) {
- for (Iterator it = this.threadConnMaps.iterator(); it.hasNext();) {
+ for (Iterator it = this.threadConnMaps.iterator(); it.hasNext(); ) {
Reference r = (Reference) it.next();
Map m = (Map) r.get();
if (m != null) {
synchronized (m) {
- for (Iterator mit = m.values().iterator(); mit.hasNext();) {
+ for (Iterator mit = m.values().iterator(); mit.hasNext(); ) {
closeCon(LocalizedStrings.ConnectionTable_CONNECTION_TABLE_BEING_DESTROYED
.toLocalizedString(), mit.next());
}
@@ -766,12 +772,11 @@ public class ConnectionTable {
/**
* Close all receiving threads. This is used during shutdown and is also used by a test hook that
* makes us deaf to incoming messages.
- *
* @param beingSick a test hook to simulate a sick process
*/
protected void closeReceivers(boolean beingSick) {
synchronized (this.receivers) {
- for (Iterator it = this.receivers.iterator(); it.hasNext();) {
+ for (Iterator it = this.receivers.iterator(); it.hasNext(); ) {
Connection con = (Connection) it.next();
if (!beingSick || con.preserveOrder) {
closeCon(
@@ -782,7 +787,7 @@ public class ConnectionTable {
}
// now close any sockets being formed
synchronized (connectingSockets) {
- for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext();) {
+ for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry) it.next();
// ConnectingSocketInfo info = (ConnectingSocketInfo)entry.getValue();
try {
@@ -814,25 +819,29 @@ public class ConnectionTable {
return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress();
}
- /** remove an endpoint and notify the membership manager of the departure */
+ /**
+ * remove an endpoint and notify the membership manager of the departure
+ */
protected void removeEndpoint(DistributedMember stub, String reason) {
removeEndpoint(stub, reason, true);
}
protected void removeEndpoint(DistributedMember memberID, String reason,
- boolean notifyDisconnect) {
+ boolean notifyDisconnect) {
if (this.closed) {
return;
}
boolean needsRemoval = false;
synchronized (this.orderedConnectionMap) {
- if (this.orderedConnectionMap.get(memberID) != null)
+ if (this.orderedConnectionMap.get(memberID) != null) {
needsRemoval = true;
+ }
}
if (!needsRemoval) {
synchronized (this.unorderedConnectionMap) {
- if (this.unorderedConnectionMap.get(memberID) != null)
+ if (this.unorderedConnectionMap.get(memberID) != null) {
needsRemoval = true;
+ }
}
}
if (!needsRemoval) {
@@ -866,7 +875,7 @@ public class ConnectionTable {
ArrayList al = (ArrayList) cm.remove(memberID);
if (al != null) {
synchronized (al) {
- for (Iterator it = al.iterator(); it.hasNext();) {
+ for (Iterator it = al.iterator(); it.hasNext(); ) {
Object c = it.next();
if (remoteAddress == null && (c instanceof Connection)) {
remoteAddress = ((Connection) c).getRemoteAddress();
@@ -882,7 +891,7 @@ public class ConnectionTable {
// close any sockets that are in the process of being connected
Set toRemove = new HashSet();
synchronized (connectingSockets) {
- for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext();) {
+ for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry) it.next();
ConnectingSocketInfo info = (ConnectingSocketInfo) entry.getValue();
if (info.peerAddress.equals(((InternalDistributedMember) memberID).getInetAddress())) {
@@ -891,7 +900,7 @@ public class ConnectionTable {
}
}
}
- for (Iterator it = toRemove.iterator(); it.hasNext();) {
+ for (Iterator it = toRemove.iterator(); it.hasNext(); ) {
Socket sock = (Socket) it.next();
try {
sock.close();
@@ -908,7 +917,7 @@ public class ConnectionTable {
// of the receivers sync (bug 38731)
toRemove.clear();
synchronized (this.receivers) {
- for (Iterator it = receivers.iterator(); it.hasNext();) {
+ for (Iterator it = receivers.iterator(); it.hasNext(); ) {
Connection con = (Connection) it.next();
if (memberID.equals(con.getRemoteAddress())) {
it.remove();
@@ -916,7 +925,7 @@ public class ConnectionTable {
}
}
}
- for (Iterator it = toRemove.iterator(); it.hasNext();) {
+ for (Iterator it = toRemove.iterator(); it.hasNext(); ) {
Connection con = (Connection) it.next();
closeCon(reason, con);
}
@@ -940,10 +949,12 @@ public class ConnectionTable {
return this.socketCloser;
}
- /** check to see if there are still any receiver threads for the given end-point */
+ /**
+ * check to see if there are still any receiver threads for the given end-point
+ */
protected boolean hasReceiversFor(DistributedMember endPoint) {
synchronized (this.receivers) {
- for (Iterator it = receivers.iterator(); it.hasNext();) {
+ for (Iterator it = receivers.iterator(); it.hasNext(); ) {
Connection con = (Connection) it.next();
if (endPoint.equals(con.getRemoteAddress())) {
return true;
@@ -954,7 +965,7 @@ public class ConnectionTable {
}
private static void removeFromThreadConMap(ConcurrentMap cm, DistributedMember stub,
- Connection c) {
+ Connection c) {
if (cm != null) {
ArrayList al = (ArrayList) cm.get(stub);
if (al != null) {
@@ -982,7 +993,7 @@ public class ConnectionTable {
}
void removeSharedConnection(String reason, DistributedMember stub, boolean ordered,
- Connection c) {
+ Connection c) {
if (this.closed) {
return;
}
@@ -1003,7 +1014,6 @@ public class ConnectionTable {
/**
* Just ensure that this class gets loaded.
- *
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
@@ -1013,7 +1023,6 @@ public class ConnectionTable {
/**
* Clears lastInstance. Does not yet close underlying sockets, but probably not strictly
* necessary.
- *
* @see SystemFailure#emergencyClose()
*/
public static void emergencyClose() {
@@ -1054,7 +1063,6 @@ public class ConnectionTable {
/**
* records the current outgoing message count on all thread-owned ordered connections. This does
* not synchronize or stop new connections from being formed or new messages from being sent
- *
* @since GemFire 5.1
*/
protected void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) {
@@ -1067,7 +1075,7 @@ public class ConnectionTable {
al = new ArrayList(al);
}
- for (Iterator it = al.iterator(); it.hasNext();) {
+ for (Iterator it = al.iterator(); it.hasNext(); ) {
Connection conn = (Connection) it.next();
if (!conn.isSharedResource() && conn.getOriginatedHere() && conn.getPreserveOrder()) {
result.put(Long.valueOf(conn.getUniqueId()), Long.valueOf(conn.getMessagesSent()));
@@ -1081,14 +1089,16 @@ public class ConnectionTable {
* wait for the given incoming connections to receive at least the associated number of messages
*/
protected void waitForThreadOwnedOrderedConnectionState(DistributedMember member,
- Map connectionStates) throws InterruptedException {
- if (Thread.interrupted())
+ Map connectionStates)
+ throws InterruptedException {
+ if (Thread.interrupted()) {
throw new InterruptedException(); // wisest to do this before the synchronize below
+ }
List r = null;
synchronized (receivers) {
r = new ArrayList(receivers);
}
- for (Iterator it = r.iterator(); it.hasNext();) {
+ for (Iterator it = r.iterator(); it.hasNext(); ) {
Connection con = (Connection) it.next();
if (!con.stopped && !con.isClosing() && !con.getOriginatedHere() && con.getPreserveOrder()
&& member.equals(con.getRemoteAddress())) {
@@ -1111,7 +1121,7 @@ public class ConnectionTable {
sb.append("These connections from ");
sb.append(member);
sb.append("could not be located during waitForThreadOwnedOrderedConnectionState: ");
- for (Iterator it = connectionStates.entrySet().iterator(); it.hasNext();) {
+ for (Iterator it = connectionStates.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry) it.next();
sb.append(entry.getKey()).append('(').append(entry.getValue()).append(')');
if (it.hasNext()) {
@@ -1189,12 +1199,12 @@ public class ConnectionTable {
/**
* Synchronously set the connection and notify waiters that we are ready.
- *
* @param c the new connection
*/
public synchronized void notifyWaiters(Connection c) {
- if (!this.pending)
+ if (!this.pending) {
return; // already done.
+ }
this.conn = c;
this.pending = false;
@@ -1207,16 +1217,15 @@ public class ConnectionTable {
/**
* Wait for a connection
- *
* @param mgr the membership manager that can instigate suspect processing if necessary
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the new connection
- * @throws IOException
*/
public synchronized Connection waitForConnect(MembershipManager mgr, long startTime,
- long ackTimeout, long ackSATimeout) throws IOException {
+ long ackTimeout, long ackSATimeout)
+ throws IOException {
if (connectingThread == Thread.currentThread()) {
throw new ReenteredConnectException("This thread is already trying to connect");
}
@@ -1230,9 +1239,10 @@ public class ConnectionTable {
targetMember = this.id;
}
- for (;;) {
- if (!this.pending)
+ for (; ; ) {
+ if (!this.pending) {
break;
+ }
getConduit().getCancelCriterion().checkCancelInProgress(null);
// wait a little bit...
@@ -1248,8 +1258,9 @@ public class ConnectionTable {
}
}
- if (!this.pending)
+ if (!this.pending) {
break;
+ }
// Still pending...
long now = System.currentTimeMillis();
@@ -1258,13 +1269,13 @@ public class ConnectionTable {
if (targetMember != null) {
logger.fatal(LocalizedMessage.create(
LocalizedStrings.ConnectionTable_UNABLE_TO_FORM_A_TCPIP_CONNECTION_TO_0_IN_OVER_1_SECONDS,
- new Object[] {targetMember, (ackSATimeout + ackTimeout) / 1000}));
+ new Object[]{targetMember, (ackSATimeout + ackTimeout) / 1000}));
}
severeAlertIssued = true;
} else if (!suspected) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ConnectionTable_UNABLE_TO_FORM_A_TCPIP_CONNECTION_TO_0_IN_OVER_1_SECONDS,
- new Object[] {this.id, (ackTimeout) / 1000}));
+ new Object[]{this.id, (ackTimeout) / 1000}));
((GMSMembershipManager) mgr).suspectMember(targetMember,
"Unable to form a TCP/IP connection in a reasonable amount of time");
suspected = true;
@@ -1342,14 +1353,18 @@ public class ConnectionTable {
return ct;
}
- /** keep track of a socket that is trying to connect() for shutdown purposes */
+ /**
+ * keep track of a socket that is trying to connect() for shutdown purposes
+ */
public void addConnectingSocket(Socket socket, InetAddress addr) {
synchronized (connectingSockets) {
connectingSockets.put(socket, new ConnectingSocketInfo(addr));
}
}
- /** remove a socket from the tracked set. It should be connected at this point */
+ /**
+ * remove a socket from the tracked set. It should be connected at this point
+ */
public void removeConnectingSocket(Socket socket) {
synchronized (connectingSockets) {
connectingSockets.remove(socket);
--
To stop receiving notification emails like this one, please contact
udo@apache.org.