You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2018/05/02 17:58:19 UTC
[geode] branch develop updated: GEODE-845: Rename and fixup
TXExpirationIntegrationTest
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new af54b49 GEODE-845: Rename and fixup TXExpirationIntegrationTest
af54b49 is described below
commit af54b49b8f73514dc239fc59f2f15982f3cb3b4d
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Tue May 1 15:43:29 2018 -0700
GEODE-845: Rename and fixup TXExpirationIntegrationTest
* rename TXExpiryJUnitTest to TXExpirationIntegrationTest
* fix flakiness in TXExpirationIntegrationTest
---
.../geode/internal/cache/InternalRegion.java | 4 +
.../apache/geode/internal/cache/LocalRegion.java | 2 +
.../java/org/apache/geode/ExpirationDetector.java | 86 +++++
.../apache/geode/TXExpirationIntegrationTest.java | 388 +++++++++++++++++++
.../java/org/apache/geode/TXExpiryJUnitTest.java | 424 ---------------------
...=> DistributedTXExpirationIntegrationTest.java} | 32 +-
.../internal/cache/RemoteTransactionDUnitTest.java | 188 ++++++++-
7 files changed, 674 insertions(+), 450 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index fb9fe1c..7633713 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -375,4 +375,8 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
Object getIMSync();
IndexManager setIndexManager(IndexManager idxMgr);
+
+ RegionTTLExpiryTask getRegionTTLExpiryTask();
+
+ RegionIdleExpiryTask getRegionIdleExpiryTask();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index bd909e1..88f6359 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -8094,6 +8094,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* Used by unit tests to get access to the RegionIdleExpiryTask of this region. Returns null if no
* task exists.
*/
+ @Override
public RegionIdleExpiryTask getRegionIdleExpiryTask() {
return this.regionIdleExpiryTask;
}
@@ -8102,6 +8103,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* Used by unit tests to get access to the RegionTTLExpiryTask of this region. Returns null if no
* task exists.
*/
+ @Override
public RegionTTLExpiryTask getRegionTTLExpiryTask() {
return this.regionTTLExpiryTask;
}
diff --git a/geode-core/src/test/java/org/apache/geode/ExpirationDetector.java b/geode-core/src/test/java/org/apache/geode/ExpirationDetector.java
new file mode 100644
index 0000000..0eb926b
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/ExpirationDetector.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geode.internal.cache.ExpiryTask;
+import org.apache.geode.internal.cache.ExpiryTask.ExpiryTaskListener;
+
+/**
+ * Used to detect that a particular ExpiryTask has expired.
+ */
+public class ExpirationDetector implements ExpiryTaskListener {
+
+ private volatile boolean executed;
+ private volatile boolean expired;
+ private volatile boolean rescheduled;
+
+ private final ExpiryTask expiryTask;
+
+ public ExpirationDetector(ExpiryTask expiry) {
+ assertThat(expiry).isNotNull();
+ expiryTask = expiry;
+ }
+
+ @Override
+ public void afterCancel(ExpiryTask expiryTask) {
+ // nothing
+ }
+
+ @Override
+ public void afterSchedule(ExpiryTask expiryTask) {
+ // nothing
+ }
+
+ @Override
+ public void afterReschedule(ExpiryTask expiryTask) {
+ if (expiryTask == this.expiryTask) {
+ if (!hasExpired()) {
+ ExpiryTask.suspendExpiration();
+ }
+ rescheduled = true;
+ }
+ }
+
+ @Override
+ public void afterExpire(ExpiryTask expiryTask) {
+ if (expiryTask == this.expiryTask) {
+ expired = true;
+ }
+ }
+
+ @Override
+ public void afterTaskRan(ExpiryTask expiryTask) {
+ if (expiryTask == this.expiryTask) {
+ executed = true;
+ }
+ }
+
+ public void awaitExecuted(long timeout, TimeUnit unit) {
+ await().atMost(timeout, unit).until(() -> executed);
+ }
+
+ public boolean wasRescheduled() {
+ return rescheduled;
+ }
+
+ public boolean hasExpired() {
+ return expired;
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/TXExpirationIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/TXExpirationIntegrationTest.java
new file mode 100644
index 0000000..aacd6ae
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/TXExpirationIntegrationTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.cache.ExpirationAction.DESTROY;
+import static org.apache.geode.cache.ExpirationAction.INVALIDATE;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.internal.cache.ExpiryTask;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxy;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+/**
+ * Tests transaction expiration functionality
+ *
+ * @since GemFire 4.0
+ */
+@Category(IntegrationTest.class)
+@RunWith(JUnitParamsRunner.class)
+public class TXExpirationIntegrationTest {
+
+ private static final String REGION_NAME =
+ TXExpirationIntegrationTest.class.getSimpleName() + "_region";
+ private static final String KEY = "key";
+
+ private InternalCache cache;
+ private CacheTransactionManager transactionManager;
+
+ @Before
+ public void setUp() throws Exception {
+ cache = (InternalCache) new CacheFactory(getConfig()).create();
+ transactionManager = cache.getCacheTransactionManager();
+ }
+
+ protected Properties getConfig() {
+ Properties config = new Properties();
+ config.setProperty(LOCATORS, "");
+ config.setProperty(MCAST_PORT, "0");
+ return config;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ transactionManager.rollback();
+ } catch (IllegalStateException ignore) {
+ }
+ cache.close();
+
+ ExpiryTask.permitExpiration();
+ }
+
+ @Test
+ @Parameters({"ENTRY_IDLE_DESTROY", "ENTRY_IDLE_INVALIDATE", "ENTRY_TTL_DESTROY",
+ "ENTRY_TTL_INVALIDATE"})
+ @TestCaseName("{method}({params})")
+ public void entryExpirationDoesNotCauseConflict(ExpirationOperation operation) throws Exception {
+ InternalRegion region = createRegion(REGION_NAME);
+ AttributesMutator<String, String> mutator = region.getAttributesMutator();
+
+ KeyCacheListener keyCacheListener = spy(new KeyCacheListener());
+ mutator.addCacheListener(keyCacheListener);
+
+ ExpiryTask.suspendExpiration();
+ operation.mutate(mutator);
+
+ region.put(KEY, "value1");
+
+ transactionManager.begin();
+ region.put(KEY, "value2");
+ awaitEntryExpiration(region, KEY); // enables expiration
+ assertThat(region.getEntry(KEY).getValue()).isEqualTo("value2");
+
+ ExpiryTask.suspendExpiration();
+ transactionManager.commit();
+
+ assertThat(region.getEntry(KEY).getValue()).isEqualTo("value2");
+ awaitEntryExpiration(region, KEY); // enables expiration
+ operation.verifyInvoked(keyCacheListener);
+
+ operation.verifyState(region);
+ }
+
+ @Test
+ @Parameters({"ENTRY_IDLE_DESTROY", "ENTRY_IDLE_INVALIDATE", "ENTRY_TTL_DESTROY",
+ "ENTRY_TTL_INVALIDATE"})
+ @TestCaseName("{method}({params})")
+ public void entryExpirationContinuesAfterCommitConflict(ExpirationOperation operation)
+ throws Exception {
+ Region<String, String> region = createRegion(REGION_NAME);
+
+ AttributesMutator<String, String> mutator = region.getAttributesMutator();
+
+ KeyCacheListener keyCacheListener = spy(new KeyCacheListener());
+ mutator.addCacheListener(keyCacheListener);
+
+ ExpiryTask.suspendExpiration();
+ operation.mutate(mutator);
+
+ ExpiryTask.suspendExpiration();
+ region.put(KEY, "value1");
+
+ transactionManager.begin();
+ region.put(KEY, "value2");
+ awaitEntryExpiration(region, KEY); // enables expiration
+ assertThat(region.getEntry(KEY).getValue()).isEqualTo("value2");
+
+ ExpiryTask.suspendExpiration();
+
+ TXManagerImpl txManagerImpl = (TXManagerImpl) transactionManager;
+ TXStateProxy txStateProxy = txManagerImpl.pauseTransaction();
+ region.put(KEY, "conflict");
+ txManagerImpl.unpauseTransaction(txStateProxy);
+
+ assertThatThrownBy(() -> transactionManager.commit())
+ .isInstanceOf(CommitConflictException.class);
+
+ awaitEntryExpiration(region, KEY); // enables expiration
+ operation.verifyInvoked(keyCacheListener);
+ operation.verifyState(region);
+ }
+
+ @Test
+ @Parameters({"ENTRY_IDLE_DESTROY", "ENTRY_IDLE_INVALIDATE", "ENTRY_TTL_DESTROY",
+ "ENTRY_TTL_INVALIDATE"})
+ @TestCaseName("{method}({params})")
+ public void entryExpirationContinuesAfterRollback(ExpirationOperation operation)
+ throws Exception {
+ Region<String, String> region = createRegion(REGION_NAME);
+
+ AttributesMutator<String, String> mutator = region.getAttributesMutator();
+
+ KeyCacheListener keyCacheListener = spy(new KeyCacheListener());
+ mutator.addCacheListener(keyCacheListener);
+
+ ExpiryTask.suspendExpiration();
+ operation.mutate(mutator);
+
+ region.put(KEY, "value1");
+
+ transactionManager.begin();
+ region.put(KEY, "value2");
+ awaitEntryExpiration(region, KEY); // enables expiration
+ assertThat(region.getEntry(KEY).getValue()).isEqualTo("value2");
+
+ ExpiryTask.suspendExpiration();
+
+ transactionManager.rollback();
+
+ awaitEntryExpiration(region, KEY); // enables expiration
+ operation.verifyInvoked(keyCacheListener);
+ operation.verifyState(region);
+ }
+
+ @Test
+ @Parameters({"REGION_IDLE_DESTROY", "REGION_IDLE_INVALIDATE", "REGION_TTL_DESTROY",
+ "REGION_TTL_INVALIDATE"})
+ @TestCaseName("{method}({params})")
+ public void regionExpirationDoesNotCauseConflict(ExpirationOperation operation) throws Exception {
+ Region<String, String> region = createRegion(REGION_NAME);
+
+ KeyCacheListener keyCacheListener = spy(new KeyCacheListener());
+
+ AttributesMutator<String, String> mutator = region.getAttributesMutator();
+ mutator.addCacheListener(keyCacheListener);
+
+ ExpiryTask.suspendExpiration();
+ operation.mutate(mutator);
+
+ transactionManager.begin();
+ region.put(KEY, "value1");
+
+ awaitRegionExpiration(region); // enables expiration
+ assertThat(region.getEntry(KEY).getValue()).isEqualTo("value1");
+
+ ExpiryTask.suspendExpiration();
+ transactionManager.commit();
+
+ assertThat(region.getEntry(KEY).getValue()).isEqualTo("value1");
+
+ awaitRegionExpiration(region); // enables expiration
+ operation.verifyInvoked(keyCacheListener);
+ operation.verifyState(region);
+ }
+
+ private enum ExpirationOperation {
+ ENTRY_IDLE_DESTROY(m -> m.setEntryIdleTimeout(new ExpirationAttributes(1, DESTROY)),
+ s -> verify(s, atLeast(1)).afterDestroyKey(eq(KEY)),
+ r -> assertThat(r.containsKey(KEY)).isFalse()),
+ ENTRY_IDLE_INVALIDATE(m -> m.setEntryIdleTimeout(new ExpirationAttributes(1, INVALIDATE)),
+ s -> verify(s, atLeast(1)).afterInvalidateKey(eq(KEY)),
+ r -> assertThat(r.get(KEY)).isNull()),
+ ENTRY_TTL_DESTROY(m -> m.setEntryTimeToLive(new ExpirationAttributes(1, DESTROY)),
+ s -> verify(s, atLeast(1)).afterDestroyKey(eq(KEY)),
+ r -> assertThat(r.containsKey(KEY)).isFalse()),
+ ENTRY_TTL_INVALIDATE(m -> m.setEntryTimeToLive(new ExpirationAttributes(1, INVALIDATE)),
+ s -> verify(s, atLeast(1)).afterInvalidateKey(eq(KEY)),
+ r -> assertThat(r.get(KEY)).isNull()),
+ REGION_IDLE_DESTROY(m -> m.setRegionIdleTimeout(new ExpirationAttributes(1, DESTROY)),
+ s -> verify(s, atLeast(1)).afterRegionDestroyName(eq(REGION_NAME)),
+ r -> await().atMost(1, MINUTES).until(() -> r.isDestroyed())),
+ REGION_IDLE_INVALIDATE(m -> m.setRegionIdleTimeout(new ExpirationAttributes(1, INVALIDATE)),
+ s -> verify(s, atLeast(1)).afterRegionInvalidateName(eq(REGION_NAME)),
+ r -> await().atMost(1, MINUTES).until(() -> r.get(KEY) == null)),
+ REGION_TTL_DESTROY(m -> m.setRegionTimeToLive(new ExpirationAttributes(1, DESTROY)),
+ s -> verify(s, atLeast(1)).afterRegionDestroyName(eq(REGION_NAME)),
+ r -> await().atMost(1, MINUTES).until(() -> r.isDestroyed())),
+ REGION_TTL_INVALIDATE(m -> m.setRegionTimeToLive(new ExpirationAttributes(1, INVALIDATE)),
+ s -> verify(s, atLeast(1)).afterRegionInvalidateName(eq(REGION_NAME)),
+ r -> await().atMost(1, MINUTES).until(() -> r.get(KEY) == null));
+
+ private final Consumer<AttributesMutator> mutatorConsumer;
+ private final Consumer<KeyCacheListener> listenerConsumer;
+ private final Consumer<Region> regionConsumer;
+
+ ExpirationOperation(Consumer<AttributesMutator> mutatorConsumer,
+ Consumer<KeyCacheListener> listenerConsumer, Consumer<Region> regionConsumer) {
+ this.mutatorConsumer = mutatorConsumer;
+ this.listenerConsumer = listenerConsumer;
+ this.regionConsumer = regionConsumer;
+ }
+
+ void mutate(AttributesMutator mutator) {
+ mutatorConsumer.accept(mutator);
+ }
+
+ void verifyInvoked(KeyCacheListener keyCacheListener) {
+ listenerConsumer.accept(keyCacheListener);
+ }
+
+ public void verifyState(Region region) {
+ regionConsumer.accept(region);
+ }
+ }
+
+ private InternalRegion createRegion(String name) {
+ RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+ rf.setStatisticsEnabled(true);
+
+ System.setProperty(LocalRegion.EXPIRY_MS_PROPERTY, "true");
+ try {
+ return (InternalRegion) rf.create(name);
+ } finally {
+ System.clearProperty(LocalRegion.EXPIRY_MS_PROPERTY);
+ }
+ }
+
+ private void awaitEntryExpired(Region region, String key) {
+ await().atMost(1, MINUTES).until(() -> region.getEntry(key) == null || region.get(key) == null);
+ }
+
+ private void awaitEntryExpiration(Region region, String key) {
+ InternalRegion internalRegion = (InternalRegion) region;
+ try {
+ ExpirationDetector detector;
+ do {
+ detector = new ExpirationDetector(internalRegion.getEntryExpiryTask(key));
+ ExpiryTask.expiryTaskListener = detector;
+ ExpiryTask.permitExpiration();
+ detector.awaitExecuted(30, SECONDS);
+ } while (!detector.hasExpired() && detector.wasRescheduled());
+ } finally {
+ ExpiryTask.expiryTaskListener = null;
+ }
+ }
+
+ private void awaitRegionExpiration(Region region) {
+ InternalRegion internalRegion = (InternalRegion) region;
+ try {
+ ExpirationDetector detector;
+ do {
+ detector = new ExpirationDetector(internalRegion.getRegionTTLExpiryTask() != null
+ ? internalRegion.getRegionTTLExpiryTask() : internalRegion.getRegionIdleExpiryTask());
+ ExpiryTask.expiryTaskListener = detector;
+ ExpiryTask.permitExpiration();
+ detector.awaitExecuted(30, SECONDS);
+ } while (!detector.hasExpired() && detector.wasRescheduled());
+ } finally {
+ ExpiryTask.expiryTaskListener = null;
+ }
+ }
+
+ private static class KeyCacheListener extends CacheListenerAdapter<String, String> {
+
+ @Override
+ public void afterCreate(EntryEvent<String, String> event) {
+ afterCreateKey(event.getKey());
+ }
+
+ @Override
+ public void afterUpdate(EntryEvent<String, String> event) {
+ afterUpdateKey(event.getKey());
+ }
+
+ @Override
+ public void afterInvalidate(EntryEvent<String, String> event) {
+ afterInvalidateKey(event.getKey());
+ }
+
+ @Override
+ public void afterDestroy(EntryEvent<String, String> event) {
+ afterDestroyKey(event.getKey());
+ }
+
+ @Override
+ public void afterRegionInvalidate(RegionEvent<String, String> event) {
+ afterRegionInvalidateName(event.getRegion().getName());
+ }
+
+ @Override
+ public void afterRegionDestroy(RegionEvent<String, String> event) {
+ afterRegionDestroyName(event.getRegion().getName());
+ }
+
+ public void afterCreateKey(Object key) {
+ // nothing
+ }
+
+ public void afterUpdateKey(Object key) {
+ // nothing
+ }
+
+ public void afterInvalidateKey(Object key) {
+ // nothing
+ }
+
+ public void afterDestroyKey(Object key) {
+ // nothing
+ }
+
+ public void afterRegionInvalidateName(Object key) {
+ // nothing
+ }
+
+ public void afterRegionDestroyName(Object key) {
+ // nothing
+ }
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java b/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java
deleted file mode 100644
index def0076..0000000
--- a/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode;
-
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.junit.Assert.*;
-
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.*;
-import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.internal.cache.*;
-import org.apache.geode.internal.cache.ExpiryTask.ExpiryTaskListener;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.FlakyTest;
-import org.apache.geode.test.junit.categories.IntegrationTest;
-
-/**
- * Tests transaction expiration functionality
- *
- * @since GemFire 4.0
- */
-@Category(IntegrationTest.class)
-public class TXExpiryJUnitTest {
-
- protected GemFireCacheImpl cache;
- protected CacheTransactionManager txMgr;
-
- protected void createCache() throws CacheException {
- Properties p = new Properties();
- p.setProperty(MCAST_PORT, "0"); // loner
- this.cache = (GemFireCacheImpl) (new CacheFactory(p)).create();
- this.txMgr = this.cache.getCacheTransactionManager();
- }
-
- private void closeCache() {
- if (this.cache != null) {
- if (this.txMgr != null) {
- try {
- this.txMgr.rollback();
- } catch (IllegalStateException ignore) {
- }
- }
- this.txMgr = null;
- Cache c = this.cache;
- this.cache = null;
- c.close();
- }
- }
-
- @Before
- public void setUp() throws Exception {
- createCache();
- }
-
- @After
- public void tearDown() throws Exception {
- closeCache();
- }
-
- @Test
- public void testEntryTTLExpiration() throws CacheException {
- generalEntryExpirationTest(createRegion("TXEntryTTL"),
- new ExpirationAttributes(1, ExpirationAction.DESTROY), true);
- }
-
- @Test
- public void testEntryIdleExpiration() throws CacheException {
- generalEntryExpirationTest(createRegion("TXEntryIdle"),
- new ExpirationAttributes(1, ExpirationAction.DESTROY), false);
- }
-
- private Region<String, String> createRegion(String name) {
- RegionFactory<String, String> rf = this.cache.createRegionFactory();
- rf.setScope(Scope.DISTRIBUTED_NO_ACK);
- rf.setStatisticsEnabled(true);
- System.setProperty(LocalRegion.EXPIRY_MS_PROPERTY, "true");
- try {
- return rf.create(name);
- } finally {
- System.getProperties().remove(LocalRegion.EXPIRY_MS_PROPERTY);
- }
- }
-
- public void generalEntryExpirationTest(final Region<String, String> exprReg,
- ExpirationAttributes exprAtt, boolean useTTL) throws CacheException {
- final LocalRegion lr = (LocalRegion) exprReg;
- final boolean wasDestroyed[] = {false};
- AttributesMutator<String, String> mutator = exprReg.getAttributesMutator();
- final AtomicInteger ac = new AtomicInteger();
- final AtomicInteger au = new AtomicInteger();
- final AtomicInteger ai = new AtomicInteger();
- final AtomicInteger ad = new AtomicInteger();
-
- if (useTTL) {
- mutator.setEntryTimeToLive(exprAtt);
- } else {
- mutator.setEntryIdleTimeout(exprAtt);
- }
- final CacheListener<String, String> cl = new CacheListenerAdapter<String, String>() {
- public void afterCreate(EntryEvent<String, String> e) {
- ac.incrementAndGet();
- }
-
- public void afterUpdate(EntryEvent<String, String> e) {
- au.incrementAndGet();
- }
-
- public void afterInvalidate(EntryEvent<String, String> e) {
- ai.incrementAndGet();
- }
-
- public void afterDestroy(EntryEvent<String, String> e) {
- ad.incrementAndGet();
- if (e.getKey().equals("key0")) {
- synchronized (wasDestroyed) {
- wasDestroyed[0] = true;
- wasDestroyed.notifyAll();
- }
- }
- }
-
- public void afterRegionInvalidate(RegionEvent<String, String> event) {
- fail("Unexpected invocation of afterRegionInvalidate");
- }
-
- public void afterRegionDestroy(RegionEvent<String, String> event) {
- if (!event.getOperation().isClose()) {
- fail("Unexpected invocation of afterRegionDestroy");
- }
- }
- };
- mutator.addCacheListener(cl);
- try {
-
- ExpiryTask.suspendExpiration();
- // Test to ensure an expiration does not cause a conflict
- for (int i = 0; i < 2; i++) {
- exprReg.put("key" + i, "value" + i);
- }
- this.txMgr.begin();
- exprReg.put("key0", "value");
- waitForEntryExpiration(lr, "key0");
- assertEquals("value", exprReg.getEntry("key0").getValue());
- try {
- ExpiryTask.suspendExpiration();
- this.txMgr.commit();
- } catch (CommitConflictException error) {
- fail("Expiration should not cause commit to fail");
- }
- assertEquals("value", exprReg.getEntry("key0").getValue());
- waitForEntryExpiration(lr, "key0");
- synchronized (wasDestroyed) {
- assertEquals(true, wasDestroyed[0]);
- }
- assertTrue(!exprReg.containsKey("key0"));
- // key1 is the canary for the rest of the entries
- waitForEntryToBeDestroyed(exprReg, "key1");
-
- // rollback and failed commit test, ensure expiration continues
- for (int j = 0; j < 2; j++) {
- synchronized (wasDestroyed) {
- wasDestroyed[0] = false;
- }
- ExpiryTask.suspendExpiration();
- for (int i = 0; i < 2; i++) {
- exprReg.put("key" + i, "value" + i);
- }
- this.txMgr.begin();
- exprReg.put("key0", "value");
- waitForEntryExpiration(lr, "key0");
- assertEquals("value", exprReg.getEntry("key0").getValue());
- String checkVal;
- ExpiryTask.suspendExpiration();
- if (j == 0) {
- checkVal = "value0";
- this.txMgr.rollback();
- } else {
- checkVal = "conflictVal";
- final TXManagerImpl txMgrImpl = (TXManagerImpl) this.txMgr;
- TXStateProxy tx = txMgrImpl.pauseTransaction();
- exprReg.put("key0", checkVal);
- txMgrImpl.unpauseTransaction(tx);
- try {
- this.txMgr.commit();
- fail("Expected CommitConflictException!");
- } catch (CommitConflictException expected) {
- }
- }
- waitForEntryExpiration(lr, "key0");
- synchronized (wasDestroyed) {
- assertEquals(true, wasDestroyed[0]);
- }
- assertTrue(!exprReg.containsKey("key0"));
- // key1 is the canary for the rest of the entries
- waitForEntryToBeDestroyed(exprReg, "key1");
- }
- } finally {
- mutator.removeCacheListener(cl);
- ExpiryTask.permitExpiration();
- }
- }
-
- private void waitForEntryToBeDestroyed(final Region r, final String key) {
- WaitCriterion waitForExpire = new WaitCriterion() {
- public boolean done() {
- return r.getEntry(key) == null;
- }
-
- public String description() {
- return "never saw entry destroy of " + key;
- }
- };
- Wait.waitForCriterion(waitForExpire, 3000, 10, true);
- }
-
- public static void waitForEntryExpiration(LocalRegion lr, String key) {
- try {
- ExpirationDetector detector;
- do {
- detector = new ExpirationDetector(lr.getEntryExpiryTask(key));
- ExpiryTask.expiryTaskListener = detector;
- ExpiryTask.permitExpiration();
- Wait.waitForCriterion(detector, 3000, 2, true);
- } while (!detector.hasExpired() && detector.wasRescheduled());
- } finally {
- ExpiryTask.expiryTaskListener = null;
- }
- }
-
- private void waitForRegionExpiration(LocalRegion lr, boolean ttl) {
- try {
- ExpirationDetector detector;
- do {
- detector = new ExpirationDetector(
- ttl ? lr.getRegionTTLExpiryTask() : lr.getRegionIdleExpiryTask());
- ExpiryTask.expiryTaskListener = detector;
- ExpiryTask.permitExpiration();
- Wait.waitForCriterion(detector, 3000, 2, true);
- } while (!detector.hasExpired() && detector.wasRescheduled());
- } finally {
- ExpiryTask.expiryTaskListener = null;
- }
- }
-
-
- /**
- * Used to detect that a particular ExpiryTask has expired.
- */
- public static class ExpirationDetector implements ExpiryTaskListener, WaitCriterion {
- private volatile boolean ran = false;
- private volatile boolean expired = false;
- private volatile boolean rescheduled = false;
- public final ExpiryTask et;
-
- public ExpirationDetector(ExpiryTask et) {
- assertNotNull(et);
- this.et = et;
- }
-
- @Override
- public void afterCancel(ExpiryTask et) {}
-
- @Override
- public void afterSchedule(ExpiryTask et) {}
-
- @Override
- public void afterReschedule(ExpiryTask et) {
- if (et == this.et) {
- if (!hasExpired()) {
- ExpiryTask.suspendExpiration();
- }
- this.rescheduled = true;
- }
- }
-
- @Override
- public void afterExpire(ExpiryTask et) {
- if (et == this.et) {
- this.expired = true;
- }
- }
-
- @Override
- public void afterTaskRan(ExpiryTask et) {
- if (et == this.et) {
- this.ran = true;
- }
- }
-
- @Override
- public boolean done() {
- return this.ran;
- }
-
- @Override
- public String description() {
- return "the expiry task " + this.et + " never ran";
- }
-
- public boolean wasRescheduled() {
- return this.rescheduled;
- }
-
- public boolean hasExpired() {
- return this.expired;
- }
- }
-
- @Category(FlakyTest.class) // GEODE-845: time sensitive, expiration, eats exceptions (1 fixed),
- // waitForCriterion, 3 second timeout
- @Test
- public void testRegionIdleExpiration() throws CacheException {
- Region<String, String> exprReg = createRegion("TXRegionIdle");
- generalRegionExpirationTest(exprReg, new ExpirationAttributes(1, ExpirationAction.INVALIDATE),
- false);
- generalRegionExpirationTest(exprReg, new ExpirationAttributes(1, ExpirationAction.DESTROY),
- false);
- }
-
- @Test
- public void testRegionTTLExpiration() throws CacheException {
- Region<String, String> exprReg = createRegion("TXRegionTTL");
- generalRegionExpirationTest(exprReg, new ExpirationAttributes(1, ExpirationAction.INVALIDATE),
- true);
- generalRegionExpirationTest(exprReg, new ExpirationAttributes(1, ExpirationAction.DESTROY),
- true);
- }
-
- private void generalRegionExpirationTest(final Region<String, String> exprReg,
- ExpirationAttributes exprAtt, boolean useTTL) throws CacheException {
- final LocalRegion lr = (LocalRegion) exprReg;
- final ExpirationAction action = exprAtt.getAction();
- final boolean regionExpiry[] = {false};
- AttributesMutator<String, String> mutator = exprReg.getAttributesMutator();
- final CacheListener<String, String> cl = new CacheListenerAdapter<String, String>() {
- public void afterRegionInvalidate(RegionEvent<String, String> event) {
- synchronized (regionExpiry) {
- regionExpiry[0] = true;
- regionExpiry.notifyAll();
- }
- }
-
- public void afterRegionDestroy(RegionEvent<String, String> event) {
- if (!event.getOperation().isClose()) {
- synchronized (regionExpiry) {
- regionExpiry[0] = true;
- regionExpiry.notifyAll();
- }
- }
- }
- };
- mutator.addCacheListener(cl);
- // Suspend before enabling region expiration to prevent
- // it from happening before we do the put.
- ExpiryTask.suspendExpiration();
- try {
- if (useTTL) {
- mutator.setRegionTimeToLive(exprAtt);
- } else {
- mutator.setRegionIdleTimeout(exprAtt);
- }
-
- // Create some keys and age them, I wish we could fake/force the age
- // instead of having to actually wait
- for (int i = 0; i < 2; i++) {
- exprReg.put("key" + i, "value" + i);
- }
-
- String regName = exprReg.getName();
- // Test to ensure a region expiration does not cause a conflict
- this.txMgr.begin();
- exprReg.put("key0", "value");
- waitForRegionExpiration(lr, useTTL);
- assertEquals("value", exprReg.getEntry("key0").getValue());
- try {
- ExpiryTask.suspendExpiration();
- this.txMgr.commit();
- } catch (CommitConflictException error) {
- Assert.fail("Expiration should not cause commit to fail", error);
- }
- assertEquals("value", exprReg.getEntry("key0").getValue());
- waitForRegionExpiration(lr, useTTL);
- synchronized (regionExpiry) {
- assertEquals(true, regionExpiry[0]);
- }
- if (action == ExpirationAction.DESTROY) {
- assertNull("listener saw Region expiration, expected a destroy operation!",
- this.cache.getRegion(regName));
- } else {
- assertTrue("listener saw Region expiration, expected invalidation",
- !exprReg.containsValueForKey("key0"));
- }
-
- } finally {
- if (!exprReg.isDestroyed()) {
- mutator.removeCacheListener(cl);
- }
- ExpiryTask.permitExpiration();
- }
-
- // @todo mitch test rollback and failed expiration
- }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/DistTXExpiryJUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/DistributedTXExpirationIntegrationTest.java
similarity index 51%
rename from geode-core/src/test/java/org/apache/geode/disttx/DistTXExpiryJUnitTest.java
rename to geode-core/src/test/java/org/apache/geode/disttx/DistributedTXExpirationIntegrationTest.java
index 2948c3d..67c3ee6 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/DistTXExpiryJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/DistributedTXExpirationIntegrationTest.java
@@ -14,42 +14,26 @@
*/
package org.apache.geode.disttx;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_TRANSACTIONS;
import java.util.Properties;
import org.junit.experimental.categories.Category;
-import org.apache.geode.TXExpiryJUnitTest;
-import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.CacheException;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Scope;
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.TXExpirationIntegrationTest;
import org.apache.geode.test.junit.categories.DistributedTransactionsTest;
import org.apache.geode.test.junit.categories.IntegrationTest;
/**
- * Same tests as that of {@link TXExpiryJUnitTest} after setting "distributed-transactions" property
- * to true
+ * Extends {@link TXExpirationIntegrationTest} with "distributed-transactions" enabled.
*/
@Category({IntegrationTest.class, DistributedTransactionsTest.class})
-public class DistTXExpiryJUnitTest extends TXExpiryJUnitTest {
-
- public DistTXExpiryJUnitTest() {}
+public class DistributedTXExpirationIntegrationTest extends TXExpirationIntegrationTest {
@Override
- protected void createCache() throws CacheException {
- Properties p = new Properties();
- p.setProperty(MCAST_PORT, "0"); // loner
- p.setProperty(ConfigurationProperties.DISTRIBUTED_TRANSACTIONS, "true");
- this.cache = (GemFireCacheImpl) CacheFactory.create(DistributedSystem.connect(p));
- AttributesFactory af = new AttributesFactory();
- af.setScope(Scope.DISTRIBUTED_NO_ACK);
- this.txMgr = this.cache.getCacheTransactionManager();
- assert (this.txMgr.isDistributed());
+ protected Properties getConfig() {
+ Properties config = super.getConfig();
+ config.setProperty(DISTRIBUTED_TRANSACTIONS, "true");
+ return config;
}
-
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
index 1422fab..5757746 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
import static org.junit.Assert.assertEquals;
@@ -44,7 +45,7 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.geode.TXExpiryJUnitTest;
+import org.apache.geode.ExpirationDetector;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.AttributesMutator;
import org.apache.geode.cache.CacheEvent;
@@ -125,12 +126,14 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
protected final String D_REFERENCE = "distrReference";
private final SerializableCallable getNumberOfTXInProgress = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
return mgr.hostedTransactionsInProgressForTest();
}
};
private final SerializableCallable verifyNoTxState = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
// TXManagerImpl mgr = getGemfireCache().getTxManager();
// assertIndexDetailsEquals(0, mgr.hostedTransactionsInProgressForTest());
@@ -218,6 +221,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
protected void initAccessorAndDataStore(VM accessor, VM datastore, final int redundantCopies) {
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
createRegion(true/* accessor */, redundantCopies, null);
return null;
@@ -225,6 +229,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
createRegion(false/* accessor */, redundantCopies, null);
populateData();
@@ -236,6 +241,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
protected void initAccessorAndDataStore(VM accessor, VM datastore1, VM datastore2,
final int redundantCopies) {
datastore2.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
createRegion(false/* accessor */, redundantCopies, null);
return null;
@@ -248,6 +254,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
private void initAccessorAndDataStoreWithInterestPolicy(VM accessor, VM datastore1, VM datastore2,
final int redundantCopies) {
datastore2.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
createRegion(false/* accessor */, redundantCopies, InterestPolicy.ALL);
return null;
@@ -269,6 +276,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
this.op = op;
}
+ @Override
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getTxManager();
LogWriterUtils.getLogWriter().fine("testTXPut starting tx");
@@ -563,6 +571,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.PUT));
datastore.invoke(new SerializableCallable("verify tx") {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
assertTrue(mgr.isHostedTxInProgress(txId));
@@ -571,6 +580,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
TXStateProxy tx = mgr.pauseTransaction();
@@ -586,6 +596,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
assertFalse(mgr.isHostedTxInProgress(txId));
@@ -594,6 +605,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
if (commit) {
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
verifyAfterCommit(OP.PUT);
return null;
@@ -601,6 +613,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
} else {
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
verifyAfterRollback(OP.PUT);
return null;
@@ -619,6 +632,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.GET));
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
assertTrue(mgr.isHostedTxInProgress(txId));
@@ -641,6 +655,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getTxManager();
mgr.commit();
@@ -659,13 +674,16 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
AttributesMutator am = getCache().getRegion(CUSTOMER).getAttributesMutator();
am.setCacheLoader(new CacheLoader() {
+ @Override
public Object load(LoaderHelper helper) throws CacheLoaderException {
return new Customer("sup dawg", "add");
}
+ @Override
public void close() {}
});
return null;
@@ -673,6 +691,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getTxManager();
mgr.begin();
@@ -705,12 +724,14 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
return null;
}
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region cust = getCache().getRegion(CUSTOMER);
CustId sup = new CustId(7);
@@ -758,13 +779,16 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
AttributesMutator am = getCache().getRegion(CUSTOMER).getAttributesMutator();
am.setCacheLoader(new CacheLoader() {
+ @Override
public Object load(LoaderHelper helper) throws CacheLoaderException {
return new Customer("sup dawg", "addr");
}
+ @Override
public void close() {}
});
CacheTransactionManager mgr = getGemfireCache().getTxManager();
@@ -782,6 +806,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
return null;
}
@@ -802,6 +827,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.PUT));
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
assertTrue(mgr.isHostedTxInProgress(txId));
@@ -822,6 +848,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
assertNotNull(mgr.getTXState());
@@ -846,6 +873,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.INVALIDATE));
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
assertTrue(mgr.isHostedTxInProgress(txId));
@@ -866,6 +894,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getTxManager();
mgr.commit();
@@ -888,6 +917,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.DESTROY));
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
assertTrue(mgr.isHostedTxInProgress(txId));
@@ -908,6 +938,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getTxManager();
mgr.commit();
@@ -928,6 +959,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final CustId newCustId = new CustId(10);
final Customer updateCust = new Customer("customer10", "address10");
final TXId txId = (TXId) accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.begin();
@@ -963,6 +995,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region cust = getGemfireCache().getRegion(CUSTOMER);
int hash1 = PartitionedRegionHelper.getHashKey((PartitionedRegion) cust, new CustId(1));
@@ -987,6 +1020,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.commit();
@@ -1026,6 +1060,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final CustId custId = new CustId(1);
final TXId txId = (TXId) accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER);
Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE);
@@ -1045,6 +1080,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
assertTrue(mgr.isHostedTxInProgress(txId));
@@ -1065,6 +1101,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.commit();
@@ -1103,6 +1140,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final CustId custId2 = new CustId(2);
final CustId custId20 = new CustId(20);
final TXId txId = (TXId) accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER);
Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE);
@@ -1121,6 +1159,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
assertTrue(mgr.isHostedTxInProgress(txId));
@@ -1143,6 +1182,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.commit();
@@ -1194,6 +1234,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
VM datastore1 = host.getVM(1);
VM datastore2 = host.getVM(3);
datastore2.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
createRegion(false/* accessor */, 0, null);
return null;
@@ -1209,6 +1250,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final CustId custId4 = new CustId(4);
final CustId custId20 = new CustId(20);
final TXId txId = (TXId) accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER);
Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE);
@@ -1242,6 +1284,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
// Create a second data store.
datastore2.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
createRegion(false/* accessor */, 1, null);
return null;
@@ -1256,6 +1299,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final CustId custId4 = new CustId(4);
final CustId custId20 = new CustId(20);
final TXId txId = (TXId) accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER);
Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE);
@@ -1270,6 +1314,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
SerializableCallable checkArtifacts = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
PartitionedRegion cust = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
assertNull(cust.get(custId0));
@@ -1292,6 +1337,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final CustId custId = new CustId(1);
final Customer updatedCust = new Customer("updated", "updated");
final TXId txId = (TXId) accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER);
Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE);
@@ -1311,6 +1357,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
assertTrue(mgr.isHostedTxInProgress(txId));
@@ -1331,6 +1378,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
mgr.commit();
@@ -1370,6 +1418,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
VM datastore2 = host.getVM(2);
datastore2.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
createRegion(false, 1, null);
return null;
@@ -1379,6 +1428,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
initAccessorAndDataStore(accessor, datastore1, 1);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
@@ -1443,6 +1493,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
VM accessor = getVMForTransactions(acc, datastore);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region ref = getCache().getRegion(D_REFERENCE);
ref.getAttributesMutator().addCacheListener(new TestCacheListener(true));
@@ -1462,6 +1513,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
SerializableCallable addListenersToDataStore = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region ref = getCache().getRegion(D_REFERENCE);
ref.getAttributesMutator().addCacheListener(new TestCacheListener(false));
@@ -1487,6 +1539,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
if (op != OP.INVALIDATE) {
// Ensure the cache writer was not fired in accessor
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region cust = getCache().getRegion(CUSTOMER);
assertFalse(((TestCacheWriter) cust.getAttributes().getCacheWriter()).wasFired);
@@ -1500,6 +1553,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
// Ensure the cache writer was fired in the primary
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region cust = getCache().getRegion(CUSTOMER);
assertTrue(((TestCacheWriter) cust.getAttributes().getCacheWriter()).wasFired);
@@ -1513,6 +1567,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getTxManager();
mgr.commit();
@@ -1521,6 +1576,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TestTxListener l = (TestTxListener) getGemfireCache().getTxManager().getListener();
assertTrue(l.isListenerInvoked());
@@ -1528,6 +1584,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
SerializableCallable verifyListeners = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region cust = getCache().getRegion(CUSTOMER);
Region order = getCache().getRegion(ORDER);
@@ -1614,34 +1671,44 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
this.isAccessor = isAccessor;
}
+ @Override
public void afterCreate(EntryEvent event) {
verifyOrigin(event);
verifyPutAll(event);
}
+ @Override
public void afterUpdate(EntryEvent event) {
verifyOrigin(event);
verifyPutAll(event);
}
+ @Override
public void afterDestroy(EntryEvent event) {
verifyOrigin(event);
}
+ @Override
public void afterInvalidate(EntryEvent event) {
verifyOrigin(event);
}
+ @Override
public void afterRegionClear(RegionEvent event) {}
+ @Override
public void afterRegionCreate(RegionEvent event) {}
+ @Override
public void afterRegionDestroy(RegionEvent event) {}
+ @Override
public void afterRegionInvalidate(RegionEvent event) {}
+ @Override
public void afterRegionLive(RegionEvent event) {}
+ @Override
public void close() {}
}
@@ -1653,6 +1720,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
this.isAccessor = isAccessor;
}
+ @Override
public void beforeCreate(EntryEvent event) throws CacheWriterException {
getGemfireCache().getLogger()
.info("SWAP:beforeCreate:" + event + " op:" + event.getOperation());
@@ -1665,6 +1733,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
wasFired = true;
}
+ @Override
public void beforeUpdate(EntryEvent event) throws CacheWriterException {
getGemfireCache().getLogger()
.info("SWAP:beforeCreate:" + event + " op:" + event.getOperation());
@@ -1673,19 +1742,23 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
setFired(event);
}
+ @Override
public void beforeDestroy(EntryEvent event) throws CacheWriterException {
verifyOrigin(event);
setFired(event);
}
+ @Override
public void beforeRegionClear(RegionEvent event) throws CacheWriterException {
setFired(null);
}
+ @Override
public void beforeRegionDestroy(RegionEvent event) throws CacheWriterException {
setFired(null);
}
+ @Override
public void close() {}
}
@@ -1733,15 +1806,18 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
this.isAccessor = isAccessor;
}
+ @Override
public void afterCommit(TransactionEvent event) {
listenerInvoked = true;
verify(event);
}
+ @Override
public void afterFailedCommit(TransactionEvent event) {
verify(event);
}
+ @Override
public void afterRollback(TransactionEvent event) {
listenerInvoked = true;
verify(event);
@@ -1751,6 +1827,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
return this.listenerInvoked;
}
+ @Override
public void close() {}
}
@@ -1759,10 +1836,12 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
this.isAccessor = isAccessor;
}
+ @Override
public void beforeCommit(TransactionEvent event) {
verify(event);
}
+ @Override
public void close() {}
}
@@ -1775,10 +1854,13 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
VM accessor = getVMForTransactions(acc, datastore);
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
getGemfireCache().getTxManager().setWriter(new TransactionWriter() {
+ @Override
public void close() {}
+ @Override
public void beforeCommit(TransactionEvent event) throws TransactionWriterException {
throw new TransactionWriterException("AssertionError");
}
@@ -1788,6 +1870,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
getGemfireCache().getTxManager().begin();
Region r = getCache().getRegion(CUSTOMER);
@@ -1823,6 +1906,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
VM taskVM = isAccessor ? accessor : datastore1;
taskVM.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
TXManagerImpl mgr = getGemfireCache().getTxManager();
@@ -1836,6 +1920,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
datastore2.invoke(verifyNoTxState);
taskVM.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
Region orderRegion = getCache().getRegion(ORDER);
@@ -1870,6 +1955,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
assertEquals(1, txOnDatastore1 + txOnDatastore2);
taskVM.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getTxManager();
mgr.commit();
@@ -1955,6 +2041,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
initAccessorAndDataStore(accessor, datastore1, datastore2, redundancy);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
Set originalSet;
@@ -1997,6 +2084,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
datastore2.invoke(verifyNoTxState);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
Region orderRegion = getCache().getRegion(ORDER);
@@ -2062,6 +2150,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
assertEquals(1, txOnDatastore1 + txOnDatastore2);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getTxManager();
mgr.commit();
@@ -2079,6 +2168,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
int originalSetSize;
int expectedSetSize;
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTXMgr();
custRegion = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
@@ -2195,6 +2285,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getGemfireCache().getRegion(CUSTOMER);
Region rr = getGemfireCache().getRegion(D_REFERENCE);
@@ -2245,6 +2336,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getGemfireCache().getRegion(CUSTOMER);
Region rr = getGemfireCache().getRegion(D_REFERENCE);
@@ -2295,6 +2387,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getGemfireCache().getRegion(CUSTOMER);
Region rr = getGemfireCache().getRegion(D_REFERENCE);
@@ -2345,6 +2438,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
SerializableCallable doIllegalIteration = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region r = getGemfireCache().getRegion(CUSTOMER);
Set keySet = r.keySet();
@@ -2446,6 +2540,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
class TXFunction implements Function {
static final String id = "TXFunction";
+ @Override
public void execute(FunctionContext context) {
Region r = null;
r = getGemfireCache().getRegion(CUSTOMER);
@@ -2455,18 +2550,22 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
context.getResultSender().lastResult(Boolean.TRUE);
}
+ @Override
public String getId() {
return id;
}
+ @Override
public boolean hasResult() {
return true;
}
+ @Override
public boolean optimizeForWrite() {
return true;
}
+ @Override
public boolean isHA() {
return false;
}
@@ -2494,6 +2593,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
SerializableCallable registerFunction = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
FunctionService.registerFunction(new TXFunction());
return null;
@@ -2505,6 +2605,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
datastore2.invoke(registerFunction);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
PartitionedRegion custRegion = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
TXManagerImpl mgr = getGemfireCache().getTXMgr();
@@ -2555,6 +2656,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
assertEquals(1, txOnDatastore1 + txOnDatastore2);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getTXMgr();
mgr.commit();
@@ -2563,6 +2665,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
datastore1.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getGemfireCache().getRegion(CUSTOMER);
assertEquals(expectedCustomer, custRegion.get(expectedCustId));
@@ -2571,6 +2674,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTXMgr();
mgr.begin();
@@ -2665,6 +2769,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
@@ -2694,6 +2799,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
this.isAccessor = isAccessor;
}
+ @Override
public Object call() throws Exception {
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(isAccessor ? DataPolicy.EMPTY : DataPolicy.REPLICATE);
@@ -2717,6 +2823,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
accessor.invoke(new CreateDR(true));
SerializableCallable registerFunction = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
FunctionService.registerFunction(new TXFunction());
return null;
@@ -2728,6 +2835,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
datastore2.invoke(registerFunction);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getGemfireCache().getRegion(CUSTOMER);
TXManagerImpl mgr = getGemfireCache().getTXMgr();
@@ -2747,6 +2855,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
datastore1.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
final Region custRegion = getGemfireCache().getRegion(CUSTOMER);
TXManagerImpl mgr = getGemfireCache().getTXMgr();
@@ -2783,6 +2892,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
SerializableCallable registerFunction = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
FunctionService.registerFunction(new TXFunction());
return null;
@@ -2794,6 +2904,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
datastore2.invoke(registerFunction);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getGemfireCache().getRegion(CUSTOMER);
TXManagerImpl mgr = getGemfireCache().getTXMgr();
@@ -2819,6 +2930,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
assertEquals(1, txOnDatastore1 + txOnDatastore2);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getGemfireCache().getRegion(CUSTOMER);
CacheTransactionManager mgr = getGemfireCache().getTXMgr();
@@ -2830,6 +2942,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
// test onMembers
SerializableCallable getMember = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
return getGemfireCache().getMyId();
}
@@ -2837,6 +2950,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final InternalDistributedMember ds1 = (InternalDistributedMember) datastore1.invoke(getMember);
final InternalDistributedMember ds2 = (InternalDistributedMember) datastore2.invoke(getMember);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
// get owner for expectedKey
@@ -2870,6 +2984,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
assertEquals(1, txOnDatastore1_1 + txOnDatastore2_1);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getGemfireCache().getRegion(CUSTOMER);
CacheTransactionManager mgr = getGemfireCache().getTXMgr();
@@ -2881,6 +2996,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
// test function execution on data store
final DistributedMember owner = (DistributedMember) accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
return pr.getOwnerForKey(pr.getKeyInfo(expectedCustId));
@@ -2888,6 +3004,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
SerializableCallable testFnOnDs = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTXMgr();
PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
@@ -2905,6 +3022,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
};
SerializableCallable closeTx = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getGemfireCache().getRegion(CUSTOMER);
CacheTransactionManager mgr = getGemfireCache().getTXMgr();
@@ -2933,6 +3051,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
// test that function is rejected if function target is not same as txState target
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getTXMgr();
PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
@@ -3008,6 +3127,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
VM accessor = getVMForTransactions(acc, datastore);
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
getGemfireCache().getTxManager().addListener(new TestTxListener(false));
return null;
@@ -3016,6 +3136,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final CustId expectedCustId = new CustId(6);
final Customer expectedCustomer = new Customer("customer6", "address6");
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
getGemfireCache().getTxManager().addListener(new TestTxListener(true));
Region custRegion = getCache().getRegion(CUSTOMER);
@@ -3030,6 +3151,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
assertNull(custRegion.get(expectedCustId));
@@ -3037,6 +3159,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region custRegion = getCache().getRegion(CUSTOMER);
Context ctx = getCache().getJNDIContext();
@@ -3052,6 +3175,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TestTxListener l = (TestTxListener) getGemfireCache().getTXMgr().getListener();
assertTrue(l.isListenerInvoked());
@@ -3073,6 +3197,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
int fireD = 0;
int fireU = 0;
+ @Override
public void beforeCreate(EntryEvent event) throws CacheWriterException {
if (!event.isOriginRemote()) {
throw new CacheWriterException("SUP?? This CREATE is supposed to be isOriginRemote");
@@ -3080,6 +3205,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
fireC++;
}
+ @Override
public void beforeDestroy(EntryEvent event) throws CacheWriterException {
getGemfireCache().getLoggerI18n().fine("SWAP:writer:createEvent:" + event);
if (!event.isOriginRemote()) {
@@ -3088,6 +3214,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
fireD++;
}
+ @Override
public void beforeUpdate(EntryEvent event) throws CacheWriterException {
if (!event.isOriginRemote()) {
throw new CacheWriterException("SUP?? This UPDATE is supposed to be isOriginRemote");
@@ -3098,6 +3225,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region refRegion = getCache().getRegion(D_REFERENCE);
refRegion.getAttributesMutator().setCacheWriter(new OriginRemoteRRWriter());
@@ -3110,6 +3238,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
accessor.invoke(new DoOpsInTX(OP.PUT));
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
TXStateProxy tx = mgr.pauseTransaction();
@@ -3123,6 +3252,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
accessor.invoke(new DoOpsInTX(OP.DESTROY));
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
TXStateProxy tx = mgr.pauseTransaction();
@@ -3137,6 +3267,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
accessor.invoke(new DoOpsInTX(OP.PUT));
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
TXStateProxy tx = mgr.pauseTransaction();
@@ -3149,6 +3280,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region refRegion = getCache().getRegion(D_REFERENCE);
OriginRemoteRRWriter w = (OriginRemoteRRWriter) refRegion.getAttributes().getCacheWriter();
@@ -3173,6 +3305,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region refRegion = getCache().getRegion(D_REFERENCE);
refRegion.create("sup", "dawg");
@@ -3181,6 +3314,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getGemfireCache().getTxManager();
TXStateProxy tx = mgr.pauseTransaction();
@@ -3193,6 +3327,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region refRegion = getCache().getRegion(D_REFERENCE);
assertEquals("dawg", refRegion.get("sup"));
@@ -3209,6 +3344,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
initAccessorAndDataStore(accessor, datastore, 0);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region cust = getCache().getRegion(CUSTOMER);
TXManagerImpl mgr = getCache().getTxManager();
@@ -3219,11 +3355,13 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
final InternalDistributedMember member =
(InternalDistributedMember) accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
return getCache().getMyId();
}
});
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
TXManagerImpl mgr = getCache().getTxManager();
assertEquals(1, mgr.hostedTransactionsInProgressForTest());
@@ -3257,6 +3395,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Map custMap = new HashMap();
for (int i = 0; i < 10; i++) {
@@ -3447,6 +3586,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
initAccessorAndDataStore(accessor, datastore, 0);
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region cust = getCache().getRegion(D_REFERENCE);
cust.put("meow", "this is a meow, deal with it");
@@ -3458,6 +3598,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region cust = getCache().getRegion(D_REFERENCE);
cust.getAttributesMutator().addCacheListener(new OneUpdateCacheListener());
@@ -3468,6 +3609,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
CacheTransactionManager mgr = getGemfireCache().getTxManager();
mgr.begin();
@@ -3480,6 +3622,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region cust = getCache().getRegion(D_REFERENCE);
OneUpdateCacheListener rat =
@@ -3492,6 +3635,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region cust = getCache().getRegion(D_REFERENCE);
OneDestroyAndThenOneCreateCacheWriter wri =
@@ -3584,6 +3728,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
protected Integer startServer(VM vm) {
return (Integer) vm.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
CacheServer s = getCache().addCacheServer();
@@ -3597,6 +3742,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
protected void createClientRegion(VM vm, final int port, final boolean isEmpty,
final boolean ri) {
vm.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port);
@@ -3626,17 +3772,20 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
boolean invoked = false;
+ @Override
public void onError(CqEvent aCqEvent) {
// TODO Auto-generated method stub
}
+ @Override
public void onEvent(CqEvent aCqEvent) {
// TODO Auto-generated method stub
invoked = true;
}
+ @Override
public void close() {
// TODO Auto-generated method stub
@@ -3765,6 +3914,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
createClientRegion(client, port, false, true);
accessor.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
@@ -3781,6 +3931,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
client.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
@@ -3788,10 +3939,12 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final ClientListener cl =
(ClientListener) custRegion.getAttributes().getCacheListeners()[0];
WaitCriterion waitForListenerInvocation = new WaitCriterion() {
+ @Override
public boolean done() {
return cl.invoked;
}
+ @Override
public String description() {
return "listener was never invoked";
}
@@ -3811,6 +3964,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
VM client = host.getVM(1);
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
AttributesFactory<Integer, String> af = new AttributesFactory<Integer, String>();
af.setScope(Scope.DISTRIBUTED_ACK);
@@ -3825,6 +3979,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final int port = startServer(datastore);
client.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
ClientCacheFactory ccf = new ClientCacheFactory();
ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port);
@@ -3844,6 +3999,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region ref = getCache().getRegion(D_REFERENCE);
Region empty = getCache().getRegion(EMPTY_REGION);
@@ -3861,14 +4017,17 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
client.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region empty = getCache().getRegion(EMPTY_REGION);
final ClientListener l = (ClientListener) empty.getAttributes().getCacheListeners()[0];
WaitCriterion wc = new WaitCriterion() {
+ @Override
public boolean done() {
return l.invoked;
}
+ @Override
public String description() {
return "listener invoked:" + l.invoked;
}
@@ -3891,6 +4050,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
createClientRegion(client, port, false, true);
datastore.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
@@ -3907,6 +4067,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
client.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
@@ -3914,10 +4075,12 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final ClientListener cl =
(ClientListener) custRegion.getAttributes().getCacheListeners()[0];
WaitCriterion waitForListenerInvocation = new WaitCriterion() {
+ @Override
public boolean done() {
return cl.invoked;
}
+ @Override
public String description() {
return "listener was never invoked";
}
@@ -3938,6 +4101,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
initAccessorAndDataStoreWithInterestPolicy(accessor, datastore1, datastore2, 1);
SerializableCallable registerListener = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
custRegion.getAttributesMutator().addCacheListener(new ListenerInvocationCounter());
@@ -3948,6 +4112,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
datastore2.invoke(registerListener);
datastore1.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
getCache().getCacheTransactionManager().begin();
@@ -3960,6 +4125,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
});
SerializableCallable getListenerCount = new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
ListenerInvocationCounter l =
@@ -3994,6 +4160,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
final CustId custId = new CustId(19);
datastore1.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE);
assertNull(refRegion.get(custId));
@@ -4003,6 +4170,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
datastore2.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE);
assertNull(refRegion.get(custId));
@@ -4011,6 +4179,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
}
});
datastore1.invoke(new SerializableCallable() {
+ @Override
public Object call() throws Exception {
try {
getCache().getCacheTransactionManager().commit();
@@ -4082,6 +4251,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
assertEquals(Status.STATUS_ACTIVE, tx.getStatus());
final CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread(new Runnable() {
+ @Override
public void run() {
Context ctx = getCache().getJNDIContext();
try {
@@ -4263,7 +4433,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
r.put("nonTXkey", "nonTXvalue");
getCache().getCacheTransactionManager().begin();
r.put("key", "newvalue");
- TXExpiryJUnitTest.waitForEntryExpiration(lr, "key");
+ waitForEntryExpiration(lr, "key");
} finally {
ExpiryTask.permitExpiration();
}
@@ -4496,4 +4666,18 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
vm1.invoke(verifyAssert);
}
+
+ private void waitForEntryExpiration(LocalRegion lr, String key) {
+ try {
+ ExpirationDetector detector;
+ do {
+ detector = new ExpirationDetector(lr.getEntryExpiryTask(key));
+ ExpiryTask.expiryTaskListener = detector;
+ ExpiryTask.permitExpiration();
+ detector.awaitExecuted(3000, MILLISECONDS);
+ } while (!detector.hasExpired() && detector.wasRescheduled());
+ } finally {
+ ExpiryTask.expiryTaskListener = null;
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
klund@apache.org.