You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2018/04/11 01:14:59 UTC
[geode] branch develop updated: GEODE-3926: Lucene Query Exception
is thrown if queries are executed in the middle of reindexing a region
(#1742)
This is an automated email from the ASF dual-hosted git repository.
jasonhuynh pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 75ae584 GEODE-3926: Lucene Query Exception is thrown if queries are executed in the middle of reindexing a region (#1742)
75ae584 is described below
commit 75ae58423bc66ca127fec7d2fc3ad71fbed61094
Author: Jason Huynh <hu...@gmail.com>
AuthorDate: Tue Apr 10 18:14:54 2018 -0700
GEODE-3926: Lucene Query Exception is thrown if queries are executed in the middle of reindexing a region (#1742)
* Add new internal exception, LuceneIndexCreationInProgressException
* Index the region even if it's empty
* Added awaitility clauses to reindexing tests
* getRepositories will throw an exception if the index is not available
* getRepositories will not throw an exception if the data region is empty (this scenario can occur when creating the index before the region)
---
.../cache/lucene/internal/InternalLuceneIndex.java | 1 +
...=> LuceneIndexCreationInProgressException.java} | 29 ++-------
.../internal/LuceneIndexForPartitionedRegion.java | 12 +++-
.../cache/lucene/internal/LuceneQueryImpl.java | 2 +
.../cache/lucene/internal/LuceneRawIndex.java | 9 ++-
.../lucene/internal/LuceneRegionListener.java | 2 +
.../cache/lucene/internal/LuceneServiceImpl.java | 19 +++---
.../internal/PartitionedRepositoryManager.java | 26 +++++++-
.../internal/RawLuceneRepositoryManager.java | 17 ++---
.../sanctioned-geode-lucene-serializables.txt | 1 +
.../lucene/LuceneQueriesReindexDUnitTest.java | 2 +
...ncyWithRegionCreatedBeforeReindexDUnitTest.java | 1 +
.../LuceneIndexForPartitionedRegionTest.java | 34 ++++++++++
.../LuceneIndexRecoveryHAIntegrationTest.java | 7 +-
.../lucene/internal/LuceneQueryImplJUnitTest.java | 2 +-
.../internal/LuceneServiceImplJUnitTest.java | 5 ++
.../PartitionedRepositoryManagerJUnitTest.java | 76 ++++++++++++++++++++--
.../RawLuceneRepositoryManagerJUnitTest.java | 9 ++-
18 files changed, 193 insertions(+), 61 deletions(-)
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java
index 74e4ac8..d308a16 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java
@@ -39,4 +39,5 @@ public interface InternalLuceneIndex extends LuceneIndex {
void initialize();
+ boolean isIndexAvailable(int id);
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java
similarity index 61%
copy from geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java
copy to geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java
index 74e4ac8..9c58ba5 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java
@@ -12,31 +12,12 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.cache.lucene.internal;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.lucene.LuceneIndex;
-import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
-
-public interface InternalLuceneIndex extends LuceneIndex {
-
- RepositoryManager getRepositoryManager();
-
- /**
- * Dump the files for this index to the given directory.
- */
- void dumpFiles(String directory);
-
- /**
- * Destroy the index
- */
- void destroy(boolean initiator);
-
- LuceneIndexStats getIndexStats();
-
- Cache getCache();
-
- void initialize();
+import org.apache.geode.GemFireException;
+public class LuceneIndexCreationInProgressException extends GemFireException {
+ public LuceneIndexCreationInProgressException(String message) {
+ super(message);
+ }
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index 577bdef..247e1b4 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -16,6 +16,7 @@
package org.apache.geode.cache.lucene.internal;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import org.apache.geode.CancelException;
import org.apache.geode.cache.AttributesFactory;
@@ -49,8 +50,11 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
public static final String FILES_REGION_SUFFIX = ".files";
+ private final ExecutorService waitingThreadPoolFromDM;
+
public LuceneIndexForPartitionedRegion(String indexName, String regionPath, InternalCache cache) {
super(indexName, regionPath, cache);
+ this.waitingThreadPoolFromDM = cache.getDistributionManager().getWaitingThreadPool();
final String statsName = indexName + "-" + regionPath;
this.fileSystemStats = new FileSystemStats(cache.getDistributedSystem(), statsName);
@@ -62,7 +66,7 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
mapper = new HeterogeneousLuceneSerializer();
}
PartitionedRepositoryManager partitionedRepositoryManager =
- new PartitionedRepositoryManager(this, mapper);
+ new PartitionedRepositoryManager(this, mapper, this.waitingThreadPoolFromDM);
return partitionedRepositoryManager;
}
@@ -202,6 +206,12 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
}
}
+ @Override
+ public boolean isIndexAvailable(int id) {
+ PartitionedRegion fileAndChunkRegion = getFileAndChunkRegion();
+ return fileAndChunkRegion.get(IndexRepositoryFactory.APACHE_GEODE_INDEX_COMPLETE, id) != null;
+ }
+
private void destroyOnRemoteMembers() {
PartitionedRegion pr = (PartitionedRegion) getDataRegion();
DistributionManager dm = pr.getDistributionManager();
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
index 6039af2..1026a83 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
@@ -133,6 +133,8 @@ public class LuceneQueryImpl<K, V> implements LuceneQuery<K, V> {
} catch (TransactionException e) {
// When function execution is run from server
throw new LuceneQueryException(LUCENE_QUERY_CANNOT_BE_EXECUTED_WITHIN_A_TRANSACTION);
+ } catch (LuceneIndexCreationInProgressException e) {
+ throw new LuceneQueryException("Lucene Index is not available, currently indexing");
}
return entries;
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
index d4168bd..1416cab 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
@@ -32,8 +32,8 @@ public class LuceneRawIndex extends LuceneIndexImpl {
if (mapper == null) {
mapper = new HeterogeneousLuceneSerializer();
}
- RawLuceneRepositoryManager rawLuceneRepositoryManager =
- new RawLuceneRepositoryManager(this, mapper);
+ RawLuceneRepositoryManager rawLuceneRepositoryManager = new RawLuceneRepositoryManager(this,
+ mapper, cache.getDistributionManager().getWaitingThreadPool());
return rawLuceneRepositoryManager;
}
@@ -50,4 +50,9 @@ public class LuceneRawIndex extends LuceneIndexImpl {
@Override
public void destroy(boolean initiator) {}
+
+ @Override
+ public boolean isIndexAvailable(int id) {
+ return true;
+ }
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java
index 7313a82..a3f2764 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java
@@ -26,6 +26,7 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
+import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionListener;
public class LuceneRegionListener implements RegionListener {
@@ -108,6 +109,7 @@ public class LuceneRegionListener implements RegionListener {
if (region.getFullPath().equals(this.regionPath)
&& this.afterCreateInvoked.compareAndSet(false, true)) {
this.service.afterDataRegionCreated(this.luceneIndex);
+ this.service.createLuceneIndexOnDataRegion((PartitionedRegion) region, luceneIndex);
}
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index 01bc5c6..ede3449 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -166,8 +166,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
if (!regionPath.startsWith("/")) {
regionPath = "/" + regionPath;
}
- String name = indexName + "#" + regionPath.replace('/', '_');
- return name;
+ return indexName + "#" + regionPath.replace('/', '_');
}
public static String getUniqueIndexRegionName(String indexName, String regionPath,
@@ -254,15 +253,13 @@ public class LuceneServiceImpl implements InternalLuceneService {
throw new BucketNotFoundException(
"Bucket ID : " + primaryBucketId + " not found during lucene indexing");
}
- if (!userBucket.isEmpty()) {
- /**
- *
- * Calling getRepository will in turn call computeRepository
- * which is responsible for indexing the user region.
- *
- **/
- repositoryManager.getRepository(primaryBucketId);
- }
+ /**
+ *
+ * Calling getRepository will in turn call computeRepository
+ * which is responsible for indexing the user region.
+ *
+ **/
+ repositoryManager.getRepository(primaryBucketId);
} catch (BucketNotFoundException | PrimaryBucketException e) {
logger.debug("Bucket ID : " + primaryBucketId
+ " not found while saving to lucene index: " + e.getMessage(), e);
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
index f60f83b..e439293 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
@@ -20,6 +20,9 @@ import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.logging.log4j.Logger;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.Region;
@@ -32,8 +35,11 @@ import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
+import org.apache.geode.internal.logging.LogService;
public class PartitionedRepositoryManager implements RepositoryManager {
+ private final Logger logger = LogService.getLogger();
+
public static IndexRepositoryFactory indexRepositoryFactory = new IndexRepositoryFactory();
/**
* map of the parent bucket region to the index repository
@@ -54,10 +60,14 @@ public class PartitionedRepositoryManager implements RepositoryManager {
protected volatile boolean closed;
private final CountDownLatch isDataRegionReady = new CountDownLatch(1);
- public PartitionedRepositoryManager(InternalLuceneIndex index, LuceneSerializer serializer) {
+ private final ExecutorService waitingThreadPoolFromDM;
+
+ public PartitionedRepositoryManager(InternalLuceneIndex index, LuceneSerializer serializer,
+ ExecutorService waitingThreadPool) {
this.index = index;
this.serializer = serializer;
this.closed = false;
+ this.waitingThreadPoolFromDM = waitingThreadPool;
}
public void setUserRegionForRepositoryManager(PartitionedRegion userRegion) {
@@ -76,7 +86,19 @@ public class PartitionedRepositoryManager implements RepositoryManager {
throw new BucketNotFoundException(
"User bucket was not found for region " + region + "bucket id " + bucketId);
} else {
- repos.add(getRepository(userBucket.getId()));
+ if (index.isIndexAvailable(userBucket.getId()) || userBucket.isEmpty()) {
+ repos.add(getRepository(userBucket.getId()));
+ } else {
+ waitingThreadPoolFromDM.execute(() -> {
+ try {
+ getRepository(userBucket.getId());
+ } catch (BucketNotFoundException | LuceneIndexDestroyedException e) {
+ logger.debug("Lucene Index creation in progress.", e);
+ }
+ });
+ throw new LuceneIndexCreationInProgressException(
+ "Lucene Index creation in progress for bucket: " + userBucket.getId());
+ }
}
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
index 0b38c45..25a4678 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
@@ -15,6 +15,7 @@
package org.apache.geode.cache.lucene.internal;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
@@ -24,8 +25,9 @@ import org.apache.geode.internal.cache.PartitionedRegion;
public class RawLuceneRepositoryManager extends PartitionedRepositoryManager {
public static IndexRepositoryFactory indexRepositoryFactory = new RawIndexRepositoryFactory();
- public RawLuceneRepositoryManager(LuceneIndexImpl index, LuceneSerializer serializer) {
- super(index, serializer);
+ public RawLuceneRepositoryManager(LuceneIndexImpl index, LuceneSerializer serializer,
+ ExecutorService waitingThreadPool) {
+ super(index, serializer, waitingThreadPool);
}
@Override
@@ -35,15 +37,8 @@ public class RawLuceneRepositoryManager extends PartitionedRepositoryManager {
return repo;
}
- try {
- repo = computeRepository(bucketId, this.serializer, this.index, this.userRegion, repo);
- return repo;
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- throw new BucketNotFoundException(
- "Colocated index buckets not found for bucket id " + bucketId);
+ repo = computeRepository(bucketId);
+ return repo;
}
@Override
diff --git a/geode-lucene/src/main/resources/org/apache/geode/internal/sanctioned-geode-lucene-serializables.txt b/geode-lucene/src/main/resources/org/apache/geode/internal/sanctioned-geode-lucene-serializables.txt
index a13c06b..3a9117a 100755
--- a/geode-lucene/src/main/resources/org/apache/geode/internal/sanctioned-geode-lucene-serializables.txt
+++ b/geode-lucene/src/main/resources/org/apache/geode/internal/sanctioned-geode-lucene-serializables.txt
@@ -17,3 +17,4 @@ org/apache/geode/cache/lucene/internal/directory/DumpDirectoryFiles,true,1
org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction,true,1
org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction,true,1
org/apache/geode/cache/lucene/internal/results/LuceneGetPageFunction,true,1
+org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException,false
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesReindexDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesReindexDUnitTest.java
index 75ed722..e25ca9a 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesReindexDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesReindexDUnitTest.java
@@ -86,6 +86,7 @@ public class LuceneQueriesReindexDUnitTest extends LuceneQueriesAccessorBase {
ai1.checkException();
ai2.checkException();
+ waitForFlushBeforeExecuteTextSearch(accessor, 60000);
executeTextSearch(accessor);
}
@@ -123,6 +124,7 @@ public class LuceneQueriesReindexDUnitTest extends LuceneQueriesAccessorBase {
ai1.checkException();
ai2.checkException();
+ waitForFlushBeforeExecuteTextSearch(accessor, 60000);
executeTextSearch(accessor);
}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/RebalanceWithRedundancyWithRegionCreatedBeforeReindexDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/RebalanceWithRedundancyWithRegionCreatedBeforeReindexDUnitTest.java
index 866dda1..7b1f361 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/RebalanceWithRedundancyWithRegionCreatedBeforeReindexDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/RebalanceWithRedundancyWithRegionCreatedBeforeReindexDUnitTest.java
@@ -170,6 +170,7 @@ public class RebalanceWithRedundancyWithRegionCreatedBeforeReindexDUnitTest
createIndexAndRebalance(regionTestType, createIndex, true);
+ waitForFlushBeforeExecuteTextSearch(dataStore3, 60000);
executeTextSearch(dataStore3, "world", "text", NUM_BUCKETS);
}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
index 9ebde2c..3a36a94 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
@@ -91,6 +91,40 @@ public class LuceneIndexForPartitionedRegionTest {
}
@Test
+ public void indexIsAvailableReturnsFalseIfCompleteFileIsNotPresent() {
+ String name = "indexName";
+ String regionPath = "regionName";
+ InternalCache cache = Fakes.cache();
+ PartitionedRegion region = mock(PartitionedRegion.class);
+ PartitionedRegion mockFileRegion = mock(PartitionedRegion.class);
+ LuceneIndexForPartitionedRegion index =
+ new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+ String fileRegionName = index.createFileRegionName();
+ when(cache.getRegion(fileRegionName)).thenReturn(region);
+ LuceneIndexForPartitionedRegion spy = spy(index);
+ when(spy.getFileAndChunkRegion()).thenReturn(mockFileRegion);
+ assertFalse(spy.isIndexAvailable(0));
+ }
+
+ @Test
+ public void indexIsAvailableReturnsTrueIfCompleteFileIsPresent() {
+ String name = "indexName";
+ String regionPath = "regionName";
+ InternalCache cache = Fakes.cache();
+ PartitionedRegion region = mock(PartitionedRegion.class);
+ PartitionedRegion mockFileRegion = mock(PartitionedRegion.class);
+ LuceneIndexForPartitionedRegion index =
+ new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+ String fileRegionName = index.createFileRegionName();
+ when(cache.getRegion(fileRegionName)).thenReturn(region);
+ LuceneIndexForPartitionedRegion spy = spy(index);
+ when(spy.getFileAndChunkRegion()).thenReturn(mockFileRegion);
+ when(mockFileRegion.get(IndexRepositoryFactory.APACHE_GEODE_INDEX_COMPLETE, 1))
+ .thenReturn("SOMETHING IS PRESENT");
+ assertTrue(spy.isIndexAvailable(1));
+ }
+
+ @Test
public void fileRegionExistsWhenFileRegionDoesNotExistShouldReturnFalse() {
String name = "indexName";
String regionPath = "regionName";
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
index 0f67cb6..8cee1c9 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
@@ -19,6 +19,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.Analyzer;
@@ -91,7 +92,8 @@ public class LuceneIndexRecoveryHAIntegrationTest {
userRegion.put("rebalance", "test");
service.waitUntilFlushed("index1", "userRegion", 30000, TimeUnit.MILLISECONDS);
- RepositoryManager manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper);
+ RepositoryManager manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper,
+ Executors.newSingleThreadExecutor());
IndexRepository repo = manager.getRepository(userRegion, 0, null);
assertNotNull(repo);
@@ -106,7 +108,8 @@ public class LuceneIndexRecoveryHAIntegrationTest {
userRegion = (PartitionedRegion) regionfactory.create("userRegion");
userRegion.put("rebalance", "test");
- manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper);
+ manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper,
+ Executors.newSingleThreadExecutor());
IndexRepository newRepo = manager.getRepository(userRegion, 0, null);
Assert.assertNotEquals(newRepo, repo);
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java
index a52330d..e705148 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java
@@ -63,7 +63,6 @@ public class LuceneQueryImplJUnitTest {
private CacheTransactionManager cacheTransactionManager;
-
@Before
public void createMocks() {
region = mock(Region.class);
@@ -169,4 +168,5 @@ public class LuceneQueryImplJUnitTest {
assertEquals("value", element.getValue());
assertEquals(5, element.getScore(), 0.01);
}
+
}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java
index 07c2d6c..8b09ff5 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.Analyzer;
@@ -52,6 +53,7 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
import org.apache.geode.cache.lucene.LuceneIndexFactory;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
@@ -123,6 +125,9 @@ public class LuceneServiceImplJUnitTest {
when(((StatisticsFactory) ds).createAtomicStatistics(any(), anyString()))
.thenReturn(luceneIndexStats);
when(cache.getRegion(anyString())).thenReturn(region);
+ when(cache.getDistributionManager()).thenReturn(mock(DistributionManager.class));
+ when(cache.getDistributionManager().getWaitingThreadPool())
+ .thenReturn(Executors.newSingleThreadExecutor());
RegionAttributes ratts = mock(RegionAttributes.class);
when(region.getAttributes()).thenReturn(ratts);
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
index b2c30b1..72a8a90 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
@@ -29,13 +29,17 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.IndexWriter;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -94,6 +98,7 @@ public class PartitionedRepositoryManagerJUnitTest {
protected LuceneIndexImpl indexForPR;
protected PartitionedRepositoryManager repoManager;
protected GemFireCacheImpl cache;
+ private final Map<Integer, Boolean> isIndexAvailableMap = new HashMap<>();
@Before
public void setUp() {
@@ -142,13 +147,14 @@ public class PartitionedRepositoryManagerJUnitTest {
when(prRoot.get("rid")).thenReturn(prConfig);
PowerMockito.mockStatic(PartitionedRegionHelper.class);
PowerMockito.when(PartitionedRegionHelper.getPRRoot(cache)).thenReturn(prRoot);
- repoManager = new PartitionedRepositoryManager(indexForPR, serializer);
+ repoManager = new PartitionedRepositoryManager(indexForPR, serializer,
+ Executors.newSingleThreadExecutor());
repoManager.setUserRegionForRepositoryManager(userRegion);
repoManager.allowRepositoryComputation();
}
@Test
- public void getByKey() throws BucketNotFoundException, IOException {
+ public void getByKey() throws BucketNotFoundException {
setUpMockBucket(0);
setUpMockBucket(1);
@@ -230,6 +236,9 @@ public class PartitionedRepositoryManagerJUnitTest {
setUpMockBucket(0);
setUpMockBucket(1);
+ when(indexForPR.isIndexAvailable(0)).thenReturn(true);
+ when(indexForPR.isIndexAvailable(1)).thenReturn(true);
+
Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1));
InternalRegionFunctionContext ctx = Mockito.mock(InternalRegionFunctionContext.class);
when(ctx.getLocalBucketSet((any()))).thenReturn(buckets);
@@ -254,6 +263,7 @@ public class PartitionedRepositoryManagerJUnitTest {
@Test(expected = BucketNotFoundException.class)
public void getMissingBucketByRegion() throws BucketNotFoundException {
setUpMockBucket(0);
+ when(indexForPR.isIndexAvailable(0)).thenReturn(true);
Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1));
@@ -262,11 +272,67 @@ public class PartitionedRepositoryManagerJUnitTest {
repoManager.getRepositories(ctx);
}
- protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) {
+ /**
+ * Test that we get the expected exception when a user bucket is not indexed yet
+ */
+ @Test(expected = LuceneIndexCreationInProgressException.class)
+ public void luceneIndexCreationInProgressExceptionExpectedIfIndexIsNotYetIndexed()
+ throws BucketNotFoundException {
+ setUpMockBucket(0);
+
+ Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1));
+
+ InternalRegionFunctionContext ctx = Mockito.mock(InternalRegionFunctionContext.class);
+ when(ctx.getLocalBucketSet((any()))).thenReturn(buckets);
+ repoManager.getRepositories(ctx);
+ }
+
+
+ @Test
+ public void queryOnlyWhenIndexIsAvailable() throws Exception {
+ setUpMockBucket(0);
+ setUpMockBucket(1);
+
+ when(indexForPR.isIndexAvailable(0)).thenReturn(true);
+ when(indexForPR.isIndexAvailable(1)).thenReturn(true);
+
+ Set<Integer> buckets = new LinkedHashSet<>(Arrays.asList(0, 1));
+ InternalRegionFunctionContext ctx = Mockito.mock(InternalRegionFunctionContext.class);
+ when(ctx.getLocalBucketSet((any()))).thenReturn(buckets);
+
+ Awaitility.await().pollDelay(1, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+ .atMost(500, TimeUnit.SECONDS).until(() -> {
+ final Collection<IndexRepository> repositories = new HashSet<>();
+ try {
+ repositories.addAll(repoManager.getRepositories(ctx));
+ } catch (BucketNotFoundException | LuceneIndexCreationInProgressException e) {
+ }
+ return repositories.size() == 2;
+ });
+
+ Iterator<IndexRepository> itr = repoManager.getRepositories(ctx).iterator();
+ IndexRepositoryImpl repo0 = (IndexRepositoryImpl) itr.next();
+ IndexRepositoryImpl repo1 = (IndexRepositoryImpl) itr.next();
+
+ assertNotNull(repo0);
+ assertNotNull(repo1);
+ assertNotEquals(repo0, repo1);
+
+ checkRepository(repo0, 0, 1);
+ checkRepository(repo1, 0, 1);
+ }
+
+ protected void checkRepository(IndexRepositoryImpl repo0, int... bucketIds) {
IndexWriter writer0 = repo0.getWriter();
RegionDirectory dir0 = (RegionDirectory) writer0.getDirectory();
- assertEquals(new BucketTargetingMap(fileAndChunkBuckets.get(bucketId), bucketId),
- dir0.getFileSystem().getFileAndChunkRegion());
+ boolean result = false;
+ for (int bucketId : bucketIds) {
+ BucketTargetingMap bucketTargetingMap =
+ new BucketTargetingMap(fileAndChunkBuckets.get(bucketId), bucketId);
+ result |= bucketTargetingMap.equals(dir0.getFileSystem().getFileAndChunkRegion());
+ }
+
+ assertTrue(result);
assertEquals(serializer, repo0.getSerializer());
}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
index a000d2f..6284f36 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
@@ -19,6 +19,8 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
+import java.util.concurrent.Executors;
+
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
@@ -56,6 +58,7 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
((RawLuceneRepositoryManager) repoManager).close();
}
+ @Override
protected void createIndexAndRepoManager() {
LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory();
@@ -66,7 +69,8 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
when(indexForPR.getCache()).thenReturn(cache);
when(indexForPR.getRegionPath()).thenReturn("/testRegion");
when(indexForPR.withPersistence()).thenReturn(true);
- repoManager = new RawLuceneRepositoryManager(indexForPR, serializer);
+ repoManager =
+ new RawLuceneRepositoryManager(indexForPR, serializer, Executors.newSingleThreadExecutor());
repoManager.setUserRegionForRepositoryManager(userRegion);
repoManager.allowRepositoryComputation();
}
@@ -78,7 +82,7 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
}
@Override
- protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) {
+ protected void checkRepository(IndexRepositoryImpl repo0, int... bucketId) {
IndexWriter writer0 = repo0.getWriter();
Directory dir0 = writer0.getDirectory();
assertTrue(dir0 instanceof NIOFSDirectory);
@@ -105,4 +109,5 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
assertNotNull(repoManager.getRepository(userRegion, 0, null));
}
+
}
--
To stop receiving notification emails like this one, please contact
jasonhuynh@apache.org.