You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2015/10/08 19:26:47 UTC

[2/3] incubator-geode git commit: Hiding the lucene async event queue from the list of queues.

Hiding the lucene async event queue from the list of queues.

I added a new flag to the queue - isMetaQueue, which controls whether a
queue will show up in getAllQueues. This also has the effect of
preventing the queue from showing up in a generated xml file.

This is necessary because our queue needs to be constructed with an
async event listener that has a reference to the index. If the queue is
generated and added to the xml, it will end up trying to create a
listener before the index is created.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d88ef883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d88ef883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d88ef883

Branch: refs/heads/feature/GEODE-11
Commit: d88ef883b23eb8dd465376ccb5038d789f797bb6
Parents: abe9d47
Author: Dan Smith <up...@apache.org>
Authored: Wed Oct 7 15:04:39 2015 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Thu Oct 8 10:26:02 2015 -0700

----------------------------------------------------------------------
 .../internal/AsyncEventQueueFactoryImpl.java          |  9 +++++++--
 .../asyncqueue/internal/AsyncEventQueueImpl.java      |  3 +++
 .../gemfire/internal/cache/GemFireCacheImpl.java      | 14 ++++++++++++--
 .../internal/cache/wan/AbstractGatewaySender.java     |  7 +++++++
 .../internal/cache/wan/GatewaySenderAttributes.java   |  5 +++++
 .../internal/LuceneIndexForPartitionedRegion.java     |  7 ++++++-
 .../lucene/internal/LuceneServiceImplJUnitTest.java   |  6 +++++-
 7 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
index f413218..caef0fc 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java
@@ -184,8 +184,9 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
           //  AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncQueueId));
       addAsyncEventListener(listener);
       GatewaySender sender = create(AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncQueueId));
-      asyncEventQueue = new AsyncEventQueueImpl(sender, listener);
-      ((GemFireCacheImpl) cache).addAsyncEventQueue(asyncEventQueue);
+      AsyncEventQueueImpl queue = new AsyncEventQueueImpl(sender, listener);
+      asyncEventQueue = queue;
+      ((GemFireCacheImpl) cache).addAsyncEventQueue(queue);
     } else if (this.cache instanceof CacheCreation) {
       asyncEventQueue = new AsyncEventQueueCreation(asyncQueueId, attrs, listener);
       ((CacheCreation) cache).addAsyncEventQueue(asyncEventQueue);
@@ -282,4 +283,8 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
     this.attrs.isHDFSQueue = isHDFSQueue;
     return this;
   }
