You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by la...@apache.org on 2018/05/14 23:35:57 UTC

[geode] branch develop updated: GEODE-5187: clients can miss events when servers recycled

This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 62c013e  GEODE-5187: clients can miss events when servers recycled
62c013e is described below

commit 62c013eb05eded6752e7614ab85cef1b9e5350a9
Author: Lynn Hughes-Godfrey <lh...@pivotal.io>
AuthorDate: Wed May 9 17:27:32 2018 -0700

    GEODE-5187: clients can miss events when servers recycled
    
    * When draining events from the giiQueue, the msg field of HAEventWrapper may be null.
      Update the HAEventWrapper to point to the message in the HAContainer vs. calling
      putEventInHARegion with the original HAContainer message (a ClientUpdateMessageImpl).
      This is necessary as the ClientUpdateMessageImpl does not have the eventId (this is not
      serialized/deserialized in toData/fromData).  The HAEventWrapper is required on the
      remote side to reconstruct the event.
    
    * Updated log messages to include the HARegionQueue.regionName
    
    * Added corresponding IntegrationTest
---
 .../geode/internal/cache/ha/HARegionQueue.java     | 54 +++++++-----
 .../cache/ha/HARegionQueueIntegrationTest.java     | 97 +++++++++++++++++++++-
 2 files changed, 128 insertions(+), 23 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index a49adfb..6928b8d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -484,7 +484,8 @@ public class HARegionQueue implements RegionQueue {
           entry = (Region.Entry) iterator.next();
           key = entry.getKey();
           if (isDebugEnabled) {
-            logger.debug("processing queue key {} and value {}", key, entry.getValue());
+            logger.debug("{} processing queue key {} and value {}", this.regionName, key,
+                entry.getValue());
           }
           if (key instanceof Long) {
             if (!(entry.getValue() instanceof ClientMarkerMessageImpl)) {
@@ -510,8 +511,8 @@ public class HARegionQueue implements RegionQueue {
               this.put(val);
             } else if (isDebugEnabled) {
               logger.debug(
-                  "bug 44959 encountered: HARegion.putGIIDataInRegion found null eventId in {}",
-                  val);
+                  "{} bug 44959 encountered: HARegion.putGIIDataInRegion found null eventId in {}",
+                  this.regionName, val);
             }
           }
         }
