You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/12/15 17:45:35 UTC
[2/3] incubator-geode git commit: Revert "Revert "Removing
TCPConduit's Stub ID class""
Revert "Revert "Removing TCPConduit's Stub ID class""
This reverts commit 507f2f3a905e70fcabed9b83d4dc966ef3e9e6ec.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4bf4557b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4bf4557b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4bf4557b
Branch: refs/heads/develop
Commit: 4bf4557b2cfe12b9396aeb43dd0b916cbcb98b89
Parents: ec9d16a
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Dec 15 08:21:27 2015 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Dec 15 08:21:27 2015 -0800
----------------------------------------------------------------------
.../internal/DistributionManager.java | 8 -
.../distributed/internal/StartupMessage.java | 1 -
.../internal/direct/DirectChannel.java | 93 +------
.../internal/direct/MissingStubException.java | 37 ---
.../internal/direct/ShunnedMemberException.java | 34 +++
.../internal/membership/MembershipManager.java | 29 +-
.../gms/mgr/GMSMembershipManager.java | 197 ++-----------
.../internal/i18n/ParentLocalizedStrings.java | 6 +-
.../gemfire/internal/tcp/Connection.java | 117 ++++----
.../gemfire/internal/tcp/ConnectionTable.java | 91 +++---
.../internal/tcp/MemberShunnedException.java | 7 +-
.../gemfire/internal/tcp/ServerDelegate.java | 5 +-
.../com/gemstone/gemfire/internal/tcp/Stub.java | 164 -----------
.../gemfire/internal/tcp/TCPConduit.java | 274 +++----------------
.../internal/DistributionManagerDUnitTest.java | 6 +-
.../gms/mgr/GMSMembershipManagerJUnitTest.java | 31 +--
.../internal/tcp/ConnectionJUnitTest.java | 3 +-
17 files changed, 233 insertions(+), 870 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
index 964845c..e3c342a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
@@ -91,7 +91,6 @@ import com.gemstone.gemfire.internal.sequencelog.MembershipLogger;
import com.gemstone.gemfire.internal.tcp.Connection;
import com.gemstone.gemfire.internal.tcp.ConnectionTable;
import com.gemstone.gemfire.internal.tcp.ReenteredConnectException;
-import com.gemstone.gemfire.internal.tcp.Stub;
import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantLock;
/**
@@ -2715,13 +2714,6 @@ public class DistributionManager
return false; // no peers, we are alone.
}
- // ensure we have stubs for everyone else
- Iterator it = allOthers.iterator();
- while (it.hasNext()) {
- InternalDistributedMember member = (InternalDistributedMember)it.next();
- membershipManager.getStubForMember(member);
- }
-
try {
ok = op.sendStartupMessage(allOthers, STARTUP_TIMEOUT, equivs,
redundancyZone, enforceUniqueZone());
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
index 96f8b60..01f8c62 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
@@ -37,7 +37,6 @@ import com.gemstone.gemfire.internal.InternalDataSerializer.SerializerAttributes
import com.gemstone.gemfire.internal.InternalInstantiator.InstantiatorAttributesHolder;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.tcp.Stub;
/**
* A message that is sent to all other distribution manager when
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
index 14ff923..d4df3bf 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
@@ -38,6 +38,7 @@ import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.ToDataException;
import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DMStats;
@@ -64,7 +65,6 @@ import com.gemstone.gemfire.internal.tcp.Connection;
import com.gemstone.gemfire.internal.tcp.ConnectionException;
import com.gemstone.gemfire.internal.tcp.MemberShunnedException;
import com.gemstone.gemfire.internal.tcp.MsgStreamer;
-import com.gemstone.gemfire.internal.tcp.Stub;
import com.gemstone.gemfire.internal.tcp.TCPConduit;
import com.gemstone.gemfire.internal.util.Breadcrumbs;
import com.gemstone.gemfire.internal.util.concurrent.ReentrantSemaphore;
@@ -115,13 +115,6 @@ public class DirectChannel {
}
/**
- * Returns the endpoint ID for the direct channel
- */
- public Stub getLocalStub() {
- return conduit.getId();
- }
-
- /**
* when the initial number of members is known, this method is invoked
* to ensure that connections to those members can be established in a
* reasonable amount of time. See bug 39848
@@ -181,7 +174,7 @@ public class DirectChannel {
this.groupOrderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS);
this.groupUnorderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS);
logger.info(LocalizedMessage.create(
- LocalizedStrings.DirectChannel_GEMFIRE_P2P_LISTENER_STARTED_ON__0, conduit.getId()));
+ LocalizedStrings.DirectChannel_GEMFIRE_P2P_LISTENER_STARTED_ON__0, conduit.getLocalAddr()));
}
catch (ConnectionException ce) {
@@ -192,48 +185,6 @@ public class DirectChannel {
}
-// /**
-// *
-// * @param addr destination for the message
-// * @param stubMap map containing all the stubs
-// * @param msg the original message
-// * @param msgBuf the serialized message
-// * @param directAck true if we need an ack
-// * @param processorType the type (serialized, etc.)
-// * @return if directAck, the Connection that needs the acknowledgment
-// * @throws MissingStubException if we do not have a Stub for the recipient
-// * @throws IOException if the message could not be sent
-// */
-// private Connection attemptSingleSend(MembershipManager mgr,
-// InternalDistributedMember addr,
-// DistributionMessage msg, ByteBuffer msgBuf,
-// boolean directAck, int processorType)
-// throws MissingStubException, IOException
-// {
-// if (!msg.deliverToSender() && localAddr.equals(addr))
-// return null;
-
-// if (addr == null)
-// return null;
-// Stub dest = mgr.getStubForMember(addr);
-// if (dest == null) {
-// // This should only happen if the member is no longer in the view.
-// Assert.assertTrue(!mgr.memberExists(addr));
-// throw new MissingStubException("No stub");
-// }
-// try {
-// msgBuf.position(0); // fix for bug#30680
-// Connection con = conduit.sendSync(dest, msgBuf, processorType, msg);
-// if (directAck)
-// return con;
-// else
-// return null;
-// }
-// catch(IOException t) {
-// throw t;
-// }
-// }
-
/**
* Return how many concurrent operations should be allowed by default.
* since 6.6, this has been raised to Integer.MAX value from the number
@@ -639,22 +590,13 @@ public class DirectChannel {
continue;
}
- Stub stub = mgr.getStubForMember(destination);
- if (stub == null) {
+ if (!mgr.memberExists(destination) || mgr.shutdownInProgress() || mgr.isShunned(destination)) {
// This should only happen if the member is no longer in the view.
if (logger.isTraceEnabled(LogMarker.DM)) {
- logger.trace(LogMarker.DM, "No Stub for {}", destination);
+ logger.trace(LogMarker.DM, "Not a member: {}", destination);
}
- // The only time getStubForMember returns null is if we are
- // shunning that member or we are shutting down.
- // So the following assertion is wrong:
- //Assert.assertTrue(!mgr.memberExists(destination));
- // instead we should:
- // Assert.assertTrue(mgr.shutdownInProgress() || mgr.isShunned(destination));
- //but this is not worth doing and isShunned is not public.
- // SO the assert has been deadcoded.
if (ce == null) ce = new ConnectExceptions();
- ce.addFailure(destination, new MissingStubException(LocalizedStrings.DirectChannel_NO_STUB_0.toLocalizedString()));
+ ce.addFailure(destination, new ShunnedMemberException(LocalizedStrings.DirectChannel_SHUNNING_0.toLocalizedString()));
}
else {
try {
@@ -662,8 +604,8 @@ public class DirectChannel {
if (ackTimeout > 0) {
startTime = System.currentTimeMillis();
}
- Connection con = conduit.getConnection(destination, stub,
- preserveOrder, retry, startTime, ackTimeout, ackSDTimeout);
+ Connection con = conduit.getConnection(destination, preserveOrder,
+ retry, startTime, ackTimeout, ackSDTimeout);
con.setInUse(true, startTime, 0, 0, null); // fix for bug#37657
cons.add(con);
@@ -823,7 +765,7 @@ public class DirectChannel {
}
- public void receive(DistributionMessage msg, int bytesRead, Stub connId) {
+ public void receive(DistributionMessage msg, int bytesRead) {
if (disconnected) {
return;
}
@@ -844,10 +786,6 @@ public class DirectChannel {
}
}
-// public void newMemberConnected(InternalDistributedMember member, Stub id) {
-// receiver.newMemberConnected(member, id);
-// }
-
public InternalDistributedMember getLocalAddress() {
return this.localAddr;
}
@@ -930,13 +868,6 @@ public class DirectChannel {
}
}
- /** Create a TCPConduit stub from a JGroups InternalDistributedMember */
- public Stub createConduitStub(InternalDistributedMember addr) {
- int port = addr.getDirectChannelPort();
- Stub stub = new Stub(addr.getInetAddress(), port, addr.getVmViewId());
- return stub;
- }
-
public void closeEndpoint(InternalDistributedMember member, String reason) {
closeEndpoint(member, reason, true);
}
@@ -948,7 +879,7 @@ public class DirectChannel {
public void closeEndpoint(InternalDistributedMember member, String reason, boolean notifyDisconnect) {
TCPConduit tc = this.conduit;
if (tc != null) {
- tc.removeEndpoint(createConduitStub(member), reason, notifyDisconnect);
+ tc.removeEndpoint(member, reason, notifyDisconnect);
}
}
@@ -962,7 +893,7 @@ public class DirectChannel {
* the map to add the state to
* @since 5.1
*/
- public void getChannelStates(Stub member, Map result)
+ public void getChannelStates(DistributedMember member, Map result)
{
TCPConduit tc = this.conduit;
if (tc != null) {
@@ -974,7 +905,7 @@ public class DirectChannel {
* wait for the given connections to process the number of messages
* associated with the connection in the given map
*/
- public void waitForChannelState(Stub member, Map channelState)
+ public void waitForChannelState(DistributedMember member, Map channelState)
throws InterruptedException
{
if (Thread.interrupted()) throw new InterruptedException();
@@ -987,7 +918,7 @@ public class DirectChannel {
/**
* returns true if there are still receiver threads for the given member
*/
- public boolean hasReceiversFor(Stub mbr) {
+ public boolean hasReceiversFor(DistributedMember mbr) {
return this.conduit.hasReceiversFor(mbr);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java
deleted file mode 100644
index 49b4486..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.distributed.internal.direct;
-
-import com.gemstone.gemfire.GemFireCheckedException;
-
-/**
- * Exception thrown when the TCPConduit is unable to acquire a stub
- * for the given recipient.
- *
- * @author jpenney
- *
- */
-public class MissingStubException extends GemFireCheckedException
-{
-
- private static final long serialVersionUID = -6455664684151074915L;
-
- public MissingStubException(String msg) {
- super(msg);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java
new file mode 100644
index 0000000..59db762
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal.direct;
+
+import com.gemstone.gemfire.GemFireCheckedException;
+
+/**
+ * Exception thrown when a member is no longer in the distributed system
+ *
+ */
+public class ShunnedMemberException extends GemFireCheckedException
+{
+
+ private static final long serialVersionUID = -6455664684151074915L;
+
+ public ShunnedMemberException(String msg) {
+ super(msg);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
index a46680b..7416efa 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java
@@ -27,7 +27,6 @@ import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DMStats;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.internal.logging.InternalLogWriter;
-import com.gemstone.gemfire.internal.tcp.Stub;
/**
* A MembershipManager is responsible for reporting a MemberView, as well as
@@ -74,7 +73,7 @@ public interface MembershipManager {
* @param m the member
* @return true if it still exists
*/
- public boolean memberExists(InternalDistributedMember m);
+ public boolean memberExists(DistributedMember m);
/**
* Is this manager still connected? If it has not been initialized, this
@@ -143,25 +142,6 @@ public interface MembershipManager {
throws NotSerializableException;
/**
- * Return a {@link Stub} referring to the given member. A <em>null</em> may
- * be returned if the system is not employing stubs for communication.
- *
- * @param m the member
- * @return the stub
- */
- public Stub getStubForMember(InternalDistributedMember m);
-
- /**
- * Return a {@link InternalDistributedMember} associated with the given Stub. This
- * method may return a null if Stubs are not being used.
- * @param s Stub to look up
- * @param validated true if member must be in the current view
- * @return the member associated with the given stub, if any
- */
- public InternalDistributedMember getMemberForStub(Stub s, boolean validated);
-
-
- /**
* Indicates to the membership manager that the system is shutting down.
* Typically speaking, this means that new connection attempts are to be
* ignored and disconnect failures are to be (more) tolerated.
@@ -286,7 +266,7 @@ public interface MembershipManager {
*/
public void warnShun(DistributedMember mbr);
- public boolean addSurpriseMember(DistributedMember mbr, Stub stub);
+ public boolean addSurpriseMember(DistributedMember mbr);
/** if a StartupMessage is going to reject a new member, this should be used
* to make sure we don't keep that member on as a "surprise member"
@@ -307,6 +287,11 @@ public interface MembershipManager {
* @return true if the member is a surprise member
*/
public boolean isSurpriseMember(DistributedMember m);
+
+ /**
+ * Returns true if the member is being shunned
+ */
+ public boolean isShunned(DistributedMember m);
/**
* Forces use of UDP for communications in the current thread. UDP is
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index 0b7a544..7be0a3a 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -94,7 +94,6 @@ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.shared.StringPrintWriter;
import com.gemstone.gemfire.internal.tcp.ConnectExceptions;
import com.gemstone.gemfire.internal.tcp.MemberShunnedException;
-import com.gemstone.gemfire.internal.tcp.Stub;
import com.gemstone.gemfire.internal.util.Breadcrumbs;
public class GMSMembershipManager implements MembershipManager, Manager
@@ -156,7 +155,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
boolean crashed;
String reason;
DistributionMessage dmsg;
- Stub stub;
NetView gmsView;
@Override
@@ -165,7 +163,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
sb.append("kind=");
switch (kind) {
case SURPRISE_CONNECT:
- sb.append("connect; member = <" + member + ">; stub = " + stub);
+ sb.append("connect; member = <" + member + ">");
break;
case VIEW:
String text = gmsView.toString();
@@ -184,12 +182,10 @@ public class GMSMembershipManager implements MembershipManager, Manager
/**
* Create a surprise connect event
* @param member the member connecting
- * @param id the stub
*/
- StartupEvent(final InternalDistributedMember member, final Stub id) {
+ StartupEvent(final InternalDistributedMember member) {
this.kind = SURPRISE_CONNECT;
this.member = member;
- this.stub = id;
}
/**
* Indicate if this is a surprise connect event
@@ -282,24 +278,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
volatile boolean hasJoined;
/**
- * a map keyed on InternalDistributedMember, values are Stubs that represent direct
- * channels to other systems
- *
- * Accesses must be under the read or write lock of {@link #latestViewLock}.
- */
- protected final Map<InternalDistributedMember, Stub> memberToStubMap =
- new ConcurrentHashMap<InternalDistributedMember, Stub>();
-
- /**
- * a map of direct channels (Stub) to InternalDistributedMember. key instanceof Stub
- * value instanceof InternalDistributedMember
- *
- * Accesses must be under the read or write lock of {@link #latestViewLock}.
- */
- protected final Map<Stub, InternalDistributedMember> stubToMemberMap =
- new ConcurrentHashMap<Stub, InternalDistributedMember>();
-
- /**
* Members of the distributed system that we believe have shut down.
* Keys are instances of {@link InternalDistributedMember}, values are
* Longs indicating the time this member was shunned.
@@ -547,12 +525,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
}
}
- // fix for bug #42006, lingering old identity
- Object oldStub = this.memberToStubMap.remove(m);
- if (oldStub != null) {
- this.stubToMemberMap.remove(oldStub);
- }
-
if (shutdownInProgress()) {
addShunnedMember(m);
continue; // no additions processed after shutdown begins
@@ -806,9 +778,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
if (directChannel != null) {
directChannel.setLocalAddr(address);
- Stub stub = directChannel.getLocalStub();
- memberToStubMap.put(address, stub);
- stubToMemberMap.put(stub, address);
}
this.hasJoined = true;
@@ -905,17 +874,15 @@ public class GMSMembershipManager implements MembershipManager, Manager
/**
* Process a surprise connect event, or place it on the startup queue.
* @param member the member
- * @param stub its stub
*/
protected void handleOrDeferSurpriseConnect(InternalDistributedMember member) {
- Stub stub = new Stub(member.getInetAddress(), member.getDirectChannelPort(), member.getVmViewId());
synchronized (startupLock) {
if (!processingEvents) {
- startupMessages.add(new StartupEvent(member, stub));
+ startupMessages.add(new StartupEvent(member));
return;
}
}
- processSurpriseConnect(member, stub);
+ processSurpriseConnect(member);
}
public void startupMessageFailed(DistributedMember mbr, String failureMessage) {
@@ -941,12 +908,9 @@ public class GMSMembershipManager implements MembershipManager, Manager
* been added, simply returns; else adds the member.
*
* @param dm the member joining
- * @param stub the member's stub
*/
- public boolean addSurpriseMember(DistributedMember dm,
- Stub stub) {
+ public boolean addSurpriseMember(DistributedMember dm) {
final InternalDistributedMember member = (InternalDistributedMember)dm;
- Stub s = null;
boolean warn = false;
latestViewLock.writeLock().lock();
@@ -1009,16 +973,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
startCleanupTimer();
} // cleanupTimer == null
- // fix for bug #42006, lingering old identity
- Object oldStub = this.memberToStubMap.remove(member);
- if (oldStub != null) {
- this.stubToMemberMap.remove(oldStub);
- }
-
- s = stub == null ? getStubForMember(member) : stub;
- // Make sure that channel information is consistent
- addChannel(member, s);
-
// Ensure that the member is accounted for in the view
// Conjure up a new view including the new member. This is necessary
// because we are about to tell the listener about a new member, so
@@ -1154,7 +1108,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
// If it's a new sender, wait our turn, generate the event
if (isNew) {
- shunned = !addSurpriseMember(m, getStubForMember(m));
+ shunned = !addSurpriseMember(m);
} // isNew
}
@@ -1166,7 +1120,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
if (shunned) { // bug #41538 - shun notification must be outside synchronization to avoid hanging
warnShun(m);
logger.info("Membership: Ignoring message from shunned member <{}>:{}", m, msg);
- throw new MemberShunnedException(getStubForMember(m));
+ throw new MemberShunnedException(m);
}
listener.messageReceived(msg);
@@ -1248,13 +1202,11 @@ public class GMSMembershipManager implements MembershipManager, Manager
* grabbed a stable view if this is really a new member.
*
* @param member
- * @param stub
*/
private void processSurpriseConnect(
- InternalDistributedMember member,
- Stub stub)
+ InternalDistributedMember member)
{
- addSurpriseMember(member, stub);
+ addSurpriseMember(member);
}
/**
@@ -1276,7 +1228,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
processView(o.gmsView.getViewId(), o.gmsView);
}
else if (o.isSurpriseConnect()) { // connect
- processSurpriseConnect(o.member, o.stub);
+ processSurpriseConnect(o.member);
}
else // sanity
@@ -1450,7 +1402,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
}
}
- public boolean memberExists(InternalDistributedMember m) {
+ public boolean memberExists(DistributedMember m) {
latestViewLock.readLock().lock();
NetView v = latestView;
latestViewLock.readLock().unlock();
@@ -1525,12 +1477,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
directChannel.emergencyClose();
}
- // could we guarantee not to allocate objects? We're using Darrel's
- // factory, so it's possible that an unsafe implementation could be
- // introduced here.
-// stubToMemberMap.clear();
-// memberToStubMap.clear();
-
if (DEBUG) {
System.err.println("DEBUG: done closing GroupMembershipService");
}
@@ -1767,7 +1713,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
allDestinations = true;
latestViewLock.writeLock().lock();
try {
- Set keySet = memberToStubMap.keySet();
+ List<InternalDistributedMember> keySet = latestView.getMembers();
keys = new InternalDistributedMember[keySet.size()];
keys = (InternalDistributedMember[])keySet.toArray(keys);
} finally {
@@ -2020,80 +1966,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
// not currently supported by this manager
}
- /**
- * Get or create stub for given member
- */
- public Stub getStubForMember(InternalDistributedMember m)
- {
- if (shutdownInProgress) {
- throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), services.getShutdownCause());
- }
-
- if (services.getConfig().getDistributionConfig().getDisableTcp()) {
- return new Stub(m.getInetAddress(), m.getPort(), m.getVmViewId());
- }
-
- // Return existing one if it is already in place
- Stub result;
- result = (Stub)memberToStubMap.get(m);
- if (result != null)
- return result;
-
- latestViewLock.writeLock().lock();
- try {
- // Do all of this work in a critical region to prevent
- // members from slipping in during shutdown
- if (shutdownInProgress())
- return null; // don't try to create a stub during shutdown
- if (isShunned(m))
- return null; // don't let zombies come back to life
-
- // OK, create one. Update the table to reflect the creation.
- result = directChannel.createConduitStub(m);
- addChannel(m, result);
- } finally {
- latestViewLock.writeLock().unlock();
- }
- return result;
- }
-
- public InternalDistributedMember getMemberForStub(Stub s, boolean validated)
- {
- latestViewLock.writeLock().lock();
- try {
- if (shutdownInProgress) {
- throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), services.getShutdownCause());
- }
- InternalDistributedMember result = (InternalDistributedMember)
- stubToMemberMap.get(s);
- if (result != null) {
- if (validated && !this.latestView.contains(result)) {
- // Do not return this member unless it is in the current view.
- if (!surpriseMembers.containsKey(result)) {
- // if not a surprise member, this stub is lingering and should be removed
- stubToMemberMap.remove(s);
- memberToStubMap.remove(result);
- }
- result = null;
- // fall through to see if there is a newer member using the same direct port
- }
- }
- if (result == null) {
- // it may have not been added to the stub->idm map yet, so check the current view
- for (InternalDistributedMember idm: latestView.getMembers()) {
- if (GMSUtil.compareAddresses(idm.getInetAddress(), s.getInetAddress()) == 0
- && idm.getDirectChannelPort() == s.getPort()) {
- addChannel(idm, s);
- return idm;
- }
- }
- }
- return result;
- } finally {
- latestViewLock.writeLock().unlock();
- }
- }
-
public void setShutdown()
{
latestViewLock.writeLock().lock();
@@ -2109,24 +1981,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
return shutdownInProgress || (dm != null && dm.shutdownInProgress());
}
- /**
- * Add a mapping from the given member to the given stub. Must
- * be called with {@link #latestViewLock} held.
- *
- * @param member
- * @param theChannel
- */
- protected void addChannel(InternalDistributedMember member, Stub theChannel)
- {
- if (theChannel != null) {
- // Don't overwrite existing stub information with a null
- this.memberToStubMap.put(member, theChannel);
-
- // Can't create reverse mapping if the stub is null
- this.stubToMemberMap.put(theChannel, member);
- }
- }
-
/**
* Clean up and create consistent new view with member removed.
@@ -2137,12 +1991,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
protected void destroyMember(final InternalDistributedMember member,
boolean crashed, final String reason) {
- // Clean up the maps
- Stub theChannel = (Stub)memberToStubMap.remove(member);
- if (theChannel != null) {
- this.stubToMemberMap.remove(theChannel);
- }
-
// Make sure it is removed from the view
latestViewLock.writeLock().lock();
try {
@@ -2365,12 +2213,11 @@ public class GMSMembershipManager implements MembershipManager, Manager
/* non-thread-owned serial channels and high priority channels are not
* included
*/
- public Map getMessageState(DistributedMember member, boolean includeMulticast) {
+ public Map getChannelStates(DistributedMember member, boolean includeMulticast) {
Map result = new HashMap();
- Stub stub = (Stub)memberToStubMap.get(member);
DirectChannel dc = directChannel;
- if (stub != null && dc != null) {
- dc.getChannelStates(stub, result);
+ if (dc != null) {
+ dc.getChannelStates(member, result);
}
services.getMessenger().getMessageState((InternalDistributedMember)member, result, includeMulticast);
return result;
@@ -2381,15 +2228,8 @@ public class GMSMembershipManager implements MembershipManager, Manager
{
if (Thread.interrupted()) throw new InterruptedException();
DirectChannel dc = directChannel;
- Stub stub;
- latestViewLock.writeLock().lock();
- try {
- stub = (Stub)memberToStubMap.get(otherMember);
- } finally {
- latestViewLock.writeLock().unlock();
- }
- if (dc != null && stub != null) {
- dc.waitForChannelState(stub, state);
+ if (dc != null) {
+ dc.waitForChannelState(otherMember, channelState);
}
services.getMessenger().waitForMessageState((InternalDistributedMember)otherMember, state);
}
@@ -2405,7 +2245,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
boolean result = false;
DirectChannel dc = directChannel;
InternalDistributedMember idm = (InternalDistributedMember)mbr;
- Stub stub = new Stub(idm.getInetAddress(), idm.getPort(), idm.getVmViewId());
int memberTimeout = this.services.getConfig().getDistributionConfig().getMemberTimeout();
long pauseTime = (memberTimeout < 1000) ? 100 : memberTimeout / 10;
boolean wait;
@@ -2413,7 +2252,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
do {
wait = false;
if (dc != null) {
- if (dc.hasReceiversFor(stub)) {
+ if (dc.hasReceiversFor(idm)) {
wait = true;
}
if (wait && logger.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java
index 7bb97b9..780fe18 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java
@@ -1109,7 +1109,7 @@ class ParentLocalizedStrings {
public static final StringId TCPConduit_ENDING_RECONNECT_ATTEMPT_BECAUSE_0_HAS_DISAPPEARED = new StringId(2086, "Ending reconnect attempt because {0} has disappeared.");
public static final StringId TCPConduit_ENDING_RECONNECT_ATTEMPT_TO_0_BECAUSE_SHUTDOWN_HAS_STARTED = new StringId(2087, "Ending reconnect attempt to {0} because shutdown has started.");
public static final StringId TCPConduit_ERROR_SENDING_MESSAGE_TO_0_WILL_REATTEMPT_1 = new StringId(2088, "Error sending message to {0} (will reattempt): {1}");
- public static final StringId TCPConduit_EXCEPTION_CREATING_SERVERSOCKET = new StringId(2089, "While creating ServerSocket and Stub on port {0} with address {1}");
+ public static final StringId TCPConduit_EXCEPTION_CREATING_SERVERSOCKET = new StringId(2089, "While creating ServerSocket on port {0} with address {1}");
public static final StringId TCPConduit_EXCEPTION_PARSING_P2PIDLECONNECTIONTIMEOUT = new StringId(2090, "exception parsing p2p.idleConnectionTimeout");
public static final StringId TCPConduit_EXCEPTION_PARSING_P2PTCPBUFFERSIZE = new StringId(2091, "exception parsing p2p.tcpBufferSize");
public static final StringId TCPConduit_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1 = new StringId(2092, "Failed to accept connection from {0} because {1}");
@@ -1444,7 +1444,7 @@ class ParentLocalizedStrings {
public static final StringId Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1 = new StringId(2432, "Detected wrong version of GemFire product during handshake. Expected {0} but found {1}");
public static final StringId Connection_FORCED_DISCONNECT_SENT_TO_0 = new StringId(2433, "Forced disconnect sent to {0}");
public static final StringId Connection_HANDSHAKE_FAILED = new StringId(2434, "Handshake failed");
- public static final StringId Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP = new StringId(2435, "Member for stub {0} left the group");
+ public static final StringId Connection_MEMBER_LEFT_THE_GROUP = new StringId(2435, "Member {0} left the group");
public static final StringId Connection_NOT_CONNECTED_TO_0 = new StringId(2436, "Not connected to {0}");
public static final StringId Connection_NULL_CONNECTIONTABLE = new StringId(2437, "Null ConnectionTable");
public static final StringId Connection_SOCKET_HAS_BEEN_CLOSED = new StringId(2438, "socket has been closed");
@@ -1542,7 +1542,7 @@ class ParentLocalizedStrings {
public static final StringId DefaultQuery_WHEN_QUERYING_A_PARTITIONED_REGION_THE_PROJECTIONS_MUST_NOT_REFERENCE_ANY_REGIONS = new StringId(2530, "When querying a Partitioned Region, the projections must not reference any regions");
public static final StringId DestroyMessage_FAILED_SENDING_0 = new StringId(2531, "Failed sending < {0} >");
public static final StringId DirectChannel_COMMUNICATIONS_DISCONNECTED = new StringId(2532, "communications disconnected");
- public static final StringId DirectChannel_NO_STUB_0 = new StringId(2533, "No stub {0}");
+ public static final StringId DirectChannel_SHUNNING_0 = new StringId(2533, "Member is being shunned: {0}");
public static final StringId DirectChannel_UNKNOWN_ERROR_SERIALIZING_MESSAGE = new StringId(2534, "Unknown error serializing message");
public static final StringId DiskEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING = new StringId(2535, "An IOException was thrown while serializing.");
public static final StringId DiskEntry_DISK_REGION_IS_NULL = new StringId(2536, "Disk region is null");
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index f918812..74660da 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -50,6 +50,7 @@ import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.ConflationKey;
import com.gemstone.gemfire.distributed.internal.DM;
@@ -72,7 +73,6 @@ import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.ByteArrayDataInput;
import com.gemstone.gemfire.internal.DSFIDFactory;
import com.gemstone.gemfire.internal.InternalDataSerializer;
-import com.gemstone.gemfire.internal.SocketCloser;
import com.gemstone.gemfire.internal.SocketCreator;
import com.gemstone.gemfire.internal.SocketUtils;
import com.gemstone.gemfire.internal.SystemTimer;
@@ -222,11 +222,6 @@ public class Connection implements Runnable {
/** the ID string of the conduit (for logging) */
String conduitIdStr;
- /** remoteId identifies the remote conduit's listener. It does NOT
- identify the "port" that this connection's socket is attached
- to, which is a different thing altogether */
- Stub remoteId;
-
/** Identifies the java group member on the other side of the connection. */
InternalDistributedMember remoteAddr;
@@ -801,7 +796,7 @@ public class Connection implements Runnable {
}
if (success) {
if (this.isReceiver) {
- needToClose = !owner.getConduit().getMembershipManager().addSurpriseMember(this.remoteAddr, this.remoteId);
+ needToClose = !owner.getConduit().getMembershipManager().addSurpriseMember(this.remoteAddr);
if (needToClose) {
reason = "this member is shunned";
}
@@ -845,7 +840,7 @@ public class Connection implements Runnable {
* @param beingSick
*/
private void asyncClose(boolean beingSick) {
- // note: remoteId may be null if this is a receiver that hasn't finished its handshake
+ // note: remoteAddr may be null if this is a receiver that hasn't finished its handshake
// we do the close in a background thread because the operation may hang if
// there is a problem with the network. See bug #46659
@@ -1018,8 +1013,7 @@ public class Connection implements Runnable {
protected static Connection createSender(final MembershipManager mgr,
final ConnectionTable t,
final boolean preserveOrder,
- final Stub key,
- final InternalDistributedMember remoteAddr,
+ final DistributedMember remoteAddr,
final boolean sharedResource,
final long startTime,
final long ackTimeout,
@@ -1074,9 +1068,8 @@ public class Connection implements Runnable {
}
if (firstTime) {
firstTime = false;
- InternalDistributedMember m = mgr.getMemberForStub(key, true);
- if (m == null) {
- throw new IOException("Member for stub " + key + " left the group");
+ if (!mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress()) {
+ throw new IOException("Member " + remoteAddr + " left the system");
}
}
else {
@@ -1084,7 +1077,7 @@ public class Connection implements Runnable {
// alert listener should not prevent cache operations from continuing
if (AlertAppender.isThreadAlerting()) {
// do not change the text of this exception - it is looked for in exception handlers
- throw new IOException("Cannot form connection to alert listener " + key);
+ throw new IOException("Cannot form connection to alert listener " + remoteAddr);
}
// Wait briefly...
@@ -1097,20 +1090,19 @@ public class Connection implements Runnable {
t.getConduit().getCancelCriterion().checkCancelInProgress(ie);
}
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
- InternalDistributedMember m = mgr.getMemberForStub(key, true);
- if (m == null) {
- throw new IOException(LocalizedStrings.Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP.toLocalizedString(key));
+ if (giveUpOnMember(mgr, remoteAddr)) {
+ throw new IOException(LocalizedStrings.Connection_MEMBER_LEFT_THE_GROUP.toLocalizedString(remoteAddr));
}
if (!warningPrinted) {
warningPrinted = true;
- logger.warn(LocalizedMessage.create(LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0, m));
+ logger.warn(LocalizedMessage.create(LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0, remoteAddr));
}
t.getConduit().stats.incReconnectAttempts();
}
//create connection
try {
conn = null;
- conn = new Connection(mgr, t, preserveOrder, key, remoteAddr, sharedResource);
+ conn = new Connection(mgr, t, preserveOrder, remoteAddr, sharedResource);
}
catch (javax.net.ssl.SSLHandshakeException se) {
// no need to retry if certificates were rejected
@@ -1118,8 +1110,7 @@ public class Connection implements Runnable {
}
catch (IOException ioe) {
// Only give up if the member leaves the view.
- InternalDistributedMember m = mgr.getMemberForStub(key, true);
- if (m == null) {
+ if (giveUpOnMember(mgr, remoteAddr)) {
throw ioe;
}
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -1130,7 +1121,7 @@ public class Connection implements Runnable {
connectionErrorLogged = true; // otherwise change to use 100ms intervals causes a lot of these
logger.info(LocalizedMessage.create(
LocalizedStrings.Connection_CONNECTION_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1,
- new Object[] {sharedResource, preserveOrder, m, ioe}));
+ new Object[] {sharedResource, preserveOrder, remoteAddr, ioe}));
}
} // IOException
finally {
@@ -1146,9 +1137,8 @@ public class Connection implements Runnable {
// something went wrong while reading the handshake
// and the socket was closed or this guy sent us a
// ShutdownMessage
- InternalDistributedMember m = mgr.getMemberForStub(key, true);
- if (m == null) {
- throw new IOException(LocalizedStrings.Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP.toLocalizedString(key));
+ if (giveUpOnMember(mgr, remoteAddr)) {
+ throw new IOException(LocalizedStrings.Connection_MEMBER_LEFT_THE_GROUP.toLocalizedString(remoteAddr));
}
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
// no success but no need to log; just retry
@@ -1161,8 +1151,7 @@ public class Connection implements Runnable {
throw e;
}
catch (ConnectionException e) {
- InternalDistributedMember m = mgr.getMemberForStub(key, true);
- if (m == null) {
+ if (giveUpOnMember(mgr, remoteAddr)) {
IOException ioe = new IOException(LocalizedStrings.Connection_HANDSHAKE_FAILED.toLocalizedString());
ioe.initCause(e);
throw ioe;
@@ -1170,17 +1159,16 @@ public class Connection implements Runnable {
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
logger.info(LocalizedMessage.create(
LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1,
- new Object[] {sharedResource, preserveOrder, m,e}));
+ new Object[] {sharedResource, preserveOrder, remoteAddr ,e}));
}
catch (IOException e) {
- InternalDistributedMember m = mgr.getMemberForStub(key, true);
- if (m == null) {
+ if (giveUpOnMember(mgr, remoteAddr)) {
throw e;
}
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
logger.info(LocalizedMessage.create(
LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1,
- new Object[] {sharedResource, preserveOrder, m,e}));
+ new Object[] {sharedResource, preserveOrder, remoteAddr ,e}));
if (!sharedResource && "Too many open files".equals(e.getMessage())) {
t.fileDescriptorsExhausted();
}
@@ -1220,7 +1208,7 @@ public class Connection implements Runnable {
if (conn == null) {
throw new ConnectionException(
LocalizedStrings.Connection_CONNECTION_FAILED_CONSTRUCTION_FOR_PEER_0
- .toLocalizedString(mgr.getMemberForStub(key, true)));
+ .toLocalizedString(remoteAddr));
}
if (preserveOrder && BATCH_SENDS) {
conn.createBatchSendBuffer();
@@ -1228,12 +1216,15 @@ public class Connection implements Runnable {
conn.finishedConnecting = true;
return conn;
}
+
+ private static boolean giveUpOnMember(MembershipManager mgr, DistributedMember remoteAddr) {
+ return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress();
+ }
- private void setRemoteAddr(InternalDistributedMember m, Stub stub) {
+ private void setRemoteAddr(DistributedMember m) {
this.remoteAddr = this.owner.getDM().getCanonicalId(m);
- this.remoteId = stub;
MembershipManager mgr = this.owner.owner.getMembershipManager();
- mgr.addSurpriseMember(m, stub);
+ mgr.addSurpriseMember(m);
}
/** creates a new connection to a remote server.
@@ -1243,11 +1234,11 @@ public class Connection implements Runnable {
private Connection(MembershipManager mgr,
ConnectionTable t,
boolean preserveOrder,
- Stub key,
- InternalDistributedMember remoteAddr,
+ DistributedMember remoteID,
boolean sharedResource)
throws IOException, DistributedSystemDisconnectedException
{
+ InternalDistributedMember remoteAddr = (InternalDistributedMember)remoteID;
if (t == null) {
throw new IllegalArgumentException(LocalizedStrings.Connection_CONNECTIONTABLE_IS_NULL.toLocalizedString());
}
@@ -1255,7 +1246,7 @@ public class Connection implements Runnable {
this.owner = t;
this.sharedResource = sharedResource;
this.preserveOrder = preserveOrder;
- setRemoteAddr(remoteAddr, key);
+ setRemoteAddr(remoteAddr);
this.conduitIdStr = this.owner.getConduit().getId().toString();
this.handshakeRead = false;
this.handshakeCancelled = false;
@@ -1265,7 +1256,7 @@ public class Connection implements Runnable {
// connect to listening socket
- InetSocketAddress addr = new InetSocketAddress(remoteId.getInetAddress(), remoteId.getPort());
+ InetSocketAddress addr = new InetSocketAddress(remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort());
if (useNIO()) {
SocketChannel channel = SocketChannel.open();
this.owner.addConnectingSocket(channel.socket(), addr.getAddress());
@@ -1325,15 +1316,15 @@ public class Connection implements Runnable {
else {
if (TCPConduit.useSSL) {
// socket = javax.net.ssl.SSLSocketFactory.getDefault()
- // .createSocket(remoteId.getInetAddress(), remoteId.getPort());
+ // .createSocket(remoteAddr.getInetAddress(), remoteAddr.getPort());
int socketBufferSize = sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize;
- this.socket = SocketCreator.getDefaultInstance().connectForServer( remoteId.getInetAddress(), remoteId.getPort(), socketBufferSize );
+ this.socket = SocketCreator.getDefaultInstance().connectForServer( remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort(), socketBufferSize );
// Set the receive buffer size local fields. It has already been set in the socket.
setSocketBufferSize(this.socket, false, socketBufferSize, true);
setSendBufferSize(this.socket);
}
else {
- //socket = new Socket(remoteId.getInetAddress(), remoteId.getPort());
+ //socket = new Socket(remoteAddr.getInetAddress(), remoteAddr.getPort());
Socket s = new Socket();
this.socket = s;
s.setTcpNoDelay(true);
@@ -1639,8 +1630,8 @@ public class Connection implements Runnable {
// we can't wait for the reader thread when running in an IBM JRE. See
// bug 41889
if (this.owner.owner.config.getEnableNetworkPartitionDetection() ||
- this.owner.owner.getLocalId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE ||
- this.owner.owner.getLocalId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
+ this.owner.owner.getLocalAddr().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE ||
+ this.owner.owner.getLocalAddr().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
}
{
@@ -1689,16 +1680,16 @@ public class Connection implements Runnable {
// Only remove endpoint if sender.
if (this.finishedConnecting) {
// only remove endpoint if our constructor finished
- this.owner.removeEndpoint(this.remoteId, reason);
+ this.owner.removeEndpoint(this.remoteAddr, reason);
}
}
}
else {
- this.owner.removeSharedConnection(reason, this.remoteId, this.preserveOrder, this);
+ this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
}
}
else if (!this.isReceiver) {
- this.owner.removeThreadConnection(this.remoteId, this);
+ this.owner.removeThreadConnection(this.remoteAddr, this);
}
}
else {
@@ -1706,10 +1697,10 @@ public class Connection implements Runnable {
// has never added this Connection to its maps since
// the calls in this block use our identity to do the removes.
if (this.sharedResource) {
- this.owner.removeSharedConnection(reason, this.remoteId, this.preserveOrder, this);
+ this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
}
else if (!this.isReceiver) {
- this.owner.removeThreadConnection(this.remoteId, this);
+ this.owner.removeThreadConnection(this.remoteAddr, this);
}
}
}
@@ -1753,7 +1744,7 @@ public class Connection implements Runnable {
} finally {
// bug36060: do the socket close within a finally block
if (logger.isDebugEnabled()) {
- logger.debug("Stopping {} for {}", p2pReaderName(), remoteId);
+ logger.debug("Stopping {} for {}", p2pReaderName(), remoteAddr);
}
initiateSuspicionIfSharedUnordered();
if (this.isReceiver) {
@@ -2338,8 +2329,7 @@ public class Connection implements Runnable {
.toLocalizedString(new Object[]{new Byte(HANDSHAKE_VERSION), new Byte(handShakeByte)}));
}
InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis);
- Stub stub = new Stub(remote.getInetAddress()/*fix for bug 33615*/, remote.getDirectChannelPort(), remote.getVmViewId());
- setRemoteAddr(remote, stub);
+ setRemoteAddr(remote);
Thread.currentThread().setName(LocalizedStrings.Connection_P2P_MESSAGE_READER_FOR_0.toLocalizedString(this.remoteAddr, this.socket.getPort()));
this.sharedResource = dis.readBoolean();
this.preserveOrder = dis.readBoolean();
@@ -2377,7 +2367,7 @@ public class Connection implements Runnable {
}
if (logger.isDebugEnabled()) {
- logger.debug("{} remoteId is {} {}", p2pReaderName(), this.remoteId,
+ logger.debug("{} remoteAddr is {} {}", p2pReaderName(), this.remoteAddr,
(this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
}
@@ -2555,7 +2545,7 @@ public class Connection implements Runnable {
throws IOException, ConnectionException
{
if (!connected) {
- throw new ConnectionException(LocalizedStrings.Connection_NOT_CONNECTED_TO_0.toLocalizedString(this.remoteId));
+ throw new ConnectionException(LocalizedStrings.Connection_NOT_CONNECTED_TO_0.toLocalizedString(this.remoteAddr));
}
if (this.batchFlusher != null) {
batchSend(buffer);
@@ -2778,7 +2768,7 @@ public class Connection implements Runnable {
if (this.disconnectRequested) {
buffer.position(origBufferPos);
// we have given up so just drop this message.
- throw new ConnectionException(LocalizedStrings.Connection_FORCED_DISCONNECT_SENT_TO_0.toLocalizedString(this.remoteId));
+ throw new ConnectionException(LocalizedStrings.Connection_FORCED_DISCONNECT_SENT_TO_0.toLocalizedString(this.remoteAddr));
}
if (!force && !this.asyncQueuingInProgress) {
// reset buffer since we will be sending it. This fixes bug 34832
@@ -2980,7 +2970,7 @@ public class Connection implements Runnable {
}
DM dm = this.owner.getDM();
if (dm == null) {
- this.owner.removeEndpoint(this.remoteId, LocalizedStrings.Connection_NO_DISTRIBUTION_MANAGER.toLocalizedString());
+ this.owner.removeEndpoint(this.remoteAddr, LocalizedStrings.Connection_NO_DISTRIBUTION_MANAGER.toLocalizedString());
return;
}
dm.getMembershipManager().requestMemberRemoval(this.remoteAddr,
@@ -3001,7 +2991,7 @@ public class Connection implements Runnable {
return;
}
}
- this.owner.removeEndpoint(this.remoteId,
+ this.owner.removeEndpoint(this.remoteAddr,
LocalizedStrings.Connection_FORCE_DISCONNECT_TIMED_OUT.toLocalizedString());
if (dm.getOtherDistributionManagerIds().contains(this.remoteAddr)) {
if (logger.isDebugEnabled()) {
@@ -3110,7 +3100,7 @@ public class Connection implements Runnable {
stats.incAsyncThreads(-1);
stats.incAsyncQueues(-1);
if (logger.isDebugEnabled()) {
- logger.debug("runNioPusher terminated id={} from {}/{}", conduitIdStr, remoteId, remoteAddr);
+ logger.debug("runNioPusher terminated id={} from {}/{}", conduitIdStr, remoteAddr, remoteAddr);
}
}
} finally {
@@ -3837,8 +3827,7 @@ public class Connection implements Runnable {
throw new IllegalStateException(LocalizedStrings.Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1.toLocalizedString(new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(handShakeByte)}));
}
InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis);
- Stub stub = new Stub(remote.getInetAddress()/*fix for bug 33615*/, remote.getDirectChannelPort(), remote.getVmViewId());
- setRemoteAddr(remote, stub);
+ setRemoteAddr(remote);
this.sharedResource = dis.readBoolean();
this.preserveOrder = dis.readBoolean();
this.uniqueId = dis.readLong();
@@ -3897,7 +3886,7 @@ public class Connection implements Runnable {
return;
}
if (logger.isDebugEnabled()) {
- logger.debug("P2P handshake remoteId is {}{}", this.remoteId,
+ logger.debug("P2P handshake remoteAddr is {}{}", this.remoteAddr,
(this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
}
try {
@@ -4031,12 +4020,6 @@ public class Connection implements Runnable {
this.accessed = true;
}
- /** returns the ConnectionKey stub representing the other side of
- this connection (host:port) */
- public final Stub getRemoteId() {
- return remoteId;
- }
-
/** return the DM id of the guy on the other side of this connection.
*/
public final InternalDistributedMember getRemoteAddress() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index bac356c..3816efe 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
@@ -60,7 +61,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
/** <p>ConnectionTable holds all of the Connection objects in a conduit.
Connections represent a pipe between two endpoints represented
- by generic Stubs.</p>
+ by generic DistributedMembers.</p>
@author Bruce Schuchardt
@author Darrel Schneider
@@ -345,7 +346,7 @@ public class ConnectionTable {
/**
* Process a newly created PendingConnection
*
- * @param id Stub on which the connection is created
+ * @param id DistributedMember on which the connection is created
* @param sharedResource whether the connection is used by multiple threads
* @param preserveOrder whether to preserve order
* @param m map to add the connection to
@@ -357,7 +358,7 @@ public class ConnectionTable {
* @throws IOException if unable to connect
* @throws DistributedSystemDisconnectedException
*/
- private Connection handleNewPendingConnection(Stub id, boolean sharedResource,
+ private Connection handleNewPendingConnection(DistributedMember id, boolean sharedResource,
boolean preserveOrder,
Map m, PendingConnection pc, long startTime, long ackThreshold, long ackSAThreshold)
throws IOException, DistributedSystemDisconnectedException
@@ -366,7 +367,7 @@ public class ConnectionTable {
Connection con = null;
try {
con = Connection.createSender(owner.getMembershipManager(), this, preserveOrder,
- id, this.owner.getMemberForStub(id, false),
+ id,
sharedResource,
startTime, ackThreshold, ackSAThreshold);
this.owner.stats.incSenders(sharedResource, preserveOrder);
@@ -442,7 +443,7 @@ public class ConnectionTable {
* unordered or conserve-sockets
* note that unordered connections are currently always shared
*
- * @param id the Stub on which we are creating a connection
+ * @param id the DistributedMember on which we are creating a connection
* @param threadOwnsResources whether unordered conn is owned by the current thread
* @param preserveOrder whether to preserve order
* @param startTime the ms clock start time for the operation
@@ -452,7 +453,7 @@ public class ConnectionTable {
* @throws IOException if unable to create the connection
* @throws DistributedSystemDisconnectedException
*/
- private Connection getUnorderedOrConserveSockets(Stub id,
+ private Connection getUnorderedOrConserveSockets(DistributedMember id,
boolean threadOwnsResources, boolean preserveOrder,
long startTime, long ackTimeout, long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException
@@ -527,7 +528,7 @@ public class ConnectionTable {
* @throws IOException if the connection could not be created
* @throws DistributedSystemDisconnectedException
*/
- Connection getOrderedAndOwned(Stub id, long startTime, long ackTimeout, long ackSATimeout)
+ Connection getOrderedAndOwned(DistributedMember id, long startTime, long ackTimeout, long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException {
Connection result = null;
@@ -566,7 +567,7 @@ public class ConnectionTable {
// OK, we have to create a new connection.
result = Connection.createSender(owner.getMembershipManager(),
this, true /* preserveOrder */, id,
- this.owner.getMemberForStub(id, false), false /* shared */,
+ false /* shared */,
startTime, ackTimeout, ackSATimeout);
if (logger.isDebugEnabled()) {
logger.debug("ConnectionTable: created an ordered connection: {}", result);
@@ -583,7 +584,7 @@ public class ConnectionTable {
ArrayList al = (ArrayList)this.threadConnectionMap.get(id);
if (al == null) {
- // First connection for this Stub. Make sure list for this
+ // First connection for this DistributedMember. Make sure list for this
// stub is created if it isn't already there.
al = new ArrayList();
@@ -651,7 +652,7 @@ public class ConnectionTable {
/**
* Get a new connection
- * @param id the Stub on which to create the connection
+ * @param id the DistributedMember on which to create the connection
* @param preserveOrder whether order should be preserved
* @param startTime the ms clock start time
* @param ackTimeout the ms ack-wait-threshold, or zero
@@ -660,7 +661,7 @@ public class ConnectionTable {
* @throws java.io.IOException if the connection could not be created
* @throws DistributedSystemDisconnectedException
*/
- protected Connection get(Stub id, boolean preserveOrder,
+ protected Connection get(DistributedMember id, boolean preserveOrder,
long startTime, long ackTimeout, long ackSATimeout)
throws java.io.IOException, DistributedSystemDisconnectedException
{
@@ -838,34 +839,38 @@ public class ConnectionTable {
/**
* Return true if our owner already knows that this endpoint is departing
*/
- protected boolean isEndpointShuttingDown(Stub stub) {
- return this.owner.getMemberForStub(stub, true) == null;
+ protected boolean isEndpointShuttingDown(DistributedMember id) {
+ return giveUpOnMember(owner.getDM().getMembershipManager(), id);
}
+ protected boolean giveUpOnMember(MembershipManager mgr, DistributedMember remoteAddr) {
+ return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress();
+ }
+
/** remove an endpoint and notify the membership manager of the departure */
- protected void removeEndpoint(Stub stub, String reason) {
+ protected void removeEndpoint(DistributedMember stub, String reason) {
removeEndpoint(stub, reason, true);
}
- protected void removeEndpoint(Stub stub, String reason, boolean notifyDisconnect) {
+ protected void removeEndpoint(DistributedMember memberID, String reason, boolean notifyDisconnect) {
if (this.closed) {
return;
}
boolean needsRemoval = false;
synchronized (this.orderedConnectionMap) {
- if (this.orderedConnectionMap.get(stub) != null)
+ if (this.orderedConnectionMap.get(memberID) != null)
needsRemoval = true;
}
if (!needsRemoval) {
synchronized (this.unorderedConnectionMap) {
- if (this.unorderedConnectionMap.get(stub) != null)
+ if (this.unorderedConnectionMap.get(memberID) != null)
needsRemoval = true;
}
}
if (!needsRemoval) {
ConcurrentMap cm = this.threadConnectionMap;
if (cm != null) {
- ArrayList al = (ArrayList)cm.get(stub);
+ ArrayList al = (ArrayList)cm.get(memberID);
needsRemoval = al != null && al.size() > 0;
}
}
@@ -873,14 +878,14 @@ public class ConnectionTable {
if (needsRemoval) {
InternalDistributedMember remoteAddress = null;
synchronized (this.orderedConnectionMap) {
- Object c = this.orderedConnectionMap.remove(stub);
+ Object c = this.orderedConnectionMap.remove(memberID);
if (c instanceof Connection) {
remoteAddress = ((Connection) c).getRemoteAddress();
}
closeCon(reason, c);
}
synchronized (this.unorderedConnectionMap) {
- Object c = this.unorderedConnectionMap.remove(stub);
+ Object c = this.unorderedConnectionMap.remove(memberID);
if (remoteAddress == null && (c instanceof Connection)) {
remoteAddress = ((Connection) c).getRemoteAddress();
}
@@ -890,7 +895,7 @@ public class ConnectionTable {
{
ConcurrentMap cm = this.threadConnectionMap;
if (cm != null) {
- ArrayList al = (ArrayList)cm.remove(stub);
+ ArrayList al = (ArrayList)cm.remove(memberID);
if (al != null) {
synchronized (al) {
for (Iterator it=al.iterator(); it.hasNext();) {
@@ -912,7 +917,7 @@ public class ConnectionTable {
for (Iterator it=connectingSockets.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
ConnectingSocketInfo info = (ConnectingSocketInfo)entry.getValue();
- if (info.peerAddress.equals(stub.getInetAddress())) {
+ if (info.peerAddress.equals(((InternalDistributedMember)memberID).getInetAddress())) {
toRemove.add(entry.getKey());
it.remove();
}
@@ -925,7 +930,7 @@ public class ConnectionTable {
}
catch (IOException e) {
if (logger.isDebugEnabled()) {
- logger.debug("caught exception while trying to close connecting socket for {}", stub, e);
+ logger.debug("caught exception while trying to close connecting socket for {}", memberID, e);
}
}
}
@@ -937,7 +942,7 @@ public class ConnectionTable {
synchronized (this.receivers) {
for (Iterator it=receivers.iterator(); it.hasNext();) {
Connection con = (Connection)it.next();
- if (stub.equals(con.getRemoteId())) {
+ if (memberID.equals(con.getRemoteAddress())) {
it.remove();
toRemove.add(con);
}
@@ -947,10 +952,13 @@ public class ConnectionTable {
Connection con = (Connection)it.next();
closeCon(reason, con);
}
- // call memberDeparted after doing the closeCon calls
- // so it can recursively call removeEndpoint
if (notifyDisconnect) {
- owner.getMemberForStub(stub, false);
+ // Before the removal of TCPConduit Stub addresses this used
+ // to call MembershipManager.getMemberForStub, which checked
+ // for a shutdown in progress and threw this exception:
+ if (owner.getDM().shutdownInProgress()) {
+ throw new DistributedSystemDisconnectedException("Shutdown in progress", owner.getDM().getMembershipManager().getShutdownCause());
+ }
}
if (remoteAddress != null) {
@@ -964,11 +972,11 @@ public class ConnectionTable {
}
/** check to see if there are still any receiver threads for the given end-point */
- protected boolean hasReceiversFor(Stub endPoint) {
+ protected boolean hasReceiversFor(DistributedMember endPoint) {
synchronized (this.receivers) {
for (Iterator it=receivers.iterator(); it.hasNext();) {
Connection con = (Connection)it.next();
- if (endPoint.equals(con.getRemoteId())) {
+ if (endPoint.equals(con.getRemoteAddress())) {
return true;
}
}
@@ -976,7 +984,7 @@ public class ConnectionTable {
return false;
}
- private static void removeFromThreadConMap(ConcurrentMap cm, Stub stub, Connection c) {
+ private static void removeFromThreadConMap(ConcurrentMap cm, DistributedMember stub, Connection c) {
if (cm != null) {
ArrayList al = (ArrayList)cm.get(stub);
if (al != null) {
@@ -986,7 +994,7 @@ public class ConnectionTable {
}
}
}
- protected void removeThreadConnection(Stub stub, Connection c) {
+ protected void removeThreadConnection(DistributedMember stub, Connection c) {
/*if (this.closed) {
return;
}*/
@@ -1001,7 +1009,7 @@ public class ConnectionTable {
} // synchronized
} // m != null
}
- void removeSharedConnection(String reason, Stub stub, boolean ordered, Connection c) {
+ void removeSharedConnection(String reason, DistributedMember stub, boolean ordered, Connection c) {
if (this.closed) {
return;
}
@@ -1054,7 +1062,7 @@ public class ConnectionTable {
Iterator it = m.entrySet().iterator();
while (it.hasNext()) {
Map.Entry me = (Map.Entry)it.next();
- Stub stub = (Stub)me.getKey();
+ DistributedMember stub = (DistributedMember)me.getKey();
Connection c = (Connection)me.getValue();
removeFromThreadConMap(this.threadConnectionMap, stub, c);
it.remove();
@@ -1079,7 +1087,7 @@ public class ConnectionTable {
* from being formed or new messages from being sent
* @since 5.1
*/
- protected void getThreadOwnedOrderedConnectionState(Stub member,
+ protected void getThreadOwnedOrderedConnectionState(DistributedMember member,
Map result) {
ConcurrentMap cm = this.threadConnectionMap;
@@ -1105,7 +1113,7 @@ public class ConnectionTable {
* wait for the given incoming connections to receive at least the associated
* number of messages
*/
- protected void waitForThreadOwnedOrderedConnectionState(Stub member,
+ protected void waitForThreadOwnedOrderedConnectionState(DistributedMember member,
Map connectionStates) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException(); // wisest to do this before the synchronize below
List r = null;
@@ -1115,14 +1123,14 @@ public class ConnectionTable {
for (Iterator it=r.iterator(); it.hasNext();) {
Connection con = (Connection)it.next();
if (!con.stopped && !con.isClosing() && !con.getOriginatedHere() && con.getPreserveOrder()
- && member.equals(con.getRemoteId())) {
+ && member.equals(con.getRemoteAddress())) {
Long state = (Long)connectionStates.remove(Long.valueOf(con.getUniqueId()));
if (state != null) {
long count = state.longValue();
while (!con.stopped && !con.isClosing() && con.getMessagesReceived() < count) {
if (logger.isDebugEnabled()) {
logger.debug("Waiting for connection {}/{} currently={} need={}",
- con.getRemoteId(), con.getUniqueId(), con.getMessagesReceived(), count);
+ con.getRemoteAddress(), con.getUniqueId(), con.getMessagesReceived(), count);
}
Thread.sleep(100);
}
@@ -1230,11 +1238,11 @@ public class ConnectionTable {
/**
* the stub we are connecting to
*/
- private final Stub id;
+ private final DistributedMember id;
private final Thread connectingThread;
- public PendingConnection(boolean preserveOrder, Stub id) {
+ public PendingConnection(boolean preserveOrder, DistributedMember id) {
this.preserveOrder = preserveOrder;
this.id = id;
this.connectingThread = Thread.currentThread();
@@ -1279,10 +1287,9 @@ public class ConnectionTable {
boolean severeAlertIssued = false;
boolean suspected = false;
- InternalDistributedMember targetMember = null;
+ DistributedMember targetMember = null;
if (ackSATimeout > 0) {
- targetMember =
- ((GMSMembershipManager)mgr).getMemberForStub(this.id, false);
+ targetMember = this.id;
}
for (;;) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java
index 5cd426f..a954814 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java
@@ -18,6 +18,7 @@
package com.gemstone.gemfire.internal.tcp;
import com.gemstone.gemfire.GemFireException;
+import com.gemstone.gemfire.distributed.DistributedMember;
/**
* MemberShunnedException may be thrown to prevent ack-ing a message
@@ -28,13 +29,13 @@ import com.gemstone.gemfire.GemFireException;
public class MemberShunnedException extends GemFireException
{
private static final long serialVersionUID = -8453126202477831557L;
- private Stub member;
+ private DistributedMember member;
/**
* constructor
* @param member the member that was shunned
*/
- public MemberShunnedException(Stub member) {
+ public MemberShunnedException(DistributedMember member) {
super("");
this.member = member;
}
@@ -42,7 +43,7 @@ public class MemberShunnedException extends GemFireException
/**
* @return the member that was shunned
*/
- public Stub getShunnedMember() {
+ public DistributedMember getShunnedMember() {
return this.member;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java
index fd495d9..cd711e7 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java
@@ -16,6 +16,7 @@
*/
package com.gemstone.gemfire.internal.tcp;
+import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.membership.*;
import com.gemstone.gemfire.i18n.LogWriterI18n;
@@ -34,7 +35,7 @@ import com.gemstone.gemfire.i18n.LogWriterI18n;
public interface ServerDelegate {
public void receive( DistributionMessage message, int bytesRead,
- Stub connId );
+ DistributedMember connId );
public LogWriterI18n getLogger();
@@ -42,5 +43,5 @@ public interface ServerDelegate {
* Called when a possibly new member is detected by receiving a direct channel
* message from him.
*/
- public void newMemberConnected(InternalDistributedMember member, Stub id);
+ public void newMemberConnected(InternalDistributedMember member);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java
deleted file mode 100644
index 2e4b91b..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.tcp;
-
-import java.io.*;
-import java.net.*;
-
-import com.gemstone.gemfire.DataSerializable;
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.internal.InternalDataSerializer;
-
-/** Stub represents an ip address and port.
-
- @author Bruce Schuchardt
- @since 2.0
-
- */
-
-public class Stub implements Externalizable, DataSerializable
-{
- private InetAddress inAddr;
- private int port;
- private int viewID;
-
- public Stub() {
- // public default needed for deserialization
- }
-
- public Stub(InetAddress addr, int port, int vmViewID) {
- viewID = vmViewID;
- inAddr = addr;
- this.port = port;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (o instanceof Stub) {
- Stub s = (Stub)o;
- boolean result;
- if (inAddr == null)
- result = s.inAddr == null;
- else
- result = inAddr.equals(s.inAddr);
- result = result && port == s.port;
- if (this.viewID != 0 && s.viewID != 0) {
- result = result && (this.viewID == s.viewID);
- }
- return result;
- }
- else {
- return false;
- }
- }
-
- // hashCode equates to the address hashCode for fast connection lookup
- @Override
- public int hashCode() {
- // do not use viewID in hashCode because it is changed after creating a stub
- int result = 0;
- // result += inAddr.hashCode(); // useless
- result += port;
- return result;
- }
-
- public void setViewID(int viewID) {
- this.viewID = viewID;
- }
-
- public int getPort() {
- return port;
- }
-
- public int getViewID() {
- return this.viewID;
- }
-
- public InetAddress getInetAddress() {
- return inAddr;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder(80);
- sb.append("tcp://");
- if (inAddr == null)
- sb.append("<null>");
- else
- sb.append(inAddr.toString());
- if (this.viewID != 0) {
- sb.append("<v"+this.viewID+">");
- }
- sb.append(":" + port);
- return sb.toString();
- }
-
- /**
- * Writes the contents of this <code>Stub</code> to a
- * <code>DataOutput</code>.
- *
- * @since 3.0
- */
- public void toData(DataOutput out)
- throws IOException
- {
- DataSerializer.writeInetAddress(inAddr, out);
- out.writeInt(port);
- out.writeInt(viewID);
- }
-
- /**
- * Reads the contents of this <code>Stub</code> from a
- * <code>DataOutput</code>.
- *
- * @since 3.0
- */
- public void fromData(DataInput in)
- throws IOException, ClassNotFoundException
- {
- inAddr = DataSerializer.readInetAddress(in);
- this.port = in.readInt();
- this.viewID = in.readInt();
- }
-
- /**
- * static factory method
- * @since 5.0.2
- */
- public static Stub createFromData(DataInput in)
- throws IOException, ClassNotFoundException
- {
- Stub result = new Stub();
- InternalDataSerializer.invokeFromData(result, in);
- return result;
- }
-
- public void writeExternal(ObjectOutput os)
- throws IOException
- {
- this.toData(os);
- }
-
- public void readExternal(ObjectInput is)
- throws IOException, ClassNotFoundException
- {
- this.fromData(is);
- }
-}