You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2017/05/08 17:22:54 UTC
geode git commit: GEODE-2776: Setting version tag on the client event
from the current region entry after load. And refactoring the
findObjectInSystem().
Repository: geode
Updated Branches:
refs/heads/develop 288676dfe -> 72d0d4baa
GEODE-2776: Setting version tag on the client event from the current region entry after load. And refactoring the findObjectInSystem().
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/72d0d4ba
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/72d0d4ba
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/72d0d4ba
Branch: refs/heads/develop
Commit: 72d0d4baaccfb90e011286cb57d97174065256ae
Parents: 288676d
Author: Anil <ag...@pivotal.io>
Authored: Wed Apr 19 17:35:11 2017 -0700
Committer: Anil <ag...@pivotal.io>
Committed: Mon May 8 10:22:24 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/DistributedRegion.java | 246 +++++++++++--------
.../geode/internal/cache/LocalRegion.java | 3 +
.../DistributedRegionSearchLoadJUnitTest.java | 187 ++++++++++++++
3 files changed, 335 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/72d0d4ba/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 0c967c9..c3a4961 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2265,123 +2265,167 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD,
ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
boolean returnTombstones) throws CacheLoaderException, TimeoutException {
+ @Released
+ EntryEventImpl event = null;
checkForLimitedOrNoAccess();
+ final Operation op = isCreate ? Operation.CREATE : Operation.UPDATE;
+ long lastModified = 0L;
+ try {
+ event = findOnServer(keyInfo, op, generateCallbacks, clientEvent);
+ if (event == null) {
+ event = createEventForLoad(keyInfo, generateCallbacks, requestingClient, op);
+ lastModified = findUsingSearchLoad(txState, localValue, clientEvent, keyInfo, event);
+ }
+ // Update region with new value.
+ if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) {
+ putNewValueInRegion(isCreate, clientEvent, lastModified, event);
+ } else if (isCreate) {
+ recordMiss(null, event.getKey());
+ }
+ return determineResult(preferCD, event);
+ } finally {
+ if (event != null) {
+ event.release();
+ }
+ }
+ }
+
+ private EntryEventImpl createEventForLoad(KeyInfo keyInfo, boolean generateCallbacks,
+ ClientProxyMembershipID requestingClient, Operation op) {
+ // Do not generate Event ID
+ EntryEventImpl event = EntryEventImpl.create(this, op, keyInfo.getKey(), null /* newValue */,
+ keyInfo.getCallbackArg(), false, getMyId(), generateCallbacks);
+ if (requestingClient != null) {
+ event.setContext(requestingClient);
+ }
+ return event;
+ }
+
+ private Object determineResult(boolean preferCD, EntryEventImpl event) {
+ if (preferCD) {
+ return event.getRawNewValueAsHeapObject();
+ }
+ return event.getNewValue();
+ }
+
+ private void putNewValueInRegion(boolean isCreate, EntryEventImpl clientEvent, long lastModified,
+ EntryEventImpl event) {
RegionEntry re = null;
- final Object key = keyInfo.getKey();
- final Object aCallbackArgument = keyInfo.getCallbackArg();
- Operation op;
+ // Set eventId. Required for interested clients.
+ event.setNewEventId(cache.getDistributedSystem());
+
+ long startPut = CachePerfStats.getStatTime();
+ validateKey(event.getKey());
+ // this next step also distributes the object to other processes, if necessary
+ try {
+ // set the tail key so that the event is passed to GatewaySender queues.
+ // if the tailKey is not set, the event gets filtered out in ParallelGatewaySenderQueue
+ if (this instanceof BucketRegion) {
+ if (((BucketRegion) this).getPartitionedRegion().isParallelWanEnabled())
+ ((BucketRegion) this).handleWANEvent(event);
+ }
+ re = basicPutEntry(event, lastModified);
+
+ // Update client event with latest version tag from re.
+ if (re != null && clientEvent != null) {
+ clientEvent.setVersionTag(event.getVersionTag());
+ }
+ if (!isTX()) {
+ getCachePerfStats().endPut(startPut, event.isOriginRemote());
+ }
+ } catch (ConcurrentCacheModificationException e) {
+ // the cache was modified while we were searching for this entry and
+ // the netsearch result was elided. Return the current value from the cache
+ updateEventWithCurrentRegionEntry(event, clientEvent);
+ } catch (CacheWriterException cwe) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("findObjectInSystem: writer exception putting entry {} : {}", event, cwe);
+ }
+ }
if (isCreate) {
- op = Operation.CREATE;
- } else {
- op = Operation.UPDATE;
+ recordMiss(re, event.getKey());
}
- long lastModified = 0L;
- boolean fromServer = false;
- @Released
- EntryEventImpl event = null;
- @Retained
- Object result = null;
+ }
+
+ private void updateEventWithCurrentRegionEntry(EntryEventImpl event, EntryEventImpl clientEvent) {
+ // defer the lruUpdateCallback to prevent a deadlock (see bug 51121).
+ final boolean disabled = this.entries.disableLruUpdateCallback();
try {
- {
- if (this.srp != null) {
- VersionTagHolder holder = new VersionTagHolder();
- Object value = this.srp.get(key, aCallbackArgument, holder);
- fromServer = value != null;
- if (fromServer) {
- event = EntryEventImpl.create(this, op, key, value, aCallbackArgument, false, getMyId(),
- generateCallbacks);
- event.setVersionTag(holder.getVersionTag());
- event.setFromServer(fromServer); // fix for bug 39358
- if (clientEvent != null && clientEvent.getVersionTag() == null) {
- clientEvent.setVersionTag(holder.getVersionTag());
- }
+ RegionEntry re = getRegionEntry(event.getKey());
+ if (re != null) {
+ synchronized (re) { // bug #51059 value & version must be obtained atomically
+ // Update client event with latest version tag from re
+ if (clientEvent != null) {
+ clientEvent.setVersionTag(re.getVersionStamp().asVersionTag());
}
+ // OFFHEAP: need to incrc, copy to heap to setNewValue, decrc
+ event.setNewValue(re.getValue(this));
}
}
-
- if (!fromServer) {
- // Do not generate Event ID
- event = EntryEventImpl.create(this, op, key, null /* newValue */, aCallbackArgument, false,
- getMyId(), generateCallbacks);
- if (requestingClient != null) {
- event.setContext(requestingClient);
- }
- // If this event is because of a register interest call, don't invoke the CacheLoader
- boolean getForRegisterInterest = clientEvent != null && clientEvent.getOperation() != null
- && clientEvent.getOperation().isGetForRegisterInterest();
- if (!getForRegisterInterest) {
- SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
- try {
- processor.initialize(this, key, aCallbackArgument);
- // processor fills in event
- processor.doSearchAndLoad(event, txState, localValue);
- if (clientEvent != null && clientEvent.getVersionTag() == null) {
- clientEvent.setVersionTag(event.getVersionTag());
- }
- lastModified = processor.getLastModified();
- } finally {
- processor.release();
- }
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("DistributedRegion.findObjectInSystem skipping loader for region="
- + getFullPath() + "; key=" + key);
- }
- }
+ } finally {
+ if (disabled) {
+ this.entries.enableLruUpdateCallback();
}
- if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) {
- try {
- // Set eventId. Required for interested clients.
- event.setNewEventId(cache.getDistributedSystem());
-
- long startPut = CachePerfStats.getStatTime();
- validateKey(key);
- // if (event.getOperation().isLoad()) {
- // this.performedLoad(event, lastModified, txState);
- // }
- // this next step also distributes the object to other processes, if necessary
- try {
- // set the tail key so that the event is passed to GatewaySender queues.
- // if the tailKey is not set, the event gets filtered out in ParallelGatewaySenderQueue
- if (this instanceof BucketRegion) {
- if (((BucketRegion) this).getPartitionedRegion().isParallelWanEnabled())
- ((BucketRegion) this).handleWANEvent(event);
- }
- re = basicPutEntry(event, lastModified);
- } catch (ConcurrentCacheModificationException e) {
- // the cache was modified while we were searching for this entry and
- // the netsearch result was elided. Return the current value from the cache
- re = getRegionEntry(key);
- if (re != null) {
- event.setNewValue(re.getValue(this)); // OFFHEAP: need to incrc, copy to heap to
- // setNewValue, decrc
- }
- }
- if (!isTX()) {
- getCachePerfStats().endPut(startPut, event.isOriginRemote());
- }
- } catch (CacheWriterException cwe) {
- if (logger.isDebugEnabled()) {
- logger.debug("findObjectInSystem: writer exception putting entry {} : {}", event, cwe);
- }
- }
+ try {
+ this.entries.lruUpdateCallback();
+ } catch (DiskAccessException dae) {
+ this.handleDiskAccessException(dae);
+ throw dae;
}
- if (isCreate) {
- recordMiss(re, key);
+ }
+ }
+
+ /**
+ * If its client, get the value from server.
+ */
+ private EntryEventImpl findOnServer(KeyInfo keyInfo, Operation op, boolean generateCallbacks,
+ EntryEventImpl clientEvent) {
+ if (this.srp == null) {
+ return null;
+ }
+ EntryEventImpl event = null;
+ VersionTagHolder holder = new VersionTagHolder();
+ Object aCallbackArgument = keyInfo.getCallbackArg();
+ Object value = this.srp.get(keyInfo.getKey(), aCallbackArgument, holder);
+ if (value != null) {
+ event = EntryEventImpl.create(this, op, keyInfo.getKey(), value, aCallbackArgument, false,
+ getMyId(), generateCallbacks);
+ event.setVersionTag(holder.getVersionTag());
+ event.setFromServer(true); // fix for bug 39358
+ if (clientEvent != null && clientEvent.getVersionTag() == null) {
+ clientEvent.setVersionTag(holder.getVersionTag());
}
+ }
+ return event;
+ }
- if (preferCD) {
- result = event.getRawNewValueAsHeapObject();
- } else {
- result = event.getNewValue();
+ private long findUsingSearchLoad(TXStateInterface txState, Object localValue,
+ EntryEventImpl clientEvent, final KeyInfo keyInfo, EntryEventImpl event) {
+ long lastModified = 0L;
+ // If this event is because of a register interest call, don't invoke the CacheLoader
+ boolean getForRegisterInterest = clientEvent != null && clientEvent.getOperation() != null
+ && clientEvent.getOperation().isGetForRegisterInterest();
+ if (!getForRegisterInterest) {
+ SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+ try {
+ processor.initialize(this, keyInfo.getKey(), keyInfo.getCallbackArg());
+ // processor fills in event
+ processor.doSearchAndLoad(event, txState, localValue);
+ if (clientEvent != null && clientEvent.getVersionTag() == null) {
+ clientEvent.setVersionTag(event.getVersionTag());
+ }
+ lastModified = processor.getLastModified();
+ } finally {
+ processor.release();
}
- return result;
- } finally {
- if (event != null) {
- event.release();
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("DistributedRegion.findObjectInSystem skipping loader for region="
+ + getFullPath() + "; key=" + keyInfo.getKey());
}
}
+ return lastModified;
}
/**
http://git-wip-us.apache.org/repos/asf/geode/blob/72d0d4ba/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 2dec53b..cdba7e4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -1393,6 +1393,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* @param key the key used to fetch the region entry
*/
final public void recordMiss(final RegionEntry re, Object key) {
+ if (!this.statisticsEnabled) {
+ return;
+ }
final RegionEntry e;
if (re == null && !isTX()) {
e = basicGetEntry(key);
http://git-wip-us.apache.org/repos/asf/geode/blob/72d0d4ba/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionSearchLoadJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionSearchLoadJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionSearchLoadJUnitTest.java
new file mode 100755
index 0000000..30fb728
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionSearchLoadJUnitTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
+import org.apache.geode.internal.cache.versions.VersionStamp;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("*.UnitTest")
+@PrepareForTest({SearchLoadAndWriteProcessor.class})
+public class DistributedRegionSearchLoadJUnitTest {
+
+ protected DistributedRegion createAndDefineRegion(boolean isConcurrencyChecksEnabled,
+ RegionAttributes ra, InternalRegionArguments ira, GemFireCacheImpl cache) {
+ DistributedRegion region = new DistributedRegion("testRegion", ra, null, cache, ira);
+ if (isConcurrencyChecksEnabled) {
+ region.enableConcurrencyChecks();
+ }
+
+ // since it is a real region object, we need to tell mockito to monitor it
+ region = spy(region);
+
+ doNothing().when(region).distributeUpdate(any(), anyLong(), anyBoolean(), anyBoolean(), any(),
+ anyBoolean());
+ doNothing().when(region).distributeDestroy(any(), any());
+ doNothing().when(region).distributeInvalidate(any());
+ doNothing().when(region).distributeUpdateEntryVersion(any());
+
+ return region;
+ }
+
+ private RegionAttributes createRegionAttributes(boolean isConcurrencyChecksEnabled) {
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ factory.setConcurrencyChecksEnabled(isConcurrencyChecksEnabled); //
+ RegionAttributes ra = factory.create();
+ return ra;
+ }
+
+ private EventID createDummyEventID() {
+ byte[] memId = {1, 2, 3};
+ EventID eventId = new EventID(memId, 11, 12, 13);
+ return eventId;
+ }
+
+ protected EntryEventImpl createDummyEvent(DistributedRegion region) {
+ // create a dummy event id
+ EventID eventId = createDummyEventID();
+ String key = "key1";
+ String value = "Value1";
+
+ // create an event
+ EntryEventImpl event = EntryEventImpl.create(region, Operation.CREATE, key, value, null,
+ false /* origin remote */, null, false /* generateCallbacks */, eventId);
+ // avoid calling invokeCallbacks
+ event.callbacksInvoked(true);
+
+ return event;
+ }
+
+ protected VersionTag createVersionTag(boolean validVersionTag) {
+ InternalDistributedMember remotemember = mock(InternalDistributedMember.class);
+ VersionTag tag = VersionTag.create(remotemember);
+ if (validVersionTag) {
+ tag.setRegionVersion(1);
+ tag.setEntryVersion(1);
+ }
+ return tag;
+ }
+
+ protected DistributedRegion prepare(boolean isConcurrencyChecksEnabled) {
+ GemFireCacheImpl cache = Fakes.cache();
+
+ // create region attributes and internal region arguments
+ RegionAttributes ra = createRegionAttributes(isConcurrencyChecksEnabled);
+ InternalRegionArguments ira = new InternalRegionArguments();
+
+ // create a region object
+ DistributedRegion region = createAndDefineRegion(isConcurrencyChecksEnabled, ra, ira, cache);
+ if (isConcurrencyChecksEnabled) {
+ region.enableConcurrencyChecks();
+ }
+
+ doNothing().when(region).notifyGatewaySender(any(), any());
+ doReturn(true).when(region).hasSeenEvent(any(EntryEventImpl.class));
+ return region;
+ }
+
+ private void createSearchLoad() {
+ SearchLoadAndWriteProcessor proc = mock(SearchLoadAndWriteProcessor.class);
+ PowerMockito.mockStatic(SearchLoadAndWriteProcessor.class);
+ PowerMockito.when(SearchLoadAndWriteProcessor.getProcessor()).thenReturn(proc);
+
+ VersionTag tag = createVersionTag(true);
+
+ doAnswer(new Answer<EntryEventImpl>() {
+ public EntryEventImpl answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ if (args[0] instanceof EntryEventImpl) {
+ EntryEventImpl event = (EntryEventImpl) args[0];
+ event.setNewValue("NewLoadedValue");
+ event.setOperation(Operation.LOCAL_LOAD_CREATE);
+ }
+ return null;
+ }
+ }).when(proc).doSearchAndLoad(any(EntryEventImpl.class), anyObject(), anyObject());
+ }
+
+ @Test
+ public void testClientEventIsUpdatedWithCurrentEntryVersionTagAfterLoad() {
+ DistributedRegion region = prepare(true);
+ EntryEventImpl event = createDummyEvent(region);
+ region.basicInvalidate(event);
+
+ createSearchLoad();
+
+ KeyInfo ki = new KeyInfo(event.getKey(), null, null);
+ region.findObjectInSystem(ki, false, null, false, null, false, false, null, event, false);
+ assertNotNull("ClientEvent version tag is not set with region version tag.",
+ event.getVersionTag());
+ }
+
+ @Test
+ public void testClientEventIsUpdatedWithCurrentEntryVersionTagAfterSearchConcurrencyException() {
+ DistributedRegion region = prepare(true);
+
+ EntryEventImpl event = createDummyEvent(region);
+ region.basicInvalidate(event);
+
+ VersionTag tag = createVersionTag(true);
+ RegionEntry re = mock(RegionEntry.class);
+ VersionStamp stamp = mock(VersionStamp.class);
+
+ doReturn(re).when(region).getRegionEntry(any());
+ when(re.getVersionStamp()).thenReturn(stamp);
+ when(stamp.asVersionTag()).thenReturn(tag);
+
+ createSearchLoad();
+ doThrow(new ConcurrentCacheModificationException()).when(region)
+ .basicPutEntry(any(EntryEventImpl.class), anyLong());
+
+ KeyInfo ki = new KeyInfo(event.getKey(), null, null);
+ region.findObjectInSystem(ki, false, null, false, null, false, false, null, event, false);
+ assertNotNull("ClientEvent version tag is not set with region version tag.",
+ event.getVersionTag());
+ }
+
+}
+