You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/07/07 18:13:22 UTC

[1/7] incubator-geode git commit: GEODE-1640 Used static instance of logger while logging

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1571 e4d79df78 -> bab4e6268


GEODE-1640 Used static instance of logger while logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4612a193
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4612a193
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4612a193

Branch: refs/heads/feature/GEODE-1571
Commit: 4612a193f9687fe9b36d1060cdae8b333050ca97
Parents: 2ab9d11
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Wed Jul 6 14:28:01 2016 -0700
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Thu Jul 7 09:18:40 2016 -0700

----------------------------------------------------------------------
 .../com/gemstone/gemfire/internal/cache/PartitionedRegion.java   | 4 ++++
 .../java/com/gemstone/gemfire/internal/cache/TXEntryState.java   | 4 ++--
 .../cache/partitioned/DestroyRegionOnDataStoreMessage.java       | 3 ++-
 3 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4612a193/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index 26c91e0..39230c4 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -10620,5 +10620,9 @@ public class PartitionedRegion extends LocalRegion implements
     }
     return br.getEntryExpiryTask(key);
   }
+  
+  public Logger getLogger() {
+	return logger;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4612a193/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
index c6caefa..9ae0ac3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
@@ -1759,8 +1759,8 @@ public class TXEntryState implements Releasable
   
   void applyChanges(LocalRegion r, Object key, TXState txState)
   {
-    if (LogService.getLogger().isDebugEnabled()) {
-      LogService.getLogger().debug(
+    if (logger.isDebugEnabled()) {
+      logger.debug(
           "applyChanges txState=" + txState + " ,key=" + key + " ,r="
               + r.getDisplayName() + " ,op=" + this.op + " ,isDirty="
               + isDirty());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4612a193/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/DestroyRegionOnDataStoreMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/DestroyRegionOnDataStoreMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/DestroyRegionOnDataStoreMessage.java
index 225d1af..4454f81 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/DestroyRegionOnDataStoreMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/DestroyRegionOnDataStoreMessage.java
@@ -84,7 +84,8 @@ public final class DestroyRegionOnDataStoreMessage extends PartitionMessage
     	return true;
     }
     
-    org.apache.logging.log4j.Logger logger = LogService.getLogger();
+    
+    org.apache.logging.log4j.Logger logger = pr.getLogger();
     if (logger.isTraceEnabled(LogMarker.DM)) {
       logger.trace("DestroyRegionOnDataStore operateOnRegion: " + pr.getFullPath());
     }


[3/7] incubator-geode git commit: GEODE-1613 CI failure: ConnectionPoolDUnitTest.test021ClientGetOfInvalidServerEntry

Posted by ji...@apache.org.
GEODE-1613 CI failure: ConnectionPoolDUnitTest.test021ClientGetOfInvalidServerEntry

Added a try/catch for scheduling the task to handle RejectedExecutionException
& fixed up the handling of shutdown conditions so the run() method no longer
throws cancellation exceptions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/860c9020
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/860c9020
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/860c9020

Branch: refs/heads/feature/GEODE-1571
Commit: 860c902048d63cc7ae0fe3bf3c2cd329486fa73b
Parents: 7b28a8d
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Thu Jul 7 10:25:49 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Thu Jul 7 10:28:06 2016 -0700

----------------------------------------------------------------------
 .../DataSerializerRecoveryListener.java         | 33 ++++++++++++++------
 1 file changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/860c9020/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java
index 0c3f692..ef471c8 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java
@@ -89,17 +89,20 @@ public class DataSerializerRecoveryListener extends EndpointManager.EndpointList
 
     @Override
     public void run2() {
-      if(pool.getCancelCriterion().cancelInProgress() != null) {
+      if (pool.getCancelCriterion().cancelInProgress() != null) {
         return;
       }
+      
       synchronized(recoveryScheduledLock) {
         recoveryScheduled = false;
       }
+      
       logger.debug("DataSerializerRecoveryTask - Attempting to recover dataSerializers");
       SerializerAttributesHolder[] holders= InternalDataSerializer.getSerializersForDistribution();
       if(holders.length == 0) {
         return;
       }
+      
       EventID eventId = InternalDataSerializer.generateEventId();
       //Fix for bug:40930
       if (eventId == null) {
@@ -108,8 +111,9 @@ public class DataSerializerRecoveryListener extends EndpointManager.EndpointList
               TimeUnit.MILLISECONDS);
           recoveryScheduled = true;
         } catch (RejectedExecutionException e) {
-          pool.getCancelCriterion().checkCancelInProgress(e);
-          throw e;
+          if (pool.getCancelCriterion().cancelInProgress() == null) {
+            throw e;
+          }
         }
       }
       else {
@@ -117,15 +121,18 @@ public class DataSerializerRecoveryListener extends EndpointManager.EndpointList
           RegisterDataSerializersOp.execute(pool, holders, eventId);
         } 
         catch (CancelException e) {
-          throw e;
+          return;
         }
         catch (RejectedExecutionException e) {
           // This is probably because we've started to shut down.
-          pool.getCancelCriterion().checkCancelInProgress(e);
-          throw e; // weird
+          if (pool.getCancelCriterion().cancelInProgress() == null) {
+            throw e; // weird
+          }
         }
         catch(Exception e) {
-          pool.getCancelCriterion().checkCancelInProgress(e);
+          if (pool.getCancelCriterion().cancelInProgress() != null) {
+            return;
+          }
           
           // If ClassNotFoundException occurred on server, don't retry
           Throwable cause = e.getCause();
@@ -141,8 +148,16 @@ public class DataSerializerRecoveryListener extends EndpointManager.EndpointList
             logger.warn(LocalizedMessage.create(
               LocalizedStrings.DataSerializerRecoveryListener_ERROR_RECOVERING_DATASERIALIZERS),
               e);
-            background.schedule(new RecoveryTask(), pingInterval, TimeUnit.MILLISECONDS);
-            recoveryScheduled = true;
+            try {
+              background.schedule(new RecoveryTask(), pingInterval, TimeUnit.MILLISECONDS);
+              recoveryScheduled = true;
+            } catch (RejectedExecutionException ex) { // GEODE-1613 - suspect string while shutting down
+              if (!background.isTerminated()
+                  && pool.getCancelCriterion().cancelInProgress() == null) {
+                throw ex;
+              }
+            }
+
           }
         } finally {
           pool.releaseThreadLocalConnection();


[4/7] incubator-geode git commit: GEODE-1420: fix intermittent TombstoneService failures

Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
index a8a512e..9ad8e0e 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
@@ -8542,30 +8542,39 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
     vm3.invoke(() -> this.assertNoClearTimeouts());
   }
   
+  private void checkCCRegionTombstoneCount(String msg, int expected) {
+    int actual = CCRegion.getTombstoneCount();
+    if (expected != actual) {
+      assertEquals(msg + " region tombstone count was " + actual + " expected=" + expected + " TombstoneService=" + CCRegion.getCache().getTombstoneService(),
+          expected, actual);
+    }
+  }
   public void versionTestTombstones() {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
     VM vm1 = host.getVM(1);
-    final int numEntries = 1000;
+    final int numEntries = 100;
     
     // create replicated regions in VM 0 and 1, then perform concurrent ops
     // on the same key while creating the region in VM2.  Afterward make
     // sure that all three regions are consistent
-    final long oldServerTimeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
-    final long oldClientTimeout = TombstoneService.CLIENT_TOMBSTONE_TIMEOUT;
-    final long oldExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+    final long oldServerTimeout = TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT;
+    final long oldClientTimeout = TombstoneService.NON_REPLICATE_TOMBSTONE_TIMEOUT;
+    final int oldExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
     final boolean oldIdleExpiration = TombstoneService.IDLE_EXPIRATION;
     final double oldLimit = TombstoneService.GC_MEMORY_THRESHOLD;
+    final long oldMaxSleepTime = TombstoneService.MAX_SLEEP_TIME;
     try {
       SerializableRunnable setTimeout = new SerializableRunnable() {
         @Override
         public void run() {
-          TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = 20000;
-          TombstoneService.CLIENT_TOMBSTONE_TIMEOUT = 19000;
-          TombstoneService.EXPIRED_TOMBSTONE_LIMIT = 1000;
+          TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = 1000;
+          TombstoneService.NON_REPLICATE_TOMBSTONE_TIMEOUT = 900;
+          TombstoneService.EXPIRED_TOMBSTONE_LIMIT = numEntries;
           TombstoneService.IDLE_EXPIRATION = true;
           TombstoneService.GC_MEMORY_THRESHOLD = 0;  // turn this off so heap profile won't cause test to fail
+          TombstoneService.MAX_SLEEP_TIME = 500;
         }
       };
       vm0.invoke(setTimeout);
@@ -8601,8 +8610,7 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
               assertTrue("entry should not exist", !CCRegion.containsKey("cckey"+i));
               assertTrue("entry should not contain a value", !CCRegion.containsValueForKey("cckey"+i));
             }
-            long count = CCRegion.getTombstoneCount();
-            assertEquals("expected "+numEntries+" tombstones", numEntries, count);
+            checkCCRegionTombstoneCount("after destroys in this vm ", numEntries);
             assertTrue("region should not contain a tombstone", !CCRegion.containsValue(Token.TOMBSTONE));
             if (CCRegion.getScope().isDistributedNoAck()) {
               sendSerialMessageToAll(); // flush the ops
@@ -8616,25 +8624,20 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
       vm1.invoke(new SerializableRunnable("check tombstone count(2)") {
         @Override
         public void run() {
-          final long count = CCRegion.getTombstoneCount();
-          assertEquals("expected "+numEntries+" tombstones", numEntries, count);
-          // ensure that some GC is performed - due to timing it may not
-          // be the whole batch, but some amount should be done
+          checkCCRegionTombstoneCount("after destroys in other vm ", numEntries);
           WaitCriterion waitForExpiration = new WaitCriterion() {
             @Override
             public boolean done() {
-              // TODO: in GEODE-561 this was changed to no longer wait for it
-              // to go to zero. But I think it should.
-              return CCRegion.getTombstoneCount() < numEntries;
+              return CCRegion.getTombstoneCount() == 0;
             }
             @Override
             public String description() {
-              return "Waiting for some tombstones to expire.  There are now " + CCRegion.getTombstoneCount()
-                + " tombstones left out of " + count + " initial tombstones";
+              return "Waiting for all tombstones to expire.  There are now " + CCRegion.getTombstoneCount()
+              + " tombstones left out of " + numEntries + " initial tombstones. " + CCRegion.getCache().getTombstoneService();
             }
           };
           try {
-            Wait.waitForCriterion(waitForExpiration, TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT+10000, 1000, true);
+            Wait.waitForCriterion(waitForExpiration, TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT+(TombstoneService.MAX_SLEEP_TIME*9), 100, true);
           } catch (AssertionError e) {
             CCRegion.dumpBackingMap();
             com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("tombstone service state: " + CCRegion.getCache().getTombstoneService());
@@ -8643,16 +8646,10 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
         }
       });
 
-      // Now check to see if tombstones are resurrected by a put/create.
-      // The entries should be created okay and the callback should be afterCreate.
-      // The tombstone count won't go down until the entries are swept, but then
-      // the count should fall to zero.
-
       vm0.invoke(new SerializableRunnable("create/destroy entries and check tombstone count") {
         @Override
         public void run() {
-          long count = CCRegion.getTombstoneCount();
-          final long origCount = count;
+          final int origCount = CCRegion.getTombstoneCount();
           try {
             WaitCriterion waitForExpiration = new WaitCriterion() {
               @Override
@@ -8662,18 +8659,17 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
               @Override
               public String description() {
                 return "Waiting for all tombstones to expire.  There are now " + CCRegion.getTombstoneCount()
-                  + " tombstones left out of " + origCount + " initial tombstones";
+                  + " tombstones left out of " + origCount + " initial tombstones. " + CCRegion.getCache().getTombstoneService();
               }
             };
-            Wait.waitForCriterion(waitForExpiration, TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT+10000, 1000, true);
+            Wait.waitForCriterion(waitForExpiration, TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT+(TombstoneService.MAX_SLEEP_TIME*9), 100, true);
             logger.debug("creating tombstones.  current count={}", CCRegion.getTombstoneCount());
             for (int i=0; i<numEntries; i++) {
               CCRegion.create("cckey" + i, i);
               CCRegion.destroy("cckey" + i);
             }
             logger.debug("done creating tombstones.  current count={}", CCRegion.getTombstoneCount());
-            count = CCRegion.getTombstoneCount();
-            assertEquals("expected "+numEntries+" tombstones", numEntries, count);
+            checkCCRegionTombstoneCount("after create+destroy in this vm ", numEntries);
             assertEquals(0, CCRegion.size());
             afterCreates = 0;
             AttributesMutator m = CCRegion.getAttributesMutator();
@@ -8699,8 +8695,7 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
       vm1.invoke(new SerializableRunnable("check tombstone count and install listener") {
         @Override
         public void run() {
-          long count = CCRegion.getTombstoneCount();
-          assertEquals("expected ten tombstones", numEntries, count);
+          checkCCRegionTombstoneCount("after create+destroy in other vm ", numEntries);
           afterCreates = 0;
           AttributesMutator m = CCRegion.getAttributesMutator();
           m.addCacheListener(new CacheListenerAdapter() {
@@ -8711,6 +8706,11 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
           });
         }});
 
+      // Now check to see if tombstones are resurrected by a create.
+      // The entries should be created okay and the callback should be afterCreate.
+      // The tombstone count won't go down until the entries are swept, but then
+      // the count should fall to zero.
+
       vm0.invoke(new SerializableRunnable("create entries and check afterCreate and tombstone count") {
         @Override
         public void run() {
@@ -8718,13 +8718,24 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
             for (int i=0; i<numEntries; i++) {
               CCRegion.create("cckey" + i, i);
             }
-            long count = CCRegion.getTombstoneCount();
-            assertEquals("expected zero tombstones", 0, count);
+            checkCCRegionTombstoneCount("after create in this vm", 0);
             assertEquals("expected "+numEntries+" afterCreates", numEntries, afterCreates);
             assertEquals(numEntries, CCRegion.size());
             if (CCRegion.getScope().isDistributedNoAck()) {
               sendSerialMessageToAll(); // flush the ops
             }
+            WaitCriterion waitForExpiration = new WaitCriterion() {
+              @Override
+              public boolean done() {
+                return CCRegion.getCache().getTombstoneService().getScheduledTombstoneCount() ==  0;
+              }
+              @Override
+              public String description() {
+                return "Waiting for all scheduled tombstones to be removed.  There are now " + CCRegion.getCache().getTombstoneService().getScheduledTombstoneCount()
+                  + " tombstones left out of " + numEntries + " initial tombstones. " + CCRegion.getCache().getTombstoneService();
+              }
+            };
+            Wait.waitForCriterion(waitForExpiration, TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT*5, 100, true);
           } catch (CacheException e) {
             fail("while performing create operations", e);
           }
@@ -8734,30 +8745,34 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
       vm1.invoke(new SerializableRunnable("check afterCreate and tombstone count") {
         @Override
         public void run() {
-          long count = CCRegion.getTombstoneCount();
-          assertEquals("expected zero tombstones", 0, count);
+          checkCCRegionTombstoneCount("after create in other vm", 0);
           assertEquals("expected "+numEntries+" afterCreates", numEntries, afterCreates);
           assertEquals(numEntries, CCRegion.size());
+          WaitCriterion waitForExpiration = new WaitCriterion() {
+            @Override
+            public boolean done() {
+              return CCRegion.getCache().getTombstoneService().getScheduledTombstoneCount() ==  0;
+            }
+            @Override
+            public String description() {
+              return "Waiting for all scheduled tombstones to be removed.  There are now " + CCRegion.getCache().getTombstoneService().getScheduledTombstoneCount()
+                + " tombstones left out of " + numEntries + " initial tombstones. " + CCRegion.getCache().getTombstoneService();
+            }
+          };
+          Wait.waitForCriterion(waitForExpiration, TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT*5, 100, true);
         }
       });
 
-      vm0.invoke(new SerializableRunnable("check region size and tombstone count") {
-        @Override
-        public void run() {
-          long count = CCRegion.getTombstoneCount();
-          assertEquals("expected all tombstones to be expired", 0, count);
-          assertEquals(numEntries, CCRegion.size());
-        }
-      });
     } finally {
       SerializableRunnable resetTimeout = new SerializableRunnable() {
         @Override
         public void run() {
-          TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = oldServerTimeout;
-          TombstoneService.CLIENT_TOMBSTONE_TIMEOUT = oldClientTimeout;
+          TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = oldServerTimeout;
+          TombstoneService.NON_REPLICATE_TOMBSTONE_TIMEOUT = oldClientTimeout;
           TombstoneService.EXPIRED_TOMBSTONE_LIMIT = oldExpiredTombstoneLimit;
           TombstoneService.IDLE_EXPIRATION = oldIdleExpiration;
           TombstoneService.GC_MEMORY_THRESHOLD = oldLimit;
+          TombstoneService.MAX_SLEEP_TIME = oldMaxSleepTime;
         }
       };
       vm0.invoke(resetTimeout);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
index c92a436..32cedd1 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
@@ -352,7 +352,7 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
    * create some exception list.
    * Before GII, P's RVV is P6,R6(3-6), R's RVV is P6,R6, RVVGC are both P4,R0   
    * vm1 becomes offline then restarts.
-   * The deltaGII should send delta which only contains unfinished opeation R4,R5  
+   * The deltaGII should send delta which only contains unfinished operation R4,R5  
    */
   @Test
   public void testDeltaGIIWithOnlyUnfinishedOp() throws Throwable {
@@ -2086,7 +2086,7 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
   protected void changeTombstoneTimout(VM vm, final long value) {
     SerializableRunnable change = new SerializableRunnable() {
       public void run() {
-        TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = value;
+        TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = value;
       }
     };
     vm.invoke(change);
@@ -2493,13 +2493,13 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
           assertTrue(entry != null && entry.getRegionEntry().isTombstone());
         }
         
-        System.out.println("GGG:new timeout="+TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT);
+        System.out.println("GGG:new timeout="+TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT);
         if (entry == null || !entry.getRegionEntry().isTombstone()) {
           return (false == expectExist);
         } else {
           long ts = entry.getRegionEntry().getVersionStamp().getVersionTimeStamp();
           if (expectExpired) {
-            return (ts + TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT <= ((GemFireCacheImpl)cache).cacheTimeMillis()); // use MAX_WAIT as timeout
+            return (ts + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT <= ((GemFireCacheImpl)cache).cacheTimeMillis()); // use MAX_WAIT as timeout
           } else {
             return (true == expectExist);
           }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDelayedRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDelayedRecoveryDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDelayedRecoveryDUnitTest.java
index bdadd8a..1870a19 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDelayedRecoveryDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDelayedRecoveryDUnitTest.java
@@ -191,6 +191,8 @@ public class PartitionedRegionDelayedRecoveryDUnitTest extends JUnit4CacheTestCa
     //create the region in a third VM, which won't have any buckets
     vm2.invoke(createPrRegions);
 
+    final long begin = System.currentTimeMillis();
+
     //close 1 cache, which should make the bucket drop below
     //the expected redundancy level.
     vm1.invoke(new SerializableRunnable("close cache") {
@@ -200,7 +202,7 @@ public class PartitionedRegionDelayedRecoveryDUnitTest extends JUnit4CacheTestCa
       }
     });
     
-    long elapsed = waitForBucketRecovery(vm2, 1);
+    long elapsed = waitForBucketRecovery(vm2, 1, begin);
     assertTrue("Did not wait at least 5 seconds to create the bucket. Elapsed=" + elapsed, elapsed >= 5000);
   }
 
@@ -262,7 +264,7 @@ public class PartitionedRegionDelayedRecoveryDUnitTest extends JUnit4CacheTestCa
             + elapsed, elapsed < 5000);
     
     //wait for the bucket to be copied
-    elapsed = waitForBucketRecovery(vm2, 4);
+    elapsed = waitForBucketRecovery(vm2, 4, begin);
     assertTrue("Did not wait at least 5 seconds to create the bucket. Elapsed=" + elapsed, elapsed >= 5000);
     
     vm2.invoke(new SerializableCallable("wait for primary move") {
@@ -280,8 +282,7 @@ public class PartitionedRegionDelayedRecoveryDUnitTest extends JUnit4CacheTestCa
     });
   }
   
-  private long waitForBucketRecovery(VM vm2, final int numBuckets) {
-    final long begin = System.currentTimeMillis();
+  private long waitForBucketRecovery(VM vm2, final int numBuckets, final long begin) {
     //wait for the bucket to be copied
     Long elapsed = (Long) vm2.invoke(new SerializableCallable("putData") {
       public Object call() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TombstoneCreationJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TombstoneCreationJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TombstoneCreationJUnitTest.java
index 3eeddf5..1851ae3 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TombstoneCreationJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TombstoneCreationJUnitTest.java
@@ -175,7 +175,7 @@ public class TombstoneCreationJUnitTest {
     // the entry
     String key = "destroyedKey1";
     VersionedThinRegionEntryHeap entry = new VersionedThinRegionEntryHeapObjectKey(region, key, Token.REMOVED_PHASE1);
-    entry.setLastModified(System.currentTimeMillis() - (2 * TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT));
+    entry.setLastModified(System.currentTimeMillis() - (2 * TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT));
     ((AbstractRegionMap)region.getRegionMap()).putEntryIfAbsentForTest(entry);
     cache.getLogger().info("entry inserted into cache: " + entry);
 
@@ -185,7 +185,7 @@ public class TombstoneCreationJUnitTest {
     tag.setIsRemoteForTesting();
     tag.setEntryVersion(3);
     tag.setRegionVersion(12345);
-    tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT);
+    tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT);
     tag.setDistributedSystemId(1);
     ev.setVersionTag(tag);
     cache.getLogger().info("trying to destroy the entry: " + region.getRegionEntry(key));
@@ -209,7 +209,7 @@ public class TombstoneCreationJUnitTest {
     tag.setIsRemoteForTesting();
     tag.setEntryVersion(1);
     tag.setRegionVersion(12340);
-    tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT - 10000);
+    tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT - 10000);
     tag.setDistributedSystemId(1);
     ev.setVersionTag(tag);
     cache.getLogger().info("trying to update the entry with an older event: " + region.getRegionEntry(key));

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
index f7c011d..a7b15fd 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
@@ -266,9 +266,8 @@ public class PersistentRVVRecoveryDUnitTest extends PersistentReplicatedTestBase
       
       @Override
       public void run2() throws CacheException {
-        // TODO Auto-generated method stub
-        long replicatedTombstoneTomeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
-        long expiriredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+        long replicatedTombstoneTomeout = TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT;
+        int expiriredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
         
         try {
           LocalRegion region = createRegion(vm0);
@@ -303,7 +302,7 @@ public class PersistentRVVRecoveryDUnitTest extends PersistentReplicatedTestBase
           // right away when they are gIId based on their original timestamp.
           Wait.pause((int) TEST_REPLICATED_TOMBSTONE_TIMEOUT);
 
-          TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = TEST_REPLICATED_TOMBSTONE_TIMEOUT;
+          TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = TEST_REPLICATED_TOMBSTONE_TIMEOUT;
           TombstoneService.EXPIRED_TOMBSTONE_LIMIT = entryCount;
           // Do region GII
           region = createRegion(vm0);
@@ -335,7 +334,7 @@ public class PersistentRVVRecoveryDUnitTest extends PersistentReplicatedTestBase
 
           cache.close();
         } finally {
-          TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = replicatedTombstoneTomeout;
+          TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = replicatedTombstoneTomeout;
           TombstoneService.EXPIRED_TOMBSTONE_LIMIT = expiriredTombstoneLimit;
         }    
       }


[5/7] incubator-geode git commit: GEODE-1420: fix intermittent TombstoneService failures

Posted by ji...@apache.org.
GEODE-1420: fix intermittent TombstoneService failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b9da9e66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b9da9e66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b9da9e66

Branch: refs/heads/feature/GEODE-1571
Commit: b9da9e6619f4c33696f8303d24741487d3c5e57a
Parents: 860c902
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jun 21 16:30:04 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Thu Jul 7 10:43:50 2016 -0700

----------------------------------------------------------------------
 .../gemfire/distributed/internal/CacheTime.java |   29 +
 .../gemfire/distributed/internal/DSClock.java   |    7 +-
 .../internal/cache/AbstractRegionEntry.java     |    2 +-
 .../internal/cache/AbstractRegionMap.java       |   46 +-
 .../gemfire/internal/cache/BucketRegion.java    |   18 +-
 .../internal/cache/GemFireCacheImpl.java        |    3 +-
 .../internal/cache/InitialImageOperation.java   |    4 +-
 .../gemfire/internal/cache/LocalRegion.java     |   25 +-
 .../gemfire/internal/cache/ProxyRegionMap.java  |    7 -
 .../gemfire/internal/cache/RegionMap.java       |    5 -
 .../internal/cache/TombstoneService.java        | 1211 +++++++++---------
 .../cache/tier/sockets/CacheClientProxy.java    |    3 +
 .../DistributedAckRegionCCEDUnitTest.java       |   10 +-
 .../cache30/GlobalRegionCCEDUnitTest.java       |    2 +-
 .../gemfire/cache30/MultiVMRegionTestCase.java  |  107 +-
 .../internal/cache/GIIDeltaDUnitTest.java       |    8 +-
 ...rtitionedRegionDelayedRecoveryDUnitTest.java |    9 +-
 .../cache/TombstoneCreationJUnitTest.java       |    6 +-
 .../PersistentRVVRecoveryDUnitTest.java         |    9 +-
 19 files changed, 772 insertions(+), 739 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
new file mode 100644
index 0000000..08c1400
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/CacheTime.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal;
+
+/**
+ * Provides a method to get the system millisecond clock time
+ * adjusted for the distributed cache.
+ */
+public interface CacheTime {
+  /**
+   * Returns the system millisecond clock time with adjustments from the distributed cache
+   * @return the current time
+   */
+  public long cacheTimeMillis();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
index d13610a..d96e7c3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DSClock.java
@@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * 
  */
 
-public class DSClock {
+public class DSClock implements CacheTime {
 
   private static final Logger logger = LogService.getLogger();
   
@@ -76,10 +76,7 @@ public class DSClock {
     this.isLoner = lonerDS;
   }
   
-  /**
-   * Returns the system millisecond clock time with adjustments from the distributed system
-   * @return the current time
-   */
+  @Override
   public long cacheTimeMillis() {
     long result;
     final long offset = getCacheTimeOffset();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
index 6ee4c17..15a5bed 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
@@ -1881,7 +1881,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
   }
 
   private boolean isExpiredTombstone(LocalRegion region, long timestamp, boolean isTombstone) {
-    return isTombstone && (timestamp + TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT) <= region.cacheTimeMillis();
+    return isTombstone && (timestamp + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT) <= region.cacheTimeMillis();
   }
   
   private boolean overwritingOldTombstone(LocalRegion region, VersionStamp stamp, VersionTag tag, StringBuilder verbose) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index bc919fc..f3cb3d6 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -3623,9 +3623,6 @@ public abstract class AbstractRegionMap implements RegionMap {
     }
   }
 
-  public final void unscheduleTombstone(RegionEntry re) {
-  }
-  
   /**
    * for testing race conditions between threads trying to apply ops to the
    * same entry
@@ -3637,21 +3634,31 @@ public abstract class AbstractRegionMap implements RegionMap {
 
   public boolean isTombstoneNotNeeded(RegionEntry re, int destroyedVersion) {
     // no need for synchronization - stale values are okay here
-    RegionEntry actualRe = getEntry(re.getKey());
     // TODO this looks like a problem for regionEntry pooling
-    if (actualRe != re) {  // null actualRe is okay here
-      return true; // tombstone was evicted at some point
+    if ( getEntry(re.getKey()) != re) {
+      // region entry was either removed (null)
+      // or changed to a different region entry.
+      // In either case the old tombstone is no longer needed.
+      return true;
+    }
+    if (!re.isTombstone()) {
+      // if the region entry no longer contains a tombstone
+      // then the old tombstone is no longer needed
+      return true;
     }
-    VersionStamp vs = re.getVersionStamp();
+    VersionStamp<?> vs = re.getVersionStamp();
     if (vs == null) {
       // if we have no VersionStamp why were we even added as a tombstone?
       // We used to see an NPE here. See bug 52092.
       logger.error("Unexpected RegionEntry scheduled as tombstone: re.getClass {} destroyedVersion {}", re.getClass(), destroyedVersion);
       return true;
     }
-    int entryVersion = vs.getEntryVersion();
-    boolean isSameTombstone = (entryVersion == destroyedVersion && re.isTombstone());
-    return !isSameTombstone;
+    if (vs.getEntryVersion() != destroyedVersion) {
+      // the version changed so old tombstone no longer needed
+      return true;
+    }
+    // region entry still has the same tombstone so we need to keep it.
+    return false;
   }
 
   /** removes a tombstone that has expired locally */
@@ -3662,12 +3669,15 @@ public abstract class AbstractRegionMap implements RegionMap {
     synchronized(this._getOwner().getSizeGuard()) { // do this sync first; see bug 51985
         synchronized (re) {
           int entryVersion = re.getVersionStamp().getEntryVersion();
-          boolean isTombstone = re.isTombstone();
-          boolean isSameTombstone = (entryVersion == destroyedVersion && isTombstone);
-          if (isSameTombstone || (isTombstone && entryVersion < destroyedVersion)) {
+          if (!re.isTombstone() || entryVersion > destroyedVersion) {
+            if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
+              logger.trace(LogMarker.TOMBSTONE_COUNT,
+                  "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
+                  re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
+            }
+          } else {
             if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
-              // logs are at info level for TomstoneService.DEBUG_TOMBSTONE_COUNT so customer doesn't have to use fine level
-              if (isSameTombstone) {
+              if (entryVersion == destroyedVersion) {
                 // logging this can put tremendous pressure on the log writer in tests
                 // that "wait for silence"
                 logger.trace(LogMarker.TOMBSTONE_COUNT,
@@ -3702,12 +3712,6 @@ public abstract class AbstractRegionMap implements RegionMap {
               //if the region has been destroyed, the tombstone is already
               //gone. Catch an exception to avoid an error from the GC thread.
             }
-          } else {
-            if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
-              logger.trace(LogMarker.TOMBSTONE_COUNT,
-                  "tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
-                  re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
-            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index e0f6fa2..b32927e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -320,6 +320,22 @@ implements Bucket
   }
 
   @Override
+  protected boolean needsTombstoneGCKeysForClients(EventID eventID, FilterInfo clientRouting) {
+    if (eventID == null) {
+      return false;
+    }
+    if (CacheClientNotifier.getInstance() == null) {
+      return false;
+    }
+    if (clientRouting != null) {
+      return true;
+    }
+    if (getFilterProfile() != null) {
+      return true;
+    }
+    return false;
+  }
+  @Override
   protected void notifyClientsOfTombstoneGC(Map<VersionSource, Long> regionGCVersions, Set<Object>removedKeys, EventID eventID, FilterInfo routing) {
     if (CacheClientNotifier.getInstance() != null) {
       // Only route the event to clients interested in the partitioned region.
@@ -327,7 +343,7 @@ implements Bucket
       // have the filter profile ferret out all of the clients that have interest
       // in this region
       FilterProfile fp = getFilterProfile();
-      if ((removedKeys != null && removedKeys.size() > 0) // bug #51877 - NPE in clients
+      if ((removedKeys != null && !removedKeys.isEmpty()) // bug #51877 - NPE in clients
           && (routing != null || fp != null)) { // fix for bug #46309 - don't send null/empty key set to clients
         RegionEventImpl regionEvent = new RegionEventImpl(getPartitionedRegion(), Operation.REGION_DESTROY, null, true, getMyId()); 
         FilterInfo clientRouting = routing;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 13e0602..98d4fa9 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -176,7 +176,7 @@ import com.sun.jna.Platform;
  *
  */
 @SuppressWarnings("deprecation")
-public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee {
+public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, CacheTime {
   private static final Logger logger = LogService.getLogger();
   
   // moved *SERIAL_NUMBER stuff to DistributionAdvisor
@@ -2792,6 +2792,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
    * 
    * @return distributed cache time.
    */
+  @Override
   public long cacheTimeMillis() {
     if (this.system != null) {
       return this.system.getClock().cacheTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
index 55bdde4..7ee5c74 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
@@ -504,7 +504,7 @@ public class InitialImageOperation  {
           //Make sure we have applied the tombstone GC as seen on the GII
           //source
           if(this.gcVersions != null) {
-            region.getGemFireCache().getTombstoneService().gcTombstones(region, this.gcVersions);
+            region.getGemFireCache().getTombstoneService().gcTombstones(region, this.gcVersions, false);
           }
           
           if (this.gotImage) {
@@ -1637,7 +1637,7 @@ public class InitialImageOperation  {
               }
             }
             if (this.checkTombstoneVersions && this.versionVector != null && rgn.concurrencyChecksEnabled) {
-              synchronized(rgn.getCache().getTombstoneService().blockGCLock) {
+              synchronized(rgn.getCache().getTombstoneService().getBlockGCLock()) {
               if (goWithFullGII(rgn, this.versionVector)) {
                 if (isGiiDebugEnabled) {
                   logger.trace(LogMarker.GII, "have to do fullGII");

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 205f38f..7da2b45 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -3285,15 +3285,20 @@ public class LocalRegion extends AbstractRegion
   public int getTombstoneCount() {
     return this.tombstoneCount.get();
   }
-  
   public void scheduleTombstone(RegionEntry entry, VersionTag destroyedVersion) {
+    scheduleTombstone(entry, destroyedVersion, false);
+  }
+  
+  public void scheduleTombstone(RegionEntry entry, VersionTag destroyedVersion, boolean reschedule) {
     if (destroyedVersion == null) {
       throw new NullPointerException("destroyed version tag cannot be null");
     }
 //    Object sync = TombstoneService.DEBUG_TOMBSTONE_COUNT? TombstoneService.debugSync : new Object();
 //    lastUnscheduled.set(null);
 //    synchronized(sync) {
+    if (!reschedule) {
       incTombstoneCount(1);
+    }
 //      if (entry instanceof AbstractRegionEntry) {
 //        AbstractRegionEntry are = (AbstractRegionEntry)entry;
 //        if (are.isTombstoneScheduled()) {
@@ -3303,7 +3308,7 @@ public class LocalRegion extends AbstractRegion
 //        are.setTombstoneScheduled(true);
 //      }
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
-        logger.trace(LogMarker.TOMBSTONE_COUNT, "scheduling tombstone for {} version={} count is {} entryMap size is {}",
+        logger.trace(LogMarker.TOMBSTONE_COUNT, "{} tombstone for {} version={} count is {} entryMap size is {}", reschedule ? "rescheduling" : "scheduling",
             entry.getKey(), entry.getVersionStamp().asVersionTag(), this.tombstoneCount.get(), this.entries.size()/*, new Exception("stack trace")*/);
         // this can be useful for debugging tombstone count problems if there aren't a lot of concurrent threads
 //        if (TombstoneService.DEBUG_TOMBSTONE_COUNT && this.entries instanceof AbstractRegionMap) {
@@ -3319,12 +3324,7 @@ public class LocalRegion extends AbstractRegion
 //  ThreadLocal<Exception> lastUnscheduledPlace = new ThreadLocal<Exception>();
   
   public void rescheduleTombstone(RegionEntry entry, VersionTag version) {
-    Object sync = TombstoneService.DEBUG_TOMBSTONE_COUNT? TombstoneService.debugSync : new Object();
-    synchronized(sync) {
-      unscheduleTombstone(entry, false); // count is off by one, so don't allow validation to take place
-      scheduleTombstone(entry, version);
-    }
-
+    scheduleTombstone(entry, version, true);
   }
   
   public void unscheduleTombstone(RegionEntry entry) {
@@ -3337,7 +3337,6 @@ public class LocalRegion extends AbstractRegion
       logger.trace(LogMarker.TOMBSTONE, "unscheduling tombstone for {} count is {} entryMap size is {}",
           entry.getKey(), this.tombstoneCount.get(), this.entries.size()/*, new Exception("stack trace")*/);
     }
-    getRegionMap().unscheduleTombstone(entry);
     if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT) && validate) {
       if (this.entries instanceof AbstractRegionMap) {
         ((AbstractRegionMap) this.entries).verifyTombstoneCount(this.tombstoneCount);
@@ -3359,7 +3358,7 @@ public class LocalRegion extends AbstractRegion
       return;
     }
     if (!this.versionVector.containsTombstoneGCVersions(regionGCVersions)) {
-      keys = this.cache.getTombstoneService().gcTombstones(this, regionGCVersions);
+      keys = this.cache.getTombstoneService().gcTombstones(this, regionGCVersions, needsTombstoneGCKeysForClients(eventID, clientRouting));
       if (keys == null) {
         // deltaGII prevented tombstone GC
         return;
@@ -3377,6 +3376,9 @@ public class LocalRegion extends AbstractRegion
   }
   
 
+  protected boolean needsTombstoneGCKeysForClients(EventID eventID, FilterInfo clientRouting) {
+    return false;
+  }
   /** pass tombstone garbage-collection info to clients 
    * @param eventID the ID of the event (see bug #50683)
    * @param routing routing info (routing is computed if this is null)
@@ -11914,9 +11916,7 @@ public class LocalRegion extends AbstractRegion
   
   /** test hook - dump the backing map for this region */
   public void dumpBackingMap() {
-    Object sync = TombstoneService.DEBUG_TOMBSTONE_COUNT? TombstoneService.debugSync : new Object();
     synchronized(this.entries) {
-      synchronized(sync) {
         if (this.entries instanceof AbstractRegionMap) {
           ((AbstractRegionMap)(this.entries)).verifyTombstoneCount(this.tombstoneCount);
         }
@@ -11924,7 +11924,6 @@ public class LocalRegion extends AbstractRegion
         if (this.entries instanceof AbstractRegionMap) {
           ((AbstractRegionMap)this.entries).dumpMap();
         }
-      }
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
index 55d11fc..3ad2cc1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
@@ -718,13 +718,6 @@ final class ProxyRegionMap implements RegionMap {
     throw new IllegalStateException("removeTombstone should never be called on a proxy");
   }
 
-
-  /* (non-Javadoc)
-   * @see com.gemstone.gemfire.internal.cache.RegionMap#unscheduleTombstone(com.gemstone.gemfire.internal.cache.RegionEntry)
-   */
-  public void unscheduleTombstone(RegionEntry re) {
-  }
-
   public void setEntryFactory(RegionEntryFactory f) {
     throw new IllegalStateException("Should not be called on a ProxyRegionMap");
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
index 57f8853..14a2d2f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
@@ -381,11 +381,6 @@ public interface RegionMap extends LRUMapCallbacks {
    */
   public boolean isTombstoneNotNeeded(RegionEntry re, int destroyedVersion);
   
-  /**
-   * a tombstone has been unscheduled - update LRU stats if necessary
-   */
-  public void unscheduleTombstone(RegionEntry re);
-
   public void updateEntryVersion(EntryEventImpl event);
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 7036d45..dca792f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -16,12 +16,12 @@
  */
 package com.gemstone.gemfire.internal.cache;
 
+import com.gemstone.gemfire.CancelCriterion;
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.util.ObjectSizer;
+import com.gemstone.gemfire.distributed.internal.CacheTime;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
-import com.gemstone.gemfire.internal.cache.control.ResourceListener;
 import com.gemstone.gemfire.internal.cache.versions.CompactVersionHolder;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
@@ -37,8 +37,10 @@ import org.apache.logging.log4j.Logger;
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
 
 /**
  * Tombstones are region entries that have been destroyed but are held
@@ -50,23 +52,23 @@ import java.util.concurrent.atomic.AtomicLong;
  * and timing out tombstones.
  * 
  */
-public class TombstoneService  implements ResourceListener<MemoryEvent> {
+public class TombstoneService {
   private static final Logger logger = LogService.getLogger();
   
   /**
-   * The default tombstone expiration period, in milliseconds for replicated
-   * regions.<p>  This is the period over which the destroy operation may
+   * The default tombstone expiration period, in milliseconds for replicates and partitions.
+   * <p>This is the period over which the destroy operation may
    * conflict with another operation.  After this timeout elapses the tombstone
    * is put into a GC set for removal.  Removal is typically triggered by
    * the size of the GC set, but could be influenced by resource managers.
    * 
    * The default is 600,000 milliseconds (10 minutes).
    */
-  public static long REPLICATED_TOMBSTONE_TIMEOUT = Long.getLong(
+  public static long REPLICATE_TOMBSTONE_TIMEOUT = Long.getLong(
       DistributionConfig.GEMFIRE_PREFIX + "tombstone-timeout", 600000L).longValue();
   
   /**
-   * The default tombstone expiration period in millis for non-replicated
+   * The default tombstone expiration period in millis for non-replicate/partition
    * regions.  This tombstone timeout should be shorter than the one for
    * replicated regions and need not be excessively long.  Making it longer
    * than the replicated timeout can cause non-replicated regions to issue
@@ -74,7 +76,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * by others that no longer have the tombstone.<p>
    * The default is 480,000 milliseconds (8 minutes)
    */
-  public static long CLIENT_TOMBSTONE_TIMEOUT = Long.getLong(
+  public static long NON_REPLICATE_TOMBSTONE_TIMEOUT = Long.getLong(
       DistributionConfig.GEMFIRE_PREFIX + "non-replicated-tombstone-timeout", 480000);
   
   /**
@@ -82,7 +84,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * all replicated regions, including PR buckets.  The default is
    * 100,000 expired tombstones.
    */
-  public static long EXPIRED_TOMBSTONE_LIMIT = Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);
+  public static int EXPIRED_TOMBSTONE_LIMIT = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "tombstone-gc-threshold", 100000);
   
   /**
    * The interval to scan for expired tombstones in the queues
@@ -99,35 +101,18 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
   
   /** this is a test hook for causing the tombstone service to act as though free memory is low */
   public static boolean FORCE_GC_MEMORY_EVENTS = false;
-
-  public final static Object debugSync = new Object();
-  public final static boolean DEBUG_TOMBSTONE_COUNT = Boolean
-      .getBoolean(DistributionConfig.GEMFIRE_PREFIX + "TombstoneService.DEBUG_TOMBSTONE_COUNT"); // TODO:LOG:replace TombstoneService.DEBUG_TOMBSTONE_COUNT
+  /** maximum time a sweeper will sleep, in milliseconds. */
+  public static long MAX_SLEEP_TIME = 10000;
 
   public static boolean IDLE_EXPIRATION = false; // dunit test hook for forced batch expiration
   
   /**
-   * tasks for cleaning up tombstones
-   */
-  private TombstoneSweeper replicatedTombstoneSweeper;
-  private TombstoneSweeper nonReplicatedTombstoneSweeper;
-
-  /** a tombstone service is tied to a cache */
-  private GemFireCacheImpl cache;
-
-  /**
-   * two queues, one for replicated regions (including PR buckets) and one for
+   * two sweepers, one for replicated regions (including PR buckets) and one for
    * other regions.  They have different timeout intervals.
    */
-  private Queue<Tombstone> replicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
-  private Queue<Tombstone> nonReplicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
+  private final ReplicateTombstoneSweeper replicatedTombstoneSweeper;
+  private final NonReplicateTombstoneSweeper nonReplicatedTombstoneSweeper;
 
-  private AtomicLong replicatedTombstoneQueueSize = new AtomicLong();
-  private AtomicLong nonReplicatedTombstoneQueueSize = new AtomicLong();
-  
-  public Object blockGCLock = new Object();
-  private int progressingDeltaGIICount; 
-  
   public static TombstoneService initialize(GemFireCacheImpl cache) {
     TombstoneService instance = new TombstoneService(cache);
 //    cache.getResourceManager().addResourceListener(instance);  experimental
@@ -135,58 +120,21 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
   }
   
   private TombstoneService(GemFireCacheImpl cache) {
-    this.cache = cache;
-    this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, this.replicatedTombstones,
-        REPLICATED_TOMBSTONE_TIMEOUT, true, this.replicatedTombstoneQueueSize);
-    this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, this.nonReplicatedTombstones,
-        CLIENT_TOMBSTONE_TIMEOUT, false, this.nonReplicatedTombstoneQueueSize);
-    startSweeper(this.replicatedTombstoneSweeper);
-    startSweeper(this.nonReplicatedTombstoneSweeper);
+    this.replicatedTombstoneSweeper = new ReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion(), cache.getDistributionManager().getWaitingThreadPool());
+    this.nonReplicatedTombstoneSweeper = new NonReplicateTombstoneSweeper(cache, cache.getCachePerfStats(), cache.getCancelCriterion());
+    this.replicatedTombstoneSweeper.start();
+    this.nonReplicatedTombstoneSweeper.start();
   }
 
-  private void startSweeper(TombstoneSweeper tombstoneSweeper) {
-    synchronized(tombstoneSweeper) {
-      if (tombstoneSweeper.sweeperThread == null) {
-        tombstoneSweeper.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors",
-            logger), tombstoneSweeper);
-        tombstoneSweeper.sweeperThread.setDaemon(true);
-        String product = "GemFire";
-        if (tombstoneSweeper == this.replicatedTombstoneSweeper) {
-          tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 1");
-        } else {
-          tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 2");
-        }
-        tombstoneSweeper.sweeperThread.start();
-      }
-    }
-  }
-  
   /**
    * this ensures that the background sweeper thread is stopped
    */
   public void stop() {
-    stopSweeper(this.replicatedTombstoneSweeper);
-    stopSweeper(this.nonReplicatedTombstoneSweeper);
-  }
-  
-  private void stopSweeper(TombstoneSweeper t) {
-    Thread sweeperThread;
-    synchronized(t) {
-      sweeperThread = t.sweeperThread;
-      t.isStopped = true;
-      if (sweeperThread != null) {
-        t.notifyAll();
-      }
-    }
-    try {
-      sweeperThread.join(100);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-    t.tombstones.clear();
+    this.replicatedTombstoneSweeper.stop();
+    this.nonReplicatedTombstoneSweeper.stop();
   }
   
-  /**
+ /**
    * Tombstones are markers placed in destroyed entries in order to keep the
    * entry around for a while so that it's available for concurrent modification
    * detection.
@@ -200,20 +148,17 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       logger.warn("Detected an attempt to schedule a tombstone for an entry that is not versioned in region " + r.getFullPath(), new Exception("stack trace"));
       return;
     }
-    boolean useReplicated = useReplicatedQueue(r);
     Tombstone ts = new Tombstone(entry, r, destroyedVersion);
-    if (useReplicated) {
-      this.replicatedTombstones.add(ts);
-      this.replicatedTombstoneQueueSize.addAndGet(ts.getSize());
-    } else {
-      this.nonReplicatedTombstones.add(ts);
-      this.nonReplicatedTombstoneQueueSize.addAndGet(ts.getSize());
-    }
+    this.getSweeper(r).scheduleTombstone(ts);
   }
   
   
-  private boolean useReplicatedQueue(LocalRegion r) {
-    return (r.getScope().isDistributed() && r.getServerProxy() == null) && r.dataPolicy.withReplication();
+  private TombstoneSweeper getSweeper(LocalRegion r)  {
+    if (r.getScope().isDistributed() && r.getServerProxy() == null && r.dataPolicy.withReplication()) {
+      return this.replicatedTombstoneSweeper;
+    } else {
+      return this.nonReplicatedTombstoneSweeper;
+    }
   }
   
   
@@ -223,47 +168,35 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * @param r
    */
   public void unscheduleTombstones(LocalRegion r) {
-    Queue<Tombstone> queue =
-      r.getAttributes().getDataPolicy().withReplication() ? replicatedTombstones : nonReplicatedTombstones;
-    long removalSize = 0;
-    for (Iterator<Tombstone> it=queue.iterator(); it.hasNext(); ) {
-      Tombstone t = it.next();
-      if (t.region == r) {
-        it.remove();
-        removalSize += t.getSize();
-      }
-    }
-    if (queue == replicatedTombstones) {
-      replicatedTombstoneQueueSize.addAndGet(-removalSize);
-    } else {
-      nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
-    }
+    getSweeper(r).unscheduleTombstones(r);
   }
   
   public int getGCBlockCount() {
-    synchronized(this.blockGCLock) {
-      return this.progressingDeltaGIICount;
-    }
+    return replicatedTombstoneSweeper.getGCBlockCount();
   }
    
   public int incrementGCBlockCount() {
-    synchronized(this.blockGCLock) {
-      return ++this.progressingDeltaGIICount;
-    }
+    return replicatedTombstoneSweeper.incrementGCBlockCount();
   }
   
   public int decrementGCBlockCount() {
-    synchronized(this.blockGCLock) {
-      return --this.progressingDeltaGIICount;
-    }
+    return replicatedTombstoneSweeper.decrementGCBlockCount();
+  }
+  
+  public long getScheduledTombstoneCount() {
+    long result = 0;
+    result += replicatedTombstoneSweeper.getScheduledTombstoneCount();
+    result += nonReplicatedTombstoneSweeper.getScheduledTombstoneCount();
+    return result;
   }
   
   /**
    * remove tombstones from the given region that have region-versions <= those in the given removal map
    * @return a collection of keys removed (only if the region is a bucket - empty otherwise)
    */
-  public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions) {
-    synchronized(this.blockGCLock) {
+  @SuppressWarnings("rawtypes")
+  public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions, boolean needsKeys) {
+    synchronized(getBlockGCLock()) {
       int count = getGCBlockCount(); 
       if (count > 0) {
         // if any delta GII is on going as provider at this member, not to do tombstone GC
@@ -272,69 +205,26 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         }
         return null;
       }
-    Queue<Tombstone> queue;
-    boolean replicated = false;
-    long removalSize = 0;
-    Tombstone currentTombstone;
-    StoppableReentrantLock lock = null;
-    boolean locked = false;
     if (logger.isDebugEnabled()) {
       logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions);
     }
-    Set<Tombstone> removals = new HashSet<Tombstone>();
-    VersionSource myId = r.getVersionMember();
-    boolean isBucket = r.isUsedForPartitionedRegionBucket();
-    try {
-      locked = false;
-      if (r.getServerProxy() != null) {
-        queue = this.nonReplicatedTombstones;
-        lock = this.nonReplicatedTombstoneSweeper.currentTombstoneLock;
-        lock.lock();
-        locked = true;
-        currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
-      } else {
-        queue = this.replicatedTombstones;
-        replicated = true;
-        lock = this.replicatedTombstoneSweeper.currentTombstoneLock;
-        lock.lock();
-        locked = true;
-        currentTombstone = this.replicatedTombstoneSweeper.currentTombstone;
-      }
-      if (currentTombstone != null && currentTombstone.region == r) {
-        VersionSource destroyingMember = currentTombstone.getMemberID();
+    final VersionSource myId = r.getVersionMember();
+    final TombstoneSweeper sweeper = getSweeper(r);
+    final List<Tombstone> removals = new ArrayList<Tombstone>();
+    sweeper.removeUnexpiredIf(t -> {
+      if (t.region == r) {
+        VersionSource destroyingMember = t.getMemberID();
         if (destroyingMember == null) {
           destroyingMember = myId;
         }
         Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
-        if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
-          removals.add(currentTombstone);
-        }
-      }
-      for (Tombstone t: queue) {
-        if (t.region == r) {
-          VersionSource destroyingMember = t.getMemberID();
-          if (destroyingMember == null) {
-            destroyingMember = myId;
-          }
-          Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
-          if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
-            removals.add(t);
-            removalSize += t.getSize();
-          }
+        if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
+          removals.add(t);
+          return true;
         }
       }
-      
-      queue.removeAll(removals);
-      if (replicated) {
-        this.replicatedTombstoneQueueSize.addAndGet(-removalSize);
-      } else {
-        this.nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
-      }
-    } finally {
-      if (locked) {
-        lock.unlock();
-      }
-    }
+      return false;
+    });
     
     //Record the GC versions now, so that we can persist them
     for(Map.Entry<VersionSource, Long> entry : regionGCVersions.entrySet()) {
@@ -353,9 +243,10 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       r.getDiskRegion().writeRVVGC(r);
     }
     
-    Set<Object> removedKeys = new HashSet();
+    Set<Object> removedKeys = needsKeys ? new HashSet<Object>() : Collections.emptySet();
     for (Tombstone t: removals) {
-      if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && isBucket) {
+      boolean tombstoneWasStillInRegionMap = t.region.getRegionMap().removeTombstone(t.entry, t, false, true);
+      if (needsKeys && tombstoneWasStillInRegionMap) {
         removedKeys.add(t.entry.getKey());
       }
     }
@@ -373,45 +264,26 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * @param r the region affected
    * @param tombstoneKeys the keys removed on the server
    */
-  public void gcTombstoneKeys(LocalRegion r, Set<Object> tombstoneKeys) {
-    Queue<Tombstone> queue = this.nonReplicatedTombstones;
-    Set<Tombstone> removals = new HashSet<Tombstone>();
-    this.nonReplicatedTombstoneSweeper.currentTombstoneLock.lock();
-    try {
-      Tombstone currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
-      long removalSize = 0;
-      VersionSource myId = r.getVersionMember();
-      if (logger.isDebugEnabled()) {
-        logger.debug("gcTombstones invoked for region {} and keys {}", r, tombstoneKeys);
-      }
-      if (currentTombstone != null && currentTombstone.region == r) {
-        VersionSource destroyingMember = currentTombstone.getMemberID();
-        if (destroyingMember == null) {
-          destroyingMember = myId;
-        }
-        if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
-          removals.add(currentTombstone);
-        }
-      }
-      for (Tombstone t: queue) {
-        if (t.region == r) {
-          VersionSource destroyingMember = t.getMemberID();
-          if (destroyingMember == null) {
-            destroyingMember = myId;
-          }
-          if (tombstoneKeys.contains(t.entry.getKey())) {
-            removals.add(t);
-            removalSize += t.getSize();
-          }
+  public void gcTombstoneKeys(final LocalRegion r, final Set<Object> tombstoneKeys) {
+    if (r.getServerProxy() == null) {
+      // if the region does not have a server proxy
+      // then it will not have any tombstones to gc for the server.
+      return;
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("gcTombstoneKeys invoked for region {} and keys {}", r, tombstoneKeys);
+    }
+    final TombstoneSweeper sweeper = this.getSweeper(r);
+    final List<Tombstone> removals = new ArrayList<Tombstone>(tombstoneKeys.size());
+    sweeper.removeUnexpiredIf(t -> {
+      if (t.region == r) {
+        if (tombstoneKeys.contains(t.entry.getKey())) {
+          removals.add(t);
+          return true;
         }
       }
-      
-      queue.removeAll(removals);
-      nonReplicatedTombstoneQueueSize.addAndGet(removalSize);
-      
-    } finally {
-      this.nonReplicatedTombstoneSweeper.currentTombstoneLock.unlock();
-    }
+      return false;
+    });
     
     for (Tombstone t: removals) {
       //TODO - RVV - to support persistent client regions
@@ -428,61 +300,17 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
    * @return true if the expiration occurred 
    */
   public boolean forceBatchExpirationForTests(int count) throws InterruptedException {
-    this.replicatedTombstoneSweeper.testHook_batchExpired = new CountDownLatch(1);
-    try {
-      synchronized(this.replicatedTombstoneSweeper) {
-        this.replicatedTombstoneSweeper.forceExpirationCount+= count;
-        this.replicatedTombstoneSweeper.notifyAll();
-      }
-
-      //Wait for 30 seconds. If we wait longer, we risk hanging the tests if
-      //something goes wrong.
-      return this.replicatedTombstoneSweeper.testHook_batchExpired.await(30, TimeUnit.SECONDS);
-    } finally {
-      this.replicatedTombstoneSweeper.testHook_batchExpired=null;
-    }
-  }
-
-  /**
-   * Test Hook - slow operation
-   * verify whether a tombstone is scheduled for expiration
-   */
-  public boolean isTombstoneScheduled(LocalRegion r, RegionEntry re) {
-    Queue<Tombstone> queue;
-    if (r.getDataPolicy().withReplication()) {
-      queue = this.replicatedTombstones;
-    } else {
-      queue = this.nonReplicatedTombstones;
-    }
-    VersionSource myId = r.getVersionMember();
-    VersionTag entryTag = re.getVersionStamp().asVersionTag();
-    int entryVersion = entryTag.getEntryVersion();
-    for (Tombstone t: queue) {
-      if (t.region == r) {
-        VersionSource destroyingMember = t.getMemberID();
-        if (destroyingMember == null) {
-          destroyingMember = myId;
-        }
-        if (t.region == r
-            && t.entry.getKey().equals(re.getKey())
-            && t.getEntryVersion() == entryVersion) {
-          return true;
-        }
-      }
-    }
-    if (this.replicatedTombstoneSweeper != null) {
-      return this.replicatedTombstoneSweeper.hasExpiredTombstone(r, re, entryTag);
-    }
-    return false;
+    return this.replicatedTombstoneSweeper.testHook_forceExpiredTombstoneGC(count);
   }
 
   @Override
   public String toString() {
-    return "Destroyed entries GC service.  Replicate Queue=" + this.replicatedTombstones.toString()
-    + " Non-replicate Queue=" + this.nonReplicatedTombstones
-    + (this.replicatedTombstoneSweeper.expiredTombstones != null?
-        " expired batch size = " + this.replicatedTombstoneSweeper.expiredTombstones.size() : "");
+    return "Destroyed entries GC service.  Replicate Queue=" + this.replicatedTombstoneSweeper
+    + " Non-replicate Queue=" + this.nonReplicatedTombstoneSweeper;
   }  
+  public Object getBlockGCLock() {
+    return this.replicatedTombstoneSweeper.getBlockGCLock();
+  }
   private static class Tombstone extends CompactVersionHolder {
     // tombstone overhead size
     public static int PER_TOMBSTONE_OVERHEAD = ReflectionSingleObjectSizer.REFERENCE_SIZE // queue's reference to the tombstone
@@ -515,55 +343,53 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       return sb.toString();
     }
   }
-  
-  private static class TombstoneSweeper implements Runnable {
-    /**
-     * the expiration time for tombstones in this sweeper
-     */
-    private final long expiryTime;
-    /**
-     * the current tombstones.  These are queued for expiration.  When tombstones
-     * are resurrected they are left in this queue and the sweeper thread
-     * figures out that they are no longer valid tombstones.
-     */
-    Queue<Tombstone> tombstones;
-    /**
-     * The size, in bytes, of the queue
-     */
-    AtomicLong queueSize = new AtomicLong();
-    /**
-     * the thread that handles tombstone expiration.  It reads from the
-     * tombstone queue.
-     */
-    Thread sweeperThread;
-    /**
-     * whether this sweeper accumulates expired tombstones for batch removal
-     */
-    boolean batchMode;
-    /**
-     * this suspends batch expiration.  It is intended for administrative use
-     * so an operator can suspend the garbage-collection of tombstones for
-     * replicated/partitioned regions if a persistent member goes off line
-     */
-    volatile boolean batchExpirationSuspended;
-    /**
-     * The sweeper thread's current tombstone
-     */
-    Tombstone currentTombstone;
+  private static class NonReplicateTombstoneSweeper extends TombstoneSweeper {
+    NonReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion) {
+      super(cacheTime, stats, cancelCriterion, NON_REPLICATE_TOMBSTONE_TIMEOUT, "Non-replicate Region Garbage Collector");
+    }
+
+    @Override
+    protected boolean removeExpiredIf(Predicate<Tombstone> predicate) {
+      return false;
+    }
+    @Override protected void updateStatistics() {
+      stats.setNonReplicatedTombstonesSize(getMemoryEstimate());
+    }
+    @Override protected boolean hasExpired(long msTillHeadTombstoneExpires) {
+      return msTillHeadTombstoneExpires <= 0;
+    }
+    @Override protected void expireTombstone(Tombstone tombstone) {
+      if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+        logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", tombstone);
+      }
+      updateMemoryEstimate(-tombstone.getSize());
+      tombstone.region.getRegionMap().removeTombstone(tombstone.entry, tombstone, false, true);
+    }
+    @Override
+    protected void checkExpiredTombstoneGC() {
+    }
+    @Override
+    protected void handleNoUnexpiredTombstones() {
+    }
+    @Override
+    boolean testHook_forceExpiredTombstoneGC(int count) throws InterruptedException {
+      return true;
+    }
+    @Override
+    protected void beforeSleepChecks() {
+    }
+  }
+
+  private static class ReplicateTombstoneSweeper extends TombstoneSweeper {
     /**
-     * a lock protecting the value of currentTombstone from changing
+     * Used to execute batch gc message execution in the background.
      */
-    final StoppableReentrantLock currentTombstoneLock;
+    private final ExecutorService executor;
     /**
      * tombstones that have expired and are awaiting batch removal.  This
      * variable is only accessed by the sweeper thread and so is not guarded
      */
-    Set<Tombstone> expiredTombstones;
-    
-    /**
-     * count of entries to forcibly expire due to memory events
-     */
-    private long forceExpirationCount = 0;
+    private final List<Tombstone> expiredTombstones;
     
     /**
      * Force batch expiration
@@ -572,92 +398,75 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
     
     /**
      * Is a batch expiration in progress?
+     * Part of expireBatch is done in a background thread
+     * and until that completes batch expiration is in progress.
      */
     private volatile boolean batchExpirationInProgress;
     
+    private final Object blockGCLock = new Object();
+    private int progressingDeltaGIICount; 
+    
     /**
-     * A test hook to force expiration of tombstones.
+     * A test hook to force a call to expireBatch.
+     * The call will only happen after testHook_forceExpirationCount
+     * goes to zero.
+     * This latch is counted down at the end of expireBatch.
      * See @{link {@link TombstoneService#forceBatchExpirationForTests(int)}
      */
-    private CountDownLatch testHook_batchExpired;
-
+    private CountDownLatch testHook_forceBatchExpireCall;
     /**
-     * the cache that owns all of the tombstones in this sweeper
+     * count of tombstones to forcibly expire
      */
-    private GemFireCacheImpl cache;
-    
-    private volatile boolean isStopped;
-    
-    TombstoneSweeper(GemFireCacheImpl cache,
-        Queue<Tombstone> tombstones,
-        long expiryTime,
-        boolean batchMode,
-        AtomicLong queueSize) {
-      this.cache = cache;
-      this.expiryTime = expiryTime;
-      this.tombstones = tombstones;
-      this.queueSize = queueSize;
-      if (batchMode) {
-        this.batchMode = true;
-        this.expiredTombstones = new HashSet<Tombstone>();
-      }
-      this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
-    }
-    
-    /** stop tombstone removal for sweepers that have batchMode==true */
-    @SuppressWarnings("unused")
-    void suspendBatchExpiration() {
-      this.batchExpirationSuspended = true;
+    private int testHook_forceExpirationCount = 0;
+
+    ReplicateTombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, ExecutorService executor) {
+      super(cacheTime, stats, cancelCriterion, REPLICATE_TOMBSTONE_TIMEOUT, "Replicate/Partition Region Garbage Collector");
+      this.expiredTombstones = new ArrayList<Tombstone>();
+      this.executor = executor;
     }
     
-    
-    /** enables tombstone removal for sweepers that have batchMode==true */
-    @SuppressWarnings("unused")
-    void resumeBatchExpiration () {
-      if (this.batchExpirationSuspended) {
-        this.batchExpirationSuspended = false; // volatile write
+    public int decrementGCBlockCount() {
+      synchronized(getBlockGCLock()) {
+        return --progressingDeltaGIICount;
       }
     }
-    
-    /** force a batch GC */
-    void forceBatchExpiration() {
-      this.forceBatchExpiration = true;
-      //this.forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size() + 1;
+
+    public int incrementGCBlockCount() {
+      synchronized(getBlockGCLock()) {
+        return ++progressingDeltaGIICount;
+      }
     }
-    
-    /** if we should GC the batched tombstones, this method will initiate the operation */
-    private void processBatch() {
-      if ((!batchExpirationSuspended &&
-          (this.forceBatchExpiration || (this.expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT)))
-        || testHook_batchExpired != null) {
-        this.forceBatchExpiration = false;
-        expireBatch();
+
+    public int getGCBlockCount() {
+      synchronized(getBlockGCLock()) {
+        return progressingDeltaGIICount;
       }
     }
-    
-    /** test hook - unsafe since not synchronized */
-    boolean hasExpiredTombstone(LocalRegion r, RegionEntry re, VersionTag tag) {
-      int entryVersion = tag.getEntryVersion();
-      boolean retry;
-      do {
-        retry = false;
-        try {
-          for (Tombstone t: this.expiredTombstones) {
-            if (t.region == r
-                && t.entry.getKey().equals(re.getKey())
-                && t.getEntryVersion() == entryVersion) {
-              return true;
-            }
+
+    public Object getBlockGCLock() {
+      return blockGCLock;
+    }
+
+    @Override
+    protected boolean removeExpiredIf(Predicate<Tombstone> predicate) {
+      boolean result = false;
+      long removalSize = 0;
+      synchronized(getBlockGCLock()) {
+        // Iterate in reverse order to optimize lots of removes.
+        // Since expiredTombstones is an ArrayList removing from
+        // low indexes requires moving everything at a higher index down.
+        for (int idx=expiredTombstones.size()-1; idx >= 0; idx--) {
+          Tombstone t = expiredTombstones.get(idx);
+          if (predicate.test(t)) {
+            removalSize += t.getSize();
+            expiredTombstones.remove(idx);
+            result = true;
           }
-        } catch (ConcurrentModificationException e) {
-          retry = true;
         }
-      } while (retry);
-      return false;
+      }
+      updateMemoryEstimate(-removalSize);
+      return result;
     }
-    
-    
-    
     /** expire a batch of tombstones */
     private void expireBatch() {
       // fix for bug #46087 - OOME due to too many GC threads
@@ -666,8 +475,8 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         // because the sweeper thread will just try again after its next sleep (max sleep is 10 seconds)
         return;
       }
-      synchronized(cache.getTombstoneService().blockGCLock) {
-        int count = cache.getTombstoneService().getGCBlockCount();
+      synchronized(getBlockGCLock()) {
+        int count = getGCBlockCount();
         if (count > 0) {
           // if any delta GII is on going as provider at this member, not to do tombstone GC
           if (logger.isDebugEnabled()) {
@@ -679,23 +488,26 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       this.batchExpirationInProgress = true;
       boolean batchScheduled = false;
       try {
-        final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
-        Set<Tombstone> expired = expiredTombstones;
-        long removalSize = 0;
-        expiredTombstones = new HashSet<Tombstone>();
-        if (expired.size() == 0) {
-          return;
-        }
 
+        // TODO seems like no need for the value of this map to be a Set.
+        // It could instead be a List, which would be nice because the per entry
+        // memory overhead for a set is much higher than an ArrayList
+        // BUT we send it to clients and the old
+        // version of them expects it to be a Set.
+        final Map<DistributedRegion, Set<Object>> reapedKeys = new HashMap<>();
+        
         //Update the GC RVV for all of the affected regions.
         //We need to do this so that we can persist the GC RVV before
         //we start removing entries from the map.
-        for (Tombstone t: expired) {
-          t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
-          regionsAffected.add((DistributedRegion)t.region);
+        for (Tombstone t: expiredTombstones) {
+          DistributedRegion tr = (DistributedRegion)t.region;
+          tr.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
+          if (!reapedKeys.containsKey(tr)) {
+            reapedKeys.put(tr, Collections.emptySet());
+          }
         }
-        
-        for (DistributedRegion r: regionsAffected) {
+
+        for (DistributedRegion r: reapedKeys.keySet()) {
           //Remove any exceptions from the RVV that are older than the GC version
           r.getVersionVector().pruneOldExceptions();
 
@@ -708,32 +520,33 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
           }
         }
 
-        final Map<LocalRegion, Set<Object>> reapedKeys = new HashMap<LocalRegion, Set<Object>>();
-        
         //Remove the tombstones from the in memory region map.
-        for (Tombstone t: expired) {
+        removeExpiredIf(t -> {
           // for PR buckets we have to keep track of the keys removed because clients have
           // them all lumped in a single non-PR region
-          if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && t.region.isUsedForPartitionedRegionBucket()) {
-            Set<Object> keys = reapedKeys.get(t.region);
-            if (keys == null) {
+          DistributedRegion tr = (DistributedRegion) t.region;
+          boolean tombstoneWasStillInRegionMap = tr.getRegionMap().removeTombstone(t.entry, t, false, true);
+          if (tombstoneWasStillInRegionMap && tr.isUsedForPartitionedRegionBucket()) {
+            Set<Object> keys = reapedKeys.get(tr);
+            if (keys.isEmpty()) {
               keys = new HashSet<Object>();
-              reapedKeys.put(t.region, keys);
+              reapedKeys.put(tr, keys);
             }
             keys.add(t.entry.getKey());
           }
-          removalSize += t.getSize();
-        }
+          return true;
+        });
 
-        this.queueSize.addAndGet(-removalSize);
         // do messaging in a pool so this thread is not stuck trying to
         // communicate with other members
-        cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+        executor.execute(new Runnable() {
           public void run() {
             try {
               // this thread should not reference other sweeper state, which is not synchronized
-              for (DistributedRegion r: regionsAffected) {
-                r.distributeTombstoneGC(reapedKeys.get(r));
+              for (Map.Entry<DistributedRegion, Set<Object>> mapEntry: reapedKeys.entrySet()) {
+                DistributedRegion r = mapEntry.getKey();
+                Set<Object> rKeysReaped = mapEntry.getValue();
+                r.distributeTombstoneGC(rKeysReaped);
               }
             } finally {
               batchExpirationInProgress = false;
@@ -742,8 +555,8 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
         });
         batchScheduled = true;
       } finally {
-        if(testHook_batchExpired != null) {
-          testHook_batchExpired.countDown();
+        if(testHook_forceBatchExpireCall != null) {
+          testHook_forceBatchExpireCall.countDown();
         }
         if (!batchScheduled) {
           batchExpirationInProgress = false;
@@ -751,219 +564,279 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
       }
       } // sync on deltaGIILock
     }
+    @Override
+    protected void checkExpiredTombstoneGC() {
+      if (shouldCallExpireBatch()) {
+        this.forceBatchExpiration = false;
+        expireBatch();
+      }
+      checkIfBatchExpirationShouldBeForced();
+    }
+    private boolean shouldCallExpireBatch() {
+      if (testHook_forceExpirationCount > 0) {
+        return false;
+      }
+      if (forceBatchExpiration) {
+        return true;
+      }
+      if (testHook_forceBatchExpireCall != null) {
+        return true;
+      }
+      if (expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT) {
+        return true;
+      }
+      return false;
+    }
+    private void testHookIfIdleExpireBatch() {
+      if (IDLE_EXPIRATION && sleepTime >= EXPIRY_TIME && !this.expiredTombstones.isEmpty()) {
+        expireBatch();
+      }
+    }
+    @Override protected void updateStatistics() {
+      stats.setReplicatedTombstonesSize(getMemoryEstimate());
+    }
+    private void checkIfBatchExpirationShouldBeForced() {
+      if (testHook_forceExpirationCount > 0) {
+        return;
+      }
+      if (GC_MEMORY_THRESHOLD <= 0.0) {
+        return;
+      }
+      if (this.batchExpirationInProgress) {
+        return;
+      }
+      if (this.expiredTombstones.size() <= (EXPIRED_TOMBSTONE_LIMIT / 4)) {
+        return;
+      }
+      if (FORCE_GC_MEMORY_EVENTS || isFreeMemoryLow()) {
+        forceBatchExpiration = true;
+        if (logger.isDebugEnabled()) {
+          logger.debug("forcing batch expiration due to low memory conditions");
+        }
+      }
+    }
+    private boolean isFreeMemoryLow() {
+      Runtime rt = Runtime.getRuntime();
+      long unusedMemory = rt.freeMemory(); // "free" is how much space we have allocated that is currently not used
+      long totalMemory = rt.totalMemory(); // "total" is how much space we have allocated
+      long maxMemory = rt.maxMemory(); // "max" is how much space we can allocate
+      unusedMemory += (maxMemory-totalMemory); // "max-total" is how much space we have that has not yet been allocated
+      return unusedMemory / (totalMemory * 1.0) < GC_MEMORY_THRESHOLD;
+    }
+    @Override protected boolean hasExpired(long msTillHeadTombstoneExpires) {
+      if (testHook_forceExpirationCount > 0) {
+        testHook_forceExpirationCount--;
+        return true;
+      }
+      return msTillHeadTombstoneExpires <= 0;
+    }
+    @Override protected void expireTombstone(Tombstone tombstone) {
+      if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+        logger.trace(LogMarker.TOMBSTONE, "adding expired tombstone {} to batch", tombstone);
+      }
+      expiredTombstones.add(tombstone);
+    }
+    @Override protected void handleNoUnexpiredTombstones() {
+      testHook_forceExpirationCount = 0;
+    }
+    @Override
+    public String toString() {
+      return super.toString() + " batchedExpiredTombstones[" + expiredTombstones.size() + "] = " + expiredTombstones.toString();
+    }
+
+    @Override
+    boolean testHook_forceExpiredTombstoneGC(int count) throws InterruptedException {
+      // sync on blockGCLock since expireBatch syncs on it
+      synchronized(getBlockGCLock()) {
+        testHook_forceBatchExpireCall = new CountDownLatch(1);
+      }
+      try {
+        synchronized(this) {
+          testHook_forceExpirationCount += count;
+          notifyAll();
+        }
+        //Wait for 30 seconds. If we wait longer, we risk hanging the tests if
+        //something goes wrong.
+        return testHook_forceBatchExpireCall.await(30, TimeUnit.SECONDS);
+      } finally {
+        testHook_forceBatchExpireCall=null;
+      }
+    }
+
+    @Override
+    protected void beforeSleepChecks() {
+      testHookIfIdleExpireBatch();
+    }
+    @Override
+    public long getScheduledTombstoneCount() {
+      return super.getScheduledTombstoneCount() + this.expiredTombstones.size();
+    }
+  }
+  
+  private static abstract class TombstoneSweeper implements Runnable {
+    /**
+     * the expiration time for tombstones in this sweeper
+     */
+    protected final long EXPIRY_TIME;
+    /**
+     * The minimum amount of elapsed time, in millis, between purges.
+     */
+    private final long PURGE_INTERVAL;
+    /**
+     * How long the sweeper should sleep.
+     */
+    protected long sleepTime;
+    /**
+     * Estimate of how long, in millis, it will take to do a purge of obsolete tombstones.
+     */
+    private long minimumPurgeTime = 1;
+    /**
+     * Timestamp of when the last purge was done.
+     */
+    private long lastPurgeTimestamp;
+    /**
+     * the current tombstones.  These are queued for expiration.  When tombstones
+     * are resurrected they are left in this queue and the sweeper thread
+     * figures out that they are no longer valid tombstones.
+     */
+    private final Queue<Tombstone> tombstones;
+    /**
+     * Estimate of the amount of memory used by this sweeper
+     */
+    private final AtomicLong memoryUsedEstimate;
+    /**
+     * the thread that handles tombstone expiration.
+     */
+    private final Thread sweeperThread;
+    /**
+     * A lock protecting the head of the tombstones queue.
+     * Operations that may remove the head need to hold this lock.
+     */
+    private final StoppableReentrantLock queueHeadLock;
+    
+
+    protected final CacheTime cacheTime;
+    protected final CachePerfStats stats;
+    private final CancelCriterion cancelCriterion;
+    
+    private volatile boolean isStopped;
     
+    TombstoneSweeper(CacheTime cacheTime, CachePerfStats stats, CancelCriterion cancelCriterion, 
+        long expiryTime,
+        String threadName) {
+      this.cacheTime = cacheTime;
+      this.stats = stats;
+      this.cancelCriterion = cancelCriterion;
+      this.EXPIRY_TIME = expiryTime;
+      this.PURGE_INTERVAL = Math.min(DEFUNCT_TOMBSTONE_SCAN_INTERVAL, expiryTime);
+      this.tombstones = new ConcurrentLinkedQueue<Tombstone>();
+      this.memoryUsedEstimate = new AtomicLong();
+      this.queueHeadLock = new StoppableReentrantLock(cancelCriterion);
+      this.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors", logger), this);
+      this.sweeperThread.setDaemon(true);
+      this.sweeperThread.setName(threadName);
+      this.lastPurgeTimestamp = getNow();
+    }
+
+    public void unscheduleTombstones(final LocalRegion r) {
+      this.removeIf(t -> {
+        if (t.region == r) {
+          return true;
+        }
+        return false;
+      });
+    }
+
     /**
-     * The run loop picks a tombstone off of the expiration queue and waits
-     * for it to expire.  It also periodically scans for resurrected tombstones
-     * and handles batch expiration.  Batch expiration works by tossing the
-     * expired tombstones into a set and delaying the removal of those tombstones
-     * from the Region until scheduled points in the calendar.  
+     * For each unexpired tombstone this sweeper knows about call the predicate.
+     * If the predicate returns true then remove the tombstone from any storage
+     * and update the memory estimate.
+     * @return true if predicate ever returned true
      */
+    private boolean removeUnexpiredIf(Predicate<Tombstone> predicate) {
+      boolean result = false;
+      long removalSize = 0;
+      lockQueueHead();
+      try {
+        for (Iterator<Tombstone> it=getQueue().iterator(); it.hasNext(); ) {
+          Tombstone t = it.next();
+          if (predicate.test(t)) {
+            removalSize += t.getSize();
+            it.remove();
+            result = true;
+          }
+        }
+      } finally {
+        unlockQueueHead();
+      }
+      updateMemoryEstimate(-removalSize);
+      return result;
+    }
+    
+    /**
+     * For all tombstone this sweeper knows about call the predicate.
+     * If the predicate returns true then remove the tombstone from any storage
+     * and update the memory estimate.
+     * @return true if predicate ever returned true
+     */
+    private boolean removeIf(Predicate<Tombstone> predicate) {
+      return removeUnexpiredIf(predicate) || removeExpiredIf(predicate);
+    }
+
+    synchronized void start() {
+      this.sweeperThread.start();
+    }
+
+    synchronized void stop() {
+      this.isStopped = true;
+      if (this.sweeperThread != null) {
+        notifyAll();
+      }
+      try {
+        this.sweeperThread.join(100);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      getQueue().clear();
+    }
+
+    private void lockQueueHead() {
+      this.queueHeadLock.lock();
+    }
+    private void unlockQueueHead() {
+      this.queueHeadLock.unlock();
+    }
+    
+    public long getMemoryEstimate() {
+      return this.memoryUsedEstimate.get();
+    }
+
+    public void updateMemoryEstimate(long delta) {
+      this.memoryUsedEstimate.addAndGet(delta);
+    }
+
+    protected Queue<Tombstone> getQueue() {
+      return this.tombstones;
+    }
+
+    void scheduleTombstone(Tombstone ts) {
+      this.tombstones.add(ts);
+      updateMemoryEstimate(ts.getSize());
+    }
+    
     public void run() {
-      long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will not work on something younger than this
-      long maximumSleepTime = 10000;
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-        logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.expiryTime);
-      }
-      currentTombstone = null;
-      // millis we need to run a scan of queue and batch set for resurrected tombstones
-      long minimumScanTime = 100;
-      // how often to perform the scan
-      long scanInterval = Math.min(DEFUNCT_TOMBSTONE_SCAN_INTERVAL, expiryTime);
-      long lastScanTime = this.cache.cacheTimeMillis();
-      
-      while (!isStopped && cache.getCancelCriterion().cancelInProgress() == null) {
-        Throwable problem = null;
+        logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with sleep interval of {} milliseconds", EXPIRY_TIME);
+      }
+      while (!isStopped && cancelCriterion.cancelInProgress() == null) {
         try {
-          if (this.batchMode) {
-            cache.getCachePerfStats().setReplicatedTombstonesSize(queueSize.get());
-          } else {
-            cache.getCachePerfStats().setNonReplicatedTombstonesSize(queueSize.get());
-          }
+          updateStatistics();
           SystemFailure.checkFailure();
-          long now = this.cache.cacheTimeMillis();
-          if (forceExpirationCount <= 0) {
-            if (this.batchMode) {
-              processBatch();
-            }
-            // if we're running out of memory we get a little more aggressive about
-            // the size of the batch we'll expire
-            if (GC_MEMORY_THRESHOLD > 0 && this.batchMode) {
-              // check to see how we're doing on memory
-              Runtime rt = Runtime.getRuntime();
-              long freeMemory = rt.freeMemory();
-              long totalMemory = rt.totalMemory();
-              long maxMemory = rt.maxMemory();
-              freeMemory += (maxMemory-totalMemory);
-              if (FORCE_GC_MEMORY_EVENTS ||
-                  freeMemory / (totalMemory * 1.0) < GC_MEMORY_THRESHOLD) {
-                forceBatchExpiration = !this.batchExpirationInProgress &&
-                       this.expiredTombstones.size() > (EXPIRED_TOMBSTONE_LIMIT / 4);
-                if (forceBatchExpiration) {
-                  if (logger.isDebugEnabled()) {
-                    logger.debug("forcing batch expiration due to low memory conditions");
-                  }
-                }
-                // forcing expiration of tombstones that have not timed out can cause inconsistencies
-                // too easily
-  //              if (this.batchMode) {
-  //                forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size();
-  //              } else {
-  //                forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT;
-  //              }
-  //              maximumSleepTime = 1000;
-              }
-            }
-          }
-          if (currentTombstone == null) {
-            try {
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = tombstones.remove();
-              } finally {
-                currentTombstoneLock.unlock();
-              }
-              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", currentTombstone);
-              }
-            } catch (NoSuchElementException e) {
-              // expected
-              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
-              }
-              forceExpirationCount = 0;
-            }
-          }
-          long sleepTime;
-          if (currentTombstone == null) {
-            sleepTime = expiryTime;
-          } else if (currentTombstone.getVersionTimeStamp()+expiryTime > now && (forceExpirationCount <= 0 || (currentTombstone.getVersionTimeStamp() + expiryTime - now) <= minimumRetentionMs)) {
-            sleepTime = currentTombstone.getVersionTimeStamp()+expiryTime - now;
-          } else {
-            if (forceExpirationCount > 0) {
-              forceExpirationCount--;
-            }
-            sleepTime = 0;
-            try {
-              if (batchMode) {
-                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
-                }
-                expiredTombstones.add(currentTombstone);
-              } else {
-                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", currentTombstone);
-                }
-                queueSize.addAndGet(-currentTombstone.getSize());
-                currentTombstone.region.getRegionMap().removeTombstone(currentTombstone.entry, currentTombstone, false, true);
-              }
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = null;
-              } finally {
-                currentTombstoneLock.unlock();
-              }
-            } catch (CancelException e) {
-              return;
-            } catch (Exception e) {
-              logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = null;
-              } finally {
-                currentTombstoneLock.unlock();
-              }
-            }
-          }
-          if (sleepTime > 0) {
-            // initial sleeps could be very long, so we reduce the interval to allow
-            // this thread to periodically sweep up tombstones for resurrected entries
-            sleepTime = Math.min(sleepTime, scanInterval);
-            if (sleepTime > minimumScanTime  &&  (now - lastScanTime) > scanInterval) {
-              lastScanTime = now;
-              long start = now;
-              // see if any have been superseded
-              for (Iterator<Tombstone> it = tombstones.iterator(); it.hasNext(); ) {
-                Tombstone test = it.next();
-                if (it.hasNext()) {
-                  if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
-                    it.remove();
-                    this.queueSize.addAndGet(-test.getSize());
-                    if (test == currentTombstone) {
-                      currentTombstoneLock.lock();
-                      try {
-                        currentTombstone = null;
-                      } finally {
-                        currentTombstoneLock.unlock();
-                      }
-                      sleepTime = 0;
-                    }
-                  } else if (batchMode && test != currentTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
-                    it.remove();
-                    this.queueSize.addAndGet(-test.getSize());
-                    if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                      logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
-                    }
-                    expiredTombstones.add(test);
-                    sleepTime = 0;
-                  }
-                }
-              }
-              // now check the batch of timed-out tombstones, if there is one
-              if (batchMode) {
-                for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
-                  Tombstone test = it.next();
-                  if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
-                    if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                      logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
-                    }
-                    it.remove();
-                    this.queueSize.addAndGet(-test.getSize());
-                    if (test == currentTombstone) {
-                      currentTombstoneLock.lock();
-                      try {
-                        currentTombstone = null;
-                      } finally {
-                        currentTombstoneLock.unlock();
-                      }
-                      sleepTime = 0;
-                    }
-                  }
-                }
-              }
-              if (sleepTime > 0) {
-                long elapsed = this.cache.cacheTimeMillis() - start;
-                sleepTime = sleepTime - elapsed;
-                if (sleepTime <= 0) {
-                  minimumScanTime = elapsed;
-                  continue;
-                }
-              }
-            }
-            // test hook:  if there are expired tombstones and nothing else is expiring soon,
-            // perform distributed tombstone GC
-            if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
-              if (this.expiredTombstones.size() > 0) {
-                expireBatch();
-              }
-            }
-            if (sleepTime > 0) {
-              try {
-                sleepTime = Math.min(sleepTime, maximumSleepTime);
-                if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
-                }
-                synchronized(this) {
-                  if(isStopped) {
-                    return;
-                  }
-                  this.wait(sleepTime);
-                }
-              } catch (InterruptedException e) {
-                return;
-              }
-            }
-          } // sleepTime > 0
+          final long now = getNow();
+          checkExpiredTombstoneGC();
+          checkOldestUnexpired(now);
+          purgeObsoleteTombstones(now);
+          doSleep();
         } catch (CancelException e) {
           break;
         } catch (VirtualMachineError err) { // GemStoneAddition
@@ -973,27 +846,135 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
           throw err;
         } catch (Throwable e) {
           SystemFailure.checkFailure();
-          problem = e;
-        }
-        if (problem != null) {
-          logger.fatal(LocalizedMessage.create(LocalizedStrings.TombstoneService_UNEXPECTED_EXCEPTION), problem);
+          logger.fatal(LocalizedMessage.create(LocalizedStrings.TombstoneService_UNEXPECTED_EXCEPTION), e);
         }
       } // while()
     } // run()
-    
-  } // class TombstoneSweeper
 
-  /* (non-Javadoc)
-   * @see com.gemstone.gemfire.internal.cache.control.ResourceListener#onEvent(java.lang.Object)
-   */
-  @Override
-  public void onEvent(MemoryEvent event) {
-    if (event.isLocal()) {
-      if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
-        this.replicatedTombstoneSweeper.forceBatchExpiration();
+    private long getNow() {
+      return cacheTime.cacheTimeMillis();
+    }
+
+    private void doSleep() {
+      if (sleepTime <= 0) {
+        return;
+      }
+      beforeSleepChecks();
+      sleepTime = Math.min(sleepTime, MAX_SLEEP_TIME);
+      if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+        logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
+      }
+      synchronized(this) {
+        if (isStopped) {
+          return;
+        }
+        try {
+          this.wait(sleepTime);
+        } catch (InterruptedException e) {
+        }
       }
     }
-  }
 
+   private void purgeObsoleteTombstones(final long now) {
+      if (minimumPurgeTime > sleepTime) {
+        // the purge might take minimumScanTime
+        // and we have something to do sooner
+        // than that so return
+        return;
+      }
+      if ((now - lastPurgeTimestamp) < PURGE_INTERVAL) {
+        // the time since the last purge
+        // is less than the configured interval
+        // so return
+        return;
+      }
+      lastPurgeTimestamp = now;
+      long start = now;
+      // see if any have been superseded
+      boolean removedObsoleteTombstone = removeIf(tombstone -> {
+        if (tombstone.region.getRegionMap().isTombstoneNotNeeded(tombstone.entry, tombstone.getEntryVersion())) {
+          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+            logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", tombstone);
+          }
+          return true;
+        }
+        return false;
+      });
+      if (removedObsoleteTombstone) {
+        sleepTime = 0;
+      } else {
+        long elapsed = getNow() - start;
+        sleepTime -= elapsed;
+        if (sleepTime <= 0) {
+          minimumPurgeTime = elapsed;
+        }
+      }
+    }
+
+    /**
+     * See if the oldest unexpired tombstone should be expired.
+     */
+    private void checkOldestUnexpired(long now) {
+      sleepTime = 0;
+      lockQueueHead();
+      Tombstone oldest = tombstones.peek();
+      try {
+        if (oldest == null) {
+          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+            logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+          }
+          handleNoUnexpiredTombstones();
+          sleepTime = EXPIRY_TIME;
+        } else {
+          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+            logger.trace(LogMarker.TOMBSTONE, "oldest unexpired tombstone is {}", oldest);
+          }
+          long msTillHeadTombstoneExpires = oldest.getVersionTimeStamp() + EXPIRY_TIME - now;
+          if (hasExpired(msTillHeadTombstoneExpires)) {
+            try {
+              tombstones.remove();
+              expireTombstone(oldest);
+            } catch (CancelException e) {
+              // nothing needed
+            } catch (Exception e) {
+              logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
+            }
+          } else {
+            sleepTime = msTillHeadTombstoneExpires;
+          }
+        }
+      } finally {
+        unlockQueueHead();
+      }
+    }
+    
+    public long getScheduledTombstoneCount() {
+      return getQueue().size();
+    }
+    
+    @Override
+    public String toString() {
+      return "[" + getQueue().size() + "] " + getQueue().toString();
+    }
 
+    /**
+     * For each expired tombstone this sweeper knows about call the predicate.
+     * If the predicate returns true then remove the tombstone from any storage
+     * and update the memory estimate.
+     * <p>Some sweepers batch up the expired tombstones to gc them later.
+     * @return true if predicate ever returned true
+     */
+    protected abstract boolean removeExpiredIf(Predicate<Tombstone> predicate);
+    /** see if the already expired tombstones should be processed */
+    protected abstract void checkExpiredTombstoneGC();
+    protected abstract void handleNoUnexpiredTombstones();
+    protected abstract boolean hasExpired(long msTillTombstoneExpires);
+    protected abstract void expireTombstone(Tombstone tombstone);
+    protected abstract void updateStatistics();
+    /**
+     * Do anything needed before the sweeper sleeps.
+     */
+    protected abstract void beforeSleepChecks();
+    abstract boolean testHook_forceExpiredTombstoneGC(int count) throws InterruptedException;
+  } // class TombstoneSweeper
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index 4269e7f..427ebfe 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@ -2881,6 +2881,9 @@ public class CacheClientProxy implements ClientSession {
       } finally {
         this.socketWriteLock.unlock();
       }
+      if (logger.isTraceEnabled()) {
+        logger.trace("{}: Sent {}", this, message);
+      }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
index 652bd6b..3816883 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -329,7 +329,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
         RegionEntry entry = CCRegion.getRegionEntry("cckey0");
         VersionTag tag = entry.getVersionStamp().asVersionTag();
         assertTrue(tag.getEntryVersion() > 1);
-        tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT - 1000);
+        tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT - 1000);
         entry.getVersionStamp().setVersionTimeStamp(tag.getVersionTimeStamp());
         try {
           entry.makeTombstone(CCRegion, tag);
@@ -368,10 +368,10 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
     final String name = this.getUniqueName() + "-CC";
 
 
-    final long saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
-    final long saveTombstoneTimeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
+    final int saveExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+    final long saveTombstoneTimeout = TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT;
     TombstoneService.EXPIRED_TOMBSTONE_LIMIT = 50;
-    TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = 500;
+    TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = 500;
     try {
       // create some destroyed entries so the GC service is populated
       RegionFactory f = getCache().createRegionFactory(getRegionAttributes());
@@ -400,7 +400,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
     } finally {
       TombstoneService.EXPIRED_TOMBSTONE_LIMIT = saveExpiredTombstoneLimit;
       TombstoneService.FORCE_GC_MEMORY_EVENTS = false;
-      TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = saveTombstoneTimeout;
+      TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = saveTombstoneTimeout;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java
index 1458e4f..fa92c9a 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/GlobalRegionCCEDUnitTest.java
@@ -200,7 +200,7 @@ public class GlobalRegionCCEDUnitTest extends GlobalRegionDUnitTest {
         VersionTag tag = entry.getVersionStamp().asVersionTag();
         assertTrue(tag.getEntryVersion() > 1);
         tag.setVersionTimeStamp(System.currentTimeMillis()
-            - TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT - 1000);
+            - TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT - 1000);
         entry.getVersionStamp().setVersionTimeStamp(tag.getVersionTimeStamp());
         try {
           entry.makeTombstone(CCRegion, tag);



[7/7] incubator-geode git commit: Merge branch 'develop' into feature/GEODE-1571

Posted by ji...@apache.org.
Merge branch 'develop' into feature/GEODE-1571


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/bab4e626
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/bab4e626
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/bab4e626

Branch: refs/heads/feature/GEODE-1571
Commit: bab4e6268561450f52e5783eacf579d0583b4e93
Parents: db456f6 b9da9e6
Author: Jinmei Liao <ji...@pivotal.io>
Authored: Thu Jul 7 11:12:52 2016 -0700
Committer: Jinmei Liao <ji...@pivotal.io>
Committed: Thu Jul 7 11:12:52 2016 -0700

----------------------------------------------------------------------
 .../DataSerializerRecoveryListener.java         |   33 +-
 .../gemfire/distributed/internal/CacheTime.java |   29 +
 .../gemfire/distributed/internal/DSClock.java   |    7 +-
 .../internal/cache/AbstractRegionEntry.java     |    2 +-
 .../internal/cache/AbstractRegionMap.java       |   46 +-
 .../gemfire/internal/cache/BucketRegion.java    |   18 +-
 .../internal/cache/GemFireCacheImpl.java        |    3 +-
 .../internal/cache/InitialImageOperation.java   |    4 +-
 .../gemfire/internal/cache/LocalRegion.java     |   25 +-
 .../internal/cache/PartitionedRegion.java       |    4 +
 .../gemfire/internal/cache/ProxyRegionMap.java  |    7 -
 .../gemfire/internal/cache/RegionMap.java       |    5 -
 .../gemfire/internal/cache/TXEntryState.java    |    4 +-
 .../internal/cache/TombstoneService.java        | 1211 +++++++++---------
 .../DestroyRegionOnDataStoreMessage.java        |    3 +-
 .../cache/tier/sockets/CacheClientProxy.java    |    3 +
 .../parallel/ParallelGatewaySenderQueue.java    |   18 +-
 .../DistributedAckRegionCCEDUnitTest.java       |   10 +-
 .../cache30/GlobalRegionCCEDUnitTest.java       |    2 +-
 .../gemfire/cache30/MultiVMRegionTestCase.java  |  107 +-
 .../internal/cache/GIIDeltaDUnitTest.java       |    8 +-
 ...rtitionedRegionDelayedRecoveryDUnitTest.java |    9 +-
 .../cache/TombstoneCreationJUnitTest.java       |    6 +-
 .../PersistentRVVRecoveryDUnitTest.java         |    9 +-
 .../cache/wan/AsyncEventQueueTestBase.java      |   32 +-
 .../asyncqueue/AsyncEventListenerDUnitTest.java |   98 ++
 .../lucene/internal/IndexRepositoryFactory.java |   71 +
 .../lucene/internal/LuceneEventListener.java    |    8 +-
 .../cache/lucene/internal/LuceneIndexImpl.java  |    2 +-
 .../internal/PartitionedRepositoryManager.java  |   28 +-
 .../repository/IndexRepositoryImpl.java         |    8 +-
 .../gemfire/cache/lucene/LuceneQueriesBase.java |   14 +-
 .../cache/lucene/LuceneQueriesPRBase.java       |  216 +++-
 .../lucene/LuceneQueriesPeerPRDUnitTest.java    |    8 +-
 .../LuceneQueriesPeerPRRedundancyDUnitTest.java |   54 +-
 .../internal/LuceneEventListenerJUnitTest.java  |    6 +-
 36 files changed, 1305 insertions(+), 813 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bab4e626/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bab4e626/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------


[2/7] incubator-geode git commit: GEODE-1558, GEODE-1609: Fixing failures due to rebalancing buckets during lucene index updates

Posted by ji...@apache.org.
GEODE-1558, GEODE-1609: Fixing failures due to rebalancing buckets during lucene index updates

There are two failures we're fixing here:
1) Calling repository.create for create events was leaving duplicate events in the
index because when a bucket fails over, the event is dispatched again on the new primary.
Using the isPossibleDuplicate flag did not work because it was not consistently set to
true for duplicate events. Changed the code to call repository.update even for create events

2) The async event queue was repeatedly dispatching the same events event after
a bucket moved to another node. We changed the async event queue code to filter out
events for buckets that are no longer present on this dispatching member.

Cleaning up the rebalancing test and adding new tests to make these scenarios
more reproducable.

This closes #176


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7b28a8d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7b28a8d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7b28a8d4

Branch: refs/heads/feature/GEODE-1571
Commit: 7b28a8d4e9dc4359d4d11d286f895537864f3a03
Parents: 4612a19
Author: Dan Smith <ds...@pivotal.io>
Authored: Tue Jun 28 15:33:51 2016 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Thu Jul 7 10:03:04 2016 -0700

----------------------------------------------------------------------
 .../parallel/ParallelGatewaySenderQueue.java    |  18 +-
 .../cache/wan/AsyncEventQueueTestBase.java      |  32 ++-
 .../asyncqueue/AsyncEventListenerDUnitTest.java |  98 +++++++++
 .../lucene/internal/IndexRepositoryFactory.java |  71 ++++++
 .../lucene/internal/LuceneEventListener.java    |   8 +-
 .../cache/lucene/internal/LuceneIndexImpl.java  |   2 +-
 .../internal/PartitionedRepositoryManager.java  |  28 +--
 .../repository/IndexRepositoryImpl.java         |   8 +-
 .../gemfire/cache/lucene/LuceneQueriesBase.java |  14 +-
 .../cache/lucene/LuceneQueriesPRBase.java       | 216 +++++++++++++++++--
 .../lucene/LuceneQueriesPeerPRDUnitTest.java    |   8 +-
 .../LuceneQueriesPeerPRRedundancyDUnitTest.java |  54 ++++-
 .../internal/LuceneEventListenerJUnitTest.java  |   6 +-
 13 files changed, 501 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index e0f8b6f..453e7f0 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1277,7 +1277,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     
     PartitionedRegion prQ = getRandomShadowPR();
-    List batch = new ArrayList();
+    List<GatewaySenderEventImpl> batch = new ArrayList<>();
     if (prQ == null || prQ.getLocalMaxMemory() == 0) {
       try {
         Thread.sleep(50);
@@ -1370,8 +1370,20 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     return batch;
   }
 
-  private void addPeekedEvents(List batch, int batchSize) {
+  private void addPeekedEvents(List<GatewaySenderEventImpl> batch, int batchSize) {
     if (this.resetLastPeeked) {
+
+      //Remove all entries from peekedEvents for buckets that are not longer primary
+      //This will prevent repeatedly trying to dispatch non-primary events
+      for(Iterator<GatewaySenderEventImpl> iterator = peekedEvents.iterator(); iterator.hasNext(); ) {
+        GatewaySenderEventImpl event = iterator.next();
+        final int bucketId = event.getBucketId();
+        final PartitionedRegion region = (PartitionedRegion) event.getRegion();
+        if(!region.getRegionAdvisor().isPrimaryForBucket(bucketId)) {
+          iterator.remove();
+        }
+      }
+
       if (this.peekedEventsProcessingInProgress) {
         // Peeked event processing is in progress. This means that the original peekedEvents
         // contained > batch size events due to a reduction in the batch size. Create a batch
@@ -1400,7 +1412,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     }
   }
 
-  private void addPreviouslyPeekedEvents(List batch, int batchSize) {
+  private void addPreviouslyPeekedEvents(List<GatewaySenderEventImpl> batch, int batchSize) {
     for (int i=0; i<batchSize; i++) {
       batch.add(this.peekedEventsProcessing.remove());
       if (this.peekedEventsProcessing.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
index 596756f..d7739c5 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -258,13 +258,26 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
   }
 
   public static void createAsyncEventQueue(String asyncChannelId,
-      boolean isParallel, Integer maxMemory, Integer batchSize,
-      boolean isConflation, boolean isPersistent, String diskStoreName,
-      boolean isDiskSynchronous) {
+                                           boolean isParallel, Integer maxMemory, Integer batchSize,
+                                           boolean isConflation, boolean isPersistent, String diskStoreName,
+                                           boolean isDiskSynchronous)
+  {
+    createAsyncEventQueue(asyncChannelId, isParallel, maxMemory, batchSize, isConflation, isPersistent, diskStoreName,
+      isDiskSynchronous, new MyAsyncEventListener());
+  }
+
+  public static void createAsyncEventQueue(
+    String asyncChannelId,
+    boolean isParallel,
+    Integer maxMemory,
+    Integer batchSize,
+    boolean isConflation,
+    boolean isPersistent,
+    String diskStoreName,
+    boolean isDiskSynchronous,
+    final AsyncEventListener asyncEventListener) {
     createDiskStore(asyncChannelId, diskStoreName);
 
-    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
-
     AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory, batchSize, isPersistent, diskStoreName);
     factory.setDiskSynchronous(isDiskSynchronous);
     factory.setBatchConflationEnabled(isConflation);
@@ -1387,15 +1400,20 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
     assertEquals(expectedToDataInvoations, filter.getNumToDataInvocations());
   }
 
-  public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
+  public static AsyncEventListener getAsyncEventListener(String asyncEventQueueId) {
     AsyncEventListener theListener = null;
 
     Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
     for (AsyncEventQueue asyncQueue : asyncEventQueues) {
       if (asyncEventQueueId.equals(asyncQueue.getId())) {
-        theListener = asyncQueue.getAsyncEventListener();
+        return asyncQueue.getAsyncEventListener();
       }
     }
+    return null;
+  }
+
+  public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
+    AsyncEventListener theListener = getAsyncEventListener(asyncEventQueueId);
 
     final Map eventsMap = ((MyAsyncEventListener)theListener).getEventsMap();
     assertNotNull(eventsMap);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index f96926f..f090402 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -18,26 +18,38 @@ package com.gemstone.gemfire.internal.cache.wan.asyncqueue;
 
 import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
 import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.internal.AvailablePortHelper;
 import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase;
 import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
+import com.gemstone.gemfire.test.dunit.VM;
 import com.gemstone.gemfire.test.dunit.Wait;
 import com.gemstone.gemfire.test.junit.categories.DistributedTest;
 import com.gemstone.gemfire.test.junit.categories.FlakyTest;
@@ -1562,4 +1574,90 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
     int vm3size = (Integer)vm3.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln" ));
     assertEquals(vm3size, 1000);
   }
+
+  @Test
+  public void testParallelAsyncEventQueueMoveBucketAndMoveItBackDuringDispatching() {
+    Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
+
+    vm1.invoke(createCacheRunnable(lnPort));
+    vm2.invoke(createCacheRunnable(lnPort));
+    final DistributedMember member1 = vm1.invoke(() -> cache.getDistributedSystem().getDistributedMember());
+    final DistributedMember member2 = vm2.invoke(() -> cache.getDistributedSystem().getDistributedMember());
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue("ln",
+        true, 100, 10, false, false, null, false, new BucketMovingAsyncEventListener(member2)));
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() ));
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln"));
+    vm1.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_PR",
+      113 ));
+
+    vm2.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue("ln",
+      true, 100, 10, false, false, null, false, new BucketMovingAsyncEventListener(member1)));
+
+    vm2.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln"));
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" ));
+    vm2.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" ));
+
+    Set<Object> allKeys = new HashSet<Object>();
+    allKeys.addAll(getKeysSeen(vm1, "ln"));
+    allKeys.addAll(getKeysSeen(vm2, "ln"));
+
+    final Set<Long> expectedKeys = LongStream.range(0, 113).mapToObj(Long::valueOf).collect(Collectors.toSet());
+    assertEquals(expectedKeys, allKeys);
+
+    assertTrue(getBucketMoved(vm1, "ln"));
+    assertTrue(getBucketMoved(vm2, "ln"));
+  }
+
+  private static Set<Object> getKeysSeen(VM vm, String asyncEventQueueId) {
+    return vm.invoke(() -> {
+      final BucketMovingAsyncEventListener listener = (BucketMovingAsyncEventListener) getAsyncEventListener(asyncEventQueueId);
+      return listener.keysSeen;
+    });
+  }
+
+  private static boolean getBucketMoved(VM vm, String asyncEventQueueId) {
+    return vm.invoke(() -> {
+      final BucketMovingAsyncEventListener listener = (BucketMovingAsyncEventListener) getAsyncEventListener(asyncEventQueueId);
+      return listener.moved;
+    });
+  }
+
+  private static final class BucketMovingAsyncEventListener implements AsyncEventListener {
+    private final DistributedMember destination;
+    private boolean moved;
+    private Set<Object> keysSeen = new HashSet<Object>();
+
+    public BucketMovingAsyncEventListener(final DistributedMember destination) {
+      this.destination = destination;
+    }
+
+    @Override public boolean processEvents(final List<AsyncEvent> events) {
+      if(!moved) {
+
+        AsyncEvent event1 = events.get(0);
+        moveBucket(destination, event1.getKey());
+        moved = true;
+        return false;
+      }
+
+      events.stream().map(AsyncEvent::getKey).forEach(keysSeen::add);
+      return true;
+    }
+
+    @Override public void close() {
+
+    }
+    private static void moveBucket(final DistributedMember destination, final Object key) {
+      Region<Object, Object> region = cache.getRegion(getTestMethodName() + "_PR");
+      DistributedMember source = cache.getDistributedSystem().getDistributedMember();
+      PartitionRegionHelper.moveBucketByKey(region, source, destination, key);
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
new file mode 100644
index 0000000..12f12ad
--- /dev/null
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+
+import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
+import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+
+public class IndexRepositoryFactory {
+
+  public IndexRepositoryFactory() {
+  }
+
+  public IndexRepository createIndexRepository(final Integer bucketId,
+                                        PartitionedRegion userRegion,
+                                        PartitionedRegion fileRegion,
+                                        PartitionedRegion chunkRegion,
+                                        LuceneSerializer serializer,
+                                        Analyzer analyzer,
+                                        LuceneIndexStats indexStats,
+                                        FileSystemStats fileSystemStats)
+    throws IOException
+  {
+    final IndexRepository repo;
+    BucketRegion fileBucket = getMatchingBucket(fileRegion, bucketId);
+    BucketRegion chunkBucket = getMatchingBucket(chunkRegion, bucketId);
+    if(fileBucket == null || chunkBucket == null) {
+      return null;
+    }
+    RegionDirectory dir = new RegionDirectory(fileBucket, chunkBucket, fileSystemStats);
+    IndexWriterConfig config = new IndexWriterConfig(analyzer);
+    IndexWriter writer = new IndexWriter(dir, config);
+    repo = new IndexRepositoryImpl(fileBucket, writer, serializer, indexStats);
+    return repo;
+  }
+
+  /**
+   * Find the bucket in region2 that matches the bucket id from region1.
+   */
+  private BucketRegion getMatchingBucket(PartitionedRegion region, Integer bucketId) {
+    //Force the bucket to be created if it is not already
+    region.getOrCreateNodeForBucketWrite(bucketId, null);
+
+    return region.getDataStore().getLocalBucketById(bucketId);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
index a7150c0..29fb159 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
@@ -29,6 +29,7 @@ import org.apache.logging.log4j.Logger;
 import com.gemstone.gemfire.InternalGemFireError;
 import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
 import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
@@ -36,6 +37,7 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
 import com.gemstone.gemfire.internal.cache.CacheObserverHolder;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
 import com.gemstone.gemfire.internal.cache.partitioned.Bucket;
 import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.TestHook;
 import com.gemstone.gemfire.internal.logging.LogService;
@@ -63,7 +65,7 @@ public class LuceneEventListener implements AsyncEventListener {
     DefaultQuery.setPdxReadSerialized(true);
 
     Set<IndexRepository> affectedRepos = new HashSet<IndexRepository>();
-    
+
     try {
       for (AsyncEvent event : events) {
         Region region = event.getRegion();
@@ -75,7 +77,7 @@ public class LuceneEventListener implements AsyncEventListener {
         Operation op = event.getOperation();
 
         if (op.isCreate()) {
-          repository.create(key, event.getDeserializedValue());
+          repository.update(key, event.getDeserializedValue());
         } else if (op.isUpdate()) {
           repository.update(key, event.getDeserializedValue());
         } else if (op.isDestroy()) {
@@ -92,7 +94,7 @@ public class LuceneEventListener implements AsyncEventListener {
         repo.commit();
       }
       return true;
-    } catch(BucketNotFoundException e) {
+    } catch(BucketNotFoundException | RegionDestroyedException | PrimaryBucketException e) {
       logger.debug("Bucket not found while saving to lucene index: " + e.getMessage());
       return false;
     } catch(IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
index acd3765..ff31c49 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
@@ -93,7 +93,6 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
       long start = System.nanoTime();
       while (System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(maxWaitInMillisecond)) {
         if (0 == queue.size()) {
-          logger.debug("waitUntilFlushed: Queue size is 0");
           flushed = true;
           break;
         } else {
@@ -106,6 +105,7 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
     } else { 
       throw new IllegalArgumentException("The AEQ does not exist for the index "+indexName+" region "+regionPath);
     }
+
     return flushed;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
index a119157..3cc713b 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
@@ -26,16 +26,12 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
 
 import com.gemstone.gemfire.InternalGemFireError;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
-import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
-import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl;
 import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
 import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
 import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
@@ -53,6 +49,8 @@ import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteHashMap;
  */
 public class PartitionedRepositoryManager implements RepositoryManager {
 
+  public static IndexRepositoryFactory indexRepositoryFactory = new IndexRepositoryFactory();
+
   /** map of the parent bucket region to the index repository
    * 
    * This is based on the BucketRegion in case a bucket is rebalanced, we don't want to 
@@ -142,16 +140,8 @@ public class PartitionedRepositoryManager implements RepositoryManager {
       }
 
       try {
-        BucketRegion fileBucket = getMatchingBucket(fileRegion, bucketId);
-        BucketRegion chunkBucket = getMatchingBucket(chunkRegion, bucketId);
-        if(fileBucket == null || chunkBucket == null) {
-          return null;
-        }
-        RegionDirectory dir = new RegionDirectory(fileBucket, chunkBucket, fileSystemStats);
-        IndexWriterConfig config = new IndexWriterConfig(analyzer);
-        IndexWriter writer = new IndexWriter(dir, config);
-        return new IndexRepositoryImpl(fileBucket, writer, serializer, indexStats);
-
+        return indexRepositoryFactory.createIndexRepository(bucketId, userRegion, fileRegion, chunkRegion, serializer,
+          analyzer, indexStats, fileSystemStats);
       } catch(IOException e) {
         throw new InternalGemFireError("Unable to create index repository", e);
       }
@@ -164,14 +154,4 @@ public class PartitionedRepositoryManager implements RepositoryManager {
 
     return repo;
   }
-
-  /**
-   * Find the bucket in region2 that matches the bucket id from region1.
-   */
-  private BucketRegion getMatchingBucket(PartitionedRegion region, Integer bucketId) {
-    //Force the bucket to be created if it is not already
-    region.getOrCreateNodeForBucketWrite(bucketId, null);
-    
-    return region.getDataStore().getLocalBucketById(bucketId);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
index 563e382..0b70542 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
@@ -30,6 +30,7 @@ import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.*;
+import org.apache.lucene.store.AlreadyClosedException;
 
 import java.io.IOException;
 import java.util.function.IntSupplier;
@@ -168,7 +169,12 @@ public class IndexRepositoryImpl implements IndexRepository {
         stats.removeDocumentsSupplier(this);
         return 0;
       }
-      return writer.numDocs();
+      try {
+        return writer.numDocs();
+      } catch(AlreadyClosedException e) {
+        //ignore
+        return 0;
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
index 1f3795c..e817d3b 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
@@ -22,9 +22,11 @@ import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.*;
 import static org.junit.Assert.*;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.Region;
@@ -170,9 +172,13 @@ public abstract class LuceneQueriesBase extends LuceneDUnitTest {
 
       LuceneService service = LuceneServiceProvider.get(cache);
       LuceneQuery<Integer, TestObject> query;
-      query = service.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME, queryString, defaultField);
-      PageableLuceneQueryResults<Integer, TestObject> results = query.findPages();
-      assertEquals(results.size(), expectedResultsSize);
+      query = service.createLuceneQueryFactory()
+        .setResultLimit(1000)
+        .setPageSize(1000)
+        .create(INDEX_NAME, REGION_NAME, queryString, defaultField);
+      Collection<?> results = query.findKeys();
+
+      assertEquals(expectedResultsSize, results.size());
     });
   }
 
@@ -186,7 +192,7 @@ public abstract class LuceneQueriesBase extends LuceneDUnitTest {
     });
   }
 
-  private static class TestObject implements Serializable {
+  protected static class TestObject implements Serializable {
     private static final long serialVersionUID = 1L;
     private String text;
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
index 4d5a0b7..889b16f 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
@@ -21,25 +21,38 @@ package com.gemstone.gemfire.cache.lucene;
 
 import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.*;
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
 
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.io.IOException;
+import java.util.concurrent.Callable;
 import java.util.function.Consumer;
+import java.util.stream.IntStream;
 
+import org.apache.lucene.analysis.Analyzer;
+import org.junit.After;
+import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.control.RebalanceOperation;
 import com.gemstone.gemfire.cache.control.RebalanceResults;
-import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.cache.lucene.internal.IndexRepositoryFactory;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexStats;
+import com.gemstone.gemfire.cache.lucene.internal.PartitionedRepositoryManager;
+import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
+import com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
 import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
 
 /**
  * This test class adds more basic tests of lucene functionality
@@ -49,20 +62,130 @@ import com.gemstone.gemfire.test.junit.categories.DistributedTest;
  */
 public abstract class LuceneQueriesPRBase extends LuceneQueriesBase {
 
+  @After
+  public void cleanupRebalanceCallback() {
+    removeCallback(dataStore1);
+    removeCallback(dataStore2);
+  }
+
+
+
+  @Test
+  public void returnCorrectResultsWhenRebalanceHappensOnIndexUpdate() throws InterruptedException {
+    addCallbackToTriggerRebalance(dataStore1);
+
+    putEntriesAndValidateQueryResults();
+  }
+
+  @Test
+  public void returnCorrectResultsWhenMoveBucketHappensOnIndexUpdate() throws InterruptedException {
+    final DistributedMember member2 = dataStore2.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
+    addCallbackToMoveBucket(dataStore1, member2);
+
+    putEntriesAndValidateQueryResults();
+  }
+
+  @Test
+  public void returnCorrectResultsWhenBucketIsMovedAndMovedBackOnIndexUpdate() throws InterruptedException {
+    final DistributedMember member1 = dataStore1.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
+    final DistributedMember member2 = dataStore2.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
+    addCallbackToMoveBucket(dataStore1, member2);
+    addCallbackToMoveBucket(dataStore2, member1);
+
+    putEntriesAndValidateQueryResults();
+  }
+
+  protected void putEntriesAndValidateQueryResults() {
+    SerializableRunnableIF createIndex = () -> {
+      LuceneService luceneService = LuceneServiceProvider.get(getCache());
+      luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
+    };
+    dataStore1.invoke(() -> initDataStore(createIndex));
+    accessor.invoke(() -> initAccessor(createIndex));
+    dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
+
+    put113Entries();
+
+    dataStore2.invoke(() -> initDataStore(createIndex));
+    dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache()));
+
+    assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000));
+
+    executeTextSearch(accessor, "world", "text", 113);
+  }
+
+  @Test
+  public void returnCorrectResultsWhenRebalanceHappensAfterUpdates() throws InterruptedException {
+    SerializableRunnableIF createIndex = () -> {
+      LuceneService luceneService = LuceneServiceProvider.get(getCache());
+      luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
+    };
+    dataStore1.invoke(() -> initDataStore(createIndex));
+    accessor.invoke(() -> initAccessor(createIndex));
+
+    put113Entries();
+
+    dataStore2.invoke(() -> initDataStore(createIndex));
+    assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000));
+
+    rebalanceRegion(dataStore2);
+
+    executeTextSearch(accessor, "world", "text", 113);
+  }
+
   @Test
-  public void returnCorrectResultsAfterRebalance() {
+  public void returnCorrectResultsWhenRebalanceHappensWhileSenderIsPaused() throws InterruptedException {
     SerializableRunnableIF createIndex = () -> {
       LuceneService luceneService = LuceneServiceProvider.get(getCache());
       luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
     };
     dataStore1.invoke(() -> initDataStore(createIndex));
     accessor.invoke(() -> initAccessor(createIndex));
-    putDataInRegion(accessor);
+    dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
+
+    put113Entries();
+
     dataStore2.invoke(() -> initDataStore(createIndex));
+    rebalanceRegion(dataStore2);
+    dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache()));
 
-    rebalanceRegion(dataStore1);
     assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000));
-    executeTextSearch(accessor);
+
+    executeTextSearch(accessor, "world", "text", 113);
+  }
+
+  protected void put113Entries() {
+    accessor.invoke(() -> {
+      final Cache cache = getCache();
+      Region<Object, Object> region = cache.getRegion(REGION_NAME);
+      IntStream.range(0,113).forEach(i -> region.put(i, new TestObject("hello world")));
+    });
+  }
+
+  private void addCallbackToTriggerRebalance(VM vm) {
+    vm.invoke(() -> {
+      IndexRepositorySpy spy = IndexRepositorySpy.injectSpy();
+
+      spy.beforeWrite(doOnce(key -> rebalanceRegion(vm)));
+    });
+  }
+
+  protected void addCallbackToMoveBucket(VM vm, final DistributedMember destination) {
+    vm.invoke(() -> {
+      IndexRepositorySpy spy = IndexRepositorySpy.injectSpy();
+
+      spy.beforeWrite(doOnce(key -> moveBucket(destination, key)));
+    });
+  }
+
+  private void moveBucket(final DistributedMember destination, final Object key) {
+    Region<Object, Object> region = getCache().getRegion(REGION_NAME);
+    DistributedMember source = getCache().getDistributedSystem().getDistributedMember();
+    PartitionRegionHelper.moveBucketByKey(region, source, destination, key);
+  }
+
+  private void removeCallback(VM vm) {
+    vm.invoke(IndexRepositorySpy::remove);
   }
 
   private void rebalanceRegion(VM vm) {
@@ -70,8 +193,75 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase {
     vm.invoke(() -> {
         RebalanceOperation op = getCache().getResourceManager().createRebalanceFactory().start();
         RebalanceResults results = op.getResults();
-        assertTrue("Transferred " + results.getTotalBucketTransfersCompleted(), 1 < results.getTotalBucketTransfersCompleted());
     });
   }
 
+  protected static class IndexRepositorySpy extends IndexRepositoryFactory {
+
+    private Consumer<Object> beforeWrite = key -> {};
+
+    public static IndexRepositorySpy injectSpy() {
+      IndexRepositorySpy factory = new IndexRepositorySpy();
+      PartitionedRepositoryManager.indexRepositoryFactory = factory;
+      return factory;
+    }
+
+    public static void remove() {
+      PartitionedRepositoryManager.indexRepositoryFactory = new IndexRepositoryFactory();
+    }
+
+    private IndexRepositorySpy() {
+    }
+
+    @Override
+    public IndexRepository createIndexRepository(final Integer bucketId,
+                                                 final PartitionedRegion userRegion,
+                                                 final PartitionedRegion fileRegion,
+                                                 final PartitionedRegion chunkRegion,
+                                                 final LuceneSerializer serializer,
+                                                 final Analyzer analyzer,
+                                                 final LuceneIndexStats indexStats,
+                                                 final FileSystemStats fileSystemStats)
+      throws IOException
+    {
+      final IndexRepository indexRepo = super.createIndexRepository(bucketId, userRegion, fileRegion, chunkRegion,
+        serializer, analyzer,
+        indexStats,
+        fileSystemStats);
+      final IndexRepository spy = Mockito.spy(indexRepo);
+
+      Answer invokeBeforeWrite = invocation -> {
+        beforeWrite.accept(invocation.getArgumentAt(0, Object.class));
+        invocation.callRealMethod();
+        return null;
+      };
+      doAnswer(invokeBeforeWrite).when(spy).update(any(), any());
+      doAnswer(invokeBeforeWrite).when(spy).create(any(), any());
+      doAnswer(invokeBeforeWrite).when(spy).delete(any());
+
+      return spy;
+    }
+
+    /**
+     * Add a callback that runs before a call to
+     * {@link IndexRepository#create(Object, Object)}
+     */
+    public void beforeWrite(Consumer<Object> action) {
+      this.beforeWrite = action;
+    }
+  }
+
+  protected static <T> Consumer<T> doOnce(Consumer<T> consumer) {
+    return new Consumer<T>() {
+      boolean done;
+
+      @Override
+      public void accept(final T t) {
+        if (!done) {
+          done = true;
+          consumer.accept(t);
+        }
+      }
+    };
+  };
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
index 830ca26..00b8254 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
@@ -17,6 +17,8 @@
 package com.gemstone.gemfire.cache.lucene;
 
 import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.REGION_NAME;
+
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.RegionShortcut;
 import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
 import com.gemstone.gemfire.test.junit.categories.DistributedTest;
@@ -28,7 +30,11 @@ public class LuceneQueriesPeerPRDUnitTest extends LuceneQueriesPRBase {
 
   @Override protected void initDataStore(final SerializableRunnableIF createIndex) throws Exception {
     createIndex.run();
-    getCache().createRegionFactory(RegionShortcut.PARTITION).create(REGION_NAME);
+    PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
+    partitionAttributesFactory.setLocalMaxMemory(100);
+    getCache().createRegionFactory(RegionShortcut.PARTITION)
+      .setPartitionAttributes(partitionAttributesFactory.create())
+      .create(REGION_NAME);
   }
 
   @Override protected void initAccessor(final SerializableRunnableIF createIndex) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
index 494cc9f..0a7bb67 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
@@ -16,11 +16,23 @@
  */
 package com.gemstone.gemfire.cache.lucene;
 
-import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.REGION_NAME;
+import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.*;
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
 import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
+import com.gemstone.gemfire.test.dunit.VM;
 import com.gemstone.gemfire.test.junit.categories.DistributedTest;
 
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category(DistributedTest.class)
@@ -34,4 +46,44 @@ public class LuceneQueriesPeerPRRedundancyDUnitTest extends LuceneQueriesPRBase
   @Override protected void initAccessor(final SerializableRunnableIF createIndex) throws Exception {
     initDataStore(createIndex);
   }
+
+  @Test
+  public void returnCorrectResultsWhenMovePrimaryHappensOnIndexUpdate() throws InterruptedException {
+    final DistributedMember member2 = dataStore2.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
+    addCallbackToMovePrimary(dataStore1, member2);
+
+    SerializableRunnableIF createIndex = () -> {
+      LuceneService luceneService = LuceneServiceProvider.get(getCache());
+      luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
+    };
+    dataStore1.invoke(() -> initDataStore(createIndex));
+    dataStore2.invoke(() -> initDataStore(createIndex));
+    accessor.invoke(() -> initAccessor(createIndex));
+    dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
+
+    put113Entries();
+
+    dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache()));
+
+    assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000));
+
+    executeTextSearch(accessor, "world", "text", 113);
+  }
+
+  protected void addCallbackToMovePrimary(VM vm, final DistributedMember destination) {
+    vm.invoke(() -> {
+      IndexRepositorySpy spy = IndexRepositorySpy.injectSpy();
+
+      spy.beforeWrite(doOnce(key -> moveBucket(destination, key)));
+    });
+  }
+
+  private void moveBucket(final DistributedMember destination, final Object key) {
+    PartitionedRegion region = (PartitionedRegion) getCache().getRegion(REGION_NAME);
+
+    BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage.send(
+      (InternalDistributedMember) destination, region, region.getKeyInfo(key).getBucketId(), true);
+    assertNotNull(response);
+    assertTrue(response.waitForResponse());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
index 86ed481..e3331de 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
@@ -96,12 +96,10 @@ public class LuceneEventListenerJUnitTest {
 
     listener.processEvents(events);
 
-    verify(repo1, atLeast(numEntries / 6)).create(any(), any());
     verify(repo1, atLeast(numEntries / 6)).delete(any());
-    verify(repo1, atLeast(numEntries / 6)).update(any(), any());
-    verify(repo2, atLeast(numEntries / 6)).create(any(), any());
+    verify(repo1, atLeast(numEntries / 3)).update(any(), any());
     verify(repo2, atLeast(numEntries / 6)).delete(any());
-    verify(repo2, atLeast(numEntries / 6)).update(any(), any());
+    verify(repo2, atLeast(numEntries / 3)).update(any(), any());
     verify(repo1, times(1)).commit();
     verify(repo2, times(1)).commit();
   }



[6/7] incubator-geode git commit: GEODE-1571: fix precheckin failures

Posted by ji...@apache.org.
GEODE-1571: fix precheckin failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/db456f64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/db456f64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/db456f64

Branch: refs/heads/feature/GEODE-1571
Commit: db456f64621420a69635869204739e2618ad5e6d
Parents: e4d79df
Author: Jinmei Liao <ji...@pivotal.io>
Authored: Thu Jul 7 10:55:22 2016 -0700
Committer: Jinmei Liao <ji...@pivotal.io>
Committed: Thu Jul 7 11:11:11 2016 -0700

----------------------------------------------------------------------
 .../cache/tier/sockets/BaseCommandQuery.java    | 34 +++++++++-----------
 .../cache/tier/sockets/CacheClientProxy.java    | 25 +++++++-------
 .../internal/cache/tier/sockets/HandShake.java  |  3 +-
 .../internal/security/GeodeSecurityUtil.java    | 11 +++++++
 .../cli/functions/DataCommandFunction.java      | 21 ++++++------
 ...tedClientContainsKeyAuthDistributedTest.java |  1 -
 ...urityNoShowValue1PostProcessorDUnitTest.java |  2 +-
 7 files changed, 53 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db456f64/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommandQuery.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommandQuery.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommandQuery.java
index 3f79873..0e32fb8 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommandQuery.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommandQuery.java
@@ -158,26 +158,24 @@ public abstract class BaseCommandQuery extends BaseCommand {
         SelectResults selectResults = (SelectResults)result;
 
         // post process, iterate through the result for post processing
-        List list = selectResults.asList();
-        for(Iterator<Object> valItr = list.iterator(); valItr.hasNext();){
-          Object value = valItr.next();
-          Object newValue = value;
-          if(value instanceof CqEntry){
-            CqEntry cqEntry = (CqEntry)value;
-            Object cqNewValue = GeodeSecurityUtil.postProcess(null, cqEntry.getKey(), cqEntry.getValue());
-            if(!cqEntry.getValue().equals(cqNewValue)){
-              selectResults.remove(value);
-              if(cqNewValue!=null){
+        if(GeodeSecurityUtil.needPostProcess()) {
+          List list = selectResults.asList();
+          for (Iterator<Object> valItr = list.iterator(); valItr.hasNext(); ) {
+            Object value = valItr.next();
+            if (value == null)
+              continue;
+
+            if (value instanceof CqEntry) {
+              CqEntry cqEntry = (CqEntry) value;
+              Object cqNewValue = GeodeSecurityUtil.postProcess(null, cqEntry.getKey(), cqEntry.getValue());
+              if (!cqEntry.getValue().equals(cqNewValue)) {
+                selectResults.remove(value);
                 selectResults.add(new CqEntry(cqEntry.getKey(), cqNewValue));
               }
-            }
-          }
-          else {
-            newValue = GeodeSecurityUtil.postProcess(null, null, value);
-            if(!value.equals(newValue)){
-              selectResults.remove(value);
-              // only add the newValue back if it's not null
-              if(newValue!=null){
+            } else {
+              Object newValue = GeodeSecurityUtil.postProcess(null, null, value);
+              if (!value.equals(newValue)) {
+                selectResults.remove(value);
                 selectResults.add(newValue);
               }
             }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db456f64/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index 1609c39..2553458 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@ -1676,20 +1676,21 @@ public class CacheClientProxy implements ClientSession {
     this._statistics.incMessagesReceived();
 
     // post process
-    Object oldValue = clientMessage.getValue();
-    if(oldValue instanceof byte[]){
-      Object newValue = GeodeSecurityUtil.postProcess(clientMessage.getRegionName(), clientMessage.getKeyOfInterest(),
-          EntryEventImpl.deserialize((byte[])oldValue));
-      try {
-        clientMessage.setLatestValue(BlobHelper.serializeToBlob(newValue));
-      } catch (IOException e) {
-        throw new GemFireIOException("Exception serializing entry value", e);
+    if(GeodeSecurityUtil.needPostProcess()) {
+      Object oldValue = clientMessage.getValue();
+      if (clientMessage.valueIsObject()) {
+        Object newValue = GeodeSecurityUtil.postProcess(clientMessage.getRegionName(), clientMessage.getKeyOfInterest(), EntryEventImpl
+          .deserialize((byte[]) oldValue));
+        try {
+          clientMessage.setLatestValue(BlobHelper.serializeToBlob(newValue));
+        } catch (IOException e) {
+          throw new GemFireIOException("Exception serializing entry value", e);
+        }
+      } else {
+        Object newValue = GeodeSecurityUtil.postProcess(clientMessage.getRegionName(), clientMessage.getKeyOfInterest(), oldValue);
+        clientMessage.setLatestValue(newValue);
       }
     }
-    else{
-      Object newValue = GeodeSecurityUtil.postProcess(clientMessage.getRegionName(), clientMessage.getKeyOfInterest(), oldValue);
-      clientMessage.setLatestValue(newValue);
-    }
 
     if (clientMessage.needsNoAuthorizationCheck() || postDeliverAuthCheckPassed(clientMessage)) {
       // If dispatcher is getting initialized, add the event to temporary queue.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db456f64/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java
index 5bceff9..cf69d0c 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/HandShake.java
@@ -1157,12 +1157,11 @@ public class HandShake implements ClientHandShake
     dhSKAlgo = config.getSecurityClientDHAlgo();
     dhPrivateKey = null;
     dhPublicKey = null;
-    String authenticator = config.getSecurityClientAuthenticator();
     // Initialize the keys when either the host is a client that has
     // non-blank setting for DH symmetric algo, or this is a server
     // that has authenticator defined.
     if ((dhSKAlgo != null && dhSKAlgo.length() > 0)
-        || (authenticator != null && authenticator.length() > 0)) {
+        || GeodeSecurityUtil.isSecurityRequired(config.getSecurityProps())) {
       KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DH");
       DHParameterSpec dhSpec = new DHParameterSpec(dhP, dhG, dhL);
       keyGen.initialize(dhSpec);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db456f64/geode-core/src/main/java/com/gemstone/gemfire/internal/security/GeodeSecurityUtil.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/security/GeodeSecurityUtil.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/security/GeodeSecurityUtil.java
index ff32f92..0661602 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/security/GeodeSecurityUtil.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/security/GeodeSecurityUtil.java
@@ -348,6 +348,16 @@ public class GeodeSecurityUtil {
     return;
   }
 
+  /**
+   * postProcess call already has this logic built in, you don't need to call this everytime you call postProcess.
+   * But if your postProcess is pretty involved with preparations and you need to bypass it entirely, call this first.
+   * @return
+   */
+  public static boolean needPostProcess(){
+    Subject subject = getSubject();
+    return (subject != null && postProcessor != null);
+  }
+
   public static Object postProcess(String regionPath, Object key, Object result){
     if(postProcessor == null)
       return result;
@@ -361,6 +371,7 @@ public class GeodeSecurityUtil {
     return postProcessor.processRegionValue((Principal)subject.getPrincipal(), regionName, key,  result);
   }
 
+
   public static Object getObject(String factoryName) {
     if (StringUtils.isBlank(factoryName)) {
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db456f64/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/DataCommandFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/DataCommandFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/DataCommandFunction.java
index 854643e..ed119a5 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/DataCommandFunction.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/functions/DataCommandFunction.java
@@ -935,16 +935,17 @@ public class DataCommandFunction extends FunctionAdapter implements  InternalEnt
             dataResult.setInputQuery(query);
 
             // post process, iterate through the result for post processing
-            List<SelectResultRow> rows = dataResult.getSelectResult();
-            for(Iterator<SelectResultRow> itr = rows.iterator(); itr.hasNext();){
-              SelectResultRow row = itr.next();
-              Object newValue = GeodeSecurityUtil.postProcess(null, null, row.getValue());
-              // user is not supposed to see this row
-              if(newValue==null){
-                itr.remove();
-              }
-              else{
-                row.setValue(newValue);
+            if(GeodeSecurityUtil.needPostProcess()) {
+              List<SelectResultRow> rows = dataResult.getSelectResult();
+              for (Iterator<SelectResultRow> itr = rows.iterator(); itr.hasNext(); ) {
+                SelectResultRow row = itr.next();
+                Object newValue = GeodeSecurityUtil.postProcess(null, null, row.getValue());
+                // user is not supposed to see this row
+                if (newValue == null) {
+                  itr.remove();
+                } else {
+                  row.setValue(newValue);
+                }
               }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db456f64/geode-core/src/test/java/com/gemstone/gemfire/security/IntegratedClientContainsKeyAuthDistributedTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/security/IntegratedClientContainsKeyAuthDistributedTest.java b/geode-core/src/test/java/com/gemstone/gemfire/security/IntegratedClientContainsKeyAuthDistributedTest.java
index f5bbcd4..7aa958e 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/security/IntegratedClientContainsKeyAuthDistributedTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/security/IntegratedClientContainsKeyAuthDistributedTest.java
@@ -44,7 +44,6 @@ public class IntegratedClientContainsKeyAuthDistributedTest extends AbstractInte
       final Region region = cache.getRegion(REGION_NAME);
       region.containsKeyOnServer("key3");
       assertTrue(region.containsKeyOnServer("key1"));
-
     });
 
     ai1.join();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db456f64/geode-core/src/test/java/com/gemstone/gemfire/security/IntegratedSecurityNoShowValue1PostProcessorDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/security/IntegratedSecurityNoShowValue1PostProcessorDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/security/IntegratedSecurityNoShowValue1PostProcessorDUnitTest.java
index 0599e57..d6ac3aa 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/security/IntegratedSecurityNoShowValue1PostProcessorDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/security/IntegratedSecurityNoShowValue1PostProcessorDUnitTest.java
@@ -66,7 +66,7 @@ public class IntegratedSecurityNoShowValue1PostProcessorDUnitTest extends Abstra
       String query = "select * from /AuthRegion";
       SelectResults result = region.query(query);
       System.out.println("query result: "+result);
-      assertEquals(4, result.size());
+      assertEquals(5, result.size());
       assertTrue(result.contains("value0"));
       assertFalse(result.contains("value1"));
       assertTrue(result.contains("value2"));