You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/04/18 23:43:16 UTC

[geode] branch feature/GEODE-5103 updated: GEODE-5103: When applying transactional update on a replicate, callbacks should be alwasy generated.

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5103
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-5103 by this push:
     new b4a315e  GEODE-5103: When applying transactional update on a replicate, callbacks should be alwasy generated.
b4a315e is described below

commit b4a315e92cf024c2c9beabf982d515f472786187
Author: eshu <es...@pivotal.io>
AuthorDate: Wed Apr 18 16:35:20 2018 -0700

    GEODE-5103: When applying transactional update on a replicate, callbacks should be alwasy generated.
    
       Currently callbacks is not generated if a region entry is null possibly caused by rebalance etc.
       We still want the node to generate callbacks for its clients.
---
 .../geode/internal/cache/AbstractRegionMap.java    | 184 +++++++++++----------
 .../internal/cache/AbstractRegionMapTest.java      |  71 ++++++++
 2 files changed, 170 insertions(+), 85 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 71ccf51..e31e643 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -2660,7 +2660,6 @@ public abstract class AbstractRegionMap
         logger.debug("txApplyPut cbEvent={}", cbEvent);
       }
 
-
       if (owner.isUsedForPartitionedRegionBucket()) {
         newValue = EntryEventImpl.getCachedDeserializable(nv, cbEvent);
         txHandleWANEvent(owner, cbEvent, txEntryState);
@@ -2684,90 +2683,9 @@ public abstract class AbstractRegionMap
           // are initialized.
           // Otherwise use the standard create/update logic
           if (!owner.isAllEvents() || (!putOp.isCreate() && isRegionReady)) {
-            // At this point we should only apply the update if the entry exists
-            RegionEntry re = getEntry(key); // Fix for bug 32347.
-            if (re != null) {
-              synchronized (re) {
-                if (!re.isRemoved()) {
-                  opCompleted = true;
-                  putOp = putOp.getCorrespondingUpdateOp();
-                  // Net writers are not called for received transaction data
-                  final int oldSize = owner.calculateRegionEntryValueSize(re);
-                  if (cbEvent != null) {
-                    cbEvent.setRegionEntry(re);
-                    cbEvent.setOldValue(re.getValueInVM(owner)); // OFFHEAP eei
-                  }
-
-                  boolean clearOccured = false;
-                  // Set RegionEntry updateInProgress
-                  if (owner.indexMaintenanceSynchronous) {
-                    re.setUpdateInProgress(true);
-                  }
-                  try {
-                    txRemoveOldIndexEntry(putOp, re);
-                    if (didDestroy) {
-                      re.txDidDestroy(owner.cacheTimeMillis());
-                    }
-                    if (txEvent != null) {
-                      txEvent.addPut(putOp, owner, re, re.getKey(), newValue, aCallbackArgument);
-                    }
-                    re.setValueResultOfSearch(putOp.isNetSearch());
-                    try {
-                      processAndGenerateTXVersionTag(owner, cbEvent, re, txEntryState);
-                      {
-                        re.setValue(owner,
-                            re.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate()));
-                      }
-                      if (putOp.isCreate()) {
-                        owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(re));
-                      } else if (putOp.isUpdate()) {
-                        // Rahul : fix for 41694. Negative bucket size can also be
-                        // an issue with normal GFE Delta and will have to be fixed
-                        // in a similar manner and may be this fix the the one for
-                        // other delta can be combined.
-                        {
-                          owner.updateSizeOnPut(key, oldSize,
-                              owner.calculateRegionEntryValueSize(re));
-                        }
-                      }
-                    } catch (RegionClearedException rce) {
-                      clearOccured = true;
-                    }
-                    {
-                      long lastMod = owner.cacheTimeMillis();
-                      EntryLogger.logTXPut(_getOwnerObject(), key, nv);
-                      re.updateStatsForPut(lastMod, lastMod);
-                      owner.txApplyPutPart2(re, re.getKey(), lastMod, false, didDestroy,
-                          clearOccured);
-                    }
-                  } finally {
-                    if (re != null && owner.indexMaintenanceSynchronous) {
-                      re.setUpdateInProgress(false);
-                    }
-                  }
-                  if (invokeCallbacks) {
-                    cbEvent.makeUpdate();
-                    switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
-                    if (pendingCallbacks == null) {
-                      owner.invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, cbEvent,
-                          hasRemoteOrigin);
-                    } else {
-                      pendingCallbacks.add(cbEvent);
-                      cbEventInPending = true;
-                    }
-                  }
-                  if (!clearOccured) {
-                    lruEntryUpdate(re);
-                  }
-                }
-              }
-              if (didDestroy && !opCompleted) {
-                owner.txApplyInvalidatePart2(re, re.getKey(), true, false /* clear */);
-              }
-            }
-            if (owner.getConcurrencyChecksEnabled() && txEntryState != null && cbEvent != null) {
-              txEntryState.setVersionTag(cbEvent.getVersionTag());
-            }
+            cbEventInPending = applyTxUpdateOnReplicateOrRedundantCopy(key, nv, didDestroy, txEvent,
+                aCallbackArgument, pendingCallbacks, txEntryState, owner, putOp, newValue,
+                hasRemoteOrigin, cbEvent, invokeCallbacks, cbEventInPending, opCompleted);
             return;
           }
         }