+  public AsyncEventQueueFactory setIsMetaQueue(boolean isMetaQueue) {
+    this.attrs.isMetaQueue = isMetaQueue;
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
index 71e8d2a..9a7698a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -180,6 +180,9 @@ public class AsyncEventQueueImpl implements AsyncEventQueue {
     return sender.isParallel();
   }
 
+  public boolean isMetaQueue() {
+    return ((AbstractGatewaySender)sender).getIsMetaQueue();
+  }
 
   public void destroy() {
     ((AbstractGatewaySender)this.sender).destroy();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 6fe639b..cf93dac 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -387,6 +387,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
    * {@link #allGatewaySendersLock}
    */
   private volatile Set<GatewaySender> allGatewaySenders = Collections.emptySet();
+  
+  /**
+   * The list of all async event queues added to the cache. 
+   * CopyOnWriteArrayList is used to allow concurrent add, remove and retrieval operations.
+   */
+  private volatile Set<AsyncEventQueue> allVisibleAsyncEventQueues = new CopyOnWriteArraySet<AsyncEventQueue>();
 
   /**
    * The list of all async event queues added to the cache. 
@@ -3881,8 +3887,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     }
   }
 
-  public void addAsyncEventQueue(AsyncEventQueue asyncQueue) {
+  public void addAsyncEventQueue(AsyncEventQueueImpl asyncQueue) {
     this.allAsyncEventQueues.add(asyncQueue);
+    if(!asyncQueue.isMetaQueue()) {
+      this.allVisibleAsyncEventQueues.add(asyncQueue);
+    }
     system
         .handleResourceEvent(ResourceEvent.ASYNCEVENTQUEUE_CREATE, asyncQueue);
   }
@@ -3925,7 +3934,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
   }
 
   public Set<AsyncEventQueue> getAsyncEventQueues() {
-    return this.allAsyncEventQueues;
+    return this.allVisibleAsyncEventQueues;
   }
   
   public AsyncEventQueue getAsyncEventQueue(String id) {
@@ -3949,6 +3958,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     // using gateway senders lock since async queue uses a gateway sender
     synchronized (allGatewaySendersLock) {
       this.allAsyncEventQueues.remove(asyncQueue);
+      this.allVisibleAsyncEventQueues.remove(asyncQueue);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index 3bd2992..e49708f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -152,6 +152,8 @@ public abstract class AbstractGatewaySender implements GatewaySender,
   
   protected boolean isHDFSQueue;
   
+  protected boolean isMetaQueue;
+  
   private int parallelismForReplicatedRegion;
   
   protected AbstractGatewaySenderEventProcessor eventProcessor;
@@ -252,6 +254,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
     this.myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager().getDistributedSystemId();
     this.serialNumber = DistributionAdvisor.createSerialNumber();
     this.isHDFSQueue = attrs.isHDFSQueue();
+    this.isMetaQueue = attrs.isMetaQueue();
     if (!(this.cache instanceof CacheCreation)) {
       this.stopper = new Stopper(cache.getCancelCriterion());
       this.senderAdvisor = GatewaySenderAdvisor.createGatewaySenderAdvisor(this);
@@ -476,6 +479,10 @@ public abstract class AbstractGatewaySender implements GatewaySender,
     return this.isHDFSQueue;
   }
   
+  public boolean getIsMetaQueue() {
+    return this.isMetaQueue;
+  }
+  
   public InternalDistributedSystem getSystem() {
     return (InternalDistributedSystem)this.cache.getDistributedSystem();
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
index 1d0b4f1..2df11aa 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java
@@ -22,6 +22,7 @@ public class GatewaySenderAttributes {
 
   public static final boolean DEFAULT_IS_BUCKETSORTED = true;
   public static final boolean DEFAULT_IS_HDFSQUEUE = false;
+  public static final boolean DEFAULT_IS_META_QUEUE = false;
 
 
   public int socketBufferSize = GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE;
@@ -73,6 +74,7 @@ public class GatewaySenderAttributes {
   public boolean isBucketSorted = GatewaySenderAttributes.DEFAULT_IS_BUCKETSORTED;
   
   public boolean isHDFSQueue = GatewaySenderAttributes.DEFAULT_IS_HDFSQUEUE;
+  public boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE;
   
   public int getSocketBufferSize() {
     return this.socketBufferSize;
@@ -183,4 +185,7 @@ public class GatewaySenderAttributes {
   public boolean isHDFSQueue() {
     return this.isHDFSQueue;
   }
+  public boolean isMetaQueue() {
+    return this.isMetaQueue;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index 60085e4..f9e2c1d 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -14,9 +14,11 @@ import com.gemstone.gemfire.cache.DataPolicy;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionFactory;
 import com.gemstone.gemfire.cache.RegionShortcut;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
 import com.gemstone.gemfire.cache.lucene.LuceneIndex;
@@ -26,7 +28,9 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
 import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer;
 import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
 import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.RegionFactoryImpl;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 
 /* wrapper of IndexWriter */
@@ -102,13 +106,14 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
       repositoryManager = new PartitionedRepositoryManager(dataRegion, (PartitionedRegion)fileRegion, (PartitionedRegion)chunkRegion, mapper, analyzer);
       
       // create AEQ, AEQ listner and specify the listener to repositoryManager
-      AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+      AsyncEventQueueFactoryImpl factory = (AsyncEventQueueFactoryImpl) cache.createAsyncEventQueueFactory();
       if (withPersistence) {
         factory.setPersistent(true);
       }
       factory.setParallel(true); // parallel AEQ for PR
       factory.setMaximumQueueMemory(1000);
       factory.setDispatcherThreads(1);
+      factory.setIsMetaQueue(true);
       
       LuceneEventListener listener = new LuceneEventListener(repositoryManager);
       String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d88ef883/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
index 5ec2725..eff2813 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneServiceImplJUnitTest.java
@@ -6,6 +6,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -123,7 +124,7 @@ public class LuceneServiceImplJUnitTest {
   @Test
   public void testCreateIndexForPR() throws IOException, ParseException {
     getService();
-    createPR("PR1", false);
+    LocalRegion userRegion = createPR("PR1", false);
     LuceneIndexImpl index1 = (LuceneIndexImpl)service.createIndex("index1", "PR1", "field1", "field2", "field3");
     assertTrue(index1 instanceof LuceneIndexForPartitionedRegion);
     LuceneIndexForPartitionedRegion index1PR = (LuceneIndexForPartitionedRegion)index1;
@@ -146,6 +147,9 @@ public class LuceneServiceImplJUnitTest {
     String aeqId = LuceneServiceImpl.getUniqueIndexName(index1.getName(), index1.getRegionPath());
     AsyncEventQueueImpl aeq = (AsyncEventQueueImpl)cache.getAsyncEventQueue(aeqId);
     assertTrue(aeq != null);
+    
+    //Make sure our queue doesn't show up in the list of async event queues 
+    assertEquals(Collections.emptySet(), cache.getAsyncEventQueues());
   }
 
   @Test