You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2018/05/02 17:58:19 UTC

[geode] branch develop updated: GEODE-845: Rename and fixup TXExpirationIntegrationTest

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new af54b49  GEODE-845: Rename and fixup TXExpirationIntegrationTest
af54b49 is described below

commit af54b49b8f73514dc239fc59f2f15982f3cb3b4d
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Tue May 1 15:43:29 2018 -0700

    GEODE-845: Rename and fixup TXExpirationIntegrationTest
    
    * rename TXExpiryJUnitTest to TXExpirationIntegrationTest
    * fix flakiness in TXExpirationIntegrationTest
---
 .../geode/internal/cache/InternalRegion.java       |   4 +
 .../apache/geode/internal/cache/LocalRegion.java   |   2 +
 .../java/org/apache/geode/ExpirationDetector.java  |  86 +++++
 .../apache/geode/TXExpirationIntegrationTest.java  | 388 +++++++++++++++++++
 .../java/org/apache/geode/TXExpiryJUnitTest.java   | 424 ---------------------
 ...=> DistributedTXExpirationIntegrationTest.java} |  32 +-
 .../internal/cache/RemoteTransactionDUnitTest.java | 188 ++++++++-
 7 files changed, 674 insertions(+), 450 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index fb9fe1c..7633713 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -375,4 +375,8 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
   Object getIMSync();
 
   IndexManager setIndexManager(IndexManager idxMgr);
+
+  RegionTTLExpiryTask getRegionTTLExpiryTask();
+
+  RegionIdleExpiryTask getRegionIdleExpiryTask();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index bd909e1..88f6359 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -8094,6 +8094,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    * Used by unit tests to get access to the RegionIdleExpiryTask of this region. Returns null if no
    * task exists.
    */
+  @Override
   public RegionIdleExpiryTask getRegionIdleExpiryTask() {
     return this.regionIdleExpiryTask;
   }
@@ -8102,6 +8103,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
    * Used by unit tests to get access to the RegionTTLExpiryTask of this region. Returns null if no
    * task exists.
    */
+  @Override
   public RegionTTLExpiryTask getRegionTTLExpiryTask() {
     return this.regionTTLExpiryTask;
   }
