You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/04/27 18:48:12 UTC

[37/40] geode git commit: fix-1

fix-1


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b8734166
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b8734166
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b8734166

Branch: refs/heads/feature/GEM-1299
Commit: b873416655308f5ccea2179119124225f9c398eb
Parents: c98bc8b
Author: zhouxh <gz...@pivotal.io>
Authored: Thu Apr 20 15:02:21 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Thu Apr 27 11:47:01 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/BucketRegionQueue.java |  4 +
 .../lucene/internal/LuceneEventListener.java    | 16 ++++
 .../LuceneIndexForPartitionedRegion.java        | 90 +++++++++++++++++++-
 .../lucene/internal/LuceneServiceImpl.java      |  2 +
 4 files changed, 111 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/b8734166/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 7a21d12..bcc1d8d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -584,6 +584,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
     this.notifyEntriesRemoved();
   }
 
+  public Object firstEventSeqNum() {
+    return this.eventSeqNumQueue.peek();
+  }
+
   public boolean isReadyForPeek() {
     return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty()
         && !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary();

http://git-wip-us.apache.org/repos/asf/geode/blob/b8734166/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
index 0f55533..62983ef 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
@@ -31,7 +31,9 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
+import org.apache.geode.cache.lucene.internal.distributed.PokeLuceneAsyncQueueFunction;
 import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
 import org.apache.geode.cache.query.internal.DefaultQuery;
 import org.apache.geode.internal.cache.BucketNotFoundException;
@@ -111,12 +113,14 @@ public class LuceneEventListener implements AsyncEventListener {
       }
       return true;
     } catch (BucketNotFoundException | RegionDestroyedException | PrimaryBucketException e) {
+      redistributeEvents(events);
       logger.debug("Bucket not found while saving to lucene index: " + e.getMessage(), e);
       return false;
     } catch (CacheClosedException e) {
       logger.debug("Unable to save to lucene index, cache has been closed", e);
       return false;
     } catch (AlreadyClosedException e) {
+      redistributeEvents(events);
       logger.debug("Unable to commit, the lucene index is already closed", e);
       return false;
     } catch (IOException e) {
@@ -126,6 +130,18 @@ public class LuceneEventListener implements AsyncEventListener {
     }
   }
 
