You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/18 17:04:53 UTC
[13/19] incubator-geode git commit: GEODE-1351: add waitUntilFlush()
into luceneIndexImpl
GEODE-1351: add waitUntilFlush() into luceneIndexImpl
This function is very useful in test and demo.
It will wait for the AEQ size to be 0
Also added junit test and integration tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8ea53200
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8ea53200
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8ea53200
Branch: refs/heads/feature/GEODE-1392
Commit: 8ea532002fc1bc3e0d442986a108f719a61757e2
Parents: c54227c
Author: zhouxh <gz...@pivotal.io>
Authored: Sun May 15 21:36:34 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed May 18 10:04:25 2016 -0700
----------------------------------------------------------------------
.../gemfire/cache/lucene/LuceneIndex.java | 5 ++
.../lucene/internal/LuceneEventListener.java | 11 +++
.../cache/lucene/internal/LuceneIndexImpl.java | 43 +++++++++--
.../internal/xml/LuceneIndexCreation.java | 4 +
.../gemfire/cache/lucene/LuceneQueriesBase.java | 69 ++++++++++++++++-
.../internal/LuceneIndexImplJUnitTest.java | 78 ++++++++++++++++++++
6 files changed, 202 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8ea53200/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
index 743045b..be329f7 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java
@@ -56,4 +56,9 @@ public interface LuceneIndex {
*/
public Map<String, Analyzer> getFieldAnalyzers();
+ /*
+ * wait until the current entries in cache are indexed
+ */
+ public void waitUntilFlushed(int maxWait);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8ea53200/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
index 9fdfd43..2dae4ee 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
@@ -35,6 +35,8 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.cache.CacheObserverHolder;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.TestHook;
import com.gemstone.gemfire.internal.logging.LogService;
/**
@@ -70,6 +72,10 @@ public class LuceneEventListener implements AsyncEventListener {
IndexRepository repository = repositoryManager.getRepository(region, key, callbackArgument);
Operation op = event.getOperation();
+
+ if (testHook != null) {
+ testHook.doTestHook("FOUND_AND_BEFORE_PROCESSING_A_EVENT");
+ }
if (op.isCreate()) {
repository.create(key, event.getDeserializedValue());
@@ -96,4 +102,9 @@ public class LuceneEventListener implements AsyncEventListener {
DefaultQuery.setPdxReadSerialized(false);
}
}
+
+ public interface TestHook {
+ public void doTestHook(String spot);
+ }
+ public static TestHook testHook;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8ea53200/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
index 0b5f8fa..981d9e4 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
@@ -21,23 +21,26 @@ package com.gemstone.gemfire.cache.lucene.internal;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
-import com.gemstone.gemfire.InternalGemFireError;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey;
import com.gemstone.gemfire.cache.lucene.internal.filesystem.File;
import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
import com.gemstone.gemfire.cache.lucene.internal.xml.LuceneIndexCreation;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
public abstract class LuceneIndexImpl implements InternalLuceneIndex {
@@ -77,7 +80,33 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
protected void setSearchableFields(String[] fields) {
searchableFieldNames = fields;
}
-
+
+ /*
+ * For test and demo purpose. To use it, the data region should stop feeding
+ * A more advanced version is under-development
+ */
+ @Override
+ public void waitUntilFlushed(int maxWait) {
+ String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);
+ AsyncEventQueue queue = (AsyncEventQueue)cache.getAsyncEventQueue(aeqId);
+ if (queue != null) {
+ long start = System.nanoTime();
+ while (System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(maxWait)) {
+ if (0 == queue.size()) {
+ logger.debug("waitUntilFlushed: Queue size is 0");
+ break;
+ } else {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ } else {
+ throw new IllegalArgumentException("The AEQ does not exist for the index "+indexName+" region "+regionPath);
+ }
+ }
+
@Override
public String[] getFieldNames() {
return searchableFieldNames;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8ea53200/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
index 86a10e4..b54f51b 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java
@@ -114,4 +114,8 @@ public class LuceneIndexCreation implements LuceneIndex, Extension<Region<?, ?>>
public void addFieldNames(String[] fieldNames) {
this.fieldNames.addAll(Arrays.asList(fieldNames));
}
+
+ @Override
+ public void waitUntilFlushed(int maxWait) {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8ea53200/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
index c467a18..c7567f3 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
@@ -18,7 +18,7 @@
*/
package com.gemstone.gemfire.cache.lucene;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
import java.io.Serializable;
import java.util.HashMap;
@@ -27,6 +27,12 @@ import java.util.Map;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneEventListener;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexImpl;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneServiceImpl;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
+import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.test.dunit.Host;
import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
import com.gemstone.gemfire.test.dunit.VM;
@@ -75,6 +81,67 @@ public abstract class LuceneQueriesBase extends JUnit4CacheTestCase {
executeTextSearch(accessor);
}
+ @Test
+ public void entriesFlushedToIndexAfterWaitForFlushCalled() {
+ SerializableRunnableIF createIndex = () -> {
+ LuceneService luceneService = LuceneServiceProvider.get(getCache());
+ luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
+ };
+ dataStore1.invoke(() -> initDataStore(createIndex));
+ dataStore2.invoke(() -> initDataStore(createIndex));
+ accessor.invoke(() -> initAccessor(createIndex));
+
+ try {
+ dataStore1.invoke(() -> setTestHook());
+ putDataInRegion(accessor);
+ waitForFlushBeforeExecuteTextSearch(accessor, 10);
+ executeTextSearch(accessor);
+ } finally {
+ dataStore1.invoke(() -> checkResultAndresetTestHook());
+ }
+ }
+
+ protected void waitForFlushBeforeExecuteTextSearch(VM vm, final int expectKeyNum) {
+ vm.invoke(() -> {
+ Cache cache = getCache();
+ Region<Object, Object> region = cache.getRegion(REGION_NAME);
+
+ LuceneService service = LuceneServiceProvider.get(cache);
+ LuceneIndexImpl index = (LuceneIndexImpl)service.getIndex(INDEX_NAME, REGION_NAME);
+ assertNotNull(index);
+ LuceneQuery<Integer, TestObject> query;
+
+ String aeqId = LuceneServiceImpl.getUniqueIndexName(INDEX_NAME, REGION_NAME);
+ AsyncEventQueue queue = cache.getAsyncEventQueue(aeqId);
+ assertNotNull(queue);
+ assertTrue(queue.size()>0);
+ index.waitUntilFlushed(30000);
+ return null;
+ });
+ }
+
+ public static void setTestHook() {
+ LuceneEventListener.testHook = new LuceneEventListener.TestHook() {
+
+ @Override
+ public void doTestHook(String spot) {
+ if (spot.equals("FOUND_AND_BEFORE_PROCESSING_A_EVENT")) {
+ try {
+ Thread.sleep(1000);
+ LogService.getLogger().debug("Waited in test hook");
+ }
+ catch (InterruptedException e) {
+ }
+ }
+ }
+ };
+ }
+
+ public static void checkResultAndresetTestHook()
+ {
+ LuceneEventListener.testHook = null;
+ }
+
protected void executeTextSearch(VM vm) {
vm.invoke(() -> {
Cache cache = getCache();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8ea53200/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImplJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImplJUnitTest.java
new file mode 100755
index 0000000..edecc66
--- /dev/null
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImplJUnitTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.lucene.LuceneIndex;
+import com.gemstone.gemfire.test.fake.Fakes;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class LuceneIndexImplJUnitTest {
+ public static final String REGION = "region";
+ public static final String INDEX = "index";
+ public static final int MAX_WAIT = 30000;
+ private Cache cache;
+ LuceneIndex index;
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Before
+ public void createLuceneIndex() {
+ cache = Fakes.cache();
+ index = new LuceneIndexForPartitionedRegion(INDEX, REGION, cache);
+ }
+
+ @Test
+ public void waitUnitFlushedWithMissingAEQThrowsIllegalArgument() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ index.waitUntilFlushed(MAX_WAIT);
+ }
+
+ @Test
+ public void waitUnitFlushedWaitsForFlush() throws Exception {
+ final String expectedIndexName = LuceneServiceImpl.getUniqueIndexName(INDEX, REGION);
+ final AsyncEventQueue queue = mock(AsyncEventQueue.class);
+ when(cache.getAsyncEventQueue(eq(expectedIndexName))).thenReturn(queue);
+
+ AtomicInteger callCount = new AtomicInteger();
+ when(queue.size()).thenAnswer(invocation -> {
+ if (callCount.get() == 0) {
+ // when the waitUnitFlushed() called the 2nd time, queue.size() will return 0
+ callCount.incrementAndGet();
+ return 2;
+ } else {
+ // when the waitUnitFlushed() called the 2nd time, queue.size() will return 0
+ return 0;
+ }
+ });
+ index.waitUntilFlushed(MAX_WAIT);
+ verify(cache).getAsyncEventQueue(eq(expectedIndexName));
+ }
+
+}