diff --git a/geode-core/src/test/java/org/apache/geode/ExpirationDetector.java b/geode-core/src/test/java/org/apache/geode/ExpirationDetector.java
new file mode 100644
index 0000000..0eb926b
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/ExpirationDetector.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geode.internal.cache.ExpiryTask;
+import org.apache.geode.internal.cache.ExpiryTask.ExpiryTaskListener;
+
+/**
+ * Used to detect that a particular ExpiryTask has expired.
+ */
+public class ExpirationDetector implements ExpiryTaskListener {
+
+  private volatile boolean executed;
+  private volatile boolean expired;
+  private volatile boolean rescheduled;
+
+  private final ExpiryTask expiryTask;
+
+  public ExpirationDetector(ExpiryTask expiry) {
+    assertThat(expiry).isNotNull();
+    expiryTask = expiry;
+  }
+
+  @Override
+  public void afterCancel(ExpiryTask expiryTask) {
+    // nothing
+  }
+
+  @Override
+  public void afterSchedule(ExpiryTask expiryTask) {
+    // nothing
+  }
+
+  @Override
+  public void afterReschedule(ExpiryTask expiryTask) {
+    if (expiryTask == this.expiryTask) {
+      if (!hasExpired()) {
+        ExpiryTask.suspendExpiration();
+      }
+      rescheduled = true;
+    }
+  }
+
+  @Override
+  public void afterExpire(ExpiryTask expiryTask) {
+    if (expiryTask == this.expiryTask) {
+      expired = true;
+    }
+  }
+
+  @Override
+  public void afterTaskRan(ExpiryTask expiryTask) {
+    if (expiryTask == this.expiryTask) {
+      executed = true;
+    }
+  }
+
+  public void awaitExecuted(long timeout, TimeUnit unit) {
+    await().atMost(timeout, unit).until(() -> executed);
+  }
+
+  public boolean wasRescheduled() {
+    return rescheduled;
+  }
+
+  public boolean hasExpired() {
+    return expired;
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/TXExpirationIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/TXExpirationIntegrationTest.java
new file mode 100644
index 0000000..aacd6ae
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/TXExpirationIntegrationTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.cache.ExpirationAction.DESTROY;
+import static org.apache.geode.cache.ExpirationAction.INVALIDATE;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.internal.cache.ExpiryTask;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxy;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+/**
+ * Tests transaction expiration functionality
+ *
+ * @since GemFire 4.0
+ */
+@Category(IntegrationTest.class)
+@RunWith(JUnitParamsRunner.class)
+public class TXExpirationIntegrationTest {
+
+  private static final String REGION_NAME =
+      TXExpirationIntegrationTest.class.getSimpleName() + "_region";
+  private static final String KEY = "key";
+
+  private InternalCache cache;
+  private CacheTransactionManager transactionManager;
+
+  @Before
+  public void setUp() throws Exception {
+    cache = (InternalCache) new CacheFactory(getConfig()).create();
+    transactionManager = cache.getCacheTransactionManager();
+  }
+
+  protected Properties getConfig() {
+    Properties config = new Properties();
+    config.setProperty(LOCATORS, "");
+    config.setProperty(MCAST_PORT, "0");
+    return config;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      transactionManager.rollback();
+    } catch (IllegalStateException ignore) {
+    }
+    cache.close();
+
+    ExpiryTask.permitExpiration();
+  }
+
+  @Test
+  @Parameters({"ENTRY_IDLE_DESTROY", "ENTRY_IDLE_INVALIDATE", "ENTRY_TTL_DESTROY",
+      "ENTRY_TTL_INVALIDATE"})
+  @TestCaseName("{method}({params})")
+  public void entryExpirationDoesNotCauseConflict(ExpirationOperation operation) throws Exception {
+    InternalRegion region = createRegion(REGION_NAME);
+    AttributesMutator<String, String> mutator = region.getAttributesMutator();
+
+    KeyCacheListener keyCacheListener = spy(new KeyCacheListener());
+    mutator.addCacheListener(keyCacheListener);
+
+    ExpiryTask.suspendExpiration();
+    operation.mutate(mutator);
+
+    region.put(KEY, "value1");
+
+    transactionManager.begin();
+    region.put(KEY, "value2");
+    awaitEntryExpiration(region, KEY); // enables expiration
+    assertThat(region.getEntry(KEY).getValue()).isEqualTo("value2");
+
+    ExpiryTask.suspendExpiration();
+    transactionManager.commit();
+
+    assertThat(region.getEntry(KEY).getValue()).isEqualTo("value2");
+    awaitEntryExpiration(region, KEY); // enables expiration
+    operation.verifyInvoked(keyCacheListener);
+
+    operation.verifyState(region);
+  }
+
+  @Test
+  @Parameters({"ENTRY_IDLE_DESTROY", "ENTRY_IDLE_INVALIDATE", "ENTRY_TTL_DESTROY",
+      "ENTRY_TTL_INVALIDATE"})
+  @TestCaseName("{method}({params})")
+  public void entryExpirationContinuesAfterCommitConflict(ExpirationOperation operation)
+      throws Exception {
+    Region<String, String> region = createRegion(REGION_NAME);
+
+    AttributesMutator<String, String> mutator = region.getAttributesMutator();
+
+    KeyCacheListener keyCacheListener = spy(new KeyCacheListener());
+    mutator.addCacheListener(keyCacheListener);
+
+    ExpiryTask.suspendExpiration();
+    operation.mutate(mutator);
+
+    ExpiryTask.suspendExpiration();
+    region.put(KEY, "value1");
+
+    transactionManager.begin();
+    region.put(KEY, "value2");
+    awaitEntryExpiration(region, KEY); // enables expiration
+    assertThat(region.getEntry(KEY).getValue()).isEqualTo("value2");
+
+    ExpiryTask.suspendExpiration();
+
+    TXManagerImpl txManagerImpl = (TXManagerImpl) transactionManager;
+    TXStateProxy txStateProxy = txManagerImpl.pauseTransaction();
+    region.put(KEY, "conflict");
+    txManagerImpl.unpauseTransaction(txStateProxy);
+
+    assertThatThrownBy(() -> transactionManager.commit())
+        .isInstanceOf(CommitConflictException.class);
+
+    awaitEntryExpiration(region, KEY); // enables expiration
+    operation.verifyInvoked(keyCacheListener);
+    operation.verifyState(region);
+  }
+
+  @Test
+  @Parameters({"ENTRY_IDLE_DESTROY", "ENTRY_IDLE_INVALIDATE", "ENTRY_TTL_DESTROY",
+      "ENTRY_TTL_INVALIDATE"})
+  @TestCaseName("{method}({params})")
+  public void entryExpirationContinuesAfterRollback(ExpirationOperation operation)
+      throws Exception {
+    Region<String, String> region = createRegion(REGION_NAME);
+
+    AttributesMutator<String, String> mutator = region.getAttributesMutator();
+
+    KeyCacheListener keyCacheListener = spy(new KeyCacheListener());
+    mutator.addCacheListener(keyCacheListener);
+
+    ExpiryTask.suspendExpiration();
+    operation.mutate(mutator);
+
+    region.put(KEY, "value1");
+
+    transactionManager.begin();
+    region.put(KEY, "value2");
+    awaitEntryExpiration(region, KEY); // enables expiration
+    assertThat(region.getEntry(KEY).getValue()).isEqualTo("value2");
+
+    ExpiryTask.suspendExpiration();
+
+    transactionManager.rollback();
+
+    awaitEntryExpiration(region, KEY); // enables expiration
+    operation.verifyInvoked(keyCacheListener);
+    operation.verifyState(region);
+  }
+
+  @Test
+  @Parameters({"REGION_IDLE_DESTROY", "REGION_IDLE_INVALIDATE", "REGION_TTL_DESTROY",
+      "REGION_TTL_INVALIDATE"})
+  @TestCaseName("{method}({params})")
+  public void regionExpirationDoesNotCauseConflict(ExpirationOperation operation) throws Exception {
+    Region<String, String> region = createRegion(REGION_NAME);
+
+    KeyCacheListener keyCacheListener = spy(new KeyCacheListener());
+
+    AttributesMutator<String, String> mutator = region.getAttributesMutator();
+    mutator.addCacheListener(keyCacheListener);
+
+    ExpiryTask.suspendExpiration();
+    operation.mutate(mutator);
+
+    transactionManager.begin();
+    region.put(KEY, "value1");
+
+    awaitRegionExpiration(region); // enables expiration
+    assertThat(region.getEntry(KEY).getValue()).isEqualTo("value1");
+
+    ExpiryTask.suspendExpiration();
+    transactionManager.commit();
+
+    assertThat(region.getEntry(KEY).getValue()).isEqualTo("value1");
+
+    awaitRegionExpiration(region); // enables expiration
+    operation.verifyInvoked(keyCacheListener);
+    operation.verifyState(region);
+  }
+
+  private enum ExpirationOperation {
+    ENTRY_IDLE_DESTROY(m -> m.setEntryIdleTimeout(new ExpirationAttributes(1, DESTROY)),
+        s -> verify(s, atLeast(1)).afterDestroyKey(eq(KEY)),
+        r -> assertThat(r.containsKey(KEY)).isFalse()),
+    ENTRY_IDLE_INVALIDATE(m -> m.setEntryIdleTimeout(new ExpirationAttributes(1, INVALIDATE)),
+        s -> verify(s, atLeast(1)).afterInvalidateKey(eq(KEY)),
+        r -> assertThat(r.get(KEY)).isNull()),
+    ENTRY_TTL_DESTROY(m -> m.setEntryTimeToLive(new ExpirationAttributes(1, DESTROY)),
+        s -> verify(s, atLeast(1)).afterDestroyKey(eq(KEY)),
+        r -> assertThat(r.containsKey(KEY)).isFalse()),
+    ENTRY_TTL_INVALIDATE(m -> m.setEntryTimeToLive(new ExpirationAttributes(1, INVALIDATE)),
+        s -> verify(s, atLeast(1)).afterInvalidateKey(eq(KEY)),
+        r -> assertThat(r.get(KEY)).isNull()),
+    REGION_IDLE_DESTROY(m -> m.setRegionIdleTimeout(new ExpirationAttributes(1, DESTROY)),
+        s -> verify(s, atLeast(1)).afterRegionDestroyName(eq(REGION_NAME)),
+        r -> await().atMost(1, MINUTES).until(() -> r.isDestroyed())),
+    REGION_IDLE_INVALIDATE(m -> m.setRegionIdleTimeout(new ExpirationAttributes(1, INVALIDATE)),
+        s -> verify(s, atLeast(1)).afterRegionInvalidateName(eq(REGION_NAME)),
+        r -> await().atMost(1, MINUTES).until(() -> r.get(KEY) == null)),
+    REGION_TTL_DESTROY(m -> m.setRegionTimeToLive(new ExpirationAttributes(1, DESTROY)),
+        s -> verify(s, atLeast(1)).afterRegionDestroyName(eq(REGION_NAME)),
+        r -> await().atMost(1, MINUTES).until(() -> r.isDestroyed())),
+    REGION_TTL_INVALIDATE(m -> m.setRegionTimeToLive(new ExpirationAttributes(1, INVALIDATE)),
+        s -> verify(s, atLeast(1)).afterRegionInvalidateName(eq(REGION_NAME)),
+        r -> await().atMost(1, MINUTES).until(() -> r.get(KEY) == null));
+
+    private final Consumer<AttributesMutator> mutatorConsumer;
+    private final Consumer<KeyCacheListener> listenerConsumer;
+    private final Consumer<Region> regionConsumer;
+
+    ExpirationOperation(Consumer<AttributesMutator> mutatorConsumer,
+        Consumer<KeyCacheListener> listenerConsumer, Consumer<Region> regionConsumer) {
+      this.mutatorConsumer = mutatorConsumer;
+      this.listenerConsumer = listenerConsumer;
+      this.regionConsumer = regionConsumer;
+    }
+
+    void mutate(AttributesMutator mutator) {
+      mutatorConsumer.accept(mutator);
+    }
+
+    void verifyInvoked(KeyCacheListener keyCacheListener) {
+      listenerConsumer.accept(keyCacheListener);
+    }
+
+    public void verifyState(Region region) {
+      regionConsumer.accept(region);
+    }
+  }
+
+  private InternalRegion createRegion(String name) {
+    RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+    rf.setStatisticsEnabled(true);
+
+    System.setProperty(LocalRegion.EXPIRY_MS_PROPERTY, "true");
+    try {
+      return (InternalRegion) rf.create(name);
+    } finally {
+      System.clearProperty(LocalRegion.EXPIRY_MS_PROPERTY);
+    }
+  }
+
+  private void awaitEntryExpired(Region region, String key) {
+    await().atMost(1, MINUTES).until(() -> region.getEntry(key) == null || region.get(key) == null);
+  }
+
+  private void awaitEntryExpiration(Region region, String key) {
+    InternalRegion internalRegion = (InternalRegion) region;
+    try {
+      ExpirationDetector detector;
+      do {
+        detector = new ExpirationDetector(internalRegion.getEntryExpiryTask(key));
+        ExpiryTask.expiryTaskListener = detector;
+        ExpiryTask.permitExpiration();
+        detector.awaitExecuted(30, SECONDS);
+      } while (!detector.hasExpired() && detector.wasRescheduled());
+    } finally {
+      ExpiryTask.expiryTaskListener = null;
+    }
+  }
+
+  private void awaitRegionExpiration(Region region) {
+    InternalRegion internalRegion = (InternalRegion) region;
+    try {
+      ExpirationDetector detector;
+      do {
+        detector = new ExpirationDetector(internalRegion.getRegionTTLExpiryTask() != null
+            ? internalRegion.getRegionTTLExpiryTask() : internalRegion.getRegionIdleExpiryTask());
+        ExpiryTask.expiryTaskListener = detector;
+        ExpiryTask.permitExpiration();
+        detector.awaitExecuted(30, SECONDS);
+      } while (!detector.hasExpired() && detector.wasRescheduled());
+    } finally {
+      ExpiryTask.expiryTaskListener = null;
+    }
+  }
+
+  private static class KeyCacheListener extends CacheListenerAdapter<String, String> {
+
+    @Override
+    public void afterCreate(EntryEvent<String, String> event) {
+      afterCreateKey(event.getKey());
+    }
+
+    @Override
+    public void afterUpdate(EntryEvent<String, String> event) {
+      afterUpdateKey(event.getKey());
+    }
+
+    @Override
+    public void afterInvalidate(EntryEvent<String, String> event) {
+      afterInvalidateKey(event.getKey());
+    }
+
+    @Override
+    public void afterDestroy(EntryEvent<String, String> event) {
+      afterDestroyKey(event.getKey());
+    }
+
+    @Override
+    public void afterRegionInvalidate(RegionEvent<String, String> event) {
+      afterRegionInvalidateName(event.getRegion().getName());
+    }
+
+    @Override
+    public void afterRegionDestroy(RegionEvent<String, String> event) {
+      afterRegionDestroyName(event.getRegion().getName());
+    }
+
+    public void afterCreateKey(Object key) {
+      // nothing
+    }
+
+    public void afterUpdateKey(Object key) {
+      // nothing
+    }
+
+    public void afterInvalidateKey(Object key) {
+      // nothing
+    }
+
+    public void afterDestroyKey(Object key) {
+      // nothing
+    }
+
+    public void afterRegionInvalidateName(Object key) {
+      // nothing
+    }
+
+    public void afterRegionDestroyName(Object key) {
+      // nothing
+    }
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java b/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java
deleted file mode 100644
index def0076..0000000
--- a/geode-core/src/test/java/org/apache/geode/TXExpiryJUnitTest.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode;
-
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.junit.Assert.*;
-
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.*;
-import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.internal.cache.*;
-import org.apache.geode.internal.cache.ExpiryTask.ExpiryTaskListener;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.FlakyTest;
-import org.apache.geode.test.junit.categories.IntegrationTest;
-
-/**
- * Tests transaction expiration functionality
- *
- * @since GemFire 4.0
- */
-@Category(IntegrationTest.class)
-public class TXExpiryJUnitTest {
-
-  protected GemFireCacheImpl cache;
-  protected CacheTransactionManager txMgr;
-
-  protected void createCache() throws CacheException {
-    Properties p = new Properties();
-    p.setProperty(MCAST_PORT, "0"); // loner
-    this.cache = (GemFireCacheImpl) (new CacheFactory(p)).create();
-    this.txMgr = this.cache.getCacheTransactionManager();
-  }
-
-  private void closeCache() {
-    if (this.cache != null) {
-      if (this.txMgr != null) {
-        try {
-          this.txMgr.rollback();
-        } catch (IllegalStateException ignore) {
-        }
-      }
-      this.txMgr = null;
-      Cache c = this.cache;
-      this.cache = null;
-      c.close();
-    }
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    createCache();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    closeCache();
-  }
-
-  @Test
-  public void testEntryTTLExpiration() throws CacheException {
-    generalEntryExpirationTest(createRegion("TXEntryTTL"),
-        new ExpirationAttributes(1, ExpirationAction.DESTROY), true);
-  }
-
-  @Test
-  public void testEntryIdleExpiration() throws CacheException {
-    generalEntryExpirationTest(createRegion("TXEntryIdle"),
-        new ExpirationAttributes(1, ExpirationAction.DESTROY), false);
-  }
-
-  private Region<String, String> createRegion(String name) {
-    RegionFactory<String, String> rf = this.cache.createRegionFactory();
-    rf.setScope(Scope.DISTRIBUTED_NO_ACK);
-    rf.setStatisticsEnabled(true);
-    System.setProperty(LocalRegion.EXPIRY_MS_PROPERTY, "true");
-    try {
-      return rf.create(name);
-    } finally {
-      System.getProperties().remove(LocalRegion.EXPIRY_MS_PROPERTY);
-    }
-  }
-
-  public void generalEntryExpirationTest(final Region<String, String> exprReg,
-      ExpirationAttributes exprAtt, boolean useTTL) throws CacheException {
-    final LocalRegion lr = (LocalRegion) exprReg;
-    final boolean wasDestroyed[] = {false};
-    AttributesMutator<String, String> mutator = exprReg.getAttributesMutator();
-    final AtomicInteger ac = new AtomicInteger();
-    final AtomicInteger au = new AtomicInteger();
-    final AtomicInteger ai = new AtomicInteger();
-    final AtomicInteger ad = new AtomicInteger();
-
-    if (useTTL) {
-      mutator.setEntryTimeToLive(exprAtt);
-    } else {
-      mutator.setEntryIdleTimeout(exprAtt);
-    }
-    final CacheListener<String, String> cl = new CacheListenerAdapter<String, String>() {
-      public void afterCreate(EntryEvent<String, String> e) {
-        ac.incrementAndGet();
-      }
-
-      public void afterUpdate(EntryEvent<String, String> e) {
-        au.incrementAndGet();
-      }
-
-      public void afterInvalidate(EntryEvent<String, String> e) {
-        ai.incrementAndGet();
-      }
-
-      public void afterDestroy(EntryEvent<String, String> e) {
-        ad.incrementAndGet();
-        if (e.getKey().equals("key0")) {
-          synchronized (wasDestroyed) {
-            wasDestroyed[0] = true;
-            wasDestroyed.notifyAll();
-          }
-        }
-      }
-
-      public void afterRegionInvalidate(RegionEvent<String, String> event) {
-        fail("Unexpected invocation of afterRegionInvalidate");
-      }
-
-      public void afterRegionDestroy(RegionEvent<String, String> event) {
-        if (!event.getOperation().isClose()) {
-          fail("Unexpected invocation of afterRegionDestroy");
-        }
-      }
-    };
-    mutator.addCacheListener(cl);
-    try {
-
-      ExpiryTask.suspendExpiration();
-      // Test to ensure an expiration does not cause a conflict
-      for (int i = 0; i < 2; i++) {
-        exprReg.put("key" + i, "value" + i);
-      }
-      this.txMgr.begin();
-      exprReg.put("key0", "value");
-      waitForEntryExpiration(lr, "key0");
-      assertEquals("value", exprReg.getEntry("key0").getValue());
-      try {
-        ExpiryTask.suspendExpiration();
-        this.txMgr.commit();
-      } catch (CommitConflictException error) {
-        fail("Expiration should not cause commit to fail");
-      }
-      assertEquals("value", exprReg.getEntry("key0").getValue());
-      waitForEntryExpiration(lr, "key0");
-      synchronized (wasDestroyed) {
-        assertEquals(true, wasDestroyed[0]);
-      }
-      assertTrue(!exprReg.containsKey("key0"));
-      // key1 is the canary for the rest of the entries
-      waitForEntryToBeDestroyed(exprReg, "key1");
-
-      // rollback and failed commit test, ensure expiration continues
-      for (int j = 0; j < 2; j++) {
-        synchronized (wasDestroyed) {
-          wasDestroyed[0] = false;
-        }
-        ExpiryTask.suspendExpiration();
-        for (int i = 0; i < 2; i++) {
-          exprReg.put("key" + i, "value" + i);
-        }
-        this.txMgr.begin();
-        exprReg.put("key0", "value");
-        waitForEntryExpiration(lr, "key0");
-        assertEquals("value", exprReg.getEntry("key0").getValue());
-        String checkVal;
-        ExpiryTask.suspendExpiration();
-        if (j == 0) {
-          checkVal = "value0";
-          this.txMgr.rollback();
-        } else {
-          checkVal = "conflictVal";
-          final TXManagerImpl txMgrImpl = (TXManagerImpl) this.txMgr;
-          TXStateProxy tx = txMgrImpl.pauseTransaction();
-          exprReg.put("key0", checkVal);
-          txMgrImpl.unpauseTransaction(tx);
-          try {
-            this.txMgr.commit();
-            fail("Expected CommitConflictException!");
-          } catch (CommitConflictException expected) {
-          }
-        }
-        waitForEntryExpiration(lr, "key0");
-        synchronized (wasDestroyed) {
-          assertEquals(true, wasDestroyed[0]);
-        }
-        assertTrue(!exprReg.containsKey("key0"));
-        // key1 is the canary for the rest of the entries
-        waitForEntryToBeDestroyed(exprReg, "key1");
-      }
-    } finally {
-      mutator.removeCacheListener(cl);
-      ExpiryTask.permitExpiration();
-    }
-  }
-
-  private void waitForEntryToBeDestroyed(final Region r, final String key) {
-    WaitCriterion waitForExpire = new WaitCriterion() {
-      public boolean done() {
-        return r.getEntry(key) == null;
-      }
-
-      public String description() {
-        return "never saw entry destroy of " + key;
-      }
-    };
-    Wait.waitForCriterion(waitForExpire, 3000, 10, true);
-  }
-
-  public static void waitForEntryExpiration(LocalRegion lr, String key) {
-    try {
-      ExpirationDetector detector;
-      do {
-        detector = new ExpirationDetector(lr.getEntryExpiryTask(key));
-        ExpiryTask.expiryTaskListener = detector;
-        ExpiryTask.permitExpiration();
-        Wait.waitForCriterion(detector, 3000, 2, true);
-      } while (!detector.hasExpired() && detector.wasRescheduled());
-    } finally {
-      ExpiryTask.expiryTaskListener = null;
-    }
-  }
-
-  private void waitForRegionExpiration(LocalRegion lr, boolean ttl) {
-    try {
-      ExpirationDetector detector;
-      do {
-        detector = new ExpirationDetector(
-            ttl ? lr.getRegionTTLExpiryTask() : lr.getRegionIdleExpiryTask());
-        ExpiryTask.expiryTaskListener = detector;
-        ExpiryTask.permitExpiration();
-        Wait.waitForCriterion(detector, 3000, 2, true);
-      } while (!detector.hasExpired() && detector.wasRescheduled());
-    } finally {
-      ExpiryTask.expiryTaskListener = null;
-    }
-  }
-
-
-  /**
-   * Used to detect that a particular ExpiryTask has expired.
-   */
-  public static class ExpirationDetector implements ExpiryTaskListener, WaitCriterion {
-    private volatile boolean ran = false;
-    private volatile boolean expired = false;
-    private volatile boolean rescheduled = false;
-    public final ExpiryTask et;
-
-    public ExpirationDetector(ExpiryTask et) {
-      assertNotNull(et);
-      this.et = et;
-    }
-
-    @Override
-    public void afterCancel(ExpiryTask et) {}
-
-    @Override
-    public void afterSchedule(ExpiryTask et) {}
-
-    @Override
-    public void afterReschedule(ExpiryTask et) {
-      if (et == this.et) {
-        if (!hasExpired()) {
-          ExpiryTask.suspendExpiration();
-        }
-        this.rescheduled = true;
-      }
-    }
-
-    @Override
-    public void afterExpire(ExpiryTask et) {
-      if (et == this.et) {
-        this.expired = true;
-      }
-    }
-
-    @Override
-    public void afterTaskRan(ExpiryTask et) {
-      if (et == this.et) {
-        this.ran = true;
-      }
-    }
-
-    @Override
-    public boolean done() {
-      return this.ran;
-    }
-
-    @Override
-    public String description() {
-      return "the expiry task " + this.et + " never ran";
-    }
-
-    public boolean wasRescheduled() {
-      return this.rescheduled;
-    }
-
-    public boolean hasExpired() {
-      return this.expired;
-    }
-  }
-
-  @Category(FlakyTest.class) // GEODE-845: time sensitive, expiration, eats exceptions (1 fixed),
-                             // waitForCriterion, 3 second timeout
-  @Test
-  public void testRegionIdleExpiration() throws CacheException {
-    Region<String, String> exprReg = createRegion("TXRegionIdle");
-    generalRegionExpirationTest(exprReg, new ExpirationAttributes(1, ExpirationAction.INVALIDATE),
-        false);
-    generalRegionExpirationTest(exprReg, new ExpirationAttributes(1, ExpirationAction.DESTROY),
-        false);
-  }
-
-  @Test
-  public void testRegionTTLExpiration() throws CacheException {
-    Region<String, String> exprReg = createRegion("TXRegionTTL");
-    generalRegionExpirationTest(exprReg, new ExpirationAttributes(1, ExpirationAction.INVALIDATE),
-        true);
-    generalRegionExpirationTest(exprReg, new ExpirationAttributes(1, ExpirationAction.DESTROY),
-        true);
-  }
-
-  private void generalRegionExpirationTest(final Region<String, String> exprReg,
-      ExpirationAttributes exprAtt, boolean useTTL) throws CacheException {
-    final LocalRegion lr = (LocalRegion) exprReg;
-    final ExpirationAction action = exprAtt.getAction();
-    final boolean regionExpiry[] = {false};
-    AttributesMutator<String, String> mutator = exprReg.getAttributesMutator();
-    final CacheListener<String, String> cl = new CacheListenerAdapter<String, String>() {
-      public void afterRegionInvalidate(RegionEvent<String, String> event) {
-        synchronized (regionExpiry) {
-          regionExpiry[0] = true;
-          regionExpiry.notifyAll();
-        }
-      }
-
-      public void afterRegionDestroy(RegionEvent<String, String> event) {
-        if (!event.getOperation().isClose()) {
-          synchronized (regionExpiry) {
-            regionExpiry[0] = true;
-            regionExpiry.notifyAll();
-          }
-        }
-      }
-    };
-    mutator.addCacheListener(cl);
-    // Suspend before enabling region expiration to prevent
-    // it from happening before we do the put.
-    ExpiryTask.suspendExpiration();
-    try {
-      if (useTTL) {
-        mutator.setRegionTimeToLive(exprAtt);
-      } else {
-        mutator.setRegionIdleTimeout(exprAtt);
-      }
-
-      // Create some keys and age them, I wish we could fake/force the age
-      // instead of having to actually wait
-      for (int i = 0; i < 2; i++) {
-        exprReg.put("key" + i, "value" + i);
-      }
-
-      String regName = exprReg.getName();
-      // Test to ensure a region expiration does not cause a conflict
-      this.txMgr.begin();
-      exprReg.put("key0", "value");
-      waitForRegionExpiration(lr, useTTL);
-      assertEquals("value", exprReg.getEntry("key0").getValue());
-      try {
-        ExpiryTask.suspendExpiration();
-        this.txMgr.commit();
-      } catch (CommitConflictException error) {
-        Assert.fail("Expiration should not cause commit to fail", error);
-      }
-      assertEquals("value", exprReg.getEntry("key0").getValue());
-      waitForRegionExpiration(lr, useTTL);
-      synchronized (regionExpiry) {
-        assertEquals(true, regionExpiry[0]);
-      }
-      if (action == ExpirationAction.DESTROY) {
-        assertNull("listener saw Region expiration, expected a destroy operation!",
-            this.cache.getRegion(regName));
-      } else {
-        assertTrue("listener saw Region expiration, expected invalidation",
-            !exprReg.containsValueForKey("key0"));
-      }
-
-    } finally {
-      if (!exprReg.isDestroyed()) {
-        mutator.removeCacheListener(cl);
-      }
-      ExpiryTask.permitExpiration();
-    }
-
-    // @todo mitch test rollback and failed expiration
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/DistTXExpiryJUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/DistributedTXExpirationIntegrationTest.java
similarity index 51%
rename from geode-core/src/test/java/org/apache/geode/disttx/DistTXExpiryJUnitTest.java
rename to geode-core/src/test/java/org/apache/geode/disttx/DistributedTXExpirationIntegrationTest.java
index 2948c3d..67c3ee6 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/DistTXExpiryJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/DistributedTXExpirationIntegrationTest.java
@@ -14,42 +14,26 @@
  */
 package org.apache.geode.disttx;
 
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_TRANSACTIONS;
 
 import java.util.Properties;
 
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.TXExpiryJUnitTest;
-import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.CacheException;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Scope;
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.TXExpirationIntegrationTest;
 import org.apache.geode.test.junit.categories.DistributedTransactionsTest;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
 /**
- * Same tests as that of {@link TXExpiryJUnitTest} after setting "distributed-transactions" property
- * to true
+ * Extends {@link TXExpirationIntegrationTest} with "distributed-transactions" enabled.
  */
 @Category({IntegrationTest.class, DistributedTransactionsTest.class})
-public class DistTXExpiryJUnitTest extends TXExpiryJUnitTest {
-
-  public DistTXExpiryJUnitTest() {}
+public class DistributedTXExpirationIntegrationTest extends TXExpirationIntegrationTest {
 
   @Override
-  protected void createCache() throws CacheException {
-    Properties p = new Properties();
-    p.setProperty(MCAST_PORT, "0"); // loner
-    p.setProperty(ConfigurationProperties.DISTRIBUTED_TRANSACTIONS, "true");
-    this.cache = (GemFireCacheImpl) CacheFactory.create(DistributedSystem.connect(p));
-    AttributesFactory af = new AttributesFactory();
-    af.setScope(Scope.DISTRIBUTED_NO_ACK);
-    this.txMgr = this.cache.getCacheTransactionManager();
-    assert (this.txMgr.isDistributed());
+  protected Properties getConfig() {
+    Properties config = super.getConfig();
+    config.setProperty(DISTRIBUTED_TRANSACTIONS, "true");
+    return config;
   }
-
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
index 1422fab..5757746 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/RemoteTransactionDUnitTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
 import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
 import static org.junit.Assert.assertEquals;
@@ -44,7 +45,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.TXExpiryJUnitTest;
+import org.apache.geode.ExpirationDetector;
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.AttributesMutator;
 import org.apache.geode.cache.CacheEvent;
@@ -125,12 +126,14 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
   protected final String D_REFERENCE = "distrReference";
 
   private final SerializableCallable getNumberOfTXInProgress = new SerializableCallable() {
+    @Override
     public Object call() throws Exception {
       TXManagerImpl mgr = getGemfireCache().getTxManager();
       return mgr.hostedTransactionsInProgressForTest();
     }
   };
   private final SerializableCallable verifyNoTxState = new SerializableCallable() {
+    @Override
     public Object call() throws Exception {
       // TXManagerImpl mgr = getGemfireCache().getTxManager();
       // assertIndexDetailsEquals(0, mgr.hostedTransactionsInProgressForTest());
@@ -218,6 +221,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
   protected void initAccessorAndDataStore(VM accessor, VM datastore, final int redundantCopies) {
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         createRegion(true/* accessor */, redundantCopies, null);
         return null;
@@ -225,6 +229,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         createRegion(false/* accessor */, redundantCopies, null);
         populateData();
@@ -236,6 +241,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
   protected void initAccessorAndDataStore(VM accessor, VM datastore1, VM datastore2,
       final int redundantCopies) {
     datastore2.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         createRegion(false/* accessor */, redundantCopies, null);
         return null;
@@ -248,6 +254,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
   private void initAccessorAndDataStoreWithInterestPolicy(VM accessor, VM datastore1, VM datastore2,
       final int redundantCopies) {
     datastore2.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         createRegion(false/* accessor */, redundantCopies, InterestPolicy.ALL);
         return null;
@@ -269,6 +276,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       this.op = op;
     }
 
+    @Override
     public Object call() throws Exception {
       CacheTransactionManager mgr = getGemfireCache().getTxManager();
       LogWriterUtils.getLogWriter().fine("testTXPut starting tx");
@@ -563,6 +571,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.PUT));
 
     datastore.invoke(new SerializableCallable("verify tx") {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         assertTrue(mgr.isHostedTxInProgress(txId));
@@ -571,6 +580,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         TXStateProxy tx = mgr.pauseTransaction();
@@ -586,6 +596,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         assertFalse(mgr.isHostedTxInProgress(txId));
@@ -594,6 +605,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
     if (commit) {
       accessor.invoke(new SerializableCallable() {
+        @Override
         public Object call() throws Exception {
           verifyAfterCommit(OP.PUT);
           return null;
@@ -601,6 +613,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       });
     } else {
       accessor.invoke(new SerializableCallable() {
+        @Override
         public Object call() throws Exception {
           verifyAfterRollback(OP.PUT);
           return null;
@@ -619,6 +632,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.GET));
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         assertTrue(mgr.isHostedTxInProgress(txId));
@@ -641,6 +655,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         CacheTransactionManager mgr = getGemfireCache().getTxManager();
         mgr.commit();
@@ -659,13 +674,16 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         AttributesMutator am = getCache().getRegion(CUSTOMER).getAttributesMutator();
         am.setCacheLoader(new CacheLoader() {
+          @Override
           public Object load(LoaderHelper helper) throws CacheLoaderException {
             return new Customer("sup dawg", "add");
           }
 
+          @Override
           public void close() {}
         });
         return null;
@@ -673,6 +691,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         CacheTransactionManager mgr = getGemfireCache().getTxManager();
         mgr.begin();
@@ -705,12 +724,14 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         return null;
       }
     });
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region cust = getCache().getRegion(CUSTOMER);
         CustId sup = new CustId(7);
@@ -758,13 +779,16 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         AttributesMutator am = getCache().getRegion(CUSTOMER).getAttributesMutator();
         am.setCacheLoader(new CacheLoader() {
+          @Override
           public Object load(LoaderHelper helper) throws CacheLoaderException {
             return new Customer("sup dawg", "addr");
           }
 
+          @Override
           public void close() {}
         });
         CacheTransactionManager mgr = getGemfireCache().getTxManager();
@@ -782,6 +806,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         return null;
       }
@@ -802,6 +827,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.PUT));
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         assertTrue(mgr.isHostedTxInProgress(txId));
@@ -822,6 +848,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         assertNotNull(mgr.getTXState());
@@ -846,6 +873,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.INVALIDATE));
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         assertTrue(mgr.isHostedTxInProgress(txId));
@@ -866,6 +894,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         CacheTransactionManager mgr = getGemfireCache().getTxManager();
         mgr.commit();
@@ -888,6 +917,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final TXId txId = (TXId) accessor.invoke(new DoOpsInTX(OP.DESTROY));
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         assertTrue(mgr.isHostedTxInProgress(txId));
@@ -908,6 +938,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         CacheTransactionManager mgr = getGemfireCache().getTxManager();
         mgr.commit();
@@ -928,6 +959,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final CustId newCustId = new CustId(10);
     final Customer updateCust = new Customer("customer10", "address10");
     final TXId txId = (TXId) accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         mgr.begin();
@@ -963,6 +995,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region cust = getGemfireCache().getRegion(CUSTOMER);
         int hash1 = PartitionedRegionHelper.getHashKey((PartitionedRegion) cust, new CustId(1));
@@ -987,6 +1020,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         mgr.commit();
@@ -1026,6 +1060,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
     final CustId custId = new CustId(1);
     final TXId txId = (TXId) accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER);
         Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE);
@@ -1045,6 +1080,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         assertTrue(mgr.isHostedTxInProgress(txId));
@@ -1065,6 +1101,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         mgr.commit();
@@ -1103,6 +1140,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final CustId custId2 = new CustId(2);
     final CustId custId20 = new CustId(20);
     final TXId txId = (TXId) accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER);
         Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE);
@@ -1121,6 +1159,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         assertTrue(mgr.isHostedTxInProgress(txId));
@@ -1143,6 +1182,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         mgr.commit();
@@ -1194,6 +1234,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     VM datastore1 = host.getVM(1);
     VM datastore2 = host.getVM(3);
     datastore2.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         createRegion(false/* accessor */, 0, null);
         return null;
@@ -1209,6 +1250,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final CustId custId4 = new CustId(4);
     final CustId custId20 = new CustId(20);
     final TXId txId = (TXId) accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER);
         Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE);
@@ -1242,6 +1284,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
     // Create a second data store.
     datastore2.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         createRegion(false/* accessor */, 1, null);
         return null;
@@ -1256,6 +1299,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final CustId custId4 = new CustId(4);
     final CustId custId20 = new CustId(20);
     final TXId txId = (TXId) accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER);
         Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE);
