You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/04/18 23:43:16 UTC
[geode] branch feature/GEODE-5103 updated: GEODE-5103: When
applying transactional update on a replicate,
callbacks should be alwasy generated.
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-5103
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-5103 by this push:
new b4a315e GEODE-5103: When applying transactional update on a replicate, callbacks should be alwasy generated.
b4a315e is described below
commit b4a315e92cf024c2c9beabf982d515f472786187
Author: eshu <es...@pivotal.io>
AuthorDate: Wed Apr 18 16:35:20 2018 -0700
GEODE-5103: When applying transactional update on a replicate, callbacks should be alwasy generated.
Currently callbacks is not generated if a region entry is null possibly caused by rebalance etc.
We still want the node to generate callbacks for its clients.
---
.../geode/internal/cache/AbstractRegionMap.java | 184 +++++++++++----------
.../internal/cache/AbstractRegionMapTest.java | 71 ++++++++
2 files changed, 170 insertions(+), 85 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 71ccf51..e31e643 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -2660,7 +2660,6 @@ public abstract class AbstractRegionMap
logger.debug("txApplyPut cbEvent={}", cbEvent);
}
-
if (owner.isUsedForPartitionedRegionBucket()) {
newValue = EntryEventImpl.getCachedDeserializable(nv, cbEvent);
txHandleWANEvent(owner, cbEvent, txEntryState);
@@ -2684,90 +2683,9 @@ public abstract class AbstractRegionMap
// are initialized.
// Otherwise use the standard create/update logic
if (!owner.isAllEvents() || (!putOp.isCreate() && isRegionReady)) {
- // At this point we should only apply the update if the entry exists
- RegionEntry re = getEntry(key); // Fix for bug 32347.
- if (re != null) {
- synchronized (re) {
- if (!re.isRemoved()) {
- opCompleted = true;
- putOp = putOp.getCorrespondingUpdateOp();
- // Net writers are not called for received transaction data
- final int oldSize = owner.calculateRegionEntryValueSize(re);
- if (cbEvent != null) {
- cbEvent.setRegionEntry(re);
- cbEvent.setOldValue(re.getValueInVM(owner)); // OFFHEAP eei
- }
-
- boolean clearOccured = false;
- // Set RegionEntry updateInProgress
- if (owner.indexMaintenanceSynchronous) {
- re.setUpdateInProgress(true);
- }
- try {
- txRemoveOldIndexEntry(putOp, re);
- if (didDestroy) {
- re.txDidDestroy(owner.cacheTimeMillis());
- }
- if (txEvent != null) {
- txEvent.addPut(putOp, owner, re, re.getKey(), newValue, aCallbackArgument);
- }
- re.setValueResultOfSearch(putOp.isNetSearch());
- try {
- processAndGenerateTXVersionTag(owner, cbEvent, re, txEntryState);
- {
- re.setValue(owner,
- re.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate()));
- }
- if (putOp.isCreate()) {
- owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(re));
- } else if (putOp.isUpdate()) {
- // Rahul : fix for 41694. Negative bucket size can also be
- // an issue with normal GFE Delta and will have to be fixed
- // in a similar manner and may be this fix the the one for
- // other delta can be combined.
- {
- owner.updateSizeOnPut(key, oldSize,
- owner.calculateRegionEntryValueSize(re));
- }
- }
- } catch (RegionClearedException rce) {
- clearOccured = true;
- }
- {
- long lastMod = owner.cacheTimeMillis();
- EntryLogger.logTXPut(_getOwnerObject(), key, nv);
- re.updateStatsForPut(lastMod, lastMod);
- owner.txApplyPutPart2(re, re.getKey(), lastMod, false, didDestroy,
- clearOccured);
- }
- } finally {
- if (re != null && owner.indexMaintenanceSynchronous) {
- re.setUpdateInProgress(false);
- }
- }
- if (invokeCallbacks) {
- cbEvent.makeUpdate();
- switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
- if (pendingCallbacks == null) {
- owner.invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, cbEvent,
- hasRemoteOrigin);
- } else {
- pendingCallbacks.add(cbEvent);
- cbEventInPending = true;
- }
- }
- if (!clearOccured) {
- lruEntryUpdate(re);
- }
- }
- }
- if (didDestroy && !opCompleted) {
- owner.txApplyInvalidatePart2(re, re.getKey(), true, false /* clear */);
- }
- }
- if (owner.getConcurrencyChecksEnabled() && txEntryState != null && cbEvent != null) {
- txEntryState.setVersionTag(cbEvent.getVersionTag());
- }
+ cbEventInPending = applyTxUpdateOnReplicateOrRedundantCopy(key, nv, didDestroy, txEvent,
+ aCallbackArgument, pendingCallbacks, txEntryState, owner, putOp, newValue,
+ hasRemoteOrigin, cbEvent, invokeCallbacks, cbEventInPending, opCompleted);
return;
}
}
@@ -2952,6 +2870,102 @@ public abstract class AbstractRegionMap
}
}
+ private boolean applyTxUpdateOnReplicateOrRedundantCopy(Object key, Object nv, boolean didDestroy,
+ TXRmtEvent txEvent, Object aCallbackArgument, List<EntryEventImpl> pendingCallbacks,
+ TXEntryState txEntryState, LocalRegion owner, Operation putOp, Object newValue,
+ boolean hasRemoteOrigin, EntryEventImpl cbEvent, boolean invokeCallbacks,
+ boolean cbEventInPending, boolean opCompleted) {
+ // At this point we should only apply the update if the entry exists
+ RegionEntry re = getEntry(key); // Fix for bug 32347.
+ if (re != null) {
+ synchronized (re) {
+ if (!re.isRemoved()) {
+ opCompleted = true;
+ putOp = putOp.getCorrespondingUpdateOp();
+ // Net writers are not called for received transaction data
+ final int oldSize = owner.calculateRegionEntryValueSize(re);
+ if (cbEvent != null) {
+ cbEvent.setRegionEntry(re);
+ cbEvent.setOldValue(re.getValueInVM(owner)); // OFFHEAP eei
+ }
+
+ boolean clearOccured = false;
+ // Set RegionEntry updateInProgress
+ if (owner.indexMaintenanceSynchronous) {
+ re.setUpdateInProgress(true);
+ }
+ try {
+ txRemoveOldIndexEntry(putOp, re);
+ if (didDestroy) {
+ re.txDidDestroy(owner.cacheTimeMillis());
+ }
+ if (txEvent != null) {
+ txEvent.addPut(putOp, owner, re, re.getKey(), newValue, aCallbackArgument);
+ }
+ re.setValueResultOfSearch(putOp.isNetSearch());
+ try {
+ processAndGenerateTXVersionTag(owner, cbEvent, re, txEntryState);
+ {
+ re.setValue(owner,
+ re.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate()));
+ }
+ if (putOp.isCreate()) {
+ owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(re));
+ } else if (putOp.isUpdate()) {
+ // Rahul : fix for 41694. Negative bucket size can also be
+ // an issue with normal GFE Delta and will have to be fixed
+ // in a similar manner and may be this fix the the one for
+ // other delta can be combined.
+ {
+ owner.updateSizeOnPut(key, oldSize, owner.calculateRegionEntryValueSize(re));
+ }
+ }
+ } catch (RegionClearedException rce) {
+ clearOccured = true;
+ }
+ {
+ long lastMod = owner.cacheTimeMillis();
+ EntryLogger.logTXPut(_getOwnerObject(), key, nv);
+ re.updateStatsForPut(lastMod, lastMod);
+ owner.txApplyPutPart2(re, re.getKey(), lastMod, false, didDestroy, clearOccured);
+ }
+ } finally {
+ if (re != null && owner.indexMaintenanceSynchronous) {
+ re.setUpdateInProgress(false);
+ }
+ }
+ if (!clearOccured) {
+ lruEntryUpdate(re);
+ }
+ }
+ }
+ if (didDestroy && !opCompleted) {
+ owner.txApplyInvalidatePart2(re, re.getKey(), true, false /* clear */);
+ }
+ }
+ if (invokeCallbacks) {
+ cbEventInPending = prepareUpdateCallbacks(pendingCallbacks, owner, hasRemoteOrigin, cbEvent,
+ cbEventInPending);
+ }
+ if (owner.getConcurrencyChecksEnabled() && txEntryState != null && cbEvent != null) {
+ txEntryState.setVersionTag(cbEvent.getVersionTag());
+ }
+ return cbEventInPending;
+ }
+
+ private boolean prepareUpdateCallbacks(List<EntryEventImpl> pendingCallbacks, LocalRegion owner,
+ boolean hasRemoteOrigin, EntryEventImpl cbEvent, boolean cbEventInPending) {
+ cbEvent.makeUpdate();
+ switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
+ if (pendingCallbacks == null) {
+ owner.invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, cbEvent, hasRemoteOrigin);
+ } else {
+ pendingCallbacks.add(cbEvent);
+ cbEventInPending = true;
+ }
+ return cbEventInPending;
+ }
+
private void txHandleWANEvent(final LocalRegion owner, EntryEventImpl cbEvent,
TXEntryState txEntryState) {
((BucketRegion) owner).handleWANEvent(cbEvent);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
index d3053b0..81704a3 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
@@ -30,6 +30,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.List;
+
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -812,8 +815,76 @@ public class AbstractRegionMapTest {
when(owner.getCache()).thenReturn(mock(InternalCache.class));
when(owner.isAllEvents()).thenReturn(true);
when(owner.isInitialized()).thenReturn(true);
+ when(owner.shouldNotifyBridgeClients()).thenReturn(true);
initialize(owner, new Attributes(), null, false);
}
}
+ @Test
+ public void txApplyPutOnSecondaryConstructsPendingCallbacksWhenRegionEntryExists()
+ throws Exception {
+ AbstractRegionMap arm = new TxRegionEntryTestableAbstractRegionMap();
+ List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
+
+ Object newValue = "value";
+ arm.txApplyPut(Operation.UPDATE, KEY, newValue, false,
+ new TXId(mock(InternalDistributedMember.class), 1), mock(TXRmtEvent.class),
+ mock(EventID.class), null, pendingCallbacks, null, null, null, null, 1);
+
+ assertEquals(1, pendingCallbacks.size());
+ }
+
+ @Test
+ public void txApplyPutOnSecondaryConstructsPendingCallbacksWhenRegionEntryIsRemoved()
+ throws Exception {
+ AbstractRegionMap arm = new TxRemovedRegionEntryTestableAbstractRegionMap();
+ List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
+
+ Object newValue = "value";
+ arm.txApplyPut(Operation.UPDATE, KEY, newValue, false,
+ new TXId(mock(InternalDistributedMember.class), 1), mock(TXRmtEvent.class),
+ mock(EventID.class), null, pendingCallbacks, null, null, null, null, 1);
+
+ assertEquals(1, pendingCallbacks.size());
+ }
+
+ @Test
+ public void txApplyPutOnSecondaryConstructsPendingCallbacksWhenRegionEntryIsNull()
+ throws Exception {
+ AbstractRegionMap arm = new TxNoRegionEntryTestableAbstractRegionMap();
+ List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
+
+ Object newValue = "value";
+ arm.txApplyPut(Operation.UPDATE, KEY, newValue, false,
+ new TXId(mock(InternalDistributedMember.class), 1), mock(TXRmtEvent.class),
+ mock(EventID.class), null, pendingCallbacks, null, null, null, null, 1);
+
+ assertEquals(1, pendingCallbacks.size());
+ }
+
+ private static class TxNoRegionEntryTestableAbstractRegionMap
+ extends TxTestableAbstractRegionMap {
+ @Override
+ public RegionEntry getEntry(Object key) {
+ return null;
+ }
+ }
+
+ private static class TxRegionEntryTestableAbstractRegionMap extends TxTestableAbstractRegionMap {
+ @Override
+ public RegionEntry getEntry(Object key) {
+ return mock(RegionEntry.class);
+ }
+ }
+
+ private static class TxRemovedRegionEntryTestableAbstractRegionMap
+ extends TxTestableAbstractRegionMap {
+ @Override
+ public RegionEntry getEntry(Object key) {
+ RegionEntry regionEntry = mock(RegionEntry.class);
+ when(regionEntry.isRemoved()).thenReturn(true);
+ return regionEntry;
+ }
+ }
+
}
--
To stop receiving notification emails like this one, please contact
eshu11@apache.org.