You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2015/10/08 19:26:47 UTC
[2/3] incubator-geode git commit: Hiding the lucene async event queue
from the list of queues.
Hiding the lucene async event queue from the list of queues.
I added a new flag to the queue - isMetaQueue, which controls whether a
queue will show up in getAllQueues. This also has the effect of
preventing the queue from showing up in a generated xml file.
This is necessary because our queue needs to be constructed with an
async event listener that has a reference to the index. If the queue is
generated and added to the xml, it will end up trying to create a
listener before the index is created.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d88ef883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d88ef883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d88ef883
Branch: refs/heads/feature/GEODE-11
Commit: d88ef883b23eb8dd465376ccb5038d789f797bb6
Parents: abe9d47
Author: Dan Smith <up...@apache.org>
Authored: Wed Oct 7 15:04:39 2015 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Thu Oct 8 10:26:02 2015 -0700
----------------------------------------------------------------------
.../internal/AsyncEventQueueFactoryImpl.java | 9 +++++++--
.../asyncqueue/internal/AsyncEventQueueImpl.java | 3 +++
.../gemfire/internal/cache/GemFireCacheImpl.java | 14 ++++++++++++--
.../internal/cache/wan/AbstractGatewaySender.java | 7 +++++++
.../internal/cache/wan/GatewaySenderAttributes.java | 5 +++++
.../internal/LuceneIndexForPartitionedRegion.java | 7 ++++++-
.../lucene/internal/LuceneServiceImplJUnitTest.java | 6 +++++-
7 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
index f413218..caef0fc 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
@@ -184,8 +184,9 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
// AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncQueueId));
addAsyncEventListener(listener);
GatewaySender sender = create(AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncQueueId));
- asyncEventQueue = new AsyncEventQueueImpl(sender, listener);
- ((GemFireCacheImpl) cache).addAsyncEventQueue(asyncEventQueue);
+ AsyncEventQueueImpl queue = new AsyncEventQueueImpl(sender, listener);
+ asyncEventQueue = queue;
+ ((GemFireCacheImpl) cache).addAsyncEventQueue(queue);
} else if (this.cache instanceof CacheCreation) {
asyncEventQueue = new AsyncEventQueueCreation(asyncQueueId, attrs, listener);
((CacheCreation) cache).addAsyncEventQueue(asyncEventQueue);
@@ -282,4 +283,8 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
this.attrs.isHDFSQueue = isHDFSQueue;
return this;
}
+ public AsyncEventQueueFactory setIsMetaQueue(boolean isMetaQueue) {
+ this.attrs.isMetaQueue = isMetaQueue;
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
index 71e8d2a..9a7698a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -180,6 +180,9 @@ public class AsyncEventQueueImpl implements AsyncEventQueue {
return sender.isParallel();
}
+ public boolean isMetaQueue() {
+ return ((AbstractGatewaySender)sender).getIsMetaQueue();
+ }
public void destroy() {
((AbstractGatewaySender)this.sender).destroy();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 6fe639b..cf93dac 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -387,6 +387,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
* {@link #allGatewaySendersLock}
*/
private volatile Set<GatewaySender> allGatewaySenders = Collections.emptySet();
+
+ /**
+ * The list of all async event queues added to the cache.
+ * CopyOnWriteArrayList is used to allow concurrent add, remove and retrieval operations.
+ */
+ private volatile Set<AsyncEventQueue> allVisibleAsyncEventQueues = new CopyOnWriteArraySet<AsyncEventQueue>();
/**
* The list of all async event queues added to the cache.
@@ -3881,8 +3887,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
- public void addAsyncEventQueue(AsyncEventQueue asyncQueue) {
+ public void addAsyncEventQueue(AsyncEventQueueImpl asyncQueue) {
this.allAsyncEventQueues.add(asyncQueue);
+ if(!asyncQueue.isMetaQueue()) {
+ this.allVisibleAsyncEventQueues.add(asyncQueue);
+ }
system
.handleResourceEvent(ResourceEvent.ASYNCEVENTQUEUE_CREATE, asyncQueue);
}
@@ -3925,7 +3934,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
public Set<AsyncEventQueue> getAsyncEventQueues() {
- return this.allAsyncEventQueues;
+ return this.allVisibleAsyncEventQueues;
}
public AsyncEventQueue getAsyncEventQueue(String id) {
@@ -3949,6 +3958,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
// using gateway senders lock since async queue uses a gateway sender
synchronized (allGatewaySendersLock) {
this.allAsyncEventQueues.remove(asyncQueue);
+ this.allVisibleAsyncEventQueues.remove(asyncQueue);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index 3bd2992..e49708f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -152,6 +152,8 @@ public abstract class AbstractGatewaySender implements GatewaySender,
protected boolean isHDFSQueue;
+ protected boolean isMetaQueue;
+
private int parallelismForReplicatedRegion;
protected AbstractGatewaySenderEventProcessor eventProcessor;
@@ -252,6 +254,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
this.myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager().getDistributedSystemId();
this.serialNumber = DistributionAdvisor.createSerialNumber();
this.isHDFSQueue = attrs.isHDFSQueue();
+ this.isMetaQueue = attrs.isMetaQueue();
if (!(this.cache instanceof CacheCreation)) {
this.stopper = new Stopper(cache.getCancelCriterion());
this.senderAdvisor = GatewaySenderAdvisor.createGatewaySenderAdvisor(this);
@@ -476,6 +479,10 @@ public abstract class AbstractGatewaySender implements GatewaySender,
return this.isHDFSQueue;
}
+ public boolean getIsMetaQueue() {
+ return this.isMetaQueue;
+ }
+
public InternalDistributedSystem getSystem() {
return (InternalDistributedSystem)this.cache.getDistributedSystem();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
index 1d0b4f1..2df11aa 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
@@ -22,6 +22,7 @@ public class GatewaySenderAttributes {
public static final boolean DEFAULT_IS_BUCKETSORTED = true;
public static final boolean DEFAULT_IS_HDFSQUEUE = false;
+ public static final boolean DEFAULT_IS_META_QUEUE = false;
public int socketBufferSize = GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE;
@@ -73,6 +74,7 @@ public class GatewaySenderAttributes {
public boolean isBucketSorted = GatewaySenderAttributes.DEFAULT_IS_BUCKETSORTED;
public boolean isHDFSQueue = GatewaySenderAttributes.DEFAULT_IS_HDFSQUEUE;
+ public boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE;
public int getSocketBufferSize() {
return this.socketBufferSize;
@@ -183,4 +185,7 @@ public class GatewaySenderAttributes {
public boolean isHDFSQueue() {
return this.isHDFSQueue;
}
+ public boolean isMetaQueue() {
+ return this.isMetaQueue;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index 60085e4..f9e2c1d 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -14,9 +14,11 @@ import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.lucene.LuceneIndex;
@@ -26,7 +28,9 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer;
import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.RegionFactoryImpl;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
/* wrapper of IndexWriter */
@@ -102,13 +106,14 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
repositoryManager = new PartitionedRepositoryManager(dataRegion, (PartitionedRegion)fileRegion, (PartitionedRegion)chunkRegion, mapper, analyzer);
// create AEQ, AEQ listner and specify the listener to repositoryManager
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+ AsyncEventQueueFactoryImpl factory = (AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory();
if (withPersistence) {
factory.setPersistent(true);
}
factory.setParallel(true); // parallel AEQ for PR
factory.setMaximumQueueMemory(1000);
factory.setDispatcherThreads(1);
+ factory.setIsMetaQueue(true);
LuceneEventListener listener = new LuceneEventListener(repositoryManager);
String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
index 5ec2725..eff2813 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
@@ -6,6 +6,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -123,7 +124,7 @@ public class LuceneServiceImplJUnitTest {
@Test
public void testCreateIndexForPR() throws IOException, ParseException {
getService();
- createPR("PR1", false);
+ LocalRegion userRegion = createPR("PR1", false);
LuceneIndexImpl index1 = (LuceneIndexImpl)service.createIndex("index1", "PR1", "field1", "field2", "field3");
assertTrue(index1 instanceof LuceneIndexForPartitionedRegion);
LuceneIndexForPartitionedRegion index1PR = (LuceneIndexForPartitionedRegion)index1;
@@ -146,6 +147,9 @@ public class LuceneServiceImplJUnitTest {
String aeqId = LuceneServiceImpl.getUniqueIndexName(index1.getName(), index1.getRegionPath());
AsyncEventQueueImpl aeq = (AsyncEventQueueImpl)cache.getAsyncEventQueue(aeqId);
assertTrue(aeq != null);
+
+ //Make sure our queue doesn't show up in the list of async event queues
+ assertEquals(Collections.emptySet(), cache.getAsyncEventQueues());
}
@Test