You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/08/03 22:48:08 UTC

[geode] branch feature/GEODE-5528 created (now 41c16e3)

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a change to branch feature/GEODE-5528
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 41c16e3  GEODE-5528: client event listener invoked multiple times for the same transactional operation

This branch includes the following new commits:

     new 41c16e3  GEODE-5528: client event listener invoked multiple times for the same transactional operation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-5528: client event listener invoked multiple times for the same transactional operation

Posted by bs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-5528
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 41c16e35ff3691709bc47b824e6c9f4f6caf05df
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Aug 3 15:47:00 2018 -0700

    GEODE-5528: client event listener invoked multiple times for the same transactional operation
    
    TXCommitMessage.combine() was checking to see if a RegionCommit was already
    in its collection by using Collection.contains() but RegionCommit doesn't
    implement equals() so this check would return false even if the collection
    had a RegionCommit for the same Region.
    
    The fix is to check for the region's path instead.
---
 .../cache/ClientServerTransactionDUnitTest.java    | 131 ++++++++++++++++++---
 .../geode/internal/cache/TXCommitMessage.java      |  23 ++--
 2 files changed, 130 insertions(+), 24 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
index c073f5e..8eccc54 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
@@ -38,6 +38,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.naming.Context;
 import javax.transaction.RollbackException;
@@ -111,6 +112,7 @@ import org.apache.geode.internal.jta.TransactionImpl;
 import org.apache.geode.internal.jta.TransactionManagerImpl;
 import org.apache.geode.internal.jta.UserTransactionImpl;
 import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.SerializableCallable;
@@ -2851,6 +2853,21 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
     });
   }
 
