You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2017/08/18 23:47:15 UTC

geode git commit: GEODE-3470: Increased serial gateway sender token timeout

Repository: geode
Updated Branches:
  refs/heads/feature/GEODE-3470 [created] af919311a


GEODE-3470: Increased serial gateway sender token timeout


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/af919311
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/af919311
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/af919311

Branch: refs/heads/feature/GEODE-3470
Commit: af919311a37c374ef131d6388cf3d50ea410424a
Parents: 36daa9a
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Fri Aug 18 16:41:26 2017 -0700
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Fri Aug 18 16:41:26 2017 -0700

----------------------------------------------------------------------
 .../cache/wan/AbstractGatewaySender.java        |   2 +-
 .../SerialGatewaySenderEventProcessor.java      |   4 +-
 .../ParallelQueueRemovalMessageJUnitTest.java   |  11 +-
 ...ialGatewaySenderEventProcessorJUnitTest.java | 114 +++++++++++++++++++
 .../TestSerialGatewaySenderEventProcessor.java  |  32 ++++++
 5 files changed, 151 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/af919311/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index c38d547..2154ffe 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -188,7 +188,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
       Integer.getInteger("GatewaySender.QUEUE_SIZE_THRESHOLD", 5000).intValue();
 
   public static int TOKEN_TIMEOUT =
-      Integer.getInteger("GatewaySender.TOKEN_TIMEOUT", 15000).intValue();
+      Integer.getInteger("GatewaySender.TOKEN_TIMEOUT", 120000).intValue();
 
   /**
    * The name of the DistributedLockService used when accessing the GatewaySender's meta data

http://git-wip-us.apache.org/repos/asf/geode/blob/af919311/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 0aa0ed9..150b5ac 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -84,7 +84,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
    * to keep track. Note: unprocessedEventsLock MUST be synchronized before using this map. This is
    * not a cut and paste error. sync unprocessedEventsLock when using unprocessedTokens.
    */
-  private Map<EventID, Long> unprocessedTokens;
+  protected Map<EventID, Long> unprocessedTokens;
 
   private ExecutorService executor;
 
@@ -98,7 +98,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
    * When the Number of unchecked events exceeds this threshold and the number of tokens in the map
    * exceeds this threshold then a check will be done for old tokens.
    */