+  private void redistributeEvents(final List<AsyncEvent> events) {
+    for (AsyncEvent event : events) {
+      try {
+        FunctionService.onRegion(event.getRegion())
+            .withArgs(new Object[] {event.getRegion().getName(), event.getKey(), event})
+            .execute(PokeLuceneAsyncQueueFunction.ID);
+      } catch (RegionDestroyedException | PrimaryBucketException | CacheClosedException e) {
+        logger.debug("Unable to redistribute async event for :" + event.getKey() + " : " + event);
+      }
+    }
+  }
+
   public static void setExceptionObserver(LuceneExceptionObserver observer) {
     if (observer == null) {
       observer = exception -> {

http://git-wip-us.apache.org/repos/asf/geode/blob/b8734166/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index c39a4a8..dbd31ba 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -15,20 +15,28 @@
 
 package org.apache.geode.cache.lucene.internal;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.FixedPartitionResolver;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.PartitionResolver;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles;
+import org.apache.geode.cache.lucene.internal.distributed.PokeLuceneAsyncQueueFunction;
 import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
 import org.apache.geode.cache.lucene.internal.partition.BucketTargetingFixedResolver;
 import org.apache.geode.cache.lucene.internal.partition.BucketTargetingResolver;
@@ -39,14 +47,22 @@ import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.BucketRegionQueue;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PrimaryBucketException;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
 
 /* wrapper of IndexWriter */
 public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
   protected Region fileAndChunkRegion;
   protected final FileSystemStats fileSystemStats;
 
+  private StuckThreadCleaner stuckCleanerThread;
   public static final String FILES_REGION_SUFFIX = ".files";
 
   public LuceneIndexForPartitionedRegion(String indexName, String regionPath, InternalCache cache) {
@@ -166,7 +182,9 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
     return createRegion(regionName, attributes);
   }
 
-  public void close() {}
+  public void close() {
+    stuckCleanerThread.finish();
+  }
 
   @Override
   public void dumpFiles(final String directory) {
@@ -234,4 +252,74 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
       }
     }
   }
+
+  @Override
+  protected AsyncEventQueue createAEQ(Region dataRegion) {
+    AsyncEventQueueImpl queue = (AsyncEventQueueImpl) super.createAEQ(dataRegion);
+    startStuckCleaner(queue);
+    return queue;
+  }
+
+  private void startStuckCleaner(AsyncEventQueueImpl queue) {
+    stuckCleanerThread = new StuckThreadCleaner(queue);
+    Thread t = new Thread(stuckCleanerThread);
+    t.setDaemon(true);
+    t.start();
+  }
+
+  private static class StuckThreadCleaner implements Runnable {
+    private boolean done = false;
+    AsyncEventQueueImpl queue;
+
+    public StuckThreadCleaner(AsyncEventQueueImpl queue) {
+      this.queue = queue;
+    }
+
+    public void run() {
+      AbstractGatewaySender sender = (AbstractGatewaySender) queue.getSender();
+      List<ParallelGatewaySenderEventProcessor> processors =
+          ((ConcurrentParallelGatewaySenderEventProcessor) sender.getEventProcessor())
+              .getProcessors();
+
+      ConcurrentParallelGatewaySenderQueue prq =
+          (ConcurrentParallelGatewaySenderQueue) sender.getQueue();
+      PartitionedRegion pr = (PartitionedRegion) prq.getRegion();
+      HashMap lastPeekedEvents = new HashMap();
+
+      while (!done) {
+        try {
+          for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
+            if (!br.getBucketAdvisor().isPrimary()) {
+              AsyncEvent currentFirst = (AsyncEvent) ((BucketRegionQueue) br).firstEventSeqNum();
+              AsyncEvent lastPeek = (AsyncEvent) lastPeekedEvents.put(br, currentFirst);
+              if (currentFirst.equals(lastPeek)) {
+                redistributeEvents(lastPeek);
+              }
+            } else {
+              lastPeekedEvents.put(br, null);
+            }
+          }
+          Thread.sleep(10000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
+    public void finish() {
+      this.done = true;
+    }
+
+    private void redistributeEvents(final AsyncEvent event) {
+      try {
+        logger.info("JASON unsticking event:" + event.getKey() + ":" + event);
+        FunctionService.onRegion(event.getRegion())
+            .withArgs(new Object[] {event.getRegion().getName(), event.getKey(), event})
+            .execute(PokeLuceneAsyncQueueFunction.ID);
+      } catch (RegionDestroyedException | PrimaryBucketException | CacheClosedException e) {
+        logger.debug("Unable to redistribute async event for :" + event.getKey() + " : " + event);
+      }
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/b8734166/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index 437a552..8e19200 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.cache.lucene.LuceneIndexExistsException;
 import org.apache.geode.cache.lucene.internal.distributed.LuceneQueryFunction;
+import org.apache.geode.cache.lucene.internal.distributed.PokeLuceneAsyncQueueFunction;
 import org.apache.geode.cache.lucene.internal.management.LuceneServiceMBean;
 import org.apache.geode.cache.lucene.internal.management.ManagementIndexListener;
 import org.apache.geode.cache.lucene.internal.results.LuceneGetPageFunction;
@@ -102,6 +103,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
     FunctionService.registerFunction(new LuceneGetPageFunction());
     FunctionService.registerFunction(new WaitUntilFlushedFunction());
     FunctionService.registerFunction(new DumpDirectoryFiles());
+    FunctionService.registerFunction(new PokeLuceneAsyncQueueFunction());
     registerDataSerializables();
   }