You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/04/27 18:48:12 UTC
[37/40] geode git commit: fix-1
fix-1
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b8734166
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b8734166
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b8734166
Branch: refs/heads/feature/GEM-1299
Commit: b873416655308f5ccea2179119124225f9c398eb
Parents: c98bc8b
Author: zhouxh <gz...@pivotal.io>
Authored: Thu Apr 20 15:02:21 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Thu Apr 27 11:47:01 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/BucketRegionQueue.java | 4 +
.../lucene/internal/LuceneEventListener.java | 16 ++++
.../LuceneIndexForPartitionedRegion.java | 90 +++++++++++++++++++-
.../lucene/internal/LuceneServiceImpl.java | 2 +
4 files changed, 111 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/b8734166/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 7a21d12..bcc1d8d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -584,6 +584,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
this.notifyEntriesRemoved();
}
+ public Object firstEventSeqNum() {
+ return this.eventSeqNumQueue.peek();
+ }
+
public boolean isReadyForPeek() {
return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty()
&& !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary();
http://git-wip-us.apache.org/repos/asf/geode/blob/b8734166/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
index 0f55533..62983ef 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
@@ -31,7 +31,9 @@ import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
+import org.apache.geode.cache.lucene.internal.distributed.PokeLuceneAsyncQueueFunction;
import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.internal.cache.BucketNotFoundException;
@@ -111,12 +113,14 @@ public class LuceneEventListener implements AsyncEventListener {
}
return true;
} catch (BucketNotFoundException | RegionDestroyedException | PrimaryBucketException e) {
+ redistributeEvents(events);
logger.debug("Bucket not found while saving to lucene index: " + e.getMessage(), e);
return false;
} catch (CacheClosedException e) {
logger.debug("Unable to save to lucene index, cache has been closed", e);
return false;
} catch (AlreadyClosedException e) {
+ redistributeEvents(events);
logger.debug("Unable to commit, the lucene index is already closed", e);
return false;
} catch (IOException e) {
@@ -126,6 +130,18 @@ public class LuceneEventListener implements AsyncEventListener {
}
}
+ private void redistributeEvents(final List<AsyncEvent> events) {
+ for (AsyncEvent event : events) {
+ try {
+ FunctionService.onRegion(event.getRegion())
+ .withArgs(new Object[] {event.getRegion().getName(), event.getKey(), event})
+ .execute(PokeLuceneAsyncQueueFunction.ID);
+ } catch (RegionDestroyedException | PrimaryBucketException | CacheClosedException e) {
+ logger.debug("Unable to redistribute async event for :" + event.getKey() + " : " + event);
+ }
+ }
+ }
+
public static void setExceptionObserver(LuceneExceptionObserver observer) {
if (observer == null) {
observer = exception -> {
http://git-wip-us.apache.org/repos/asf/geode/blob/b8734166/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index c39a4a8..dbd31ba 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -15,20 +15,28 @@
package org.apache.geode.cache.lucene.internal;
+import java.util.HashMap;
+import java.util.List;
import java.util.Set;
import org.apache.geode.CancelException;
import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.FixedPartitionResolver;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles;
+import org.apache.geode.cache.lucene.internal.distributed.PokeLuceneAsyncQueueFunction;
import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
import org.apache.geode.cache.lucene.internal.partition.BucketTargetingFixedResolver;
import org.apache.geode.cache.lucene.internal.partition.BucketTargetingResolver;
@@ -39,14 +47,22 @@ import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.BucketRegionQueue;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PrimaryBucketException;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
/* wrapper of IndexWriter */
public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
protected Region fileAndChunkRegion;
protected final FileSystemStats fileSystemStats;
+ private StuckThreadCleaner stuckCleanerThread;
public static final String FILES_REGION_SUFFIX = ".files";
public LuceneIndexForPartitionedRegion(String indexName, String regionPath, InternalCache cache) {
@@ -166,7 +182,9 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
return createRegion(regionName, attributes);
}
- public void close() {}
+ public void close() {
+ stuckCleanerThread.finish();
+ }
@Override
public void dumpFiles(final String directory) {
@@ -234,4 +252,74 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
}
}
}
+
+ @Override
+ protected AsyncEventQueue createAEQ(Region dataRegion) {
+ AsyncEventQueueImpl queue = (AsyncEventQueueImpl) super.createAEQ(dataRegion);
+ startStuckCleaner(queue);
+ return queue;
+ }
+
+ private void startStuckCleaner(AsyncEventQueueImpl queue) {
+ stuckCleanerThread = new StuckThreadCleaner(queue);
+ Thread t = new Thread(stuckCleanerThread);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ private static class StuckThreadCleaner implements Runnable {
+ private boolean done = false;
+ AsyncEventQueueImpl queue;
+
+ public StuckThreadCleaner(AsyncEventQueueImpl queue) {
+ this.queue = queue;
+ }
+
+ public void run() {
+ AbstractGatewaySender sender = (AbstractGatewaySender) queue.getSender();
+ List<ParallelGatewaySenderEventProcessor> processors =
+ ((ConcurrentParallelGatewaySenderEventProcessor) sender.getEventProcessor())
+ .getProcessors();
+
+ ConcurrentParallelGatewaySenderQueue prq =
+ (ConcurrentParallelGatewaySenderQueue) sender.getQueue();
+ PartitionedRegion pr = (PartitionedRegion) prq.getRegion();
+ HashMap lastPeekedEvents = new HashMap();
+
+ while (!done) {
+ try {
+ for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
+ if (!br.getBucketAdvisor().isPrimary()) {
+ AsyncEvent currentFirst = (AsyncEvent) ((BucketRegionQueue) br).firstEventSeqNum();
+ AsyncEvent lastPeek = (AsyncEvent) lastPeekedEvents.put(br, currentFirst);
+ if (currentFirst.equals(lastPeek)) {
+ redistributeEvents(lastPeek);
+ }
+ } else {
+ lastPeekedEvents.put(br, null);
+ }
+ }
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void finish() {
+ this.done = true;
+ }
+
+ private void redistributeEvents(final AsyncEvent event) {
+ try {
+ logger.info("JASON unsticking event:" + event.getKey() + ":" + event);
+ FunctionService.onRegion(event.getRegion())
+ .withArgs(new Object[] {event.getRegion().getName(), event.getKey(), event})
+ .execute(PokeLuceneAsyncQueueFunction.ID);
+ } catch (RegionDestroyedException | PrimaryBucketException | CacheClosedException e) {
+ logger.debug("Unable to redistribute async event for :" + event.getKey() + " : " + event);
+ }
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/geode/blob/b8734166/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index 437a552..8e19200 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.geode.cache.lucene.LuceneIndexExistsException;
import org.apache.geode.cache.lucene.internal.distributed.LuceneQueryFunction;
+import org.apache.geode.cache.lucene.internal.distributed.PokeLuceneAsyncQueueFunction;
import org.apache.geode.cache.lucene.internal.management.LuceneServiceMBean;
import org.apache.geode.cache.lucene.internal.management.ManagementIndexListener;
import org.apache.geode.cache.lucene.internal.results.LuceneGetPageFunction;
@@ -102,6 +103,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
FunctionService.registerFunction(new LuceneGetPageFunction());
FunctionService.registerFunction(new WaitUntilFlushedFunction());
FunctionService.registerFunction(new DumpDirectoryFiles());
+ FunctionService.registerFunction(new PokeLuceneAsyncQueueFunction());
registerDataSerializables();
}