You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/20 17:10:42 UTC
[17/50] [abbrv] incubator-geode git commit: GEODE-1351: let the
waitForFlush() to return false if timeout
GEODE-1351: let the waitForFlush() to return false if timeout
also apply it on some unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3e8a610e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3e8a610e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3e8a610e
Branch: refs/heads/feature/GEODE-835
Commit: 3e8a610e99bc005bcc56c378e3d1e2274a3d468c
Parents: 34d3791
Author: zhouxh <gz...@pivotal.io>
Authored: Mon May 16 17:12:15 2016 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Mon May 16 17:12:15 2016 -0700
----------------------------------------------------------------------
.../gemfire/cache/lucene/LuceneIndex.java | 4 +++-
.../cache/lucene/internal/LuceneIndexImpl.java | 11 ++++-----
.../internal/xml/LuceneIndexCreation.java | 3 ++-
.../LuceneIndexCreationIntegrationTest.java | 25 ++++++++++----------
.../LuceneIndexRecoveryHAIntegrationTest.java | 14 +++++------
5 files changed, 29 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
index be329f7..6b1a4b4 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
@@ -58,7 +58,9 @@ public interface LuceneIndex {
/*
* wait until the current entries in cache are indexed
+ * @param maxWaitInMilliseconds max wait time in millisecond
+ * @return if entries are flushed within maxWait
*/
- public void waitUntilFlushed(int maxWait);
+ public boolean waitUntilFlushed(int maxWaitInMillisecond);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
index 981d9e4..c165085 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
@@ -81,19 +81,17 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
searchableFieldNames = fields;
}
- /*
- * For test and demo purpose. To use it, the data region should stop feeding
- * A more advanced version is under-development
- */
@Override
- public void waitUntilFlushed(int maxWait) {
+ public boolean waitUntilFlushed(int maxWaitInMillisecond) {
String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);
AsyncEventQueue queue = (AsyncEventQueue)cache.getAsyncEventQueue(aeqId);
+ boolean flushed = false;
if (queue != null) {
long start = System.nanoTime();
- while (System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(maxWait)) {
+ while (System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(maxWaitInMillisecond)) {
if (0 == queue.size()) {
logger.debug("waitUntilFlushed: Queue size is 0");
+ flushed = true;
break;
} else {
try {
@@ -105,6 +103,7 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
} else {
throw new IllegalArgumentException("The AEQ does not exist for the index "+indexName+" region "+regionPath);
}
+ return flushed;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
index b54f51b..a3bdd24 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
@@ -116,6 +116,7 @@ public class LuceneIndexCreation implements LuceneIndex, Extension<Region<?, ?>>
}
@Override
- public void waitUntilFlushed(int maxWait) {
+ public boolean waitUntilFlushed(int maxWaitInMillisecond) {
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java
index 6429143..fe754a4 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java
@@ -44,7 +44,6 @@ import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-import com.jayway.awaitility.Awaitility;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.KeywordTokenizer;
@@ -71,12 +70,10 @@ public class LuceneIndexCreationIntegrationTest extends LuceneIntegrationTest {
Region region = createRegion();
final LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME);
region.put("key1", new TestObject());
-
+ verifyIndexFinishFlushing(INDEX_NAME, REGION_NAME);
assertEquals(analyzers, index.getFieldAnalyzers());
- Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
- assertEquals(Arrays.asList("field1"), field1Analyzer.analyzedfields);
- assertEquals(Arrays.asList("field2"), field2Analyzer.analyzedfields);
- });
+ assertEquals(Arrays.asList("field1"), field1Analyzer.analyzedfields);
+ assertEquals(Arrays.asList("field2"), field2Analyzer.analyzedfields);
}
@Test
@@ -188,26 +185,28 @@ public class LuceneIndexCreationIntegrationTest extends LuceneIntegrationTest {
cache.close();
createCache();
createIndex("field1", "field2");
+ verifyIndexFinishFlushing(INDEX_NAME, REGION_NAME);
dataRegion = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
.create(REGION_NAME);
LuceneQuery<Object, Object> query = luceneService.createLuceneQueryFactory()
.create(INDEX_NAME, REGION_NAME,
"field1:world");
- Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
- assertEquals(1, query.search().size());
- });
+ assertEquals(1, query.search().size());
}
+ private void verifyIndexFinishFlushing(String indexName, String regionName) {
+ LuceneIndex index = luceneService.getIndex(indexName, regionName);
+ boolean flushed = index.waitUntilFlushed(60000);
+ assertTrue(flushed);
+ }
+
@Test
public void shouldRecoverPersistentIndexWhenDataIsWrittenToIndex() throws ParseException, InterruptedException {
createIndex("field1", "field2");
Region dataRegion = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
.create(REGION_NAME);
dataRegion.put("A", new TestObject());
- final AsyncEventQueueImpl queue = (AsyncEventQueueImpl) getIndexQueue();
-
- //Wait until the queue has drained
- Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> assertEquals(0, queue.size()));
+ verifyIndexFinishFlushing(INDEX_NAME, REGION_NAME);
cache.close();
createCache();
createIndex("text");
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
index 77d2a5c..d32e6d8 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
@@ -41,6 +41,7 @@ import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.lucene.LuceneIndex;
import com.gemstone.gemfire.cache.lucene.LuceneQuery;
import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
import com.gemstone.gemfire.cache.lucene.LuceneService;
@@ -55,7 +56,6 @@ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.test.junit.categories.FlakyTest;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-import com.jayway.awaitility.Awaitility;
@Category(IntegrationTest.class)
public class LuceneIndexRecoveryHAIntegrationTest {
@@ -139,7 +139,7 @@ public class LuceneIndexRecoveryHAIntegrationTest {
value = new Type1("lucene world", 1, 2L, 3.0, 4.0f);
userRegion.put("value3", value);
- waitUntilQueueEmpty(aeqId);
+ verifyIndexFinishFlushing(INDEX, REGION);
LuceneQuery<Integer, Type1> query = service.createLuceneQueryFactory().create(INDEX, REGION, "s:world");
LuceneQueryResults<Integer, Type1> results = query.search();
@@ -190,7 +190,7 @@ public class LuceneIndexRecoveryHAIntegrationTest {
value = new Type1("lucene world", 1, 2L, 3.0, 4.0f);
userRegion.put("value3", value);
- waitUntilQueueEmpty(aeqId);
+ verifyIndexFinishFlushing(INDEX, REGION);
PartitionedRegion fileRegion = (PartitionedRegion) cache.getRegion(aeqId + ".files");
assertNotNull(fileRegion);
@@ -203,9 +203,9 @@ public class LuceneIndexRecoveryHAIntegrationTest {
Assert.assertEquals(3, results.size());
}
- private void waitUntilQueueEmpty(final String aeqId) {
- // TODO flush queue
- AsyncEventQueue queue = cache.getAsyncEventQueue(aeqId);
- Awaitility.waitAtMost(1000, TimeUnit.MILLISECONDS).until(() -> assertEquals(0, queue.size()));
+ private void verifyIndexFinishFlushing(String indexName, String regionName) {
+ LuceneIndex index = LuceneServiceProvider.get(cache).getIndex(indexName, regionName);
+ boolean flushed = index.waitUntilFlushed(60000);
+ assertTrue(flushed);
}
}