You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/06/04 16:19:55 UTC
[geode] branch develop updated: Revert "GEODE-5269
CommitConflictException after TransactionInDoubtException"
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 0d89728 Revert "GEODE-5269 CommitConflictException after TransactionInDoubtException"
0d89728 is described below
commit 0d897288f1cab6193ba1b1c809117bd85f0578bd
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Mon Jun 4 09:19:00 2018 -0700
Revert "GEODE-5269 CommitConflictException after TransactionInDoubtException"
This reverts commit f50b9450b918ad4409c5a6ebf374b88de8c75986.
Backing this out as it seems to cause intermittent failure in
ClientServerTransactionCCEDUnitTest.testBug42942
---
.../internal/ClusterDistributionManager.java | 2 +-
.../distributed/internal/DistributionManager.java | 7 +--
.../internal/LonerDistributionManager.java | 2 +-
.../distributed/internal/direct/DirectChannel.java | 11 ++---
.../geode/internal/cache/PeerTXStateStub.java | 17 +++----
.../cache/tier/sockets/command/CommitCommand.java | 50 ++-----------------
.../tier/sockets/command/CommitCommandTest.java | 57 ----------------------
.../dunit/cache/internal/JUnit4CacheTestCase.java | 5 +-
8 files changed, 23 insertions(+), 128 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index a513627..8b5d82c 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -2856,7 +2856,7 @@ public class ClusterDistributionManager implements DistributionManager {
}
@Override
- public boolean isCurrentMember(DistributedMember id) {
+ public boolean isCurrentMember(InternalDistributedMember id) {
Set m;
synchronized (this.membersLock) {
// access to members synchronized under membersLock in order to
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
index 8bb84ae..167f8ba 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
@@ -306,11 +306,8 @@ public interface DistributionManager extends ReplySender {
/** Returns a set of all roles currently in the distributed system. */
Set getAllRoles();
- /**
- * Returns true if id is a current member of the distributed system
- *
- */
- boolean isCurrentMember(DistributedMember id);
+ /** Returns true if id is a current member of the distributed system */
+ boolean isCurrentMember(InternalDistributedMember id);
/**
* Remove given member from list of members who are pending a startup reply
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index d6a6cd1..f74c34d 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -1256,7 +1256,7 @@ public class LonerDistributionManager implements DistributionManager {
this.getId().setPort(this.lonerPort);
}
- public boolean isCurrentMember(DistributedMember p_id) {
+ public boolean isCurrentMember(InternalDistributedMember p_id) {
return getId().equals(p_id);
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index 970957f..c766d26 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -20,7 +20,6 @@ import java.io.NotSerializableException;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -315,10 +314,6 @@ public class DirectChannel {
if (!directReply && directMsg != null) {
directMsg.registerProcessor();
}
- if (logger.isDebugEnabled()) {
- logger.debug("Sending ({}) to {} peers ({}) via tcp/ip",
- msg, p_destinations.length, Arrays.toString(p_destinations));
- }
try {
do {
@@ -380,9 +375,9 @@ public class DirectChannel {
}
try {
- if (retry && logger.isDebugEnabled()) {
- logger.debug("Retrying send ({}{}) to {} peers ({}) via tcp/ip",
- msg, cons.size(), cons);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}{}) to {} peers ({}) via tcp/ip",
+ (retry ? "Retrying send (" : "Sending ("), msg, cons.size(), cons);
}
DMStats stats = getDMStats();
List<?> sentCons; // used for cons we sent to this time
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
index 670bbb4..6211b6a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
@@ -14,7 +14,6 @@
*/
package org.apache.geode.internal.cache;
-
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
@@ -132,14 +131,14 @@ public class PeerTXStateStub extends TXStateStub {
}
} catch (Exception e) {
this.getCache().getCancelCriterion().checkCancelInProgress(e);
- Throwable eCause = e.getCause();
- if (eCause != null) {
- if (eCause instanceof ForceReattemptException) {
- if (eCause.getCause() instanceof PrimaryBucketException) {
+ if (e.getCause() != null) {
+ if (e.getCause() instanceof ForceReattemptException) {
+ Throwable e2 = e.getCause();
+ if (e2.getCause() != null && e2.getCause() instanceof PrimaryBucketException) {
// data rebalanced
TransactionDataRebalancedException tdnce =
- new TransactionDataRebalancedException(eCause.getCause().getMessage());
- tdnce.initCause(eCause.getCause());
+ new TransactionDataRebalancedException(e2.getCause().getMessage());
+ tdnce.initCause(e2.getCause());
throw tdnce;
} else {
// We cannot be sure that the member departed starting to process commit request,
@@ -147,11 +146,11 @@ public class PeerTXStateStub extends TXStateStub {
// fixes 44939
TransactionInDoubtException tdnce =
new TransactionInDoubtException(e.getCause().getMessage());
- tdnce.initCause(eCause);
+ tdnce.initCause(e.getCause());
throw tdnce;
}
}
- throw new TransactionInDoubtException(eCause);
+ throw new TransactionInDoubtException(e.getCause());
} else {
throw new TransactionInDoubtException(e);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
index efa085e..936e9fb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
@@ -15,11 +15,7 @@
package org.apache.geode.internal.cache.tier.sockets.command;
import java.io.IOException;
-import java.util.concurrent.TimeoutException;
-import org.apache.geode.CancelException;
-import org.apache.geode.cache.TransactionInDoubtException;
-import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.TXCommitMessage;
@@ -51,8 +47,6 @@ public class CommitCommand extends BaseCommand {
@Override
public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection,
final SecurityService securityService, long start) throws IOException {
-
-
serverConnection.setAsTrue(REQUIRES_RESPONSE);
TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
InternalDistributedMember client =
@@ -80,27 +74,18 @@ public class CommitCommand extends BaseCommand {
if (logger.isDebugEnabled()) {
logger.debug("TX: committing client tx: {}", txId);
}
- commitTransaction(clientMessage, serverConnection, txMgr, wasInProgress,
- txProxy);
- }
-
- protected void commitTransaction(Message clientMessage, ServerConnection serverConnection,
- TXManagerImpl txMgr,
- boolean wasInProgress, TXStateProxy txProxy) throws IOException {
- Exception txException = null;
- TXCommitMessage commitMsg = null;
- TXId txId = txProxy.getTxId();
try {
+
+ txId = txProxy.getTxId();
+
txProxy.setCommitOnBehalfOfRemoteStub(true);
txMgr.commit();
commitMsg = txProxy.getCommitMessage();
- logger.debug("Sending commit response to client: {}", commitMsg);
writeCommitResponse(commitMsg, clientMessage, serverConnection);
serverConnection.setAsTrue(RESPONDED);
-
} catch (Exception e) {
- txException = e;
+ sendException(clientMessage, serverConnection, e);
} finally {
if (txId != null) {
txMgr.removeHostedTXState(txId);
@@ -112,35 +97,10 @@ public class CommitCommand extends BaseCommand {
commitMsg.setClientVersion(null); // fixes bug 46529
}
}
- if (txException != null) {
- DistributedMember target = txProxy.getTarget();
- // a TransactionInDoubtException caused by the TX host shutting down means that
- // the transaction may still be active and hold locks. We must wait for the transaction
- // host to finish shutting down before responding to the client or it could encounter
- // conflicts in retrying the transaction
- if ((txException instanceof TransactionInDoubtException)
- && (txException.getCause() instanceof CancelException)) {
- logger.info(
- "Waiting for departure of {} before throwing TransactionInDoubtException.",
- target);
- try {
- serverConnection.getCache().getDistributionManager().getMembershipManager()
- .waitForDeparture(target);
- } catch (TimeoutException e) {
- // status will be logged below
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- logger.info("Done waiting. Transaction host {} in the cluster.",
- serverConnection.getCache().getDistributionManager().isCurrentMember(target)
- ? "is still"
- : "is no longer");
- }
- sendException(clientMessage, serverConnection, txException);
- }
}
+
protected static void writeCommitResponse(TXCommitMessage response, Message origMsg,
ServerConnection servConn) throws IOException {
Message responseMsg = servConn.getResponseMessage();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
index 08f1425..4da081d 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
@@ -14,30 +14,14 @@
*/
package org.apache.geode.internal.cache.tier.sockets.command;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.net.InetAddress;
-
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.CancelCriterion;
-import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.TransactionInDoubtException;
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.distributed.internal.membership.MembershipManager;
import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.TXManagerImpl;
-import org.apache.geode.internal.cache.TXStateProxy;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -66,45 +50,4 @@ public class CommitCommandTest {
CommitCommand.writeCommitResponse(null, origMsg, servConn);
}
-
- /**
- * GEODE-5269 CommitConflictException after TransactionInDoubtException
- * CommitCommand needs to stall waiting for the host of a transaction to
- * finish shutting down before sending a TransactionInDoubtException to
- * the client.
- */
- @Test
- public void testTransactionInDoubtWaitsForTargetDeparture() throws Exception {
- CommitCommand command = (CommitCommand) CommitCommand.getCommand();
- Message clientMessage = mock(Message.class);
- ServerConnection serverConnection = mock(ServerConnection.class);
- TXManagerImpl txMgr = mock(TXManagerImpl.class);
- TXStateProxy txProxy = mock(TXStateProxy.class);
- InternalCache cache = mock(InternalCache.class);
- DistributionManager distributionManager = mock(DistributionManager.class);
- MembershipManager membershipManager = mock(MembershipManager.class);
- boolean wasInProgress = false;
-
- doReturn(cache).when(serverConnection).getCache();
- doReturn(distributionManager).when(cache).getDistributionManager();
- doReturn(membershipManager).when(distributionManager).getMembershipManager();
- doReturn(false).when(distributionManager).isCurrentMember(isA(
- InternalDistributedMember.class));
-
- doReturn(mock(Message.class)).when(serverConnection).getErrorResponseMessage();
-
- doReturn(new InternalDistributedMember(InetAddress.getLocalHost(), 1234)).when(txProxy)
- .getTarget();
-
- TransactionInDoubtException transactionInDoubtException =
- new TransactionInDoubtException("tx in doubt");
- transactionInDoubtException.initCause(new CacheClosedException("testing"));
- doThrow(transactionInDoubtException).when(txMgr).commit();
-
- command.commitTransaction(
- clientMessage, serverConnection, txMgr, wasInProgress, txProxy);
-
- verify(txMgr, atLeastOnce()).commit();
- verify(membershipManager, times(1)).waitForDeparture(isA(DistributedMember.class));
- }
}
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java b/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
index c93f921..fa9a6f2 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
@@ -41,6 +41,7 @@ import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.InternalClientCache;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -262,8 +263,8 @@ public abstract class JUnit4CacheTestCase extends JUnit4DistributedTestCase
}
}
- public final ClientCache getClientCache() {
- return (ClientCache) cache;
+ public final InternalClientCache getClientCache() {
+ return (InternalClientCache) cache;
}
/**
--
To stop receiving notification emails like this one, please contact
bschuchardt@apache.org.