You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/08/06 16:36:38 UTC
[geode] branch develop updated: GEODE-5528: client event listener
invoked multiple times for the same transactional operation
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 6cc75bb GEODE-5528: client event listener invoked multiple times for the same transactional operation
6cc75bb is described below
commit 6cc75bb72d50147a6e2921f0c10bb6085315b85d
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Mon Aug 6 09:34:23 2018 -0700
GEODE-5528: client event listener invoked multiple times for the same transactional operation
TXCommitMessage.combine() was checking to see if a RegionCommit was already
in its collection by using Collection.contains() but RegionCommit doesn't
implement equals() so this check would return false even if the collection
had a RegionCommit for the same Region.
The fix is to check for the region's path instead.
This closes #2260
---
.../cache/ClientServerTransactionDUnitTest.java | 127 ++++++++++++++++++---
.../internal/cache/RemoteTransactionDUnitTest.java | 7 ++
.../geode/internal/cache/TXCommitMessage.java | 23 ++--
3 files changed, 133 insertions(+), 24 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
index c073f5e..375bf61 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
@@ -38,6 +38,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.Context;
import javax.transaction.RollbackException;
@@ -111,6 +112,7 @@ import org.apache.geode.internal.jta.TransactionImpl;
import org.apache.geode.internal.jta.TransactionManagerImpl;
import org.apache.geode.internal.jta.UserTransactionImpl;
import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.SerializableCallable;
@@ -2851,6 +2853,21 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
});
}
+ class CreateReplicateRegion extends SerializableCallable {
+ String regionName;
+
+ public CreateReplicateRegion(String replicateRegionName) {
+ this.regionName = replicateRegionName;
+ }
+
+ public Object call() throws Exception {
+ RegionFactory rf = getCache().createRegionFactory(RegionShortcut.REPLICATE);
+ rf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
+ rf.create(regionName);
+ return null;
+ }
+ }
+
/**
* start 3 servers, accessor has r1 and r2; ds1 has r1, ds2 has r2 stop server after distributing
* commit but b4 replying to client
@@ -2870,21 +2887,6 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
}
});
- class CreateReplicateRegion extends SerializableCallable {
- String regionName;
-
- public CreateReplicateRegion(String replicateRegionName) {
- this.regionName = replicateRegionName;
- }
-
- public Object call() throws Exception {
- RegionFactory rf = getCache().createRegionFactory(RegionShortcut.REPLICATE);
- rf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
- rf.create(regionName);
- return null;
- }
- }
-
accessor.invoke(new CreateReplicateRegion("r1"));
accessor.invoke(new CreateReplicateRegion("r2"));
@@ -2973,6 +2975,101 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
}
/**
+ * start 3 servers with region r1. stop client's server after distributing
+ * commit but before replying to client. ensure that a listener in the
+ * client is only invoked once
+ */
+ @Test
+ public void testFailoverAfterCommitDistributionInvokesListenerInClientOnlyOnce() {
+ Host host = Host.getHost(0);
+ VM server = host.getVM(0);
+ VM datastore1 = host.getVM(1);
+ VM datastore2 = host.getVM(2);
+ VM client = host.getVM(3);
+
+ final int port1 = createRegionsAndStartServer(server, false);
+ final int port2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+
+ server.invoke(new CreateReplicateRegion("r1"));
+
+
+ client.invoke("create client cache", () -> {
+ disconnectFromDS();
+ System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints",
+ "true");
+ ClientCacheFactory ccf = new ClientCacheFactory();
+ ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port1);
+ ccf.addPoolServer("localhost", port2);
+ ccf.setPoolMinConnections(5);
+ ccf.setPoolLoadConditioningInterval(-1);
+ ccf.setPoolSubscriptionEnabled(false);
+ ccf.set(LOG_LEVEL, getDUnitLogLevel());
+ ClientCache cCache = getClientCache(ccf);
+ Region r1 =
+ cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("r1");
+ Region r2 =
+ cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("r2");
+ return null;
+ });
+
+ datastore1.invoke("create backup server", () -> {
+ CacheServer s = getCache().addCacheServer();
+ s.setPort(port2);
+ s.start();
+ return null;
+ });
+ datastore1.invoke(new CreateReplicateRegion("r1"));
+ datastore2.invoke(new CreateReplicateRegion("r1"));
+
+ final TransactionId txId = client.invoke("start transaction in client", () -> {
+ ClientCache cCache = (ClientCache) getCache();
+ Region r1 = cCache.getRegion("r1");
+ r1.put("key", "value");
+ cCache.getCacheTransactionManager().begin();
+ r1.destroy("key");
+ return cCache.getCacheTransactionManager().getTransactionId();
+ });
+
+ server.invoke("close cache after sending tx message to other servers", () -> {
+ final TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+ assertTrue(mgr.isHostedTxInProgress((TXId) txId));
+ TXStateProxyImpl txProxy = (TXStateProxyImpl) mgr.getHostedTXState((TXId) txId);
+ final TXState txState = (TXState) txProxy.getRealDeal(null, null);
+ txState.setAfterSend(new Runnable() {
+ public void run() {
+ getCache().getLogger().info("server is now closing its cache");
+ System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close", "true");
+ try {
+ mgr.removeHostedTXState((TXId) txState.getTransactionId());
+ DistributedTestUtils.crashDistributedSystem(getCache().getDistributedSystem());
+ } finally {
+ System.getProperties()
+ .remove(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close");
+ }
+ }
+ });
+ return null;
+ });
+
+ client.invoke("committing transaction in client", () -> {
+ Region r1 = getCache().getRegion("r1");
+ final AtomicInteger afterDestroyInvocations = new AtomicInteger();
+ CacheListener listener = new CacheListenerAdapter() {
+ @Override
+ public void afterDestroy(EntryEvent event) {
+ afterDestroyInvocations.incrementAndGet();
+ }
+ };
+ r1.getAttributesMutator().addCacheListener(listener);
+
+ getCache().getCacheTransactionManager().commit();
+ assertFalse(r1.containsKey("key"));
+ assertEquals(1, afterDestroyInvocations.intValue());
+ return null;
+ });
+ }
+
+ /**
* start a tx in a thread, obtain local locks and wait. start another tx and commit, make sure 2nd
* thread gets CCE
*/
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
index fd2f193..2d1dcff 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
@@ -47,7 +47,9 @@ import org.junit.Test;
import org.apache.geode.ExpirationDetector;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheLoaderException;
@@ -133,6 +135,11 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
private final SerializableCallable verifyNoTxState = new SerializableCallable() {
@Override
public Object call() throws Exception {
+ try {
+ CacheFactory.getAnyInstance();
+ } catch (CacheClosedException e) {
+ return null;
+ }
// TXManagerImpl mgr = getGemfireCache().getTxManager();
// assertIndexDetailsEquals(0, mgr.hostedTransactionsInProgressForTest());
final TXManagerImpl mgr = getGemfireCache().getTxManager();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index 13f15ea..9a3a1e4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -87,7 +87,7 @@ public class TXCommitMessage extends PooledDistributionMessage
// Keep a 60 second history @ an estimated 1092 transactions/second ~= 16^4
protected static final TXFarSideCMTracker txTracker = new TXFarSideCMTracker((60 * 1092));
- private ArrayList regions; // list of RegionCommit instances
+ private ArrayList<RegionCommit> regions; // list of RegionCommit instances
protected TXId txIdent;
protected int processorId; // 0 unless needsAck is true
protected TXLockIdImpl lockId;
@@ -991,14 +991,15 @@ public class TXCommitMessage extends PooledDistributionMessage
public void combine(TXCommitMessage other) {
assert other != null;
Iterator it = other.regions.iterator();
- while (it.hasNext()) {
- RegionCommit rc = (RegionCommit) it.next();
- if (!this.regions.contains(rc)) {
- if (logger.isDebugEnabled()) {
- logger.debug("TX: adding region commit: {} to: {}", rc, this);
- }
- rc.msg = this;
- this.regions.add(rc);
+ Map<String, RegionCommit> regionCommits = new HashMap<>();
+ for (RegionCommit commit : regions) {
+ regionCommits.put(commit.getRegionPath(), commit);
+ }
+ for (RegionCommit commit : other.regions) {
+ if (!regionCommits.containsKey(commit.getRegionPath())) {
+ commit.msg = this;
+ this.regions.add(commit);
+ regionCommits.put(commit.getRegionPath(), commit);
}
}
}
@@ -1131,6 +1132,10 @@ public class TXCommitMessage extends PooledDistributionMessage
this.msg = msg;
}
+ public String getRegionPath() {
+ return regionPath;
+ }
+
public void incRefCount() {
this.refCount++;
}