You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/30 17:42:04 UTC
[12/43] geode git commit: GEODE-2939: Make sure bucket region
initiate event tracker from the image provider.
GEODE-2939: Make sure bucket region initiate event tracker from the image provider.
Save all event states from remote processes.
Initiate event tracker from the image provider only.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/56f976c8
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/56f976c8
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/56f976c8
Branch: refs/heads/feature/GEODE-2632-17
Commit: 56f976c89fabed58a086a845593efc2ef6e75114
Parents: 29ea88a
Author: eshu <es...@pivotal.io>
Authored: Thu May 25 16:38:55 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Thu May 25 17:14:09 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/BucketRegion.java | 29 ++++++++
.../cache/CacheDistributionAdvisee.java | 8 ++
.../internal/cache/CreateRegionProcessor.java | 36 ++++-----
.../geode/internal/cache/DistributedRegion.java | 9 +++
.../geode/internal/cache/EventTracker.java | 3 +-
.../internal/cache/InitialImageOperation.java | 3 +
.../internal/cache/EventTrackerDUnitTest.java | 78 ++++++++++++++++++++
7 files changed, 147 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 7bfffb7..31b341a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -29,8 +29,11 @@ import org.apache.geode.internal.Assert;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.BucketAdvisor.BucketProfile;
+import org.apache.geode.internal.cache.CreateRegionProcessor.CreateRegionReplyProcessor;
+import org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder;
import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.control.MemoryEvent;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.partitioned.DestroyMessage;
import org.apache.geode.internal.cache.partitioned.InvalidateMessage;
@@ -92,6 +95,8 @@ public class BucketRegion extends DistributedRegion implements Bucket {
private final AtomicLong numOverflowBytesOnDisk = new AtomicLong();
private final AtomicLong numEntriesInVM = new AtomicLong();
private final AtomicLong evictions = new AtomicLong();
+ // For GII
+ private CreateRegionReplyProcessor createRegionReplyProcessor;
/**
* Contains size in bytes of the values stored in theRealMap. Sizes are tallied during put and
@@ -281,6 +286,30 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
@Override
+ public void registerCreateRegionReplyProcessor(CreateRegionReplyProcessor processor) {
+ this.createRegionReplyProcessor = processor;
+ }
+
+ @Override
+ protected void recordEventStateFromImageProvider(InternalDistributedMember provider) {
+ if (this.createRegionReplyProcessor != null) {
+ Map<ThreadIdentifier, EventSeqnoHolder> providerEventStates =
+ this.createRegionReplyProcessor.getEventState(provider);
+ if (providerEventStates != null) {
+ recordEventState(provider, providerEventStates);
+ } else {
+ // Does not see this to happen. Just in case we get gii from a node
+ // that was not in the cluster originally when we sent
+ // createRegionMessage (its event tracker was saved),
+ // but later available before we could get gii from anyone else.
+ // This will not cause data inconsistent issue. Log this message for debug purpose.
+ logger.info("Could not initiate event tracker from GII provider {}", provider);
+ }
+ this.createRegionReplyProcessor = null;
+ }
+ }
+
+ @Override
protected CacheDistributionAdvisor createDistributionAdvisor(
InternalRegionArguments internalRegionArgs) {
return internalRegionArgs.getBucketAdvisor();
http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java
index e4a7957..d933019 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisee.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.distributed.internal.DistributionAdvisee;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
+import org.apache.geode.internal.cache.CreateRegionProcessor.CreateRegionReplyProcessor;
/**
* Distributed cache object (typically a <code>Region</code>) which uses a
@@ -54,4 +55,11 @@ public interface CacheDistributionAdvisee extends DistributionAdvisee {
* @param profile the remote member's profile
*/
public void remoteRegionInitialized(CacheProfile profile);
+
+ /**
+ * Allow this advisee to know the CreateRegionReplyProcessor that is creating it.
+ *
+ * @param processor the CreateRegionReplyProcessor that is creating the advisee
+ */
+ default public void registerCreateRegionReplyProcessor(CreateRegionReplyProcessor processor) {}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
index c1d1e77..1e38065 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CreateRegionProcessor.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.Logger;
@@ -48,6 +49,8 @@ import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
+import org.apache.geode.internal.cache.EventTracker.EventSeqnoHolder;
+import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
@@ -96,6 +99,7 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor {
}
CreateRegionReplyProcessor replyProc = new CreateRegionReplyProcessor(recps);
+ newRegion.registerCreateRegionReplyProcessor(replyProc);
boolean useMcast = false; // multicast is disabled for this message for now
CreateRegionMessage msg = getCreateRegionMessage(recps, replyProc, useMcast);
@@ -199,17 +203,16 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor {
.getDistributedSystem(), members);
}
- /**
- * guards application of event state to the region so that we deserialize and apply event state
- * only once
- */
- private Object eventStateLock = new Object();
-
- /** whether event state has been recorded in the region */
- private boolean eventStateRecorded = false;
+ private final Map<DistributedMember, Map<ThreadIdentifier, EventSeqnoHolder>> remoteEventStates =
+ new ConcurrentHashMap<>();
private boolean allMembersSkippedChecks = true;
+ public Map<ThreadIdentifier, EventSeqnoHolder> getEventState(
+ InternalDistributedMember provider) {
+ return this.remoteEventStates.get(provider);
+ }
+
/**
* true if all members skipped CreateRegionMessage#checkCompatibility(), in which case
* CreateRegionMessage should be retried.
@@ -218,6 +221,7 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor {
return this.allMembersSkippedChecks;
}
+ @SuppressWarnings("unchecked")
@Override
public void process(DistributionMessage msg) {
Assert.assertTrue(msg instanceof CreateRegionReplyMessage,
@@ -246,17 +250,13 @@ public class CreateRegionProcessor implements ProfileExchangeProcessor {
RegionAdvisor ra = (RegionAdvisor) cda;
ra.putBucketRegionProfiles(reply.bucketProfiles);
}
- if (reply.eventState != null && lr.hasEventTracker()) {
- synchronized (eventStateLock) {
- if (!this.eventStateRecorded) {
- this.eventStateRecorded = true;
- Object eventState = null;
- eventState = reply.eventState;
- lr.recordEventState(reply.getSender(), (Map) eventState);
- }
- }
+
+ // Save all event states, need to initiate the event tracker from the GII provider
+ if (reply.eventState != null) {
+ remoteEventStates.put(reply.getSender(),
+ (Map<ThreadIdentifier, EventSeqnoHolder>) reply.eventState);
}
- reply.eventState = null;
+
if (lr.isUsedForPartitionedRegionBucket()) {
((BucketRegion) lr).updateEventSeqNum(reply.seqKeyForWan);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 650fe2a..9df64d0 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -261,6 +261,15 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
/**
+ * Record the event state from image provider
+ *
+ * @param provider the member that provided the initial image and event state
+ */
+ protected void recordEventStateFromImageProvider(InternalDistributedMember provider) {
+ // No Op. Only Bucket region will initiate event states
+ }
+
+ /**
* Intended for used during construction of a DistributedRegion
*
* @return the advisor to be used by the region
http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
index 2c86aed..b919043 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventTracker.java
@@ -99,7 +99,8 @@ public class EventTracker {
String name;
/**
- * whether or not this tracker has been initialized with state from another process
+ * whether or not this tracker has been initialized to allow entry operation. replicate region
+ * does not initiate event tracker from its replicates.
*/
volatile boolean initialized;
http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
index 82df980..f8e9d0f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
@@ -231,11 +231,13 @@ public class InitialImageOperation {
}
}
long giiStart = this.region.getCachePerfStats().startGetInitialImage();
+ InternalDistributedMember provider = null;
for (Iterator itr = recipients.iterator(); !this.gotImage && itr.hasNext();) {
// if we got a partial image from the previous recipient, then clear it
InternalDistributedMember recipient = (InternalDistributedMember) itr.next();
+ provider = recipient;
// In case of HARegion, before getting the region snapshot(image) get the filters
// registered by the associated client and apply them.
@@ -546,6 +548,7 @@ public class InitialImageOperation {
} // for
if (this.gotImage) {
+ this.region.recordEventStateFromImageProvider(provider);
this.region.getCachePerfStats().endGetInitialImage(giiStart);
if (this.isDeltaGII) {
this.region.getCachePerfStats().incDeltaGIICompleted();
http://git-wip-us.apache.org/repos/asf/geode/blob/56f976c8/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java
index 3faf41f..77c0998 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EventTrackerDUnitTest.java
@@ -19,8 +19,11 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -405,4 +408,79 @@ public class EventTrackerDUnitTest extends JUnit4CacheTestCase {
protected static int getCacheServerPort() {
return cacheServerPort;
}
+
+ /**
+ * Tests event track is initialized after gii
+ */
+ @Test
+ public void testEventTrackerIsInitalized() throws CacheException {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ createPRInVMs(vm0, vm1, vm2);
+
+ createPR();
+
+ doPutsInVMs(vm0, vm1, vm2);
+
+ doPuts();
+
+ verifyEventTrackerContent();
+
+ // close the region
+ getCache().getRegion(getName()).close();
+
+ // create the region again.
+ createPR();
+
+ for (int i = 0; i < 12; i++) {
+ waitEntryIsLocal(i);
+ }
+
+ // verify event track initialized after create region
+ verifyEventTrackerContent();
+
+ }
+
+ private void waitEntryIsLocal(int i) {
+ Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
+ .atMost(30, TimeUnit.SECONDS)
+ .until(() -> getCache().getRegion(getName()).getEntry(i) != null);
+ }
+
+ private void verifyEventTrackerContent() {
+ PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(getName());
+ BucketRegion br = pr.getDataStore().getLocalBucketById(0);
+ Map<?, ?> eventStates = br.getEventState();
+ assertTrue(eventStates.size() == 4);
+ }
+
+ public void createPRInVMs(VM... vms) {
+ for (VM vm : vms) {
+ vm.invoke(() -> createPR());
+ }
+ }
+
+ private void createPR() {
+ PartitionAttributesFactory paf =
+ new PartitionAttributesFactory().setRedundantCopies(3).setTotalNumBuckets(4);
+ RegionFactory fact = getCache().createRegionFactory(RegionShortcut.PARTITION)
+ .setPartitionAttributes(paf.create());
+ fact.create(getName());
+ }
+
+ public void doPutsInVMs(VM... vms) {
+ for (VM vm : vms) {
+ vm.invoke(() -> doPuts());
+ }
+ }
+
+ private void doPuts() {
+ Region region = getCache().getRegion(getName());
+ for (int i = 0; i < 12; i++) {
+ region.put(i, i);
+ }
+ }
}