-  static private final int REAP_THRESHOLD = 1000;
+  static protected final int REAP_THRESHOLD = 1000;
 
   /*
    * How many events have happened without a reap check being done?

http://git-wip-us.apache.org/repos/asf/geode/blob/af919311/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index e45a06b..1a49cfd 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.invocation.InvocationOnMock;
@@ -49,7 +48,6 @@ import org.apache.geode.internal.cache.BucketAdvisor;
 import org.apache.geode.internal.cache.BucketRegionQueue;
 import org.apache.geode.internal.cache.BucketRegionQueueHelper;
 import org.apache.geode.internal.cache.EntryEventImpl;
-import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.EvictionAttributesImpl;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalRegionArguments;
@@ -156,10 +154,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
 
     when(pa.getColocatedWith()).thenReturn(null);
 
-    // classes cannot be mocked
-    ProxyBucketRegion pbr = new ProxyBucketRegion(BUCKET_ID, this.queueRegion, pbrIra);
-
-    when(ba.getProxyBucketRegion()).thenReturn(pbr);
+    when(ba.getProxyBucketRegion()).thenReturn(mock(ProxyBucketRegion.class));
 
     // Create RegionAttributes
     AttributesFactory factory = new AttributesFactory();
@@ -175,7 +170,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     this.bucketRegionQueue = spy(realBucketRegionQueue);
     // (this.queueRegion.getBucketName(BUCKET_ID), attributes, this.rootRegion, this.cache, ira);
     EntryEventImpl entryEvent = EntryEventImpl.create(this.bucketRegionQueue, Operation.DESTROY,
-        mock(EventID.class), "value", null, false, mock(DistributedMember.class));
+        KEY, "value", null, false, mock(DistributedMember.class));
     doReturn(entryEvent).when(this.bucketRegionQueue).newDestroyEntryEvent(any(), any());
     // when(this.bucketRegionQueue.newDestroyEntryEvent(any(), any())).thenReturn();
 
@@ -203,7 +198,6 @@ public class ParallelQueueRemovalMessageJUnitTest {
     assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
   }
 
-  @Ignore
   @Test
   public void validateDestroyKeyFromBucketQueueInUninitializedBucketRegionQueue() throws Exception {
     // Validate initial BucketRegionQueue state
@@ -245,7 +239,6 @@ public class ParallelQueueRemovalMessageJUnitTest {
     assertEquals(0, tempQueue.size());
   }
 
-  @Ignore
   @Test
   public void validateDestroyFromBucketQueueAndTempQueueInUninitializedBucketRegionQueue() {
     // Validate initial BucketRegionQueue state

http://git-wip-us.apache.org/repos/asf/geode/blob/af919311/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
new file mode 100644
index 0000000..f21634e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.serial;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class SerialGatewaySenderEventProcessorJUnitTest {
+
+  private AbstractGatewaySender sender;
+
+  private TestSerialGatewaySenderEventProcessor processor;
+
+  @Before
+  public void setUp() throws Exception {
+    this.sender = mock(AbstractGatewaySender.class);
+    this.processor = new TestSerialGatewaySenderEventProcessor(this.sender, "ny");
+  }
+
+  @Test
+  public void validateUnprocessedTokensMapUpdated() throws Exception {
+    GatewaySenderStats gss = mock(GatewaySenderStats.class);
+    when(sender.getStatistics()).thenReturn(gss);
+
+    // Handle primary event
+    EventID id = handlePrimaryEvent();
+
+    // Verify the token was added by checking the correct stat methods were called and the size of
+    // the unprocessedTokensMap.
+    verify(gss).incUnprocessedTokensAddedByPrimary();
+    verify(gss, never()).incUnprocessedEventsRemovedByPrimary();
+    assertEquals(1, this.processor.getUnprocessedTokensSize());
+
+    // Handle the event from the secondary. The call to enqueueEvent is necessary to synchronize the
+    // unprocessedEventsLock and prevent the assertion error in basicHandleSecondaryEvent.
+    EntryEventImpl event = mock(EntryEventImpl.class);
+    when(event.getRegion()).thenReturn(mock(LocalRegion.class));
+    when(event.getEventId()).thenReturn(id);
+    when(event.getOperation()).thenReturn(Operation.CREATE);
+    this.processor.enqueueEvent(null, event, null);
+
+    // Verify the token was removed by checking the correct stat methods were called and the size of
+    // the unprocessedTokensMap.
+    verify(gss).incUnprocessedTokensRemovedBySecondary();
+    verify(gss, never()).incUnprocessedEventsAddedBySecondary();
+    assertEquals(0, this.processor.getUnprocessedTokensSize());
+  }
+
+  @Test
+  public void validateUnprocessedTokensMapReaping() throws Exception {
+    // Set the token timeout low
+    int originalTokenTimeout = AbstractGatewaySender.TOKEN_TIMEOUT;
+    AbstractGatewaySender.TOKEN_TIMEOUT = 500;
+    try {
+      GatewaySenderStats gss = mock(GatewaySenderStats.class);
+      when(sender.getStatistics()).thenReturn(gss);
+
+      // Add REAP_THRESHOLD + 1 events to the unprocessed tokens map. This causes the uncheckedCount
+      // in the reaper to be REAP_THRESHOLD. The next event will cause the reaper to run.\
+      int numEvents = SerialGatewaySenderEventProcessor.REAP_THRESHOLD + 1;
+      for (int i = 0; i < numEvents; i++) {
+        handlePrimaryEvent();
+      }
+      assertEquals(numEvents, this.processor.getUnprocessedTokensSize());
+
+      // Wait for the timeout
+      Thread.sleep(AbstractGatewaySender.TOKEN_TIMEOUT + 1000);
+
+      // Add one more event to the unprocessed tokens map. This will reap all of the previous
+      // tokens.
+      handlePrimaryEvent();
+      assertEquals(1, this.processor.getUnprocessedTokensSize());
+    } finally {
+      AbstractGatewaySender.TOKEN_TIMEOUT = originalTokenTimeout;
+    }
+  }
+
+  private EventID handlePrimaryEvent() {
+    GatewaySenderEventImpl gsei = mock(GatewaySenderEventImpl.class);
+    EventID id = mock(EventID.class);
+    when(gsei.getEventId()).thenReturn(id);
+    this.processor.basicHandlePrimaryEvent(gsei);
+    return id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/af919311/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/TestSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/TestSerialGatewaySenderEventProcessor.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/TestSerialGatewaySenderEventProcessor.java
new file mode 100644
index 0000000..cf453e6
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/TestSerialGatewaySenderEventProcessor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.serial;
+
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+
+public class TestSerialGatewaySenderEventProcessor extends SerialGatewaySenderEventProcessor {
+
+  public TestSerialGatewaySenderEventProcessor(AbstractGatewaySender sender, String id) {
+    super(sender, id);
+  }
+
+  protected void initializeMessageQueue(String id) {
+    // Overridden to not create the RegionQueue in the constructor.
+  }
+
+  protected int getUnprocessedTokensSize() {
+    return this.unprocessedTokens.size();
+  }
+}