You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2017/05/26 00:14:18 UTC

geode git commit: GEODE-2939: Make sure bucket region initiate event tracker from the image provider.

Repository: geode
Updated Branches:
  refs/heads/develop 29ea88a23 -> 56f976c89


GEODE-2939: Make sure bucket region initiate event tracker from the image provider.

Save all event states from remote processes.
Initiate event tracker from the image provider only.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/56f976c8
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/56f976c8
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/56f976c8

Branch: refs/heads/develop
Commit: 56f976c89fabed58a086a845593efc2ef6e75114
Parents: 29ea88a
Author: eshu <es...@pivotal.io>
Authored: Thu May 25 16:38:55 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Thu May 25 17:14:09 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/BucketRegion.java      | 29 ++++++++
 .../cache/CacheDistributionAdvisee.java         |  8 ++
 .../internal/cache/CreateRegionProcessor.java   | 36 ++++-----
 .../geode/internal/cache/DistributedRegion.java |  9 +++
 .../geode/internal/cache/EventTracker.java      |  3 +-
 .../internal/cache/InitialImageOperation.java   |  3 +
 .../internal/cache/EventTrackerDUnitTest.java   | 78 ++++++++++++++++++++
 7 files changed, 147 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 7bfffb7..31b341a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -29,8 +29,11 @@ import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.BucketAdvisor.BucketProfile;
+import org.apache.geode.internal.cache.CreateRegionProcessor.CreateRegionReplyProcessor;
+import org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder;
 import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.control.MemoryEvent;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.partitioned.DestroyMessage;
 import org.apache.geode.internal.cache.partitioned.InvalidateMessage;
@@ -92,6 +95,8 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   private final AtomicLong numOverflowBytesOnDisk = new AtomicLong();
   private final AtomicLong numEntriesInVM = new AtomicLong();
   private final AtomicLong evictions = new AtomicLong();
+  // For GII
+  private CreateRegionReplyProcessor createRegionReplyProcessor;
 
   /**
    * Contains size in bytes of the values stored in theRealMap. Sizes are tallied during put and
@@ -281,6 +286,30 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   @Override
+  public void registerCreateRegionReplyProcessor(CreateRegionReplyProcessor processor) {
+    this.createRegionReplyProcessor = processor;
+  }
+
+  @Override
+  protected void recordEventStateFromImageProvider(InternalDistributedMember provider) {
+    if (this.createRegionReplyProcessor != null) {
+      Map<ThreadIdentifier, EventSeqnoHolder> providerEventStates =
+          this.createRegionReplyProcessor.getEventState(provider);
+      if (providerEventStates != null) {
+        recordEventState(provider, providerEventStates);
+      } else {
+        // Does not see this to happen. Just in case we get gii from a node
+        // that was not in the cluster originally when we sent
+        // createRegionMessage (its event tracker was saved),
+        // but later available before we could get gii from anyone else.
+        // This will not cause data inconsistent issue. Log this message for debug purpose.
+        logger.info("Could not initiate event tracker from GII provider {}", provider);
+      }
+      this.createRegionReplyProcessor = null;
+    }
+  }
+
+  @Override
   protected CacheDistributionAdvisor createDistributionAdvisor(
       InternalRegionArguments internalRegionArgs) {
     return internalRegionArgs.getBucketAdvisor();

http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java
index e4a7957..d933019 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.distributed.internal.DistributionAdvisee;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
+import org.apache.geode.internal.cache.CreateRegionProcessor.CreateRegionReplyProcessor;
 
 /**
  * Distributed cache object (typically a <code>Region</code>) which uses a
@@ -54,4 +55,11 @@ public interface CacheDistributionAdvisee extends DistributionAdvisee {
    * @param profile the remote member's profile
    */
   public void remoteRegionInitialized(CacheProfile profile);
+
+  /**
+   * Allow this advisee to know the CreateRegionReplyProcessor that is creating it.
+   * 
+   * @param processor the CreateRegionReplyProcessor that is creating the advisee
+   */
+  default public void registerCreateRegionReplyProcessor(CreateRegionReplyProcessor processor) {}
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
index c1d1e77..1e38065 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.logging.log4j.Logger;
 
@@ -48,6 +49,8 @@ import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
+import org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
@@ -96,6 +99,7 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor {
       }
 
       CreateRegionReplyProcessor replyProc = new CreateRegionReplyProcessor(recps);
+      newRegion.registerCreateRegionReplyProcessor(replyProc);
 
       boolean useMcast = false; // multicast is disabled for this message for now
       CreateRegionMessage msg = getCreateRegionMessage(recps, replyProc, useMcast);
@@ -199,17 +203,16 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor {
           .getDistributedSystem(), members);
     }
 
-    /**
-     * guards application of event state to the region so that we deserialize and apply event state
-     * only once
-     */
-    private Object eventStateLock = new Object();
-
-    /** whether event state has been recorded in the region */
-    private boolean eventStateRecorded = false;
+    private final Map<DistributedMember, Map<ThreadIdentifier, EventSeqnoHolder>> remoteEventStates =
+        new ConcurrentHashMap<>();
 
     private boolean allMembersSkippedChecks = true;
 