+  class CreateReplicateRegion extends SerializableCallable {
+    String regionName;
+
+    public CreateReplicateRegion(String replicateRegionName) {
+      this.regionName = replicateRegionName;
+    }
+
+    public Object call() throws Exception {
+      RegionFactory rf = getCache().createRegionFactory(RegionShortcut.REPLICATE);
+      rf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
+      rf.create(regionName);
+      return null;
+    }
+  }
+
   /**
    * start 3 servers, accessor has r1 and r2; ds1 has r1, ds2 has r2 stop server after distributing
    * commit but b4 replying to client
@@ -2870,21 +2887,6 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
       }
     });
 
-    class CreateReplicateRegion extends SerializableCallable {
-      String regionName;
-
-      public CreateReplicateRegion(String replicateRegionName) {
-        this.regionName = replicateRegionName;
-      }
-
-      public Object call() throws Exception {
-        RegionFactory rf = getCache().createRegionFactory(RegionShortcut.REPLICATE);
-        rf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled());
-        rf.create(regionName);
-        return null;
-      }
-    }
-
     accessor.invoke(new CreateReplicateRegion("r1"));
     accessor.invoke(new CreateReplicateRegion("r2"));
 
@@ -2973,6 +2975,105 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
   }
 
   /**
+   * start 3 servers with region r1. stop client's server after distributing
+   * commit but before replying to client. ensure that a listener in the
+   * client is only invoked once
+   */
+  @Test
+  public void testFailoverAfterCommitDistributionInvokesListenerInClientOnlyOnce() {
+    Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM datastore1 = host.getVM(1);
+    VM datastore2 = host.getVM(2);
+    VM client = host.getVM(3);
+
+    final int port1 = createRegionsAndStartServer(server, false);
+    final int port2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+
+    server.invoke(new CreateReplicateRegion("r1"));
+
+
+    client.invoke("create client cache", () -> {
+      disconnectFromDS();
+      System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints",
+          "true");
+      ClientCacheFactory ccf = new ClientCacheFactory();
+      ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port1);
+      ccf.addPoolServer("localhost", port2);
+      ccf.setPoolMinConnections(5);
+      ccf.setPoolLoadConditioningInterval(-1);
+      ccf.setPoolSubscriptionEnabled(false);
+      ccf.set(LOG_LEVEL, getDUnitLogLevel());
+      ClientCache cCache = getClientCache(ccf);
+      Region r1 =
+          cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("r1");
+      Region r2 =
+          cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("r2");
+      return null;
+    });
+
+    datastore1.invoke("create backup server", () -> {
+      CacheServer s = getCache().addCacheServer();
+      getCache().getLogger().info("SWAP:ds1");
+      s.setPort(port2);
+      s.start();
+      return null;
+    });
+    datastore1.invoke(new CreateReplicateRegion("r1"));
+    datastore2.invoke(new CreateReplicateRegion("r1"));
+
+    final TransactionId txId = client.invoke("start transaction in client", () -> {
+      ClientCache cCache = (ClientCache) getCache();
+      Region r1 = cCache.getRegion("r1");
+      r1.put("key", "value");
+      cCache.getCacheTransactionManager().begin();
+      cCache.getLogger().info("beganTX");
+      r1.destroy("key");
+      return cCache.getCacheTransactionManager().getTransactionId();
+    });
+
+    server.invoke("close cache after sending tx message to other servers", () -> {
+      final TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+      assertTrue(mgr.isHostedTxInProgress((TXId) txId));
+      TXStateProxyImpl txProxy = (TXStateProxyImpl) mgr.getHostedTXState((TXId) txId);
+      final TXState txState = (TXState) txProxy.getRealDeal(null, null);
+      txState.setAfterSend(new Runnable() {
+        public void run() {
+          getCache().getLogger().info("server is now closing its cache");
+          System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close", "true");
+          try {
+            mgr.removeHostedTXState((TXId) txState.getTransactionId());
+            DistributedTestUtils.crashDistributedSystem(getCache().getDistributedSystem());
+          } finally {
+            System.getProperties()
+                .remove(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close");
+          }
+        }
+      });
+      return null;
+    });
+
+    client.invoke("committing transaction in client", () -> {
+      Region r1 = getCache().getRegion("r1");
+      final AtomicInteger afterDestroyInvocations = new AtomicInteger();
+      CacheListener listener = new CacheListenerAdapter() {
+        @Override
+        public void afterDestroy(EntryEvent event) {
+          System.err.println("afterDestroy invoked");
+          Thread.dumpStack();
+          afterDestroyInvocations.incrementAndGet();
+        }
+      };
+      r1.getAttributesMutator().addCacheListener(listener);
+
+      getCache().getCacheTransactionManager().commit();
+      assertFalse(r1.containsKey("key"));
+      assertEquals(1, afterDestroyInvocations.intValue());
+      return null;
+    });
+  }
+
+  /**
    * start a tx in a thread, obtain local locks and wait. start another tx and commit, make sure 2nd
    * thread gets CCE
    */
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index 13f15ea..9a3a1e4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -87,7 +87,7 @@ public class TXCommitMessage extends PooledDistributionMessage
   // Keep a 60 second history @ an estimated 1092 transactions/second ~= 16^4
   protected static final TXFarSideCMTracker txTracker = new TXFarSideCMTracker((60 * 1092));
 
-  private ArrayList regions; // list of RegionCommit instances
+  private ArrayList<RegionCommit> regions; // list of RegionCommit instances
   protected TXId txIdent;
   protected int processorId; // 0 unless needsAck is true
   protected TXLockIdImpl lockId;
@@ -991,14 +991,15 @@ public class TXCommitMessage extends PooledDistributionMessage
   public void combine(TXCommitMessage other) {
     assert other != null;
     Iterator it = other.regions.iterator();
-    while (it.hasNext()) {
-      RegionCommit rc = (RegionCommit) it.next();
-      if (!this.regions.contains(rc)) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("TX: adding region commit: {} to: {}", rc, this);
-        }
-        rc.msg = this;
-        this.regions.add(rc);
+    Map<String, RegionCommit> regionCommits = new HashMap<>();
+    for (RegionCommit commit : regions) {
+      regionCommits.put(commit.getRegionPath(), commit);
+    }
+    for (RegionCommit commit : other.regions) {
+      if (!regionCommits.containsKey(commit.getRegionPath())) {
+        commit.msg = this;
+        this.regions.add(commit);
+        regionCommits.put(commit.getRegionPath(), commit);
       }
     }
   }
@@ -1131,6 +1132,10 @@ public class TXCommitMessage extends PooledDistributionMessage
       this.msg = msg;
     }
 
+    public String getRegionPath() {
+      return regionPath;
+    }
+
     public void incRefCount() {
       this.refCount++;
     }