You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/06/21 23:16:19 UTC
[24/25] geode git commit: GEODE-3101: Release local locks held by the
JTA beforeCompletion() in client when the JTA failed on the server.
GEODE-3101: Release local locks held by the JTA beforeCompletion() in client when the JTA failed on the server.
Also added a unit test would fail without the fix.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b7f5391d
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b7f5391d
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b7f5391d
Branch: refs/heads/feature/GEODE-3062-2
Commit: b7f5391d983cda4d95dc34a21aeb414a3c0a14c2
Parents: 822946b
Author: eshu <es...@pivotal.io>
Authored: Wed Jun 21 15:41:25 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Wed Jun 21 15:41:25 2017 -0700
----------------------------------------------------------------------
.../internal/cache/tx/ClientTXStateStub.java | 10 +-
.../internal/jta/ClientServerJTADUnitTest.java | 143 +++++++++++++++++++
2 files changed, 151 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/b7f5391d/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
index ded789e..ab5701c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
@@ -21,7 +21,7 @@ import java.util.List;
import javax.transaction.Status;
import org.apache.logging.log4j.Logger;
-
+import org.apache.geode.GemFireException;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
import org.apache.geode.cache.TransactionException;
@@ -237,7 +237,13 @@ public class ClientTXStateStub extends TXStateStub {
@Override
public void beforeCompletion() {
obtainLocalLocks();
- this.firstProxy.beforeCompletion(proxy.getTxId().getUniqId());
+ try {
+ this.firstProxy.beforeCompletion(proxy.getTxId().getUniqId());
+ } catch (GemFireException e) {
+ this.lockReq.releaseLocal();
+ this.firstProxy.getPool().releaseServerAffinity();
+ throw e;
+ }
}
public InternalDistributedMember getOriginatingMember() {
http://git-wip-us.apache.org/repos/asf/geode/blob/b7f5391d/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
new file mode 100644
index 0000000..51ef44a
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.jta;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.transaction.Status;
+
+import org.apache.geode.GemFireException;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.cache.tx.ClientTXStateStub;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({DistributedTest.class})
+public class ClientServerJTADUnitTest extends JUnit4CacheTestCase {
+ private String key = "key";
+ private String value = "value";
+ private String newValue = "newValue";
+
+ @Test
+ public void testClientTXStateStubBeforeCompletion() throws Exception {
+ final Host host = Host.getHost(0);
+ final VM server = host.getVM(0);
+ final VM client = host.getVM(1);
+ final String regionName = getUniqueName();
+ getBlackboard().initBlackboard();
+ final Properties properties = getDistributedSystemProperties();
+
+ final int port = server.invoke("create cache", () -> {
+ Cache cache = getCache(properties);
+ CacheServer cacheServer = createCacheServer(cache);
+ Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+ region.put(key, value);
+
+ return cacheServer.getPort();
+ });
+
+ client.invoke(() -> createClientRegion(host, port, regionName));
+
+ createClientRegion(host, port, regionName);
+
+ Region region = getCache().getRegion(regionName);
+ assertTrue(region.get(key).equals(value));
+
+ String first = "one";
+ String second = "two";
+
+ client.invokeAsync(() -> commitTxWithBeforeCompletion(regionName, true, first, second));
+
+ getBlackboard().waitForGate(first, 30, TimeUnit.SECONDS);
+ TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+ mgr.begin();
+ region.put(key, newValue);
+ TXStateProxyImpl tx = (TXStateProxyImpl) mgr.internalSuspend();
+ ClientTXStateStub txStub = (ClientTXStateStub) tx.getRealDeal(null, null);
+ mgr.internalResume(tx);
+ try {
+ txStub.beforeCompletion();
+ fail("expected to get CommitConflictException");
+ } catch (GemFireException e) {
+ // expected commit conflict exception thrown from server
+ mgr.setTXState(null);
+ getBlackboard().signalGate(second);
+ }
+
+ Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
+ .atMost(30, TimeUnit.SECONDS).until(() -> region.get(key).equals(newValue));
+
+ try {
+ commitTxWithBeforeCompletion(regionName, false, first, second);
+ } catch (Exception e) {
+ Assert.fail("got unexpected exception", e);
+ }
+ }
+
+ private CacheServer createCacheServer(Cache cache) {
+ CacheServer server = cache.addCacheServer();
+ server.setPort(AvailablePortHelper.getRandomAvailableTCPPort());
+ try {
+ server.start();
+ } catch (IOException e) {
+ Assert.fail("got exception", e);
+ }
+ return server;
+ }
+
+ private void createClientRegion(final Host host, final int port0, String regionName) {
+ ClientCacheFactory cf = new ClientCacheFactory();
+ cf.addPoolServer(host.getHostName(), port0);
+ ClientCache cache = getClientCache(cf);
+ cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
+ }
+
+ private void commitTxWithBeforeCompletion(String regionName, boolean withWait, String first,
+ String second) throws TimeoutException, InterruptedException {
+ Region region = getCache().getRegion(regionName);
+ TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+ mgr.begin();
+ region.put(key, newValue);
+ TXStateProxyImpl tx = (TXStateProxyImpl) mgr.internalSuspend();
+ ClientTXStateStub txStub = (ClientTXStateStub) tx.getRealDeal(null, null);
+ mgr.internalResume(tx);
+ txStub.beforeCompletion();
+ if (withWait) {
+ getBlackboard().signalGate(first);
+ getBlackboard().waitForGate(second, 30, TimeUnit.SECONDS);
+ }
+ txStub.afterCompletion(Status.STATUS_COMMITTED);
+ }
+}