You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/04/27 18:48:09 UTC

[34/40] geode git commit: fix-6

fix-6


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d0e8a5a8
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d0e8a5a8
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d0e8a5a8

Branch: refs/heads/feature/GEM-1299
Commit: d0e8a5a83f6a164b3af43a73852b7fb78d385f87
Parents: a192c9f
Author: zhouxh <gz...@pivotal.io>
Authored: Wed Apr 26 23:23:13 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Thu Apr 27 11:47:01 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/geode/internal/cache/BucketRegion.java  | 2 +-
 .../org/apache/geode/internal/cache/BucketRegionQueue.java  | 6 +++++-
 .../java/org/apache/geode/internal/cache/LocalRegion.java   | 2 +-
 .../cache/wan/parallel/ParallelGatewaySenderQueue.java      | 1 +
 .../lucene/internal/LuceneIndexForPartitionedRegion.java    | 9 +++++----
 .../internal/distributed/PokeLuceneAsyncQueueFunction.java  | 3 +++
 6 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 136d7b9..cde7cf4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -668,7 +668,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
     }
   }
 
-  protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
+  public void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
     // We don't need to clone the event for new Gateway Senders.
     // Preserve the bucket reference for resetting it later.
     LocalRegion bucketRegion = event.getRegion();

http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index bcc1d8d..56ae3f1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -384,7 +384,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
   /**
    * Does a get that gets the value without fault values in from disk.
    */
-  private Object optimalGet(Object k) {
+  public Object optimalGet(Object k) {
     // Get the object at that key (to remove the index).
     Object object = null;
     try {
@@ -588,6 +588,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
     return this.eventSeqNumQueue.peek();
   }
 
+  public BlockingQueue getEventSeqNumQueue() {
+    return eventSeqNumQueue;
+  }
+
   public boolean isReadyForPeek() {
     return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty()
         && !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary();

http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 8c061b0..3f0d6b3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -6338,7 +6338,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     return false;
   }
 
-  protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
+  public void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
 
     if (isPdxTypesRegion()
         || event.isConcurrencyConflict() /* usually concurrent cache modification problem */) {

http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 9696b90..87feb21 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1133,6 +1133,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
             bucketId, prQ.getFullPath());
       }
     }
+    brq.getEventSeqNumQueue().add(key);
     addRemovedEvent(prQ, bucketId, key);
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index a60ca01..6e3dce0 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -290,16 +290,17 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
         try {
           for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
             if (!br.getBucketAdvisor().isPrimary()) {
-              AsyncEvent currentFirst = (AsyncEvent) ((BucketRegionQueue) br).firstEventSeqNum();
-              AsyncEvent lastPeek = (AsyncEvent) lastPeekedEvents.put(br, currentFirst);
+              Long currentFirst = (Long) ((BucketRegionQueue) br).firstEventSeqNum();
+              Long lastPeek = (Long) lastPeekedEvents.put(br, currentFirst);
               if (currentFirst != null && currentFirst.equals(lastPeek)) {
-                redistributeEvents(lastPeek);
+                redistributeEvents((AsyncEvent) ((BucketRegionQueue) br).optimalGet(currentFirst));
+                lastPeekedEvents.put(br, ((BucketRegionQueue) br).firstEventSeqNum());
               }
             } else {
               lastPeekedEvents.put(br, null);
             }
           }
-          Thread.sleep(10000);
+          Thread.sleep(2000);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }

http://git-wip-us.apache.org/repos/asf/geode/blob/d0e8a5a8/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
index 992972b..10c6888 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
@@ -52,6 +52,7 @@ public class PokeLuceneAsyncQueueFunction implements Function, InternalEntity {
     PartitionedRegion pr = (PartitionedRegion) ctx.getDataSet();
     Cache cache = pr.getCache();
     String queueId = (String) pr.getAttributes().getAsyncEventQueueIds().iterator().next();
+    // PR could have many AEQs, not just AEQ for lucene
     AsyncEventQueueImpl queue = (AsyncEventQueueImpl) cache.getAsyncEventQueue(queueId);
 
     // Get the GatewaySender
@@ -60,6 +61,8 @@ public class PokeLuceneAsyncQueueFunction implements Function, InternalEntity {
     // Update the shadow key
     BucketRegion br = pr.getBucketRegion(key);
     if (br.getBucketAdvisor().isPrimary()) {
+      // only do it for primary? how about failover again to secondary?
+      // why not br.notifyGatewaySender(operation, event);
       try {
         List<ParallelGatewaySenderEventProcessor> processors =
             ((ConcurrentParallelGatewaySenderEventProcessor) sender.getEventProcessor())