You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/20 16:30:04 UTC

[33/49] incubator-geode git commit: GEODE-1351: let the waitForFlush() to return false if timeout

GEODE-1351: let the waitForFlush() to return false if timeout

also apply it on some unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3e8a610e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3e8a610e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3e8a610e

Branch: refs/heads/feature/GEODE-835-test
Commit: 3e8a610e99bc005bcc56c378e3d1e2274a3d468c
Parents: 34d3791
Author: zhouxh <gz...@pivotal.io>
Authored: Mon May 16 17:12:15 2016 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Mon May 16 17:12:15 2016 -0700

----------------------------------------------------------------------
 .../gemfire/cache/lucene/LuceneIndex.java       |  4 +++-
 .../cache/lucene/internal/LuceneIndexImpl.java  | 11 ++++-----
 .../internal/xml/LuceneIndexCreation.java       |  3 ++-
 .../LuceneIndexCreationIntegrationTest.java     | 25 ++++++++++----------
 .../LuceneIndexRecoveryHAIntegrationTest.java   | 14 +++++------
 5 files changed, 29 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
index be329f7..6b1a4b4 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
@@ -58,7 +58,9 @@ public interface LuceneIndex {
   
   /* 
    * wait until the current entries in cache are indexed
+   * @param maxWaitInMilliseconds max wait time in millisecond
+   * @return if entries are flushed within maxWait
    */
-  public void waitUntilFlushed(int maxWait);
+  public boolean waitUntilFlushed(int maxWaitInMillisecond);
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
index 981d9e4..c165085 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
@@ -81,19 +81,17 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
     searchableFieldNames = fields;
   }
 
-  /*
-   *  For test and demo purpose. To use it, the data region should stop feeding
-   *  A more advanced version is under-development
-   */
   @Override
