You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/08/06 16:36:38 UTC

[geode] branch develop updated: GEODE-5528: client event listener invoked multiple times for the same transactional operation

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6cc75bb  GEODE-5528: client event listener invoked multiple times for the same transactional operation
6cc75bb is described below

commit 6cc75bb72d50147a6e2921f0c10bb6085315b85d
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Mon Aug 6 09:34:23 2018 -0700

    GEODE-5528: client event listener invoked multiple times for the same transactional operation
    
    TXCommitMessage.combine() was checking to see if a RegionCommit was already
    in its collection by using Collection.contains() but RegionCommit doesn't
    implement equals() so this check would return false even if the collection
    had a RegionCommit for the same Region.
    
    The fix is to check for the region's path instead.
    
    This closes #2260
---
 .../cache/ClientServerTransactionDUnitTest.java    | 127 ++++++++++++++++++---
 .../internal/cache/RemoteTransactionDUnitTest.java |   7 ++
 .../geode/internal/cache/TXCommitMessage.java      |  23 ++--
 3 files changed, 133 insertions(+), 24 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
index c073f5e..375bf61 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
@@ -38,6 +38,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.naming.Context;
 import javax.transaction.RollbackException;
@@ -111,6 +112,7 @@ import org.apache.geode.internal.jta.TransactionImpl;
 import org.apache.geode.internal.jta.TransactionManagerImpl;
 import org.apache.geode.internal.jta.UserTransactionImpl;
 import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.SerializableCallable;
@@ -2851,6 +2853,21 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
     });
   }
 
+  class CreateReplicateRegion extends SerializableCallable {
+    String regionName;
+
+    public CreateReplicateRegion(String replicateRegionName) {
+      this.regionName = replicateRegionName;
+    }
+
+    public Object call() throws Exception {
+      RegionFactory rf = getCache().createRegionFactory(RegionShortcut.REPLICATE);
+      rf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
+      rf.create(regionName);
+      return null;
+    }
+  }
+
   /**
    * start 3 servers, accessor has r1 and r2; ds1 has r1, ds2 has r2 stop server after distributing
    * commit but b4 replying to client
@@ -2870,21 +2887,6 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
       }
     });
 
-    class CreateReplicateRegion extends SerializableCallable {
-      String regionName;
-
-      public CreateReplicateRegion(String replicateRegionName) {
-        this.regionName = replicateRegionName;
-      }
-
-      public Object call() throws Exception {
-        RegionFactory rf = getCache().createRegionFactory(RegionShortcut.REPLICATE);
-        rf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
-        rf.create(regionName);
-        return null;
-      }
-    }
-
     accessor.invoke(new CreateReplicateRegion("r1"));
     accessor.invoke(new CreateReplicateRegion("r2"));
 
@@ -2973,6 +2975,101 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
   }
 
   /**
+   * start 3 servers with region r1. stop client's server after distributing
+   * commit but before replying to client. ensure that a listener in the
+   * client is only invoked once
+   */
+  @Test
+  public void testFailoverAfterCommitDistributionInvokesListenerInClientOnlyOnce() {
+    Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM datastore1 = host.getVM(1);
+    VM datastore2 = host.getVM(2);
+    VM client = host.getVM(3);
+
+    final int port1 = createRegionsAndStartServer(server, false);
+    final int port2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+
+    server.invoke(new CreateReplicateRegion("r1"));
+
+
+    client.invoke("create client cache", () -> {
+      disconnectFromDS();
+      System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints",
+          "true");
+      ClientCacheFactory ccf = new ClientCacheFactory();
+      ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port1);
+      ccf.addPoolServer("localhost", port2);
+      ccf.setPoolMinConnections(5);
+      ccf.setPoolLoadConditioningInterval(-1);
+      ccf.setPoolSubscriptionEnabled(false);
+      ccf.set(LOG_LEVEL, getDUnitLogLevel());
+      ClientCache cCache = getClientCache(ccf);
+      Region r1 =
+          cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("r1");
+      Region r2 =
+          cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("r2");
+      return null;
+    });
+
+    datastore1.invoke("create backup server", () -> {
+      CacheServer s = getCache().addCacheServer();
+      s.setPort(port2);
+      s.start();
+      return null;
+    });
+    datastore1.invoke(new CreateReplicateRegion("r1"));
+    datastore2.invoke(new CreateReplicateRegion("r1"));
+
+    final TransactionId txId = client.invoke("start transaction in client", () -> {
+      ClientCache cCache = (ClientCache) getCache();
+      Region r1 = cCache.getRegion("r1");
+      r1.put("key", "value");
+      cCache.getCacheTransactionManager().begin();
+      r1.destroy("key");
+      return cCache.getCacheTransactionManager().getTransactionId();
+    });
+
+    server.invoke("close cache after sending tx message to other servers", () -> {
+      final TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+      assertTrue(mgr.isHostedTxInProgress((TXId) txId));
+      TXStateProxyImpl txProxy = (TXStateProxyImpl) mgr.getHostedTXState((TXId) txId);
+      final TXState txState = (TXState) txProxy.getRealDeal(null, null);
+      txState.setAfterSend(new Runnable() {
+        public void run() {
+          getCache().getLogger().info("server is now closing its cache");
+          System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close", "true");
+          try {
+            mgr.removeHostedTXState((TXId) txState.getTransactionId());
+            DistributedTestUtils.crashDistributedSystem(getCache().getDistributedSystem());
+          } finally {
+            System.getProperties()
+                .remove(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close");
+          }
+        }
+      });
+      return null;
+    });
+
+    client.invoke("committing transaction in client", () -> {
+      Region r1 = getCache().getRegion("r1");
+      final AtomicInteger afterDestroyInvocations = new AtomicInteger();
+      CacheListener listener = new CacheListenerAdapter() {
+        @Override
+        public void afterDestroy(EntryEvent event) {
+          afterDestroyInvocations.incrementAndGet();
+        }
+      };
+      r1.getAttributesMutator().addCacheListener(listener);
+
+      getCache().getCacheTransactionManager().commit();
+      assertFalse(r1.containsKey("key"));
+      assertEquals(1, afterDestroyInvocations.intValue());
+      return null;
+    });
+  }
+
+  /**
    * start a tx in a thread, obtain local locks and wait. start another tx and commit, make sure 2nd
    * thread gets CCE
    */
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
index fd2f193..2d1dcff 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
@@ -47,7 +47,9 @@ import org.junit.Test;
 import org.apache.geode.ExpirationDetector;
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.CacheLoader;
 import org.apache.geode.cache.CacheLoaderException;
