You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/07/27 17:34:08 UTC

incubator-geode git commit: GEODE-1700: fix stat corrupting when async queue full

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 19361b6b2 -> 9389c6fec


GEODE-1700: fix stat corrupting when async queue full

A common method is now used for writing async ops to disk.
The same method is now used for both flushing an item that
has been taken out of the queue and for when the queue is
full causing a sync write to be done.

Also did a minor refactoring to get rid of the "removed" flag.
This revision fixes a bug introduced in GEODE-93 and should be
included with that fix.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9389c6fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9389c6fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9389c6fe

Branch: refs/heads/develop
Commit: 9389c6fece1adb3e83bc7a8863a2d84f52bfc0fa
Parents: 19361b6
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Jul 26 15:03:06 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Jul 27 10:30:44 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/DiskEntry.java       | 139 +++++--------------
 1 file changed, 33 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9389c6fe/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
index 698e3bd..98ee729 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
@@ -1496,82 +1496,7 @@ public interface DiskEntry extends RegionEntry {
 
     
     public static void handleFullAsyncQueue(DiskEntry entry, LocalRegion region, VersionTag tag) {
-      DiskRegion dr = region.getDiskRegion();
-      DiskId did = entry.getDiskId();
-      synchronized (entry) {
-      dr.acquireReadLock();
-      try {
-        synchronized (did) {
-          if (did.isPendingAsync()) {
-            did.setPendingAsync(false);
-            final Token entryVal = entry.getValueAsToken();
-            final int entryValSize = region.calculateRegionEntryValueSize(entry);
-            boolean remove = false;
-            try {
-              if (Token.isRemovedFromDisk(entryVal)) {
-                // onDisk was already deced so just do the valueLength here
-                dr.incNumOverflowBytesOnDisk(-did.getValueLength());
-                incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
-                                     -did.getValueLength());
-                dr.remove(region, entry, true, false);
-                if (dr.isBackup()) {
-                  did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
-                }
-                remove = true;
-              } else if (Token.isInvalid(entryVal) && !dr.isBackup()) {
-                // no need to write invalid to disk if overflow only
-              } else if (entryVal != null) {
-                writeToDisk(entry, region, true);
-              } else {
-                //if we have a version tag we need to record the operation
-                //to update the RVV
-                if(tag != null) {
-                  DiskEntry.Helper.doAsyncFlush(tag, region);
-                }
-                return;
-              }
-              assert !dr.isSync();
-              // Only setValue to null if this was an evict.
-              // We could just be a backup that is writing async.
-              if (!remove
-                  && !Token.isInvalid(entryVal)
-                  && entry instanceof LRUEntry
-                  && ((LRUEntry)entry).testEvicted()) {
-                // Moved this here to fix bug 40116.
-                region.updateSizeOnEvict(entry.getKey(), entryValSize);
-                // note the old size was already accounted for
-                // onDisk was already inced so just do the valueLength here
-                dr.incNumOverflowBytesOnDisk(did.getValueLength());
-                incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
-                                     did.getValueLength());
-                entry.handleValueOverflow(region);
-                entry.setValueWithContext(region,null);
-              }
-              
-              //See if we the entry we wrote to disk has the same tag
-              //as this entry. If not, write the tag as a conflicting operation.
-              //to update the RVV.
-              VersionStamp stamp = entry.getVersionStamp();
-              if(tag != null && stamp != null 
-                  && (stamp.getMemberID() != tag.getMemberID()
-                    || stamp.getRegionVersion() != tag.getRegionVersion())) {
-                DiskEntry.Helper.doAsyncFlush(tag, region);
-              }
-            } catch (RegionClearedException ignore) {
-              // no need to do the op since it was clobbered by a region clear
-            }
-          } else {
-            //if we have a version tag we need to record the operation
-            //to update the RVV, even if we don't write the entry
-            if(tag != null) {
-              DiskEntry.Helper.doAsyncFlush(tag, region);
-            }
-          }
-        }
-      } finally {
-        dr.releaseReadLock();
-      }
-      } // sync entry
+      writeEntryToDisk(entry, region, tag, true);
     }
     
     public static void doAsyncFlush(VersionTag tag, LocalRegion region) {
@@ -1591,13 +1516,23 @@ public interface DiskEntry extends RegionEntry {
     
     /**
      * Flush an entry that was previously scheduled to be written to disk.
-     * @param tag 
      * @since GemFire prPersistSprint1
      */
     public static void doAsyncFlush(DiskEntry entry, LocalRegion region, VersionTag tag) {
+      writeEntryToDisk(entry, region, tag, false);
+    }
+    /**
+     * Does a synchronous write to disk for a region that uses async.
+     * This method is used by both doAsyncFlush and handleFullAsyncQueue to fix GEODE-1700.
+     * @param asyncQueueWasFull true if caller wanted to put this entry in the queue
+     *        but could not do so because it was full
+     */
+    private static void writeEntryToDisk(DiskEntry entry, LocalRegion region, VersionTag tag, boolean asyncQueueWasFull) {
       if (region.isThisRegionBeingClosedOrDestroyed()) return;
       DiskRegion dr = region.getDiskRegion();
-      dr.setClearCountReference();
+      if (!asyncQueueWasFull) {
+        dr.setClearCountReference();
+      }
       synchronized (entry) { // fixes 40116
         // If I don't sync the entry and this method ends up doing an eviction
         // thus setting value to null
@@ -1614,7 +1549,6 @@ public interface DiskEntry extends RegionEntry {
             did.setPendingAsync(false);
             final Token entryVal = entry.getValueAsToken();
             final int entryValSize = region.calculateRegionEntryValueSize(entry);
-            boolean remove = false;
             try {
               if (Token.isRemovedFromDisk(entryVal)) {
                 if (region.isThisRegionBeingClosedOrDestroyed()) return;
@@ -1626,19 +1560,28 @@ public interface DiskEntry extends RegionEntry {
                 if (dr.isBackup()) {
                   did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
                 }
-                remove = true;
               } else if ((Token.isInvalid(entryVal) || entryVal == Token.TOMBSTONE) && !dr.isBackup()) {
                 // no need to write invalid or tombstones to disk if overflow only
               } else if (entryVal != null) {
                 writeToDisk(entry, region, true);
+                assert !dr.isSync();
+                // Only setValue to null if this was an evict.
+                // We could just be a backup that is writing async.
+                if (!Token.isInvalid(entryVal)
+                    && (entryVal != Token.TOMBSTONE)
+                    && entry instanceof LRUEntry
+                    && ((LRUEntry)entry).testEvicted()) {
+                  // Moved this here to fix bug 40116.
+                  region.updateSizeOnEvict(entry.getKey(), entryValSize);
+                  dr.incNumEntriesInVM(-1);
+                  dr.incNumOverflowOnDisk(1L);
+                  dr.incNumOverflowBytesOnDisk(did.getValueLength());
+                  incrementBucketStats(region, -1/*InVM*/, 1/*OnDisk*/,
+                                       did.getValueLength());
+                  entry.handleValueOverflow(region);
+                  entry.setValueWithContext(region,null);
+                }
               } else {
-                // @todo why would we have a null value here?
-                // I'm seeing it show up in tests:
-// java.lang.IllegalArgumentException: Must not serialize  null  in this context.
-// 	at com.gemstone.gemfire.internal.cache.EntryEventImpl.serialize(EntryEventImpl.java:1024)
-// 	at com.gemstone.gemfire.internal.cache.DiskEntry$Helper.writeToDisk(DiskEntry.java:351)
-// 	at com.gemstone.gemfire.internal.cache.DiskEntry$Helper.doAsyncFlush(DiskEntry.java:683)
-// 	at com.gemstone.gemfire.internal.cache.DiskRegion$FlusherThread.run(DiskRegion.java:1055)
                 //if we have a version tag we need to record the operation
                 //to update the RVV
                 if(tag != null) {
@@ -1646,24 +1589,6 @@ public interface DiskEntry extends RegionEntry {
                 }
                 return;
               }
-              assert !dr.isSync();
-              // Only setValue to null if this was an evict.
-              // We could just be a backup that is writing async.
-              if (!remove
-                  && !Token.isInvalid(entryVal)
-                  && (entryVal != Token.TOMBSTONE)
-                  && entry instanceof LRUEntry
-                  && ((LRUEntry)entry).testEvicted()) {
-                // Moved this here to fix bug 40116.
-                region.updateSizeOnEvict(entry.getKey(), entryValSize);
-                dr.incNumEntriesInVM(-1);
-                dr.incNumOverflowOnDisk(1L);
-                dr.incNumOverflowBytesOnDisk(did.getValueLength());
-                incrementBucketStats(region, -1/*InVM*/, 1/*OnDisk*/,
-                                     did.getValueLength());
-                entry.handleValueOverflow(region);
-                entry.setValueWithContext(region,null);
-              }
             } catch (RegionClearedException ignore) {
               // no need to do the op since it was clobbered by a region clear
             }
@@ -1689,7 +1614,9 @@ public interface DiskEntry extends RegionEntry {
         dr.releaseReadLock();
       }
       } finally {
-        dr.removeClearCountReference();
+        if (!asyncQueueWasFull) {
+          dr.removeClearCountReference();
+        }
       }
       } // sync entry
     }