You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2016/10/07 22:03:33 UTC
incubator-geode git commit: GEODE-1926: Modification of peedAhead()
function
Repository: incubator-geode
Updated Branches:
refs/heads/develop fbbdb824b -> 654b06ed2
GEODE-1926: Modification of peedAhead() function
* Modification of peedAhead() function check if heapCopy is successful before adding the key to peekedIds.
* Since peek() doesnt have access to the current key but just the object and hence cannot remove it from peekedIDs list, we moved the check for heapcopy into peekAhead.
* So now only if a successful heap copy is made then only the key will be placed into the peekedIDs list.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/654b06ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/654b06ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/654b06ed
Branch: refs/heads/develop
Commit: 654b06ed2fc1d740fe6c2cb2b40fd92b17d5a8dc
Parents: fbbdb82
Author: nabarun <nn...@pivotal.io>
Authored: Fri Oct 7 14:57:25 2016 -0700
Committer: nabarun <nn...@pivotal.io>
Committed: Fri Oct 7 14:57:25 2016 -0700
----------------------------------------------------------------------
.../wan/serial/SerialGatewaySenderQueue.java | 56 ++++++----
.../serial/SerialWANConflationDUnitTest.java | 111 +++++++++++++++++++
2 files changed, 148 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/654b06ed/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index a8bb72d..28f5f83 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -513,13 +513,6 @@ public class SerialGatewaySenderQueue implements RegionQueue {
//resetLastPeeked();
while (batch.size() < size) {
AsyncEvent object = peekAhead();
- if (object != null && object instanceof GatewaySenderEventImpl) {
- GatewaySenderEventImpl copy = ((GatewaySenderEventImpl)object).makeHeapCopyIfOffHeap();
- if (copy == null) {
- continue;
- }
- object = copy;
- }
// Conflate here
if (object != null) {
batch.add(object);
@@ -759,17 +752,42 @@ public class SerialGatewaySenderQueue implements RegionQueue {
*
* @throws CacheException
*/
- private AsyncEvent peekAhead() throws CacheException {
- AsyncEvent object = null;
- long currentKey = -1;
+
+ private Long getCurrentKey(){
+ long currentKey;
if (this.peekedIds.isEmpty()) {
- currentKey = getHeadKey();
+ currentKey = getHeadKey();
} else {
- Long lastPeek = this.peekedIds.peekLast();
- if (lastPeek == null) {
- return null;
- }
- currentKey = lastPeek.longValue() + 1;
+ Long lastPeek = this.peekedIds.peekLast();
+ if (lastPeek == null) {
+ return null;
+ }
+ currentKey = lastPeek.longValue() + 1;
+ }
+ return currentKey;
+ }
+
+ private AsyncEvent getObjectInSerialSenderQueue(Long currentKey) {
+ AsyncEvent object = optimalGet(currentKey);
+ if ((null != object) && logger.isDebugEnabled()) {
+ logger.debug("{}: Peeked {}->{}", this, currentKey, object);
+ }
+ if (object != null && object instanceof GatewaySenderEventImpl) {
+ GatewaySenderEventImpl copy = ((GatewaySenderEventImpl)object).makeHeapCopyIfOffHeap();
+ if (copy == null) {
+ logger.debug("Unable to make heap copy and will not be added to peekedIds for object" +
+ " : {} ",object.toString());
+ }
+ object = copy;
+ }
+ return object;
+ }
+
+ private AsyncEvent peekAhead() throws CacheException {
+ AsyncEvent object = null;
+ Long currentKey = getCurrentKey();
+ if(currentKey == null ){
+ return null;
}
@@ -788,12 +806,12 @@ public class SerialGatewaySenderQueue implements RegionQueue {
// does not save anything since GatewayBatchOp needs to GatewayEventImpl
// in object form.
while (before(currentKey, getTailKey())
- // use optimalGet here to fix bug 40654
- && (object = optimalGet(Long.valueOf(currentKey))) == null) {
+ && (null == (object = getObjectInSerialSenderQueue(currentKey)))) {
if (logger.isTraceEnabled()) {
logger.trace("{}: Trying head key + offset: {}", this, currentKey);
}
currentKey = inc(currentKey);
+ object = getObjectInSerialSenderQueue(currentKey);
if (this.stats != null) {
this.stats.incEventsNotQueuedConflated();
}
@@ -803,7 +821,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
logger.debug("{}: Peeked {}->{}", this, currentKey, object);
}
if (object != null) {
- this.peekedIds.add(Long.valueOf(currentKey));
+ this.peekedIds.add(currentKey);
}
return object;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/654b06ed/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
new file mode 100644
index 0000000..82d1067
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.wan.serial;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.jayway.awaitility.Awaitility;
+import org.junit.Test;
+
+import org.apache.geode.internal.cache.wan.WANTestBase;
+
+public class SerialWANConflationDUnitTest extends WANTestBase{
+
+ @Test
+ public void testSerialPropagationPartitionRegionBatchConflation() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() ->createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() ->createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ vm2.invoke(() ->createPartitionedRegion(
+ getTestMethodName(), null, 1, 8, isOffHeap() ));
+ vm3.invoke(() ->createPartitionedRegion(
+ getTestMethodName(), null, 1, 8, isOffHeap() ));
+
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() ->createPartitionedRegion(
+ getTestMethodName(), "ln", 0, 8, isOffHeap() ));
+ vm5.invoke(() ->createPartitionedRegion(
+ getTestMethodName(), "ln", 0, 8, isOffHeap() ));
+ vm6.invoke(() ->createPartitionedRegion(
+ getTestMethodName(), "ln", 0, 8, isOffHeap() ));
+ vm7.invoke(() ->createPartitionedRegion(
+ getTestMethodName(), "ln", 0, 8, isOffHeap() ));
+
+ vm4.invoke(() ->createSender( "ln", 2,
+ false, 100, 50, false, false, null, true ));
+ vm5.invoke(() ->createSender( "ln", 2,
+ false, 100, 50, false, false, null, true ));
+ vm6.invoke(() ->createSender( "ln", 2,
+ false, 100, 50, false, false, null, true ));
+ vm7.invoke(() ->createSender( "ln", 2,
+ false, 100, 50, false, false, null, true ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() ->pauseSender( "ln" ));
+ vm5.invoke(() ->pauseSender( "ln" ));
+ vm6.invoke(() ->pauseSender( "ln" ));
+ vm7.invoke(() ->pauseSender( "ln" ));
+
+
+ final Map keyValues = new HashMap();
+
+ for (int i = 1; i <= 10; i++) {
+ for (int j = 1; j <= 10; j++) {
+ keyValues.put(j, i) ;
+ }
+ vm4.invoke(() ->putGivenKeyValue(
+ getTestMethodName(), keyValues ));
+ }
+
+ vm4.invoke(() ->enableConflation( "ln" ));
+ vm5.invoke(() ->enableConflation( "ln" ));
+ vm6.invoke(() ->enableConflation( "ln" ));
+ vm7.invoke(() ->enableConflation( "ln" ));
+
+ vm4.invoke(() ->resumeSender( "ln" ));
+ vm5.invoke(() ->resumeSender( "ln" ));
+ vm6.invoke(() ->resumeSender( "ln" ));
+ vm7.invoke(() ->resumeSender( "ln" ));
+
+ ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() ->
+ WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() ->
+ WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() ->
+ WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() ->
+ WANTestBase.getSenderStats( "ln", 0 ));
+
+ assertTrue("No events conflated in batch", (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
+
+ vm2.invoke(() ->validateRegionSize(
+ getTestMethodName(), 10 ));
+
+ }
+
+}