@@ -1270,6 +1314,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     SerializableCallable checkArtifacts = new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         PartitionedRegion cust = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
         assertNull(cust.get(custId0));
@@ -1292,6 +1337,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final CustId custId = new CustId(1);
     final Customer updatedCust = new Customer("updated", "updated");
     final TXId txId = (TXId) accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> cust = getGemfireCache().getRegion(CUSTOMER);
         Region<CustId, Customer> ref = getGemfireCache().getRegion(D_REFERENCE);
@@ -1311,6 +1357,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         assertTrue(mgr.isHostedTxInProgress(txId));
@@ -1331,6 +1378,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         mgr.commit();
@@ -1370,6 +1418,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     VM datastore2 = host.getVM(2);
 
     datastore2.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         createRegion(false, 1, null);
         return null;
@@ -1379,6 +1428,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     initAccessorAndDataStore(accessor, datastore1, 1);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
         Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
@@ -1443,6 +1493,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     VM accessor = getVMForTransactions(acc, datastore);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region ref = getCache().getRegion(D_REFERENCE);
         ref.getAttributesMutator().addCacheListener(new TestCacheListener(true));
@@ -1462,6 +1513,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     SerializableCallable addListenersToDataStore = new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region ref = getCache().getRegion(D_REFERENCE);
         ref.getAttributesMutator().addCacheListener(new TestCacheListener(false));