-  public void waitUntilFlushed(int maxWait) {
+  public boolean waitUntilFlushed(int maxWaitInMillisecond) {
     String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);
     AsyncEventQueue queue = (AsyncEventQueue)cache.getAsyncEventQueue(aeqId);
+    boolean flushed = false;
     if (queue != null) {
       long start = System.nanoTime();
-      while (System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(maxWait)) {
+      while (System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(maxWaitInMillisecond)) {
         if (0 == queue.size()) {
           logger.debug("waitUntilFlushed: Queue size is 0");
+          flushed = true;
           break;
         } else {
           try {
@@ -105,6 +103,7 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
     } else { 
       throw new IllegalArgumentException("The AEQ does not exist for the index "+indexName+" region "+regionPath);
     }
+    return flushed;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
index b54f51b..a3bdd24 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
@@ -116,6 +116,7 @@ public class LuceneIndexCreation implements LuceneIndex, Extension<Region<?, ?>>
   }
 
   @Override
-  public void waitUntilFlushed(int maxWait) {
+  public boolean waitUntilFlushed(int maxWaitInMillisecond) {
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java
index 6429143..fe754a4 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java
@@ -44,7 +44,6 @@ import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-import com.jayway.awaitility.Awaitility;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.core.KeywordTokenizer;
@@ -71,12 +70,10 @@ public class LuceneIndexCreationIntegrationTest extends LuceneIntegrationTest {
     Region region = createRegion();
     final LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME);
     region.put("key1", new TestObject());
-
+    verifyIndexFinishFlushing(INDEX_NAME, REGION_NAME);
     assertEquals(analyzers, index.getFieldAnalyzers());
-    Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
-      assertEquals(Arrays.asList("field1"), field1Analyzer.analyzedfields);
-      assertEquals(Arrays.asList("field2"), field2Analyzer.analyzedfields);
-    });
+    assertEquals(Arrays.asList("field1"), field1Analyzer.analyzedfields);
+    assertEquals(Arrays.asList("field2"), field2Analyzer.analyzedfields);
   }
 
   @Test
@@ -188,26 +185,28 @@ public class LuceneIndexCreationIntegrationTest extends LuceneIntegrationTest {
     cache.close();
     createCache();
     createIndex("field1", "field2");
+    verifyIndexFinishFlushing(INDEX_NAME, REGION_NAME);
     dataRegion = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
       .create(REGION_NAME);
     LuceneQuery<Object, Object> query = luceneService.createLuceneQueryFactory()
       .create(INDEX_NAME, REGION_NAME,
         "field1:world");
-    Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
-      assertEquals(1, query.search().size());
-    });
+    assertEquals(1, query.search().size());
   }
 
+  private void verifyIndexFinishFlushing(String indexName, String regionName) {
+    LuceneIndex index = luceneService.getIndex(indexName, regionName);
+    boolean flushed = index.waitUntilFlushed(60000);
+    assertTrue(flushed);
+  }
+  
   @Test
   public void shouldRecoverPersistentIndexWhenDataIsWrittenToIndex() throws ParseException, InterruptedException {
     createIndex("field1", "field2");
     Region dataRegion = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
       .create(REGION_NAME);
     dataRegion.put("A", new TestObject());
-    final AsyncEventQueueImpl queue = (AsyncEventQueueImpl) getIndexQueue();
-
-    //Wait until the queue has drained
-    Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> assertEquals(0, queue.size()));
+    verifyIndexFinishFlushing(INDEX_NAME, REGION_NAME);
     cache.close();
     createCache();
     createIndex("text");

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
index 77d2a5c..d32e6d8 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
@@ -41,6 +41,7 @@ import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionFactory;
 import com.gemstone.gemfire.cache.RegionShortcut;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.lucene.LuceneIndex;
 import com.gemstone.gemfire.cache.lucene.LuceneQuery;
 import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
 import com.gemstone.gemfire.cache.lucene.LuceneService;
@@ -55,7 +56,6 @@ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.test.junit.categories.FlakyTest;
 import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-import com.jayway.awaitility.Awaitility;
 
 @Category(IntegrationTest.class)
 public class LuceneIndexRecoveryHAIntegrationTest {
@@ -139,7 +139,7 @@ public class LuceneIndexRecoveryHAIntegrationTest {
     value = new Type1("lucene world", 1, 2L, 3.0, 4.0f);
     userRegion.put("value3", value);
 
-    waitUntilQueueEmpty(aeqId);
+    verifyIndexFinishFlushing(INDEX, REGION);
 
     LuceneQuery<Integer, Type1> query = service.createLuceneQueryFactory().create(INDEX, REGION, "s:world");
     LuceneQueryResults<Integer, Type1> results = query.search();
@@ -190,7 +190,7 @@ public class LuceneIndexRecoveryHAIntegrationTest {
     value = new Type1("lucene world", 1, 2L, 3.0, 4.0f);
     userRegion.put("value3", value);
 
-    waitUntilQueueEmpty(aeqId);
+    verifyIndexFinishFlushing(INDEX, REGION);
 
     PartitionedRegion fileRegion = (PartitionedRegion) cache.getRegion(aeqId + ".files");
     assertNotNull(fileRegion);
@@ -203,9 +203,9 @@ public class LuceneIndexRecoveryHAIntegrationTest {
     Assert.assertEquals(3, results.size());
   }
 
-  private void waitUntilQueueEmpty(final String aeqId) {
-    // TODO flush queue
-    AsyncEventQueue queue = cache.getAsyncEventQueue(aeqId);
-    Awaitility.waitAtMost(1000, TimeUnit.MILLISECONDS).until(() -> assertEquals(0, queue.size()));
+  private void verifyIndexFinishFlushing(String indexName, String regionName) {
+    LuceneIndex index = LuceneServiceProvider.get(cache).getIndex(indexName, regionName);
+    boolean flushed = index.waitUntilFlushed(60000);
+    assertTrue(flushed);
   }
 }