+    public Map<ThreadIdentifier, EventSeqnoHolder> getEventState(
+        InternalDistributedMember provider) {
+      return this.remoteEventStates.get(provider);
+    }
+
     /**
      * true if all members skipped CreateRegionMessage#checkCompatibility(), in which case
      * CreateRegionMessage should be retried.
@@ -218,6 +221,7 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor {
       return this.allMembersSkippedChecks;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void process(DistributionMessage msg) {
       Assert.assertTrue(msg instanceof CreateRegionReplyMessage,
@@ -246,17 +250,13 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor {
             RegionAdvisor ra = (RegionAdvisor) cda;
             ra.putBucketRegionProfiles(reply.bucketProfiles);
           }
-          if (reply.eventState != null && lr.hasEventTracker()) {
-            synchronized (eventStateLock) {
-              if (!this.eventStateRecorded) {
-                this.eventStateRecorded = true;
-                Object eventState = null;
-                eventState = reply.eventState;
-                lr.recordEventState(reply.getSender(), (Map) eventState);
-              }
-            }
+
+          // Save all event states, need to initiate the event tracker from the GII provider
+          if (reply.eventState != null) {
+            remoteEventStates.put(reply.getSender(),
+                (Map<ThreadIdentifier, EventSeqnoHolder>) reply.eventState);
           }
-          reply.eventState = null;
+
           if (lr.isUsedForPartitionedRegionBucket()) {
             ((BucketRegion) lr).updateEventSeqNum(reply.seqKeyForWan);
           }

http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 650fe2a..9df64d0 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -261,6 +261,15 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
   }
 
   /**
+   * Record the event state from image provider
+   * 
+   * @param provider the member that provided the initial image and event state
+   */
+  protected void recordEventStateFromImageProvider(InternalDistributedMember provider) {
+    // No Op. Only Bucket region will initiate event states
+  }
+
+  /**
    * Intended for used during construction of a DistributedRegion
    * 
    * @return the advisor to be used by the region

http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
index 2c86aed..b919043 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
@@ -99,7 +99,8 @@ public class EventTracker {
   String name;
 
   /**
-   * whether or not this tracker has been initialized with state from another process
+   * whether or not this tracker has been initialized to allow entry operation. replicate region
+   * does not initiate event tracker from its replicates.
    */
   volatile boolean initialized;
 

http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
index 82df980..f8e9d0f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
@@ -231,11 +231,13 @@ public class InitialImageOperation {
       }
     }
     long giiStart = this.region.getCachePerfStats().startGetInitialImage();
+    InternalDistributedMember provider = null;
 
     for (Iterator itr = recipients.iterator(); !this.gotImage && itr.hasNext();) {
       // if we got a partial image from the previous recipient, then clear it
 
       InternalDistributedMember recipient = (InternalDistributedMember) itr.next();
+      provider = recipient;
 
       // In case of HARegion, before getting the region snapshot(image) get the filters
       // registered by the associated client and apply them.
@@ -546,6 +548,7 @@ public class InitialImageOperation {
     } // for
 
     if (this.gotImage) {
+      this.region.recordEventStateFromImageProvider(provider);
       this.region.getCachePerfStats().endGetInitialImage(giiStart);
       if (this.isDeltaGII) {
         this.region.getCachePerfStats().incDeltaGIICompleted();

http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java
index 3faf41f..77c0998 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java
@@ -19,8 +19,11 @@ import static org.junit.Assert.*;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 
+import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -405,4 +408,79 @@ public class EventTrackerDUnitTest extends JUnit4CacheTestCase {
   protected static int getCacheServerPort() {
     return cacheServerPort;
   }
+
+  /**
+   * Tests event track is initialized after gii
+   */
+  @Test
+  public void testEventTrackerIsInitalized() throws CacheException {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    createPRInVMs(vm0, vm1, vm2);
+
+    createPR();
+
+    doPutsInVMs(vm0, vm1, vm2);
+
+    doPuts();
+
+    verifyEventTrackerContent();
+
+    // close the region
+    getCache().getRegion(getName()).close();
+
+    // create the region again.
+    createPR();
+
+    for (int i = 0; i < 12; i++) {
+      waitEntryIsLocal(i);
+    }
+
+    // verify event track initialized after create region
+    verifyEventTrackerContent();
+
+  }
+
+  private void waitEntryIsLocal(int i) {
+    Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
+        .atMost(30, TimeUnit.SECONDS)
+        .until(() -> getCache().getRegion(getName()).getEntry(i) != null);
+  }
+
+  private void verifyEventTrackerContent() {
+    PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(getName());
+    BucketRegion br = pr.getDataStore().getLocalBucketById(0);
+    Map<?, ?> eventStates = br.getEventState();
+    assertTrue(eventStates.size() == 4);
+  }
+
+  public void createPRInVMs(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(() -> createPR());
+    }
+  }
+
+  private void createPR() {
+    PartitionAttributesFactory paf =
+        new PartitionAttributesFactory().setRedundantCopies(3).setTotalNumBuckets(4);
+    RegionFactory fact = getCache().createRegionFactory(RegionShortcut.PARTITION)
+        .setPartitionAttributes(paf.create());
+    fact.create(getName());
+  }
+
+  public void doPutsInVMs(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(() -> doPuts());
+    }
+  }
+
+  private void doPuts() {
+    Region region = getCache().getRegion(getName());
+    for (int i = 0; i < 12; i++) {
+      region.put(i, i);
+    }
+  }
 }