You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/04/27 18:48:09 UTC
[34/40] geode git commit: fix-6
fix-6
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d0e8a5a8
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d0e8a5a8
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d0e8a5a8
Branch: refs/heads/feature/GEM-1299
Commit: d0e8a5a83f6a164b3af43a73852b7fb78d385f87
Parents: a192c9f
Author: zhouxh <gz...@pivotal.io>
Authored: Wed Apr 26 23:23:13 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Thu Apr 27 11:47:01 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/geode/internal/cache/BucketRegion.java | 2 +-
.../org/apache/geode/internal/cache/BucketRegionQueue.java | 6 +++++-
.../java/org/apache/geode/internal/cache/LocalRegion.java | 2 +-
.../cache/wan/parallel/ParallelGatewaySenderQueue.java | 1 +
.../lucene/internal/LuceneIndexForPartitionedRegion.java | 9 +++++----
.../internal/distributed/PokeLuceneAsyncQueueFunction.java | 3 +++
6 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 136d7b9..cde7cf4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -668,7 +668,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
}
- protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
+ public void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
// We don't need to clone the event for new Gateway Senders.
// Preserve the bucket reference for resetting it later.
LocalRegion bucketRegion = event.getRegion();
http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index bcc1d8d..56ae3f1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -384,7 +384,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
/**
* Does a get that gets the value without fault values in from disk.
*/
- private Object optimalGet(Object k) {
+ public Object optimalGet(Object k) {
// Get the object at that key (to remove the index).
Object object = null;
try {
@@ -588,6 +588,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
return this.eventSeqNumQueue.peek();
}
+ public BlockingQueue getEventSeqNumQueue() {
+ return eventSeqNumQueue;
+ }
+
public boolean isReadyForPeek() {
return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty()
&& !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary();
http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 8c061b0..3f0d6b3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -6338,7 +6338,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return false;
}
- protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
+ public void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
if (isPdxTypesRegion()
|| event.isConcurrencyConflict() /* usually concurrent cache modification problem */) {
http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 9696b90..87feb21 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1133,6 +1133,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
bucketId, prQ.getFullPath());
}
}
+ brq.getEventSeqNumQueue().add(key);
addRemovedEvent(prQ, bucketId, key);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index a60ca01..6e3dce0 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -290,16 +290,17 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
try {
for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
if (!br.getBucketAdvisor().isPrimary()) {
- AsyncEvent currentFirst = (AsyncEvent) ((BucketRegionQueue) br).firstEventSeqNum();
- AsyncEvent lastPeek = (AsyncEvent) lastPeekedEvents.put(br, currentFirst);
+ Long currentFirst = (Long) ((BucketRegionQueue) br).firstEventSeqNum();
+ Long lastPeek = (Long) lastPeekedEvents.put(br, currentFirst);
if (currentFirst != null && currentFirst.equals(lastPeek)) {
- redistributeEvents(lastPeek);
+ redistributeEvents((AsyncEvent) ((BucketRegionQueue) br).optimalGet(currentFirst));
+ lastPeekedEvents.put(br, ((BucketRegionQueue) br).firstEventSeqNum());
}
} else {
lastPeekedEvents.put(br, null);
}
}
- Thread.sleep(10000);
+ Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
index 992972b..10c6888 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
@@ -52,6 +52,7 @@ public class PokeLuceneAsyncQueueFunction implements Function, InternalEntity {
PartitionedRegion pr = (PartitionedRegion) ctx.getDataSet();
Cache cache = pr.getCache();
String queueId = (String) pr.getAttributes().getAsyncEventQueueIds().iterator().next();
+ // PR could have many AEQs, not just AEQ for lucene
AsyncEventQueueImpl queue = (AsyncEventQueueImpl) cache.getAsyncEventQueue(queueId);
// Get the GatewaySender
@@ -60,6 +61,8 @@ public class PokeLuceneAsyncQueueFunction implements Function, InternalEntity {
// Update the shadow key
BucketRegion br = pr.getBucketRegion(key);
if (br.getBucketAdvisor().isPrimary()) {
+ // only do it for primary? how about failover again to secondary?
+ // why not br.notifyGatewaySender(operation, event);
try {
List<ParallelGatewaySenderEventProcessor> processors =
((ConcurrentParallelGatewaySenderEventProcessor) sender.getEventProcessor())