You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/18 17:04:53 UTC

[13/19] incubator-geode git commit: GEODE-1351: add waitUntilFlush() into luceneIndexImpl

GEODE-1351: add waitUntilFlush() into luceneIndexImpl

This function is very useful in test and demo.
It will wait for the AEQ size to be 0

Also added junit test and integration tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8ea53200
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8ea53200
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8ea53200

Branch: refs/heads/feature/GEODE-1392
Commit: 8ea532002fc1bc3e0d442986a108f719a61757e2
Parents: c54227c
Author: zhouxh <gz...@pivotal.io>
Authored: Sun May 15 21:36:34 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed May 18 10:04:25 2016 -0700

----------------------------------------------------------------------
 .../gemfire/cache/lucene/LuceneIndex.java       |  5 ++
 .../lucene/internal/LuceneEventListener.java    | 11 +++
 .../cache/lucene/internal/LuceneIndexImpl.java  | 43 +++++++++--
 .../internal/xml/LuceneIndexCreation.java       |  4 +
 .../gemfire/cache/lucene/LuceneQueriesBase.java | 69 ++++++++++++++++-
 .../internal/LuceneIndexImplJUnitTest.java      | 78 ++++++++++++++++++++
 6 files changed, 202 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8ea53200/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
index 743045b..be329f7 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
@@ -56,4 +56,9 @@ public interface LuceneIndex {
    */
   public Map<String, Analyzer> getFieldAnalyzers();
   
+  /* 
+   * wait until the current entries in cache are indexed
+   */
+  public void waitUntilFlushed(int maxWait);
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8ea53200/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
index 9fdfd43..2dae4ee 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
@@ -35,6 +35,8 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.cache.CacheObserverHolder;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.TestHook;
 import com.gemstone.gemfire.internal.logging.LogService;
 
 /**
@@ -70,6 +72,10 @@ public class LuceneEventListener implements AsyncEventListener {
         IndexRepository repository = repositoryManager.getRepository(region, key, callbackArgument);
 
         Operation op = event.getOperation();
+        
+        if (testHook != null) {
+          testHook.doTestHook("FOUND_AND_BEFORE_PROCESSING_A_EVENT");
+        }
 
         if (op.isCreate()) {
           repository.create(key, event.getDeserializedValue());
@@ -96,4 +102,9 @@ public class LuceneEventListener implements AsyncEventListener {
       DefaultQuery.setPdxReadSerialized(false);
     }
   }
+  
+  public interface TestHook {
+    public void doTestHook(String spot);
+  }
+  public static TestHook testHook;
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8ea53200/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
index 0b5f8fa..981d9e4 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
@@ -21,23 +21,26 @@ package com.gemstone.gemfire.cache.lucene.internal;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
-import com.gemstone.gemfire.InternalGemFireError;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.File;
 import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
 import com.gemstone.gemfire.cache.lucene.internal.xml.LuceneIndexCreation;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 
 public abstract class LuceneIndexImpl implements InternalLuceneIndex {
@@ -77,7 +80,33 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
   protected void setSearchableFields(String[] fields) {
     searchableFieldNames = fields;
   }
-  
+
+  /*
+   *  For test and demo purpose. To use it, the data region should stop feeding
+   *  A more advanced version is under-development
+   */
+  @Override
+  public void waitUntilFlushed(int maxWait) {
+    String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);
+    AsyncEventQueue queue = (AsyncEventQueue)cache.getAsyncEventQueue(aeqId);
+    if (queue != null) {
+      long start = System.nanoTime();
+      while (System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(maxWait)) {
+        if (0 == queue.size()) {
+          logger.debug("waitUntilFlushed: Queue size is 0");
+          break;
+        } else {
+          try {
+            Thread.sleep(200);
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    } else { 
+      throw new IllegalArgumentException("The AEQ does not exist for the index "+indexName+" region "+regionPath);
+    }
+  }
+
   @Override
   public String[] getFieldNames() {
     return searchableFieldNames;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8ea53200/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
index 86a10e4..b54f51b 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
@@ -114,4 +114,8 @@ public class LuceneIndexCreation implements LuceneIndex, Extension<Region<?, ?>>
   public void addFieldNames(String[] fieldNames) {
     this.fieldNames.addAll(Arrays.asList(fieldNames));
   }
+
+  @Override
+  public void waitUntilFlushed(int maxWait) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8ea53200/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
index c467a18..c7567f3 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
@@ -18,7 +18,7 @@
  */
 package com.gemstone.gemfire.cache.lucene;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -27,6 +27,12 @@ import java.util.Map;
 
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneEventListener;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexImpl;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneServiceImpl;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
+import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.test.dunit.Host;
 import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
 import com.gemstone.gemfire.test.dunit.VM;
@@ -75,6 +81,67 @@ public abstract class LuceneQueriesBase extends JUnit4CacheTestCase {
     executeTextSearch(accessor);
   }
 
+  @Test
+  public void entriesFlushedToIndexAfterWaitForFlushCalled() {
+    SerializableRunnableIF createIndex = () -> {
+      LuceneService luceneService = LuceneServiceProvider.get(getCache());
+      luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
+    };
+    dataStore1.invoke(() -> initDataStore(createIndex));
+    dataStore2.invoke(() -> initDataStore(createIndex));
+    accessor.invoke(() -> initAccessor(createIndex));
+
+    try {
+      dataStore1.invoke(() -> setTestHook());
+      putDataInRegion(accessor);
+      waitForFlushBeforeExecuteTextSearch(accessor, 10);
+      executeTextSearch(accessor);
+    } finally {
+      dataStore1.invoke(() -> checkResultAndresetTestHook());
+    }
+  }
+
+  protected void waitForFlushBeforeExecuteTextSearch(VM vm, final int expectKeyNum) {
+    vm.invoke(() -> {
+      Cache cache = getCache();
+      Region<Object, Object> region = cache.getRegion(REGION_NAME);
+
+      LuceneService service = LuceneServiceProvider.get(cache);
+      LuceneIndexImpl index = (LuceneIndexImpl)service.getIndex(INDEX_NAME, REGION_NAME);
+      assertNotNull(index);
+      LuceneQuery<Integer, TestObject> query;
+
+      String aeqId = LuceneServiceImpl.getUniqueIndexName(INDEX_NAME, REGION_NAME);
+      AsyncEventQueue queue = cache.getAsyncEventQueue(aeqId);
+      assertNotNull(queue);
+      assertTrue(queue.size()>0);
+      index.waitUntilFlushed(30000);
+      return null;
+    });
+  }
+
+  public static void setTestHook() {
+    LuceneEventListener.testHook = new LuceneEventListener.TestHook() {
+
+      @Override
+      public void doTestHook(String spot) {
+        if (spot.equals("FOUND_AND_BEFORE_PROCESSING_A_EVENT")) {
+          try {
+            Thread.sleep(1000);
+            LogService.getLogger().debug("Waited in test hook");
+          }
+          catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+  }
+  
+  public static void checkResultAndresetTestHook()
+  {
+    LuceneEventListener.testHook = null;
+  }
+
   protected void executeTextSearch(VM vm) {
     vm.invoke(() -> {
       Cache cache = getCache();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8ea53200/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImplJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImplJUnitTest.java
new file mode 100755
index 0000000..edecc66
--- /dev/null
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImplJUnitTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.lucene.LuceneIndex;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class LuceneIndexImplJUnitTest {
+  public static final String REGION = "region";
+  public static final String INDEX = "index";
+  public static final int MAX_WAIT = 30000;
+  private Cache cache;
+  LuceneIndex index;
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+  
+  @Before
+  public void createLuceneIndex() {
+    cache = Fakes.cache();
+    index = new LuceneIndexForPartitionedRegion(INDEX, REGION, cache);
+  }
+  
+  @Test
+  public void waitUnitFlushedWithMissingAEQThrowsIllegalArgument() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    index.waitUntilFlushed(MAX_WAIT);
+  }
+  
+  @Test
+  public void waitUnitFlushedWaitsForFlush() throws Exception {
+    final String expectedIndexName = LuceneServiceImpl.getUniqueIndexName(INDEX, REGION);
+    final AsyncEventQueue queue = mock(AsyncEventQueue.class);
+    when(cache.getAsyncEventQueue(eq(expectedIndexName))).thenReturn(queue);
+    
+    AtomicInteger callCount = new AtomicInteger();
+    when(queue.size()).thenAnswer(invocation -> {
+      if (callCount.get() == 0) {
+        // when the waitUnitFlushed() called the 2nd time, queue.size() will return 0
+        callCount.incrementAndGet();
+        return 2;
+      } else {
+        // when the waitUnitFlushed() called the 2nd time, queue.size() will return 0
+        return 0;
+      }
+    });
+    index.waitUntilFlushed(MAX_WAIT);
+    verify(cache).getAsyncEventQueue(eq(expectedIndexName));
+  }
+
+}