@@ -133,6 +135,11 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
   private final SerializableCallable verifyNoTxState = new SerializableCallable() {
     @Override
     public Object call() throws Exception {
+      try {
+        CacheFactory.getAnyInstance();
+      } catch (CacheClosedException e) {
+        return null;
+      }
       // TXManagerImpl mgr = getGemfireCache().getTxManager();
       // assertIndexDetailsEquals(0, mgr.hostedTransactionsInProgressForTest());
       final TXManagerImpl mgr = getGemfireCache().getTxManager();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index 13f15ea..9a3a1e4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -87,7 +87,7 @@ public class TXCommitMessage extends PooledDistributionMessage
   // Keep a 60 second history @ an estimated 1092 transactions/second ~= 16^4
   protected static final TXFarSideCMTracker txTracker = new TXFarSideCMTracker((60 * 1092));
 
-  private ArrayList regions; // list of RegionCommit instances
+  private ArrayList<RegionCommit> regions; // list of RegionCommit instances
   protected TXId txIdent;
   protected int processorId; // 0 unless needsAck is true
   protected TXLockIdImpl lockId;
@@ -991,14 +991,15 @@ public class TXCommitMessage extends PooledDistributionMessage
   public void combine(TXCommitMessage other) {
     assert other != null;
     Iterator it = other.regions.iterator();
-    while (it.hasNext()) {
-      RegionCommit rc = (RegionCommit) it.next();
-      if (!this.regions.contains(rc)) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("TX: adding region commit: {} to: {}", rc, this);
-        }
-        rc.msg = this;
-        this.regions.add(rc);
+    Map<String, RegionCommit> regionCommits = new HashMap<>();
+    for (RegionCommit commit : regions) {
+      regionCommits.put(commit.getRegionPath(), commit);
+    }
+    for (RegionCommit commit : other.regions) {
+      if (!regionCommits.containsKey(commit.getRegionPath())) {
+        commit.msg = this;
+        this.regions.add(commit);
+        regionCommits.put(commit.getRegionPath(), commit);
       }
     }
   }
@@ -1131,6 +1132,10 @@ public class TXCommitMessage extends PooledDistributionMessage
       this.msg = msg;
     }
 
+    public String getRegionPath() {
+      return regionPath;
+    }
+
     public void incRefCount() {
       this.refCount++;
     }