@@ -2952,6 +2870,102 @@ public abstract class AbstractRegionMap
     }
   }
 
+  private boolean applyTxUpdateOnReplicateOrRedundantCopy(Object key, Object nv, boolean didDestroy,
+      TXRmtEvent txEvent, Object aCallbackArgument, List<EntryEventImpl> pendingCallbacks,
+      TXEntryState txEntryState, LocalRegion owner, Operation putOp, Object newValue,
+      boolean hasRemoteOrigin, EntryEventImpl cbEvent, boolean invokeCallbacks,
+      boolean cbEventInPending, boolean opCompleted) {
+    // At this point we should only apply the update if the entry exists
+    RegionEntry re = getEntry(key); // Fix for bug 32347.
+    if (re != null) {
+      synchronized (re) {
+        if (!re.isRemoved()) {
+          opCompleted = true;
+          putOp = putOp.getCorrespondingUpdateOp();
+          // Net writers are not called for received transaction data
+          final int oldSize = owner.calculateRegionEntryValueSize(re);
+          if (cbEvent != null) {
+            cbEvent.setRegionEntry(re);
+            cbEvent.setOldValue(re.getValueInVM(owner)); // OFFHEAP eei
+          }
+
+          boolean clearOccured = false;
+          // Set RegionEntry updateInProgress
+          if (owner.indexMaintenanceSynchronous) {
+            re.setUpdateInProgress(true);
+          }
+          try {
+            txRemoveOldIndexEntry(putOp, re);
+            if (didDestroy) {
+              re.txDidDestroy(owner.cacheTimeMillis());
+            }
+            if (txEvent != null) {
+              txEvent.addPut(putOp, owner, re, re.getKey(), newValue, aCallbackArgument);
+            }
+            re.setValueResultOfSearch(putOp.isNetSearch());
+            try {
+              processAndGenerateTXVersionTag(owner, cbEvent, re, txEntryState);
+              {
+                re.setValue(owner,
+                    re.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate()));
+              }
+              if (putOp.isCreate()) {
+                owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(re));
+              } else if (putOp.isUpdate()) {
+                // Rahul : fix for 41694. Negative bucket size can also be
+                // an issue with normal GFE Delta and will have to be fixed
+                // in a similar manner and may be this fix the the one for
+                // other delta can be combined.
+                {
+                  owner.updateSizeOnPut(key, oldSize, owner.calculateRegionEntryValueSize(re));
+                }
+              }
+            } catch (RegionClearedException rce) {
+              clearOccured = true;
+            }
+            {
+              long lastMod = owner.cacheTimeMillis();
+              EntryLogger.logTXPut(_getOwnerObject(), key, nv);
+              re.updateStatsForPut(lastMod, lastMod);
+              owner.txApplyPutPart2(re, re.getKey(), lastMod, false, didDestroy, clearOccured);
+            }
+          } finally {
+            if (re != null && owner.indexMaintenanceSynchronous) {
+              re.setUpdateInProgress(false);
+            }
+          }
+          if (!clearOccured) {
+            lruEntryUpdate(re);
+          }
+        }
+      }
+      if (didDestroy && !opCompleted) {
+        owner.txApplyInvalidatePart2(re, re.getKey(), true, false /* clear */);
+      }
+    }
+    if (invokeCallbacks) {
+      cbEventInPending = prepareUpdateCallbacks(pendingCallbacks, owner, hasRemoteOrigin, cbEvent,
+          cbEventInPending);
+    }
+    if (owner.getConcurrencyChecksEnabled() && txEntryState != null && cbEvent != null) {
+      txEntryState.setVersionTag(cbEvent.getVersionTag());
+    }
+    return cbEventInPending;
+  }
+
+  private boolean prepareUpdateCallbacks(List<EntryEventImpl> pendingCallbacks, LocalRegion owner,
+      boolean hasRemoteOrigin, EntryEventImpl cbEvent, boolean cbEventInPending) {
+    cbEvent.makeUpdate();
+    switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
+    if (pendingCallbacks == null) {
+      owner.invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, cbEvent, hasRemoteOrigin);
+    } else {
+      pendingCallbacks.add(cbEvent);
+      cbEventInPending = true;
+    }
+    return cbEventInPending;
+  }
+
   private void txHandleWANEvent(final LocalRegion owner, EntryEventImpl cbEvent,
       TXEntryState txEntryState) {
     ((BucketRegion) owner).handleWANEvent(cbEvent);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
index d3053b0..81704a3 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionMapTest.java
@@ -30,6 +30,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.junit.After;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -812,8 +815,76 @@ public class AbstractRegionMapTest {
       when(owner.getCache()).thenReturn(mock(InternalCache.class));
       when(owner.isAllEvents()).thenReturn(true);
       when(owner.isInitialized()).thenReturn(true);
+      when(owner.shouldNotifyBridgeClients()).thenReturn(true);
       initialize(owner, new Attributes(), null, false);
     }
   }
 