@@ -537,8 +538,8 @@ public class HARegionQueue implements RegionQueue {
     if (val instanceof HAEventWrapper && ((HAEventWrapper) val).getClientUpdateMessage() == null) {
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "HARegionQueue.putGIIDataInRegion(): key={} was removed at sender side, so not putting it into the ha queue.",
-            ((Conflatable) val).getKeyToConflate());
+            "{} HARegionQueue.putGIIDataInRegion(): key={} was removed at sender side, so not putting it into the ha queue.",
+            this.regionName, ((Conflatable) val).getKeyToConflate());
       }
     } else {
       this.put(val);
@@ -605,7 +606,7 @@ public class HARegionQueue implements RegionQueue {
     try {
       if (this.giiCount > 0) {
         if (logger.isDebugEnabled()) {
-          logger.debug("{}: adding message to GII queue of size {}: {}", this.region.getName(),
+          logger.debug("{}: adding message to GII queue of size {}: {}", this.regionName,
               giiQueue.size(), object);
         }
         if (object instanceof HAEventWrapper) {
@@ -615,7 +616,7 @@ public class HARegionQueue implements RegionQueue {
         this.giiQueue.add(object);
       } else {
         if (logger.isTraceEnabled()) {
-          logger.trace("{}: adding message to HA queue: {}", this.region.getName(), object);
+          logger.trace("{}: adding message to HA queue: {}", this.regionName, object);
         }
         basicPut(object);
       }
@@ -717,7 +718,7 @@ public class HARegionQueue implements RegionQueue {
     this.giiLock.writeLock().lock();
     this.giiCount++; // TODO: non-atomic operation on volatile!
     if (logger.isDebugEnabled()) {
-      logger.debug("{}: startGiiQueueing count is now {}", this.region.getName(), this.giiCount);
+      logger.debug("{}: startGiiQueueing count is now {}", this.regionName, this.giiCount);
     }
     this.giiLock.writeLock().unlock();
   }
@@ -733,17 +734,18 @@ public class HARegionQueue implements RegionQueue {
     try {
       this.giiCount--; // TODO: non-atomic operation on volatile!
       if (isDebugEnabled) {
-        logger.debug("{}: endGiiQueueing count is now {}", this.region.getName(), this.giiCount);
+        logger.debug("{}: endGiiQueueing count is now {}", this.regionName, this.giiCount);
       }
       if (this.giiCount < 0) {
         if (isDebugEnabled) {
-          logger.debug("{} found giiCount to be {}", this.region.getName(), this.giiCount);
+          logger.debug("{} found giiCount to be {}", this.regionName, this.giiCount);
         }
         this.giiCount = 0;
       }
       if (this.giiCount == 0) {
         if (isDebugEnabled) {
-          logger.debug("all GII requests completed - draining {} messages", this.giiQueue.size());
+          logger.debug("{} all GII requests completed - draining {} messages", this.regionName,
+              this.giiQueue.size());
         }
         boolean interrupted = false;
         int expectedCount = this.giiQueue.size();
@@ -758,17 +760,20 @@ public class HARegionQueue implements RegionQueue {
           actualCount++;
           try {
             if (isDebugEnabled) {
-              logger.debug("draining #{}: {}", (actualCount + 1), value);
+              logger.debug("{} draining #{}: {}", this.regionName, (actualCount + 1), value);
             }
             if (value instanceof HAEventWrapper) {
               if (((HAEventWrapper) value).getClientUpdateMessage() == null) {
                 // if there is no wrapped message look for it in the HA container map
-                value = haContainer.get(value);
-                if (value == null) {
+                ClientUpdateMessageImpl haContainerMessage =
+                    (ClientUpdateMessageImpl) haContainer.get(value);
+                if (haContainerMessage != null) {
+                  ((HAEventWrapper) value).setClientUpdateMessage(haContainerMessage);
+                } else {
                   if (isDebugEnabled) {
                     logger.debug(
-                        "ATTENTION: found gii queued event with null event message.  Please see bug #44852: {}",
-                        value);
+                        "{} ATTENTION: found gii queued event with null event message.  Please see bug #44852: {}",
+                        this.regionName, value);
                   }
                   continue;
                 }
@@ -800,7 +805,7 @@ public class HARegionQueue implements RegionQueue {
       throw t;
     } finally {
       if (logger.isTraceEnabled()) {
-        logger.trace("endGiiQueueing completed");
+        logger.trace("{} endGiiQueueing completed", this.regionName);
       }
       this.giiLock.writeLock().unlock();
     }
@@ -2097,7 +2102,6 @@ public class HARegionQueue implements RegionQueue {
         return null;
       }
       HAEventWrapper entryHaEventWrapper = null;
-      // synchronized (haContainer) {
       do {
         ClientUpdateMessageImpl entryMessage = (ClientUpdateMessageImpl) haContainer
             .putIfAbsent(inputHaEventWrapper, inputHaEventWrapper.getClientUpdateMessage());
@@ -3474,12 +3478,18 @@ public class HARegionQueue implements RegionQueue {
       }
       // Put the reference to the HAEventWrapper instance into the
       // HA queue.
+      if (logger.isDebugEnabled()) {
+        logger.debug("adding inputHaEventWrapper to HARegion at " + position + ":"
+            + inputHaEventWrapper + " for " + this.regionName);
+      }
       this.region.put(position, inputHaEventWrapper);
-      // logger.info(LocalizedStrings.DEBUG, "added message at position " + position);
     } else { // (event instanceof ClientMarkerMessageImpl OR ConflatableObject OR
              // ClientInstantiatorMessage)
+      if (logger.isDebugEnabled()) {
+        logger.debug("adding ClientUpdateMessage to HARegion at " + position + ":" + event + " for "
+            + this.regionName);
+      }
       this.region.put(position, event);
-      // logger.info(LocalizedStrings.DEBUG, "added non-msg at position " + position);
     }
   }
 
@@ -3882,4 +3892,8 @@ public class HARegionQueue implements RegionQueue {
       return expiryTime.orElse(DEFAULT_THREAD_ID_EXPIRY_TIME);
     }
   }
+
+  public Queue getGiiQueue() {
+    return this.giiQueue;
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
index 6ce921e..f908d49 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
@@ -16,10 +16,18 @@ package org.apache.geode.internal.cache.ha;
 
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mock;
 
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.junit.After;
@@ -27,6 +35,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.powermock.api.mockito.PowerMockito;
@@ -48,6 +57,7 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.CachedDeserializable;
+import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -120,6 +130,51 @@ public class HARegionQueueIntegrationTest {
   }
 
   @Test
+  public void verifyEndGiiQueueingPutsHAEventWrapperNotClientUpdateMessage() throws Exception {
+    // Create a HAContainerRegion
+    HAContainerWrapper haContainerWrapper = createHAContainerRegion();
+
+    // create message and HAEventWrapper
+    ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+        (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+        new ClientProxyMembershipID(), new EventID(cache.getDistributedSystem()));
+    HAEventWrapper wrapper = new HAEventWrapper(message);
+    wrapper.setHAContainer(haContainerWrapper);
+
+    // Create and update HARegionQueues forcing one queue to startGiiQueueing
+    int numQueues = 10;
+    HARegionQueue targetQueue = createAndUpdateHARegionQueuesWithGiiQueueing(haContainerWrapper,
+        wrapper, message, numQueues);
+
+    // Verify HAContainerWrapper (1) and refCount (numQueues(10))
+    assertEquals(1, haContainerWrapper.size());
+
+    HAEventWrapper wrapperInContainer = (HAEventWrapper) haContainerWrapper.getKey(wrapper);
+    assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+
+    // Verify that the HAEventWrapper in the giiQueue now has msg = null
+    // this gets set to null when wrapper is added to HAContainer (for non-gii queues)
+    Queue giiQueue = targetQueue.getGiiQueue();
+    assertEquals(1, giiQueue.size());
+
+    HAEventWrapper giiQueueEntry = (HAEventWrapper) giiQueue.peek();
+    assertNotNull(giiQueueEntry);
+    assertNull(giiQueueEntry.getClientUpdateMessage());
+
+    // endGiiQueueing and verify queue empty and putEventInHARegion invoked with HAEventWrapper
+    // not ClientUpdateMessageImpl
+    HARegionQueue spyTargetQueue = spy(targetQueue);
+    spyTargetQueue.endGiiQueueing();
+    assertEquals(0, giiQueue.size());
+
+    ArgumentCaptor<Conflatable> eventCaptor = ArgumentCaptor.forClass(Conflatable.class);
+    verify(spyTargetQueue).putEventInHARegion(eventCaptor.capture(), anyLong());
+    Conflatable capturedEvent = eventCaptor.getValue();
+    assertTrue(capturedEvent instanceof HAEventWrapper);
+    assertNotNull(((HAEventWrapper) capturedEvent).getClientUpdateMessage());
+  }
+
+  @Test
   public void verifySequentialUpdateHAEventWrapperWithMap() throws Exception {
     // Create a HAContainerMap to be used by the CacheClientNotifier
     HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
@@ -210,9 +265,23 @@ public class HARegionQueueIntegrationTest {
   }
 
   private HARegionQueue createHARegionQueue(Map haContainer, int index) throws Exception {
-    return new HARegionQueue("haRegion+" + index, mock(HARegion.class), (InternalCache) cache,
-        haContainer, null, (byte) 1, true, mock(HARegionQueueStats.class),
-        mock(StoppableReentrantReadWriteLock.class), mock(StoppableReentrantReadWriteLock.class),
+    StoppableReentrantReadWriteLock giiLock = Mockito.mock(StoppableReentrantReadWriteLock.class);
+    when(giiLock.writeLock())
+        .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableWriteLock.class));
+    when(giiLock.readLock())
+        .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
+
+    StoppableReentrantReadWriteLock rwLock = Mockito.mock(StoppableReentrantReadWriteLock.class);
+    when(rwLock.writeLock())
+        .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableWriteLock.class));
+    when(rwLock.readLock())
+        .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
+
+    HARegion haRegion = Mockito.mock(HARegion.class);
+    when(haRegion.getGemFireCache()).thenReturn((InternalCache) cache);
+
+    return new HARegionQueue("haRegion+" + index, haRegion, (InternalCache) cache, haContainer,
+        null, (byte) 1, true, mock(HARegionQueueStats.class), giiLock, rwLock,
         mock(CancelCriterion.class), false);
   }
 
@@ -244,6 +313,28 @@ public class HARegionQueueIntegrationTest {
     }
   }
 
+  private HARegionQueue createAndUpdateHARegionQueuesWithGiiQueueing(
+      HAContainerWrapper haContainerWrapper, HAEventWrapper wrapper, ClientUpdateMessage message,
+      int numQueues) throws Exception {
+
+    HARegionQueue targetQueue = null;
+    int startGiiQueueingIndex = numQueues / 2;
+
+    // create HARegionQueues and startGiiQueuing on a region about half way through
+    for (int i = 0; i < numQueues; i++) {
+      HARegionQueue haRegionQueue = createHARegionQueue(haContainerWrapper, i);
+
+      // start GII Queueing (targetRegionQueue)
+      if (i == startGiiQueueingIndex) {
+        targetQueue = haRegionQueue;
+        targetQueue.startGiiQueueing();
+      }
+
+      haRegionQueue.put(wrapper);
+    }
+    return targetQueue;
+  }
+
   private void createAndUpdateHARegionQueuesSimultaneously(HAContainerWrapper haContainerWrapper,
       CachedDeserializable cd, int numQueues) throws Exception {
     // Create some HARegionQueues

-- 
To stop receiving notification emails like this one, please contact
ladyvader@apache.org.