@@ -1487,6 +1539,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     if (op != OP.INVALIDATE) {
       // Ensure the cache writer was not fired in accessor
       accessor.invoke(new SerializableCallable() {
+        @Override
         public Object call() throws Exception {
           Region cust = getCache().getRegion(CUSTOMER);
           assertFalse(((TestCacheWriter) cust.getAttributes().getCacheWriter()).wasFired);
@@ -1500,6 +1553,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
       // Ensure the cache writer was fired in the primary
       datastore.invoke(new SerializableCallable() {
+        @Override
         public Object call() throws Exception {
           Region cust = getCache().getRegion(CUSTOMER);
           assertTrue(((TestCacheWriter) cust.getAttributes().getCacheWriter()).wasFired);
@@ -1513,6 +1567,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     }
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         CacheTransactionManager mgr = getGemfireCache().getTxManager();
         mgr.commit();
@@ -1521,6 +1576,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TestTxListener l = (TestTxListener) getGemfireCache().getTxManager().getListener();
         assertTrue(l.isListenerInvoked());
@@ -1528,6 +1584,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     SerializableCallable verifyListeners = new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region cust = getCache().getRegion(CUSTOMER);
         Region order = getCache().getRegion(ORDER);
@@ -1614,34 +1671,44 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       this.isAccessor = isAccessor;
     }
 
+    @Override
     public void afterCreate(EntryEvent event) {
       verifyOrigin(event);
       verifyPutAll(event);
     }
 
+    @Override
     public void afterUpdate(EntryEvent event) {
       verifyOrigin(event);
       verifyPutAll(event);
     }
 
+    @Override
     public void afterDestroy(EntryEvent event) {
       verifyOrigin(event);
     }
 
+    @Override
     public void afterInvalidate(EntryEvent event) {
       verifyOrigin(event);
     }
 
+    @Override
     public void afterRegionClear(RegionEvent event) {}
 
+    @Override
     public void afterRegionCreate(RegionEvent event) {}
 
+    @Override
     public void afterRegionDestroy(RegionEvent event) {}
 
+    @Override
     public void afterRegionInvalidate(RegionEvent event) {}
 
+    @Override
     public void afterRegionLive(RegionEvent event) {}
 
+    @Override
     public void close() {}
   }
 
@@ -1653,6 +1720,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       this.isAccessor = isAccessor;
     }
 
+    @Override
     public void beforeCreate(EntryEvent event) throws CacheWriterException {
       getGemfireCache().getLogger()
           .info("SWAP:beforeCreate:" + event + " op:" + event.getOperation());
@@ -1665,6 +1733,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       wasFired = true;
     }
 
+    @Override
     public void beforeUpdate(EntryEvent event) throws CacheWriterException {
       getGemfireCache().getLogger()
           .info("SWAP:beforeCreate:" + event + " op:" + event.getOperation());
@@ -1673,19 +1742,23 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       setFired(event);
     }
 
+    @Override
     public void beforeDestroy(EntryEvent event) throws CacheWriterException {
       verifyOrigin(event);
       setFired(event);
     }
 
+    @Override
     public void beforeRegionClear(RegionEvent event) throws CacheWriterException {
       setFired(null);
     }
 
+    @Override
     public void beforeRegionDestroy(RegionEvent event) throws CacheWriterException {
       setFired(null);
     }
 
+    @Override
     public void close() {}
   }
 
@@ -1733,15 +1806,18 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       this.isAccessor = isAccessor;
     }
 
+    @Override
     public void afterCommit(TransactionEvent event) {
       listenerInvoked = true;
       verify(event);
     }
 
+    @Override
     public void afterFailedCommit(TransactionEvent event) {
       verify(event);
     }
 
+    @Override
     public void afterRollback(TransactionEvent event) {
       listenerInvoked = true;
       verify(event);
@@ -1751,6 +1827,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       return this.listenerInvoked;
     }
 
+    @Override
     public void close() {}
   }
 