+  @Test
+  public void txApplyPutOnSecondaryConstructsPendingCallbacksWhenRegionEntryExists()
+      throws Exception {
+    AbstractRegionMap arm = new TxRegionEntryTestableAbstractRegionMap();
+    List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
+
+    Object newValue = "value";
+    arm.txApplyPut(Operation.UPDATE, KEY, newValue, false,
+        new TXId(mock(InternalDistributedMember.class), 1), mock(TXRmtEvent.class),
+        mock(EventID.class), null, pendingCallbacks, null, null, null, null, 1);
+
+    assertEquals(1, pendingCallbacks.size());
+  }
+
+  @Test
+  public void txApplyPutOnSecondaryConstructsPendingCallbacksWhenRegionEntryIsRemoved()
+      throws Exception {
+    AbstractRegionMap arm = new TxRemovedRegionEntryTestableAbstractRegionMap();
+    List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
+
+    Object newValue = "value";
+    arm.txApplyPut(Operation.UPDATE, KEY, newValue, false,
+        new TXId(mock(InternalDistributedMember.class), 1), mock(TXRmtEvent.class),
+        mock(EventID.class), null, pendingCallbacks, null, null, null, null, 1);
+
+    assertEquals(1, pendingCallbacks.size());
+  }
+
+  @Test
+  public void txApplyPutOnSecondaryConstructsPendingCallbacksWhenRegionEntryIsNull()
+      throws Exception {
+    AbstractRegionMap arm = new TxNoRegionEntryTestableAbstractRegionMap();
+    List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
+
+    Object newValue = "value";
+    arm.txApplyPut(Operation.UPDATE, KEY, newValue, false,
+        new TXId(mock(InternalDistributedMember.class), 1), mock(TXRmtEvent.class),
+        mock(EventID.class), null, pendingCallbacks, null, null, null, null, 1);
+
+    assertEquals(1, pendingCallbacks.size());
+  }
+
+  private static class TxNoRegionEntryTestableAbstractRegionMap
+      extends TxTestableAbstractRegionMap {
+    @Override
+    public RegionEntry getEntry(Object key) {
+      return null;
+    }
+  }
+
+  private static class TxRegionEntryTestableAbstractRegionMap extends TxTestableAbstractRegionMap {
+    @Override
+    public RegionEntry getEntry(Object key) {
+      return mock(RegionEntry.class);
+    }
+  }
+
+  private static class TxRemovedRegionEntryTestableAbstractRegionMap
+      extends TxTestableAbstractRegionMap {
+    @Override
+    public RegionEntry getEntry(Object key) {
+      RegionEntry regionEntry = mock(RegionEntry.class);
+      when(regionEntry.isRemoved()).thenReturn(true);
+      return regionEntry;
+    }
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
eshu11@apache.org.