You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/06/21 23:16:19 UTC

[24/25] geode git commit: GEODE-3101: Release local locks held by the JTA beforeCompletion() in client when the JTA failed on the server.

GEODE-3101: Release local locks held by the JTA beforeCompletion() in client  when the JTA failed on the server.

          Also added a unit test would fail without the fix.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b7f5391d
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b7f5391d
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b7f5391d

Branch: refs/heads/feature/GEODE-3062-2
Commit: b7f5391d983cda4d95dc34a21aeb414a3c0a14c2
Parents: 822946b
Author: eshu <es...@pivotal.io>
Authored: Wed Jun 21 15:41:25 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Wed Jun 21 15:41:25 2017 -0700

----------------------------------------------------------------------
 .../internal/cache/tx/ClientTXStateStub.java    |  10 +-
 .../internal/jta/ClientServerJTADUnitTest.java  | 143 +++++++++++++++++++
 2 files changed, 151 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/b7f5391d/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
index ded789e..ab5701c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
@@ -21,7 +21,7 @@ import java.util.List;
 import javax.transaction.Status;
 
 import org.apache.logging.log4j.Logger;
-
+import org.apache.geode.GemFireException;
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
 import org.apache.geode.cache.TransactionException;
@@ -237,7 +237,13 @@ public class ClientTXStateStub extends TXStateStub {
   @Override
   public void beforeCompletion() {
     obtainLocalLocks();
-    this.firstProxy.beforeCompletion(proxy.getTxId().getUniqId());
+    try {
+      this.firstProxy.beforeCompletion(proxy.getTxId().getUniqId());
+    } catch (GemFireException e) {
+      this.lockReq.releaseLocal();
+      this.firstProxy.getPool().releaseServerAffinity();
+      throw e;
+    }
   }
 
   public InternalDistributedMember getOriginatingMember() {

http://git-wip-us.apache.org/repos/asf/geode/blob/b7f5391d/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
new file mode 100644
index 0000000..51ef44a
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.jta;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.transaction.Status;
+
+import org.apache.geode.GemFireException;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.cache.tx.ClientTXStateStub;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({DistributedTest.class})
+public class ClientServerJTADUnitTest extends JUnit4CacheTestCase {
+  private String key = "key";
+  private String value = "value";
+  private String newValue = "newValue";
+
+  @Test
+  public void testClientTXStateStubBeforeCompletion() throws Exception {
+    final Host host = Host.getHost(0);
+    final VM server = host.getVM(0);
+    final VM client = host.getVM(1);
+    final String regionName = getUniqueName();
+    getBlackboard().initBlackboard();
+    final Properties properties = getDistributedSystemProperties();
+
+    final int port = server.invoke("create cache", () -> {
+      Cache cache = getCache(properties);
+      CacheServer cacheServer = createCacheServer(cache);
+      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+      region.put(key, value);
+
+      return cacheServer.getPort();
+    });
+
+    client.invoke(() -> createClientRegion(host, port, regionName));
+
+    createClientRegion(host, port, regionName);
+
+    Region region = getCache().getRegion(regionName);
+    assertTrue(region.get(key).equals(value));
+
+    String first = "one";
+    String second = "two";
+
+    client.invokeAsync(() -> commitTxWithBeforeCompletion(regionName, true, first, second));
+
+    getBlackboard().waitForGate(first, 30, TimeUnit.SECONDS);
+    TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+    mgr.begin();
+    region.put(key, newValue);
+    TXStateProxyImpl tx = (TXStateProxyImpl) mgr.internalSuspend();
+    ClientTXStateStub txStub = (ClientTXStateStub) tx.getRealDeal(null, null);
+    mgr.internalResume(tx);
+    try {
+      txStub.beforeCompletion();
+      fail("expected to get CommitConflictException");
+    } catch (GemFireException e) {
+      // expected commit conflict exception thrown from server
+      mgr.setTXState(null);
+      getBlackboard().signalGate(second);
+    }
+
+    Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
+        .atMost(30, TimeUnit.SECONDS).until(() -> region.get(key).equals(newValue));
+
+    try {
+      commitTxWithBeforeCompletion(regionName, false, first, second);
+    } catch (Exception e) {
+      Assert.fail("got unexpected exception", e);
+    }
+  }
+
+  private CacheServer createCacheServer(Cache cache) {
+    CacheServer server = cache.addCacheServer();
+    server.setPort(AvailablePortHelper.getRandomAvailableTCPPort());
+    try {
+      server.start();
+    } catch (IOException e) {
+      Assert.fail("got exception", e);
+    }
+    return server;
+  }
+
+  private void createClientRegion(final Host host, final int port0, String regionName) {
+    ClientCacheFactory cf = new ClientCacheFactory();
+    cf.addPoolServer(host.getHostName(), port0);
+    ClientCache cache = getClientCache(cf);
+    cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
+  }
+
+  private void commitTxWithBeforeCompletion(String regionName, boolean withWait, String first,
+      String second) throws TimeoutException, InterruptedException {
+    Region region = getCache().getRegion(regionName);
+    TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+    mgr.begin();
+    region.put(key, newValue);
+    TXStateProxyImpl tx = (TXStateProxyImpl) mgr.internalSuspend();
+    ClientTXStateStub txStub = (ClientTXStateStub) tx.getRealDeal(null, null);
+    mgr.internalResume(tx);
+    txStub.beforeCompletion();
+    if (withWait) {
+      getBlackboard().signalGate(first);
+      getBlackboard().waitForGate(second, 30, TimeUnit.SECONDS);
+    }
+    txStub.afterCompletion(Status.STATUS_COMMITTED);
+  }
+}