@@ -1759,10 +1836,12 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       this.isAccessor = isAccessor;
     }
 
+    @Override
     public void beforeCommit(TransactionEvent event) {
       verify(event);
     }
 
+    @Override
     public void close() {}
   }
 
@@ -1775,10 +1854,13 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     VM accessor = getVMForTransactions(acc, datastore);
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         getGemfireCache().getTxManager().setWriter(new TransactionWriter() {
+          @Override
           public void close() {}
 
+          @Override
           public void beforeCommit(TransactionEvent event) throws TransactionWriterException {
             throw new TransactionWriterException("AssertionError");
           }
@@ -1788,6 +1870,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         getGemfireCache().getTxManager().begin();
         Region r = getCache().getRegion(CUSTOMER);
@@ -1823,6 +1906,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     VM taskVM = isAccessor ? accessor : datastore1;
 
     taskVM.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getCache().getRegion(CUSTOMER);
         TXManagerImpl mgr = getGemfireCache().getTxManager();
@@ -1836,6 +1920,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     datastore2.invoke(verifyNoTxState);
 
     taskVM.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getCache().getRegion(CUSTOMER);
         Region orderRegion = getCache().getRegion(ORDER);
@@ -1870,6 +1955,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     assertEquals(1, txOnDatastore1 + txOnDatastore2);
 
     taskVM.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         CacheTransactionManager mgr = getGemfireCache().getTxManager();
         mgr.commit();
@@ -1955,6 +2041,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     initAccessorAndDataStore(accessor, datastore1, datastore2, redundancy);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getCache().getRegion(CUSTOMER);
         Set originalSet;
@@ -1997,6 +2084,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     datastore2.invoke(verifyNoTxState);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getCache().getRegion(CUSTOMER);
         Region orderRegion = getCache().getRegion(ORDER);
@@ -2062,6 +2150,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     assertEquals(1, txOnDatastore1 + txOnDatastore2);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         CacheTransactionManager mgr = getGemfireCache().getTxManager();
         mgr.commit();
@@ -2079,6 +2168,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       int originalSetSize;
       int expectedSetSize;
 
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTXMgr();
         custRegion = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
@@ -2195,6 +2285,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getGemfireCache().getRegion(CUSTOMER);
         Region rr = getGemfireCache().getRegion(D_REFERENCE);
@@ -2245,6 +2336,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getGemfireCache().getRegion(CUSTOMER);
         Region rr = getGemfireCache().getRegion(D_REFERENCE);
@@ -2295,6 +2387,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getGemfireCache().getRegion(CUSTOMER);
         Region rr = getGemfireCache().getRegion(D_REFERENCE);
@@ -2345,6 +2438,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
 
     SerializableCallable doIllegalIteration = new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region r = getGemfireCache().getRegion(CUSTOMER);
         Set keySet = r.keySet();
@@ -2446,6 +2540,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
   class TXFunction implements Function {
     static final String id = "TXFunction";
 
+    @Override
     public void execute(FunctionContext context) {
       Region r = null;
       r = getGemfireCache().getRegion(CUSTOMER);
@@ -2455,18 +2550,22 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       context.getResultSender().lastResult(Boolean.TRUE);
     }
 
+    @Override
     public String getId() {
       return id;
     }
 
+    @Override
     public boolean hasResult() {
       return true;
     }
 
+    @Override
     public boolean optimizeForWrite() {
       return true;
     }
 
+    @Override
     public boolean isHA() {
       return false;
     }
@@ -2494,6 +2593,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
 
     SerializableCallable registerFunction = new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         FunctionService.registerFunction(new TXFunction());
         return null;
@@ -2505,6 +2605,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     datastore2.invoke(registerFunction);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         PartitionedRegion custRegion = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
         TXManagerImpl mgr = getGemfireCache().getTXMgr();
@@ -2555,6 +2656,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     assertEquals(1, txOnDatastore1 + txOnDatastore2);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         CacheTransactionManager mgr = getGemfireCache().getTXMgr();
         mgr.commit();
@@ -2563,6 +2665,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     datastore1.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getGemfireCache().getRegion(CUSTOMER);
         assertEquals(expectedCustomer, custRegion.get(expectedCustId));
@@ -2571,6 +2674,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTXMgr();
         mgr.begin();
@@ -2665,6 +2769,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
@@ -2694,6 +2799,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
         this.isAccessor = isAccessor;
       }
 
+      @Override
       public Object call() throws Exception {
         AttributesFactory af = new AttributesFactory();
         af.setDataPolicy(isAccessor ? DataPolicy.EMPTY : DataPolicy.REPLICATE);
@@ -2717,6 +2823,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     accessor.invoke(new CreateDR(true));
 
     SerializableCallable registerFunction = new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         FunctionService.registerFunction(new TXFunction());
         return null;
@@ -2728,6 +2835,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     datastore2.invoke(registerFunction);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getGemfireCache().getRegion(CUSTOMER);
         TXManagerImpl mgr = getGemfireCache().getTXMgr();
@@ -2747,6 +2855,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     datastore1.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         final Region custRegion = getGemfireCache().getRegion(CUSTOMER);
         TXManagerImpl mgr = getGemfireCache().getTXMgr();
@@ -2783,6 +2892,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
 
     SerializableCallable registerFunction = new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         FunctionService.registerFunction(new TXFunction());
         return null;
@@ -2794,6 +2904,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     datastore2.invoke(registerFunction);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getGemfireCache().getRegion(CUSTOMER);
         TXManagerImpl mgr = getGemfireCache().getTXMgr();
@@ -2819,6 +2930,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     assertEquals(1, txOnDatastore1 + txOnDatastore2);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getGemfireCache().getRegion(CUSTOMER);
         CacheTransactionManager mgr = getGemfireCache().getTXMgr();
@@ -2830,6 +2942,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
     // test onMembers
     SerializableCallable getMember = new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         return getGemfireCache().getMyId();
       }
@@ -2837,6 +2950,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final InternalDistributedMember ds1 = (InternalDistributedMember) datastore1.invoke(getMember);
     final InternalDistributedMember ds2 = (InternalDistributedMember) datastore2.invoke(getMember);
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
         // get owner for expectedKey
@@ -2870,6 +2984,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     assertEquals(1, txOnDatastore1_1 + txOnDatastore2_1);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getGemfireCache().getRegion(CUSTOMER);
         CacheTransactionManager mgr = getGemfireCache().getTXMgr();
@@ -2881,6 +2996,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
     // test function execution on data store
     final DistributedMember owner = (DistributedMember) accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
         return pr.getOwnerForKey(pr.getKeyInfo(expectedCustId));
@@ -2888,6 +3004,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     SerializableCallable testFnOnDs = new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTXMgr();
         PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
@@ -2905,6 +3022,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     };
     SerializableCallable closeTx = new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getGemfireCache().getRegion(CUSTOMER);
         CacheTransactionManager mgr = getGemfireCache().getTXMgr();
@@ -2933,6 +3051,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
     // test that function is rejected if function target is not same as txState target
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         CacheTransactionManager mgr = getGemfireCache().getTXMgr();
         PartitionedRegion pr = (PartitionedRegion) getGemfireCache().getRegion(CUSTOMER);
@@ -3008,6 +3127,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     VM accessor = getVMForTransactions(acc, datastore);
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         getGemfireCache().getTxManager().addListener(new TestTxListener(false));
         return null;
@@ -3016,6 +3136,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final CustId expectedCustId = new CustId(6);
     final Customer expectedCustomer = new Customer("customer6", "address6");
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         getGemfireCache().getTxManager().addListener(new TestTxListener(true));
         Region custRegion = getCache().getRegion(CUSTOMER);
@@ -3030,6 +3151,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getCache().getRegion(CUSTOMER);
         assertNull(custRegion.get(expectedCustId));
@@ -3037,6 +3159,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region custRegion = getCache().getRegion(CUSTOMER);
         Context ctx = getCache().getJNDIContext();
@@ -3052,6 +3175,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TestTxListener l = (TestTxListener) getGemfireCache().getTXMgr().getListener();
         assertTrue(l.isListenerInvoked());
@@ -3073,6 +3197,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       int fireD = 0;
       int fireU = 0;
 
+      @Override
       public void beforeCreate(EntryEvent event) throws CacheWriterException {
         if (!event.isOriginRemote()) {
           throw new CacheWriterException("SUP?? This CREATE is supposed to be isOriginRemote");
@@ -3080,6 +3205,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
         fireC++;
       }
 
+      @Override
       public void beforeDestroy(EntryEvent event) throws CacheWriterException {
         getGemfireCache().getLoggerI18n().fine("SWAP:writer:createEvent:" + event);
         if (!event.isOriginRemote()) {
@@ -3088,6 +3214,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
         fireD++;
       }
 
+      @Override
       public void beforeUpdate(EntryEvent event) throws CacheWriterException {
         if (!event.isOriginRemote()) {
           throw new CacheWriterException("SUP?? This UPDATE is supposed to be isOriginRemote");
@@ -3098,6 +3225,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
     datastore.invoke(new SerializableCallable() {
 
+      @Override
       public Object call() throws Exception {
         Region refRegion = getCache().getRegion(D_REFERENCE);
         refRegion.getAttributesMutator().setCacheWriter(new OriginRemoteRRWriter());
@@ -3110,6 +3238,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     accessor.invoke(new DoOpsInTX(OP.PUT));
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         TXStateProxy tx = mgr.pauseTransaction();
@@ -3123,6 +3252,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     accessor.invoke(new DoOpsInTX(OP.DESTROY));
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         TXStateProxy tx = mgr.pauseTransaction();
@@ -3137,6 +3267,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     accessor.invoke(new DoOpsInTX(OP.PUT));
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         TXStateProxy tx = mgr.pauseTransaction();
@@ -3149,6 +3280,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
     datastore.invoke(new SerializableCallable() {
 
+      @Override
       public Object call() throws Exception {
         Region refRegion = getCache().getRegion(D_REFERENCE);
         OriginRemoteRRWriter w = (OriginRemoteRRWriter) refRegion.getAttributes().getCacheWriter();
@@ -3173,6 +3305,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
     accessor.invoke(new SerializableCallable() {
 
+      @Override
       public Object call() throws Exception {
         Region refRegion = getCache().getRegion(D_REFERENCE);
         refRegion.create("sup", "dawg");
@@ -3181,6 +3314,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getGemfireCache().getTxManager();
         TXStateProxy tx = mgr.pauseTransaction();
@@ -3193,6 +3327,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
     accessor.invoke(new SerializableCallable() {
 
+      @Override
       public Object call() throws Exception {
         Region refRegion = getCache().getRegion(D_REFERENCE);
         assertEquals("dawg", refRegion.get("sup"));
@@ -3209,6 +3344,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     initAccessorAndDataStore(accessor, datastore, 0);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region cust = getCache().getRegion(CUSTOMER);
         TXManagerImpl mgr = getCache().getTxManager();
@@ -3219,11 +3355,13 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
     final InternalDistributedMember member =
         (InternalDistributedMember) accessor.invoke(new SerializableCallable() {
+          @Override
           public Object call() throws Exception {
             return getCache().getMyId();
           }
         });
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         TXManagerImpl mgr = getCache().getTxManager();
         assertEquals(1, mgr.hostedTransactionsInProgressForTest());
@@ -3257,6 +3395,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     initAccessorAndDataStore(accessor, datastore1, datastore2, 0);
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Map custMap = new HashMap();
         for (int i = 0; i < 10; i++) {
@@ -3447,6 +3586,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     initAccessorAndDataStore(accessor, datastore, 0);
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region cust = getCache().getRegion(D_REFERENCE);
         cust.put("meow", "this is a meow, deal with it");
@@ -3458,6 +3598,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region cust = getCache().getRegion(D_REFERENCE);
         cust.getAttributesMutator().addCacheListener(new OneUpdateCacheListener());
@@ -3468,6 +3609,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         CacheTransactionManager mgr = getGemfireCache().getTxManager();
         mgr.begin();
@@ -3480,6 +3622,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region cust = getCache().getRegion(D_REFERENCE);
         OneUpdateCacheListener rat =
@@ -3492,6 +3635,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region cust = getCache().getRegion(D_REFERENCE);
         OneDestroyAndThenOneCreateCacheWriter wri =
@@ -3584,6 +3728,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
   protected Integer startServer(VM vm) {
     return (Integer) vm.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
         CacheServer s = getCache().addCacheServer();
@@ -3597,6 +3742,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
   protected void createClientRegion(VM vm, final int port, final boolean isEmpty,
       final boolean ri) {
     vm.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         ClientCacheFactory ccf = new ClientCacheFactory();
         ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port);
@@ -3626,17 +3772,20 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
     boolean invoked = false;
 
+    @Override
     public void onError(CqEvent aCqEvent) {
       // TODO Auto-generated method stub
 
     }
 
+    @Override
     public void onEvent(CqEvent aCqEvent) {
       // TODO Auto-generated method stub
       invoked = true;
 
     }
 
+    @Override
     public void close() {
       // TODO Auto-generated method stub
 
@@ -3765,6 +3914,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
     createClientRegion(client, port, false, true);
     accessor.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
         Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
@@ -3781,6 +3931,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     client.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
         Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
@@ -3788,10 +3939,12 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
         final ClientListener cl =
             (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
         WaitCriterion waitForListenerInvocation = new WaitCriterion() {
+          @Override
           public boolean done() {
             return cl.invoked;
           }
 
+          @Override
           public String description() {
             return "listener was never invoked";
           }
@@ -3811,6 +3964,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     VM client = host.getVM(1);
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         AttributesFactory<Integer, String> af = new AttributesFactory<Integer, String>();
         af.setScope(Scope.DISTRIBUTED_ACK);
@@ -3825,6 +3979,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
     final int port = startServer(datastore);
     client.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         ClientCacheFactory ccf = new ClientCacheFactory();
         ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port);
@@ -3844,6 +3999,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region ref = getCache().getRegion(D_REFERENCE);
         Region empty = getCache().getRegion(EMPTY_REGION);
@@ -3861,14 +4017,17 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     client.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region empty = getCache().getRegion(EMPTY_REGION);
         final ClientListener l = (ClientListener) empty.getAttributes().getCacheListeners()[0];
         WaitCriterion wc = new WaitCriterion() {
+          @Override
           public boolean done() {
             return l.invoked;
           }
 
+          @Override
           public String description() {
             return "listener invoked:" + l.invoked;
           }
@@ -3891,6 +4050,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
     createClientRegion(client, port, false, true);
     datastore.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
         Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
@@ -3907,6 +4067,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     client.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
         Region<OrderId, Order> orderRegion = getCache().getRegion(ORDER);
@@ -3914,10 +4075,12 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
         final ClientListener cl =
             (ClientListener) custRegion.getAttributes().getCacheListeners()[0];
         WaitCriterion waitForListenerInvocation = new WaitCriterion() {
+          @Override
           public boolean done() {
             return cl.invoked;
           }
 
+          @Override
           public String description() {
             return "listener was never invoked";
           }
@@ -3938,6 +4101,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
 
     initAccessorAndDataStoreWithInterestPolicy(accessor, datastore1, datastore2, 1);
     SerializableCallable registerListener = new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
         custRegion.getAttributesMutator().addCacheListener(new ListenerInvocationCounter());
@@ -3948,6 +4112,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     datastore2.invoke(registerListener);
 
     datastore1.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
         getCache().getCacheTransactionManager().begin();
@@ -3960,6 +4125,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     });
 
     SerializableCallable getListenerCount = new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> custRegion = getCache().getRegion(CUSTOMER);
         ListenerInvocationCounter l =
@@ -3994,6 +4160,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     final CustId custId = new CustId(19);
 
     datastore1.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE);
         assertNull(refRegion.get(custId));
@@ -4003,6 +4170,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     datastore2.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         Region<CustId, Customer> refRegion = getCache().getRegion(D_REFERENCE);
         assertNull(refRegion.get(custId));
@@ -4011,6 +4179,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       }
     });
     datastore1.invoke(new SerializableCallable() {
+      @Override
       public Object call() throws Exception {
         try {
           getCache().getCacheTransactionManager().commit();
@@ -4082,6 +4251,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
       assertEquals(Status.STATUS_ACTIVE, tx.getStatus());
       final CountDownLatch latch = new CountDownLatch(1);
       Thread t = new Thread(new Runnable() {
+        @Override
         public void run() {
           Context ctx = getCache().getJNDIContext();
           try {
@@ -4263,7 +4433,7 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
           r.put("nonTXkey", "nonTXvalue");
           getCache().getCacheTransactionManager().begin();
           r.put("key", "newvalue");
-          TXExpiryJUnitTest.waitForEntryExpiration(lr, "key");
+          waitForEntryExpiration(lr, "key");
         } finally {
           ExpiryTask.permitExpiration();
         }
@@ -4496,4 +4666,18 @@ public class RemoteTransactionDUnitTest extends JUnit4CacheTestCase {
     vm1.invoke(verifyAssert);
 
   }
+
+  private void waitForEntryExpiration(LocalRegion lr, String key) {
+    try {
+      ExpirationDetector detector;
+      do {
+        detector = new ExpirationDetector(lr.getEntryExpiryTask(key));
+        ExpiryTask.expiryTaskListener = detector;
+        ExpiryTask.permitExpiration();
+        detector.awaitExecuted(3000, MILLISECONDS);
+      } while (!detector.hasExpired() && detector.wasRescheduled());
+    } finally {
+      ExpiryTask.expiryTaskListener = null;
+    }
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
klund@apache.org.