You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2018/03/21 18:56:55 UTC
[geode] 01/01: Added LuceneIndexCreationInProgressException to
break when a bug is still being created
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-3926_2
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 69a2b18907fa3a12d2f1efd41bebe0d687adba44
Author: Udo <uk...@pivotal.io>
AuthorDate: Tue Mar 13 15:46:05 2018 -0700
Added LuceneIndexCreationInProgressException to break when a bug is still being created
---
.../lucene/internal/IndexRepositoryFactory.java | 155 +-------------
.../cache/lucene/internal/InternalLuceneIndex.java | 1 +
.../LuceneIndexCreationInProgressException.java | 9 +
.../lucene/internal/LuceneIndexFactoryImpl.java | 1 +
.../internal/LuceneIndexForPartitionedRegion.java | 237 +++++++++++++++++++--
.../cache/lucene/internal/LuceneIndexImpl.java | 47 ++--
.../lucene/internal/LuceneIndexImplFactory.java | 15 +-
.../cache/lucene/internal/LuceneRawIndex.java | 22 +-
.../lucene/internal/LuceneRawIndexFactory.java | 16 +-
.../lucene/internal/LuceneRegionListener.java | 2 +
.../lucene/internal/LuceneResultStructImpl.java | 6 +-
.../cache/lucene/internal/LuceneServiceImpl.java | 36 ++--
.../internal/PartitionedRepositoryManager.java | 38 +++-
.../internal/RawLuceneRepositoryManager.java | 3 +-
.../lucene/internal/LuceneIndexFactorySpy.java | 26 ++-
.../LuceneIndexForPartitionedRegionTest.java | 98 +++++----
.../lucene/internal/LuceneIndexImplJUnitTest.java | 47 ----
.../LuceneIndexRecoveryHAIntegrationTest.java | 9 +-
.../internal/LuceneServiceImplJUnitTest.java | 9 +
.../PartitionedRepositoryManagerJUnitTest.java | 91 ++++++--
.../RawLuceneRepositoryManagerJUnitTest.java | 5 +-
gradle.properties | 4 +
22 files changed, 543 insertions(+), 334 deletions(-)
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index 618aa29..a44b365 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -42,158 +42,21 @@ import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.logging.LogService;
public class IndexRepositoryFactory {
-
private static final Logger logger = LogService.getLogger();
- public static final String FILE_REGION_LOCK_FOR_BUCKET_ID = "FileRegionLockForBucketId:";
public static final String APACHE_GEODE_INDEX_COMPLETE = "APACHE_GEODE_INDEX_COMPLETE";
- public IndexRepositoryFactory() {}
-
- public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer,
- InternalLuceneIndex index, PartitionedRegion userRegion, final IndexRepository oldRepository,
- PartitionedRepositoryManager partitionedRepositoryManager) throws IOException {
- LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
- final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
-
- // We need to ensure that all members have created the fileAndChunk region before continuing
- Region prRoot = PartitionedRegionHelper.getPRRoot(fileRegion.getCache());
- PartitionRegionConfig prConfig =
- (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier());
- LuceneFileRegionColocationListener luceneFileRegionColocationCompleteListener =
- new LuceneFileRegionColocationListener(partitionedRepositoryManager, bucketId);
- fileRegion.addColocationListener(luceneFileRegionColocationCompleteListener);
- IndexRepository repo = null;
- if (prConfig.isColocationComplete()) {
- repo = finishComputingRepository(bucketId, serializer, userRegion, oldRepository, index);
- }
- return repo;
- }
-
- /*
- * NOTE: The method finishComputingRepository must be called through computeIndexRepository.
- * Executing finishComputingRepository outside of computeIndexRepository may result in race
- * conditions.
- * This is a util function just to not let computeIndexRepository be a huge chunk of code.
- */
- private IndexRepository finishComputingRepository(Integer bucketId, LuceneSerializer serializer,
- PartitionedRegion userRegion, IndexRepository oldRepository, InternalLuceneIndex index)
- throws IOException {
- LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
- final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
- BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId);
- BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId);
- boolean success = false;
- if (fileAndChunkBucket == null) {
- if (oldRepository != null) {
- oldRepository.cleanup();
- }
- return null;
- }
- if (!fileAndChunkBucket.getBucketAdvisor().isPrimary()) {
- if (oldRepository != null) {
- oldRepository.cleanup();
- }
- return null;
- }
-
- if (oldRepository != null && !oldRepository.isClosed()) {
- return oldRepository;
- }
-
- if (oldRepository != null) {
- oldRepository.cleanup();
- }
- DistributedLockService lockService = getLockService();
- String lockName = getLockName(fileAndChunkBucket);
- while (!lockService.lock(lockName, 100, -1)) {
- if (!fileAndChunkBucket.getBucketAdvisor().isPrimary()) {
- return null;
- }
- }
-
- final IndexRepository repo;
- InternalCache cache = (InternalCache) userRegion.getRegionService();
- boolean initialPdxReadSerializedFlag = cache.getPdxReadSerializedOverride();
- cache.setPdxReadSerializedOverride(true);
- try {
- // bucketTargetingMap handles partition resolver (via bucketId as callbackArg)
- Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId);
- RegionDirectory dir =
- new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats());
- IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
- IndexWriter writer = new IndexWriter(dir, config);
- repo = new IndexRepositoryImpl(fileAndChunkBucket, writer, serializer,
- indexForPR.getIndexStats(), dataBucket, lockService, lockName, indexForPR);
- success = false;
- // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver
- if (null != fileRegion.get(APACHE_GEODE_INDEX_COMPLETE, bucketId)) {
- success = true;
- return repo;
- } else {
- success = reindexUserDataRegion(bucketId, userRegion, fileRegion, dataBucket, repo);
- }
- return repo;
- } catch (IOException e) {
- logger.info("Exception thrown while constructing Lucene Index for bucket:" + bucketId
- + " for file region:" + fileAndChunkBucket.getFullPath());
- throw e;
- } catch (CacheClosedException e) {
- logger.info("CacheClosedException thrown while constructing Lucene Index for bucket:"
- + bucketId + " for file region:" + fileAndChunkBucket.getFullPath());
- throw e;
- } finally {
- if (!success) {
- lockService.unlock(lockName);
- }
- cache.setPdxReadSerializedOverride(initialPdxReadSerializedFlag);
- }
+ public IndexRepositoryFactory() {
}
- private boolean reindexUserDataRegion(Integer bucketId, PartitionedRegion userRegion,
- PartitionedRegion fileRegion, BucketRegion dataBucket, IndexRepository repo)
+ public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer,
+ InternalLuceneIndex index,
+ PartitionedRegion userRegion,
+ final IndexRepository oldRepository,
+ PartitionedRepositoryManager partitionedRepositoryManager)
throws IOException {
- Set<IndexRepository> affectedRepos = new HashSet<IndexRepository>();
-
- for (Object key : dataBucket.keySet()) {
- Object value = getValue(userRegion.getEntry(key));
- if (value != null) {
- repo.update(key, value);
- } else {
- repo.delete(key);
- }
- affectedRepos.add(repo);
- }
-
- for (IndexRepository affectedRepo : affectedRepos) {
- affectedRepo.commit();
- }
- // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver
- fileRegion.put(APACHE_GEODE_INDEX_COMPLETE, APACHE_GEODE_INDEX_COMPLETE, bucketId);
- return true;
- }
-
- private Object getValue(Region.Entry entry) {
- final EntrySnapshot es = (EntrySnapshot) entry;
- Object value;
- try {
- value = es == null ? null : es.getRawValue(true);
- } catch (EntryDestroyedException e) {
- value = null;
- }
- return value;
- }
-
- private Map getBucketTargetingMap(BucketRegion region, int bucketId) {
- return new BucketTargetingMap(region, bucketId);
- }
-
- private String getLockName(final BucketRegion fileAndChunkBucket) {
- return FILE_REGION_LOCK_FOR_BUCKET_ID + fileAndChunkBucket.getFullPath();
- }
-
- private DistributedLockService getLockService() {
- return DistributedLockService
- .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
+ System.out.println(this + ".computeIndexRepository bucketId: " + bucketId);
+ return ((LuceneIndexForPartitionedRegion) index).computeIndexRepository(bucketId, serializer,
+ userRegion, oldRepository, partitionedRepositoryManager);
}
/**
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java
index 74e4ac8..d308a16 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java
@@ -39,4 +39,5 @@ public interface InternalLuceneIndex extends LuceneIndex {
void initialize();
+ boolean isIndexAvailable(int id);
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java
new file mode 100644
index 0000000..7077246
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java
@@ -0,0 +1,9 @@
+package org.apache.geode.cache.lucene.internal;
+
+import org.apache.geode.GemFireException;
+
+public class LuceneIndexCreationInProgressException extends GemFireException {
+ public LuceneIndexCreationInProgressException(String message) {
+ super(message);
+ }
+}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactoryImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactoryImpl.java
index 45d4fe1..a326e39 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactoryImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactoryImpl.java
@@ -16,6 +16,7 @@ package org.apache.geode.cache.lucene.internal;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index 577bdef..4246531 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -15,10 +15,23 @@
package org.apache.geode.cache.lucene.internal;
+import static org.apache.geode.cache.lucene.internal.IndexRepositoryFactory.APACHE_GEODE_INDEX_COMPLETE;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
import org.apache.geode.CancelException;
import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.EntryDestroyedException;
import org.apache.geode.cache.FixedPartitionResolver;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
@@ -30,30 +43,60 @@ import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles;
+import org.apache.geode.cache.lucene.internal.directory.RegionDirectory;
import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
import org.apache.geode.cache.lucene.internal.partition.BucketTargetingFixedResolver;
+import org.apache.geode.cache.lucene.internal.partition.BucketTargetingMap;
import org.apache.geode.cache.lucene.internal.partition.BucketTargetingResolver;
+import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
+import org.apache.geode.cache.lucene.internal.repository.IndexRepositoryImpl;
import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
import org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer;
import org.apache.geode.cache.partition.PartitionListener;
+import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.EntrySnapshot;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PartitionRegionConfig;
import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.logging.LogService;
public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
- protected Region fileAndChunkRegion;
- protected final FileSystemStats fileSystemStats;
+ private Region fileAndChunkRegion;
+ private final FileSystemStats fileSystemStats;
public static final String FILES_REGION_SUFFIX = ".files";
+ private static final Logger logger = LogService.getLogger();
+ private static final String FILE_REGION_LOCK_FOR_BUCKET_ID = "FileRegionLockForBucketId:";
- public LuceneIndexForPartitionedRegion(String indexName, String regionPath, InternalCache cache) {
- super(indexName, regionPath, cache);
+ private ExecutorService waitingThreadPool;
+
+ // For Mocking only
+ LuceneIndexForPartitionedRegion() {
+ this.fileSystemStats = null;
+ }
+ public LuceneIndexForPartitionedRegion(String indexName, String regionPath, InternalCache cache,
+ Analyzer analyzer, Map<String, Analyzer> fieldAnalyzers,
+ LuceneSerializer serializer,
+ RegionAttributes attributes, String aeqId, String[] fields,
+ ExecutorService waitingThreadPool) {
+ super(indexName, regionPath, cache, serializer, fieldAnalyzers);
final String statsName = indexName + "-" + regionPath;
- this.fileSystemStats = new FileSystemStats(cache.getDistributedSystem(), statsName);
+ this.fileSystemStats = new FileSystemStats(getCache().getDistributedSystem(), statsName);
+ this.waitingThreadPool = waitingThreadPool;
+ this.setSearchableFields(fields);
+ this.setAnalyzer(analyzer);
+ if (aeqId == null) {
+ this.createAEQ(attributes);
+ } else {
+ this.createAEQ(attributes, aeqId);
+ }
}
protected RepositoryManager createRepositoryManager(LuceneSerializer luceneSerializer) {
@@ -61,9 +104,7 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
if (mapper == null) {
mapper = new HeterogeneousLuceneSerializer();
}
- PartitionedRepositoryManager partitionedRepositoryManager =
- new PartitionedRepositoryManager(this, mapper);
- return partitionedRepositoryManager;
+ return new PartitionedRepositoryManager(this, mapper, waitingThreadPool);
}
protected void createLuceneListenersAndFileChunkRegions(
@@ -97,6 +138,8 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
if (!fileRegionExists(fileRegionName)) {
fileAndChunkRegion = createRegion(fileRegionName, regionShortCut, this.regionPath,
partitionAttributes, regionAttributes, lucenePrimaryBucketListener);
+ } else {
+ fileAndChunkRegion = this.cache.getRegion(fileRegionName);
}
fileSystemStats
@@ -140,9 +183,11 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
}
protected <K, V> Region<K, V> createRegion(final String regionName,
- final RegionShortcut regionShortCut, final String colocatedWithRegionName,
- final PartitionAttributes partitionAttributes, final RegionAttributes regionAttributes,
- PartitionListener lucenePrimaryBucketListener) {
+ final RegionShortcut regionShortCut,
+ final String colocatedWithRegionName,
+ final PartitionAttributes partitionAttributes,
+ final RegionAttributes regionAttributes,
+ PartitionListener lucenePrimaryBucketListener) {
PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
if (lucenePrimaryBucketListener != null) {
partitionAttributesFactory.addPartitionListener(lucenePrimaryBucketListener);
@@ -162,12 +207,13 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
return createRegion(regionName, attributes);
}
- public void close() {}
+ public void close() {
+ }
@Override
public void dumpFiles(final String directory) {
ResultCollector results = FunctionService.onRegion(getDataRegion())
- .setArguments(new String[] {directory, indexName}).execute(DumpDirectoryFiles.ID);
+ .setArguments(new String[]{directory, indexName}).execute(DumpDirectoryFiles.ID);
results.getResult();
}
@@ -202,6 +248,15 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
}
}
+ @Override
+ public boolean isIndexAvailable(int id) {
+ PartitionedRegion fileAndChunkRegion = getFileAndChunkRegion();
+ if (fileAndChunkRegion != null) {
+ return fileAndChunkRegion.get(IndexRepositoryFactory.APACHE_GEODE_INDEX_COMPLETE, id) != null;
+ }
+ return false;
+ }
+
private void destroyOnRemoteMembers() {
PartitionedRegion pr = (PartitionedRegion) getDataRegion();
DistributionManager dm = pr.getDistributionManager();
@@ -230,4 +285,160 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
}
}
}
+
+ public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer,
+ PartitionedRegion userRegion,
+ final IndexRepository oldRepository,
+ PartitionedRepositoryManager partitionedRepositoryManager)
+ throws IOException {
+
+ final PartitionedRegion fileRegion = this.getFileAndChunkRegion();
+
+ // We need to ensure that all members have created the fileAndChunk region before continuing
+ Region prRoot = PartitionedRegionHelper.getPRRoot(fileRegion.getCache());
+ PartitionRegionConfig prConfig =
+ (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier());
+ LuceneFileRegionColocationListener luceneFileRegionColocationCompleteListener =
+ new LuceneFileRegionColocationListener(partitionedRepositoryManager, bucketId);
+ fileRegion.addColocationListener(luceneFileRegionColocationCompleteListener);
+ IndexRepository repo = null;
+ if (prConfig.isColocationComplete()) {
+ repo = finishComputingRepository(bucketId, serializer, userRegion, oldRepository);
+ }
+ return repo;
+ }
+
+ /*
+ * NOTE: The method finishComputingRepository must be called through computeIndexRepository.
+ * Executing finishComputingRepository outside of computeIndexRepository may result in race
+ * conditions.
+ * This is a util function just to not let computeIndexRepository be a huge chunk of code.
+ */
+ private IndexRepository finishComputingRepository(Integer bucketId, LuceneSerializer serializer,
+ PartitionedRegion userRegion,
+ IndexRepository oldRepository)
+ throws IOException {
+ final PartitionedRegion fileRegion = this.getFileAndChunkRegion();
+ BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId);
+ BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId);
+ boolean success = false;
+ if (fileAndChunkBucket == null) {
+ if (oldRepository != null) {
+ oldRepository.cleanup();
+ }
+ return null;
+ }
+ if (!fileAndChunkBucket.getBucketAdvisor().isPrimary()) {
+ if (oldRepository != null) {
+ oldRepository.cleanup();
+ }
+ return null;
+ }
+
+ if (oldRepository != null && !oldRepository.isClosed()) {
+ return oldRepository;
+ }
+
+ if (oldRepository != null) {
+ oldRepository.cleanup();
+ }
+ DistributedLockService lockService = getLockService();
+ String lockName = getLockName(fileAndChunkBucket);
+ while (!lockService.lock(lockName, 100, -1)) {
+ if (!fileAndChunkBucket.getBucketAdvisor().isPrimary()) {
+ return null;
+ }
+ }
+ final IndexRepository repo;
+ InternalCache cache = (InternalCache) userRegion.getRegionService();
+ boolean initialPdxReadSerializedFlag = cache.getPdxReadSerializedOverride();
+ cache.setPdxReadSerializedOverride(true);
+ try {
+ // bucketTargetingMap handles partition resolver (via bucketId as callbackArg)
+ Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId);
+ RegionDirectory dir =
+ new RegionDirectory(bucketTargetingMap, this.getFileSystemStats());
+ IndexWriterConfig config = new IndexWriterConfig(this.getAnalyzer());
+ IndexWriter writer = new IndexWriter(dir, config);
+ repo = new IndexRepositoryImpl(fileAndChunkBucket, writer, serializer,
+ this.getIndexStats(), dataBucket, lockService, lockName, this);
+ success = false;
+ // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver
+ if (null != fileRegion.get(APACHE_GEODE_INDEX_COMPLETE, bucketId)) {
+ success = true;
+ return repo;
+ } else {
+ success = reindexUserDataRegion(bucketId, userRegion, fileRegion, dataBucket, repo);
+ }
+ return repo;
+ } catch (IOException e) {
+ logger.info("Exception thrown while constructing Lucene Index for bucket:" + bucketId
+ + " for file region:" + fileAndChunkBucket.getFullPath());
+ throw e;
+ } catch (CacheClosedException e) {
+ logger.info("CacheClosedException thrown while constructing Lucene Index for bucket:"
+ + bucketId + " for file region:" + fileAndChunkBucket.getFullPath());
+ throw e;
+ } finally {
+ if (!success) {
+ lockService.unlock(lockName);
+ }
+ cache.setPdxReadSerializedOverride(initialPdxReadSerializedFlag);
+ }
+ }
+
+ private boolean reindexUserDataRegion(Integer bucketId, PartitionedRegion userRegion,
+ PartitionedRegion fileRegion, BucketRegion dataBucket,
+ IndexRepository repo)
+ throws IOException {
+ Set<IndexRepository> affectedRepos = new HashSet<IndexRepository>();
+
+ for (Object key : dataBucket.keySet()) {
+ Object value = getValue(userRegion.getEntry(key));
+ if (value != null) {
+ repo.update(key, value);
+ } else {
+ repo.delete(key);
+ }
+ affectedRepos.add(repo);
+ }
+
+ for (IndexRepository affectedRepo : affectedRepos) {
+ affectedRepo.commit();
+ }
+ // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver
+ fileRegion.put(APACHE_GEODE_INDEX_COMPLETE, APACHE_GEODE_INDEX_COMPLETE, bucketId);
+ return true;
+ }
+
+ private Object getValue(Region.Entry entry) {
+ final EntrySnapshot es = (EntrySnapshot) entry;
+ Object value;
+ try {
+ value = es == null ? null : es.getRawValue(true);
+ } catch (EntryDestroyedException e) {
+ value = null;
+ }
+ return value;
+ }
+
+ private Map getBucketTargetingMap(BucketRegion region, int bucketId) {
+ return new BucketTargetingMap(region, bucketId);
+ }
+
+ private String getLockName(final BucketRegion fileAndChunkBucket) {
+ return FILE_REGION_LOCK_FOR_BUCKET_ID + fileAndChunkBucket.getFullPath();
+ }
+
+ private DistributedLockService getLockService() {
+ return DistributedLockService
+ .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
+ }
+
+ protected BucketRegion getMatchingBucket(PartitionedRegion region, Integer bucketId) {
+ // Force the bucket to be created if it is not already
+ region.getOrCreateNodeForBucketWrite(bucketId, null);
+
+ return region.getDataStore().getLocalBucketById(bucketId);
+ }
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
index e58c21f..7d3aa5d 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
@@ -14,7 +14,6 @@
*/
package org.apache.geode.cache.lucene.internal;
-import java.util.Collections;
import java.util.Map;
import org.apache.logging.log4j.Logger;
@@ -48,20 +47,33 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
protected final InternalCache cache;
protected final LuceneIndexStats indexStats;
- protected Map<String, Analyzer> fieldAnalyzers;
+ protected final Map<String, Analyzer> fieldAnalyzers;
protected String[] searchableFieldNames;
- protected RepositoryManager repositoryManager;
+ protected final RepositoryManager repositoryManager;
protected Analyzer analyzer;
protected LuceneSerializer luceneSerializer;
protected LocalRegion dataRegion;
- protected LuceneIndexImpl(String indexName, String regionPath, InternalCache cache) {
+ LuceneIndexImpl() {
+ this(null, null, null, null, null);
+ }
+
+ protected LuceneIndexImpl(String indexName, String regionPath, InternalCache cache,
+ LuceneSerializer serializer, Map<String, Analyzer> fieldAnalyzers) {
this.indexName = indexName;
this.regionPath = regionPath;
this.cache = cache;
final String statsName = indexName + "-" + regionPath;
- this.indexStats = new LuceneIndexStats(cache.getDistributedSystem(), statsName);
+ if (getCache() != null) {
+ this.indexStats = new LuceneIndexStats(getCache().getDistributedSystem(), statsName);
+ } else {
+ this.indexStats = null;
+ }
+ luceneSerializer = serializer;
+ repositoryManager = createRepositoryManager(luceneSerializer);
+ this.fieldAnalyzers = fieldAnalyzers;
+
}
@Override
@@ -123,19 +135,10 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
return this.luceneSerializer;
}
- public void setLuceneSerializer(LuceneSerializer serializer) {
- this.luceneSerializer = serializer;
- }
-
public Cache getCache() {
return this.cache;
}
- public void setFieldAnalyzers(Map<String, Analyzer> fieldAnalyzers) {
- this.fieldAnalyzers =
- fieldAnalyzers == null ? null : Collections.unmodifiableMap(fieldAnalyzers);
- }
-
public LuceneIndexStats getIndexStats() {
return indexStats;
}
@@ -143,32 +146,30 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
public void initialize() {
/* create index region */
dataRegion = assignDataRegion();
+ System.err.println(this + ".initialize DataRegion: " + dataRegion + " repositoryManager: "
+ + repositoryManager);
createLuceneListenersAndFileChunkRegions((PartitionedRepositoryManager) repositoryManager);
addExtension(dataRegion);
}
- protected void setupRepositoryManager(LuceneSerializer luceneSerializer) {
- repositoryManager = createRepositoryManager(luceneSerializer);
- }
-
protected abstract RepositoryManager createRepositoryManager(LuceneSerializer luceneSerializer);
protected abstract void createLuceneListenersAndFileChunkRegions(
PartitionedRepositoryManager partitionedRepositoryManager);
- protected AsyncEventQueue createAEQ(Region dataRegion) {
- String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath);
- return createAEQ(createAEQFactory(dataRegion.getAttributes()), aeqId);
+ protected AsyncEventQueue createAEQ(RegionAttributes attributes) {
+ return createAEQ(attributes, LuceneServiceImpl.getUniqueIndexName(getName(), regionPath));
}
protected AsyncEventQueue createAEQ(RegionAttributes attributes, String aeqId) {
- if (attributes.getPartitionAttributes() != null) {
+ if (attributes != null && attributes.getPartitionAttributes() != null) {
if (attributes.getPartitionAttributes().getLocalMaxMemory() == 0) {
// accessor will not create AEQ
return null;
}
+ return createAEQ(createAEQFactory(attributes), aeqId);
}
- return createAEQ(createAEQFactory(attributes), aeqId);
+ return null;
}
private AsyncEventQueue createAEQ(AsyncEventQueueFactoryImpl factory, String aeqId) {
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplFactory.java
index c960daa..94819e5 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplFactory.java
@@ -14,13 +14,24 @@
*/
package org.apache.geode.cache.lucene.internal;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.lucene.analysis.Analyzer;
+
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.internal.cache.InternalCache;
public class LuceneIndexImplFactory {
public LuceneIndexImplFactory() {}
- public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache) {
- return new LuceneIndexForPartitionedRegion(indexName, regionPath, cache);
+ public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache,
+ Analyzer analyzer, Map<String, Analyzer> fieldAnalyzers, LuceneSerializer serializer,
+ RegionAttributes attributes, String aeqId, String[] fields,
+ ExecutorService waitingThreadPool) {
+ return new LuceneIndexForPartitionedRegion(indexName, regionPath, cache, analyzer,
+ fieldAnalyzers, serializer, attributes, aeqId, fields, waitingThreadPool);
}
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
index d4168bd..eb41eeb 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
@@ -14,6 +14,12 @@
*/
package org.apache.geode.cache.lucene.internal;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.lucene.analysis.Analyzer;
+
+import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
import org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer;
@@ -21,9 +27,14 @@ import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
public class LuceneRawIndex extends LuceneIndexImpl {
-
- protected LuceneRawIndex(String indexName, String regionPath, InternalCache cache) {
- super(indexName, regionPath, cache);
+ public LuceneRawIndex(String indexName, String regionPath, InternalCache cache, Analyzer analyzer,
+ Map<String, Analyzer> fieldAnalyzers, LuceneSerializer serializer,
+ RegionAttributes attributes, String aeqId, String[] fields,
+ ExecutorService waitingThreadPool) {
+ super(indexName, regionPath, cache, serializer, fieldAnalyzers);
+ this.setSearchableFields(fields);
+ this.setAnalyzer(analyzer);
+ this.createAEQ(attributes, aeqId);
}
@Override
@@ -50,4 +61,9 @@ public class LuceneRawIndex extends LuceneIndexImpl {
@Override
public void destroy(boolean initiator) {}
+
+ @Override
+ public boolean isIndexAvailable(int id) {
+ return true;
+ }
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java
index 4a92049..0e290b6 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java
@@ -14,11 +14,21 @@
*/
package org.apache.geode.cache.lucene.internal;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.lucene.analysis.Analyzer;
+
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.internal.cache.InternalCache;
public class LuceneRawIndexFactory extends LuceneIndexImplFactory {
- @Override
- public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache) {
- return new LuceneRawIndex(indexName, regionPath, cache);
+ public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache,
+ Analyzer analyzer, Map<String, Analyzer> fieldAnalyzers, LuceneSerializer serializer,
+ RegionAttributes attributes, String aeqId, String[] fields,
+ ExecutorService waitingThreadPool) {
+ return new LuceneRawIndex(indexName, regionPath, cache, analyzer, fieldAnalyzers, serializer,
+ attributes, aeqId, fields, waitingThreadPool);
}
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java
index 7313a82..06371c6 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java
@@ -107,6 +107,8 @@ public class LuceneRegionListener implements RegionListener {
public void afterCreate(Region region) {
if (region.getFullPath().equals(this.regionPath)
&& this.afterCreateInvoked.compareAndSet(false, true)) {
+ System.err
+ .println(this + ".afterCreate Service: " + service + " LuceneIndex: " + luceneIndex);
this.service.afterDataRegionCreated(this.luceneIndex);
}
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneResultStructImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneResultStructImpl.java
index 25a6a78..30654ef 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneResultStructImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneResultStructImpl.java
@@ -26,9 +26,9 @@ import org.apache.geode.internal.Version;
public class LuceneResultStructImpl<K, V>
implements LuceneResultStruct<K, V>, DataSerializableFixedID {
- K key;
- V value;
- float score;
+ private K key;
+ private V value;
+ private float score;
public LuceneResultStructImpl() {}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index 5d0ea48..14705da 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -24,8 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger;
@@ -66,6 +64,7 @@ import org.apache.geode.cache.lucene.internal.results.LuceneGetPageFunction;
import org.apache.geode.cache.lucene.internal.results.PageResults;
import org.apache.geode.cache.lucene.internal.xml.LuceneServiceXmlGenerator;
import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.internal.DSFIDFactory;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.cache.BucketNotFoundException;
@@ -84,7 +83,6 @@ import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
/**
* Implementation of LuceneService to create lucene index and query.
*
- *
* @since GemFire 8.5
*/
public class LuceneServiceImpl implements InternalLuceneService {
@@ -97,6 +95,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
private IndexListener managementListener;
public static boolean LUCENE_REINDEX =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "luceneReindex");
+ private DistributionManager dm;
public LuceneServiceImpl() {}
@@ -117,6 +116,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
cache.getCancelCriterion().checkCancelInProgress(null);
this.cache = (InternalCache) cache;
+ this.dm = ((InternalCache) cache).getDistributionManager();
FunctionService.registerFunction(new LuceneQueryFunction());
FunctionService.registerFunction(new LuceneGetPageFunction());
@@ -166,8 +166,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
if (!regionPath.startsWith("/")) {
regionPath = "/" + regionPath;
}
- String name = indexName + "#" + regionPath.replace('/', '_');
- return name;
+ return indexName + "#" + regionPath.replace('/', '_');
}
public static String getUniqueIndexRegionName(String indexName, String regionPath,
@@ -232,6 +231,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
LuceneIndexImpl luceneIndex = beforeDataRegionCreated(indexName, regionPath,
region.getAttributes(), analyzer, fieldAnalyzers, aeqId, serializer, fields);
+ System.out.println(this + ".createIndexOnExistingRegion index: " + luceneIndex);
afterDataRegionCreated(luceneIndex);
region.addAsyncEventQueueId(aeqId, true);
@@ -324,19 +324,20 @@ public class LuceneServiceImpl implements InternalLuceneService {
RegionAttributes attributes, final Analyzer analyzer,
final Map<String, Analyzer> fieldAnalyzers, String aeqId, final LuceneSerializer serializer,
final String... fields) {
- LuceneIndexImpl index = createIndexObject(indexName, regionPath);
- index.setSearchableFields(fields);
- index.setAnalyzer(analyzer);
- index.setFieldAnalyzers(fieldAnalyzers);
- index.setLuceneSerializer(serializer);
- index.setupRepositoryManager(serializer);
- index.createAEQ(attributes, aeqId);
+ LuceneIndexImpl index = createIndexObject(indexName, regionPath, fields, analyzer,
+ fieldAnalyzers, serializer, attributes, aeqId);
+ System.err.println(this + ".beforeDataRegionCreated Index: " + index);
return index;
-
}
- private LuceneIndexImpl createIndexObject(String indexName, String regionPath) {
- return luceneIndexFactory.create(indexName, regionPath, cache);
+ private LuceneIndexImpl createIndexObject(String indexName, String regionPath, String[] fields,
+ Analyzer analyzer, Map<String, Analyzer> fieldAnalyzers, LuceneSerializer serializer,
+ RegionAttributes attributes, String aeqId) {
+ System.err.println(this + ".createIndexObject LuceneIndexFactory: " + luceneIndexFactory);
+ LuceneIndexImpl index = luceneIndexFactory.create(indexName, regionPath, cache, analyzer,
+ fieldAnalyzers, serializer, attributes, aeqId, fields, dm.getWaitingThreadPool());
+ System.err.println(this + ".createIndexObject Index: " + index);
+ return index;
}
private void registerDefinedIndex(final String indexName, final String regionPath,
@@ -530,11 +531,12 @@ public class LuceneServiceImpl implements InternalLuceneService {
}
public void unregisterIndex(final String region) {
- if (indexMap.containsKey(region))
+ if (indexMap.containsKey(region)) {
indexMap.remove(region);
+ }
}
- /** Public for test purposes */
+ // Public for test purposes
public static void registerDataSerializables() {
DSFIDFactory.registerDSFID(DataSerializableFixedID.LUCENE_CHUNK_KEY, ChunkKey.class);
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
index f60f83b..b945226 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
@@ -20,6 +20,9 @@ import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.logging.log4j.Logger;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.Region;
@@ -32,8 +35,10 @@ import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
+import org.apache.geode.internal.logging.LogService;
public class PartitionedRepositoryManager implements RepositoryManager {
+ private final Logger logger = LogService.getLogger();
public static IndexRepositoryFactory indexRepositoryFactory = new IndexRepositoryFactory();
/**
* map of the parent bucket region to the index repository
@@ -47,17 +52,23 @@ public class PartitionedRepositoryManager implements RepositoryManager {
protected final ConcurrentHashMap<Integer, IndexRepository> indexRepositories =
new ConcurrentHashMap<Integer, IndexRepository>();
- /** The user region for this index */
+ /**
+ * The user region for this index
+ */
protected PartitionedRegion userRegion = null;
protected final LuceneSerializer serializer;
protected final InternalLuceneIndex index;
protected volatile boolean closed;
private final CountDownLatch isDataRegionReady = new CountDownLatch(1);
- public PartitionedRepositoryManager(InternalLuceneIndex index, LuceneSerializer serializer) {
+ private final ExecutorService waitingThreadPoolFromDM;
+
+ public PartitionedRepositoryManager(InternalLuceneIndex index, LuceneSerializer serializer,
+ ExecutorService waitingThreadPool) {
this.index = index;
this.serializer = serializer;
this.closed = false;
+ this.waitingThreadPoolFromDM = waitingThreadPool;
}
public void setUserRegionForRepositoryManager(PartitionedRegion userRegion) {
@@ -66,7 +77,7 @@ public class PartitionedRepositoryManager implements RepositoryManager {
@Override
public Collection<IndexRepository> getRepositories(RegionFunctionContext ctx)
- throws BucketNotFoundException {
+ throws BucketNotFoundException, LuceneIndexCreationInProgressException {
Region<Object, Object> region = ctx.getDataSet();
Set<Integer> buckets = ((InternalRegionFunctionContext) ctx).getLocalBucketSet(region);
ArrayList<IndexRepository> repos = new ArrayList<IndexRepository>(buckets.size());
@@ -76,7 +87,21 @@ public class PartitionedRepositoryManager implements RepositoryManager {
throw new BucketNotFoundException(
"User bucket was not found for region " + region + "bucket id " + bucketId);
} else {
- repos.add(getRepository(userBucket.getId()));
+ if (index.isIndexAvailable(userBucket.getId())) {
+ repos.add(getRepository(userBucket.getId()));
+ } else {
+ waitingThreadPoolFromDM.execute(() -> {
+ try {
+ IndexRepository repository = getRepository(userBucket.getId());
+ repos.add(repository);
+ } catch (BucketNotFoundException e) {
+ logger.debug(
+ "Lucene Index creation still in progress. Catching BucketNotFoundException");
+ }
+ });
+ throw new LuceneIndexCreationInProgressException(
+ "Lucene Index creation still in progress for bucket: " + userBucket.getId());
+ }
}
}
@@ -137,7 +162,7 @@ public class PartitionedRepositoryManager implements RepositoryManager {
}
return computeRepository(bucketId, serializer, index, userRegion, oldRepository);
} catch (IOException e) {
- throw new InternalGemFireError("Unable to create index repository", e);
+ throw new InternalGemFireError("Unable to create index repository for bucket "+bucketId, e);
}
});
return repo;
@@ -155,7 +180,8 @@ public class PartitionedRepositoryManager implements RepositoryManager {
try {
computeRepository(bucketId);
} catch (LuceneIndexDestroyedException e) {
- /* expected exception */}
+ /* expected exception */
+ }
}
}
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
index 0b38c45..f845898 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
@@ -15,6 +15,7 @@
package org.apache.geode.cache.lucene.internal;
import java.io.IOException;
+import java.util.concurrent.Executors;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
@@ -25,7 +26,7 @@ public class RawLuceneRepositoryManager extends PartitionedRepositoryManager {
public static IndexRepositoryFactory indexRepositoryFactory = new RawIndexRepositoryFactory();
public RawLuceneRepositoryManager(LuceneIndexImpl index, LuceneSerializer serializer) {
- super(index, serializer);
+ super(index, serializer, Executors.newSingleThreadExecutor());
}
@Override
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java
index 4985e90..90d3b26 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java
@@ -14,10 +14,15 @@
*/
package org.apache.geode.cache.lucene.internal;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
+import org.apache.lucene.analysis.Analyzer;
import org.mockito.Mockito;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.internal.cache.InternalCache;
public class LuceneIndexFactorySpy extends LuceneIndexImplFactory {
@@ -36,9 +41,15 @@ public class LuceneIndexFactorySpy extends LuceneIndexImplFactory {
};
@Override
- public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache) {
+ public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache,
+ Analyzer analyzer, Map<String, Analyzer> fieldAnalyzers, LuceneSerializer serializer,
+ RegionAttributes attributes, String aeqId, String[] fields,
+ ExecutorService waitingThreadPool) {
+
LuceneIndexForPartitionedRegion index =
- Mockito.spy(new ExtendedLuceneIndexForPartitionedRegion(indexName, regionPath, cache));
+ Mockito.spy(new ExtendedLuceneIndexForPartitionedRegion(indexName, regionPath, cache,
+ analyzer, fieldAnalyzers, serializer, attributes, aeqId, fields, waitingThreadPool));
+ System.err.println(this + ".create " + index);
return index;
}
@@ -49,9 +60,14 @@ public class LuceneIndexFactorySpy extends LuceneIndexImplFactory {
private static class ExtendedLuceneIndexForPartitionedRegion
extends LuceneIndexForPartitionedRegion {
- public ExtendedLuceneIndexForPartitionedRegion(final String indexName, final String regionPath,
- final InternalCache cache) {
- super(indexName, regionPath, cache);
+
+
+ public ExtendedLuceneIndexForPartitionedRegion(String indexName, String regionPath,
+ InternalCache cache, Analyzer analyzer, Map<String, Analyzer> fieldAnalyzers,
+ LuceneSerializer serializer, RegionAttributes attributes, String aeqId, String[] fields,
+ ExecutorService waitingThreadPool) {
+ super(indexName, regionPath, cache, analyzer, fieldAnalyzers, serializer, attributes, aeqId,
+ fields, waitingThreadPool);
}
}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
index 9ebde2c..99b6064 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java
@@ -18,7 +18,12 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -37,11 +42,13 @@ import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles;
+import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
@@ -55,14 +62,25 @@ public class LuceneIndexForPartitionedRegionTest {
@Rule
public ExpectedException expectedExceptions = ExpectedException.none();
+ private ExecutorService executorService;
+
+ @Before
+ public void setup() {
+ executorService = Executors.newSingleThreadExecutor();
+ }
+
+ @After
+ public void teardown() {
+ executorService.shutdownNow();
+ }
@Test
public void getIndexNameReturnsCorrectName() {
String name = "indexName";
String regionPath = "regionName";
InternalCache cache = Fakes.cache();
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, null, null, null, executorService);
assertEquals(name, index.getName());
}
@@ -71,8 +89,8 @@ public class LuceneIndexForPartitionedRegionTest {
String name = "indexName";
String regionPath = "regionName";
InternalCache cache = Fakes.cache();
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, null, null, null, executorService);
assertEquals(regionPath, index.getRegionPath());
}
@@ -82,8 +100,8 @@ public class LuceneIndexForPartitionedRegionTest {
String regionPath = "regionName";
InternalCache cache = Fakes.cache();
PartitionedRegion region = mock(PartitionedRegion.class);
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, region.getAttributes(), null, null, executorService);
String fileRegionName = index.createFileRegionName();
when(cache.getRegion(fileRegionName)).thenReturn(region);
@@ -95,8 +113,8 @@ public class LuceneIndexForPartitionedRegionTest {
String name = "indexName";
String regionPath = "regionName";
InternalCache cache = Fakes.cache();
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, null, null, null, executorService);
String fileRegionName = index.createFileRegionName();
when(cache.getRegion(fileRegionName)).thenReturn(null);
@@ -104,6 +122,7 @@ public class LuceneIndexForPartitionedRegionTest {
}
@Test
+ @Ignore("Not sure this makes sense anymore since these calls are handled by the constructor")
public void createAEQWithPersistenceCallsCreateOnAEQFactory() {
String name = "indexName";
String regionPath = "regionName";
@@ -114,28 +133,27 @@ public class LuceneIndexForPartitionedRegionTest {
AsyncEventQueueFactoryImpl aeqFactory = mock(AsyncEventQueueFactoryImpl.class);
when(cache.createAsyncEventQueueFactory()).thenReturn(aeqFactory);
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
- index.createAEQ(region);
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, region.getAttributes(), null, null, executorService);
verify(aeqFactory).setPersistent(eq(true));
verify(aeqFactory).create(any(), any());
}
@Test
+ @Ignore("Not sure this makes sense anymore since these calls are handled by the constructor")
public void createRepositoryManagerWithNotNullSerializer() {
String name = "indexName";
String regionPath = "regionName";
InternalCache cache = Fakes.cache();
LuceneSerializer serializer = mock(LuceneSerializer.class);
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
- index = spy(index);
- index.setupRepositoryManager(serializer);
+ LuceneIndexForPartitionedRegion index = spy(new LuceneIndexForPartitionedRegion(name,
+ regionPath, cache, null, null, serializer, null, null, null, executorService));
verify(index).createRepositoryManager(eq(serializer));
}
@Test
+ @Ignore("Not sure this makes sense anymore since these calls are handled by the constructor")
public void createRepositoryManagerWithNullSerializer() {
String name = "indexName";
String regionPath = "regionName";
@@ -143,17 +161,16 @@ public class LuceneIndexForPartitionedRegionTest {
InternalCache cache = Fakes.cache();
ArgumentCaptor<LuceneSerializer> serializerCaptor =
ArgumentCaptor.forClass(LuceneSerializer.class);
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, null, null, fields, executorService);
index = spy(index);
- when(index.getFieldNames()).thenReturn(fields);
- index.setupRepositoryManager(null);
verify(index).createRepositoryManager(serializerCaptor.capture());
LuceneSerializer serializer = serializerCaptor.getValue();
assertNull(serializer);
}
@Test
+ @Ignore("Not sure this makes sense anymore since these calls are handled by the constructor")
public void createAEQCallsCreateOnAEQFactory() {
String name = "indexName";
String regionPath = "regionName";
@@ -162,9 +179,8 @@ public class LuceneIndexForPartitionedRegionTest {
AsyncEventQueueFactoryImpl aeqFactory = mock(AsyncEventQueueFactoryImpl.class);
when(cache.createAsyncEventQueueFactory()).thenReturn(aeqFactory);
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
- index.createAEQ(region);
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, region.getAttributes(), null, null, executorService);
verify(aeqFactory, never()).setPersistent(eq(true));
verify(aeqFactory).create(any(), any());
@@ -228,8 +244,10 @@ public class LuceneIndexForPartitionedRegionTest {
String regionPath = "regionName";
InternalCache cache = Fakes.cache();
Region region = initializeScenario(withPersistence, regionPath, cache, 0);
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+
+ RegionAttributes regionAttributes = region.getAttributes();
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, regionAttributes, "aeq", null, executorService);
LuceneIndexForPartitionedRegion spy = setupSpy(region, index, "aeq");
spy.initialize();
}
@@ -240,13 +258,12 @@ public class LuceneIndexForPartitionedRegionTest {
String name = "indexName";
String regionPath = "regionName";
InternalCache cache = Fakes.cache();
+ when(cache.createAsyncEventQueueFactory()).thenReturn(mock(AsyncEventQueueFactoryImpl.class));
Region region = initializeScenario(withPersistence, regionPath, cache);
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
- LuceneIndexForPartitionedRegion spy = setupSpy(region, index, "aeq");
-
- verify(spy).createAEQ(eq(region.getAttributes()), eq("aeq"));
+ RegionAttributes regionAttributes = region.getAttributes();
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, regionAttributes, "aeq", null, executorService);
}
protected LuceneIndexForPartitionedRegion setupSpy(final Region region,
@@ -255,8 +272,6 @@ public class LuceneIndexForPartitionedRegionTest {
LuceneIndexForPartitionedRegion spy = spy(index);
doReturn(null).when(spy).createRegion(any(), any(), any(), any(), any(), any());
doReturn(null).when(spy).createAEQ(any(), any());
- spy.setupRepositoryManager(null);
- spy.createAEQ(region.getAttributes(), aeq);
spy.initialize();
return spy;
}
@@ -268,9 +283,10 @@ public class LuceneIndexForPartitionedRegionTest {
String regionPath = "regionName";
InternalCache cache = Fakes.cache();
Region region = initializeScenario(withPersistence, regionPath, cache);
-
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+ AsyncEventQueueFactoryImpl aeqFactory = mock(AsyncEventQueueFactoryImpl.class);
+ when(cache.createAsyncEventQueueFactory()).thenReturn(aeqFactory);
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, region.getAttributes(), "aeq", null, executorService);
LuceneIndexForPartitionedRegion spy = setupSpy(region, index, "aeq");
verify(spy).createRegion(eq(index.createFileRegionName()), eq(RegionShortcut.PARTITION), any(),
@@ -286,8 +302,8 @@ public class LuceneIndexForPartitionedRegionTest {
RegionAttributes regionAttributes = mock(RegionAttributes.class);
when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
PartitionAttributes partitionAttributes = initializeAttributes(cache);
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, regionAttributes, null, null, executorService);
LuceneIndexForPartitionedRegion indexSpy = spy(index);
indexSpy.createRegion(index.createFileRegionName(), RegionShortcut.PARTITION, regionPath,
partitionAttributes, regionAttributes, null);
@@ -305,13 +321,12 @@ public class LuceneIndexForPartitionedRegionTest {
InternalCache cache = Fakes.cache();
initializeScenario(withPersistence, regionPath, cache);
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, null, null, null, executorService);
index.setSearchableFields(new String[] {"field"});
LuceneIndexForPartitionedRegion spy = spy(index);
doReturn(null).when(spy).createRegion(any(), any(), any(), any(), any(), any());
doReturn(null).when(spy).createAEQ((RegionAttributes) any(), any());
- spy.setupRepositoryManager(null);
spy.createAEQ(any(), any());
spy.initialize();
@@ -331,12 +346,11 @@ public class LuceneIndexForPartitionedRegionTest {
AsyncEventQueue aeq = mock(AsyncEventQueue.class);
DumpDirectoryFiles function = new DumpDirectoryFiles();
FunctionService.registerFunction(function);
- LuceneIndexForPartitionedRegion index =
- new LuceneIndexForPartitionedRegion(name, regionPath, cache);
+ LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath,
+ cache, null, null, null, null, null, null, executorService);
index = spy(index);
when(index.getFieldNames()).thenReturn(fields);
doReturn(aeq).when(index).createAEQ(any(), any());
- index.setupRepositoryManager(null);
index.createAEQ(cache.getRegionAttributes(regionPath), aeq.getId());
index.initialize();
PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionPath);
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java
deleted file mode 100644
index 5286bae..0000000
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.cache.lucene.internal;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-
-import org.apache.geode.cache.lucene.LuceneIndex;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.test.fake.Fakes;
-import org.apache.geode.test.junit.categories.LuceneTest;
-import org.apache.geode.test.junit.categories.UnitTest;
-
-@Category({UnitTest.class, LuceneTest.class})
-public class LuceneIndexImplJUnitTest {
-
- public static final String REGION = "region";
- public static final String INDEX = "index";
- public static final int MAX_WAIT = 30000;
-
- private InternalCache cache;
- private LuceneIndex index;
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Before
- public void createLuceneIndex() {
- cache = Fakes.cache();
- index = new LuceneIndexForPartitionedRegion(INDEX, REGION, cache);
- }
-
-}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
index 0f67cb6..fb739e7 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
@@ -19,6 +19,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.Analyzer;
@@ -91,7 +92,8 @@ public class LuceneIndexRecoveryHAIntegrationTest {
userRegion.put("rebalance", "test");
service.waitUntilFlushed("index1", "userRegion", 30000, TimeUnit.MILLISECONDS);
- RepositoryManager manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper);
+ RepositoryManager manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper,
+ Executors.newSingleThreadExecutor());
IndexRepository repo = manager.getRepository(userRegion, 0, null);
assertNotNull(repo);
@@ -106,14 +108,13 @@ public class LuceneIndexRecoveryHAIntegrationTest {
userRegion = (PartitionedRegion) regionfactory.create("userRegion");
userRegion.put("rebalance", "test");
- manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper);
+ manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper,
+ Executors.newSingleThreadExecutor());
IndexRepository newRepo = manager.getRepository(userRegion, 0, null);
Assert.assertNotEquals(newRepo, repo);
}
-
-
private void verifyIndexFinishFlushing(String indexName, String regionName)
throws InterruptedException {
LuceneService service = LuceneServiceProvider.get(cache);
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java
index 07c2d6c..d1e530f 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.Analyzer;
@@ -52,6 +53,7 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
import org.apache.geode.cache.lucene.LuceneIndexFactory;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
@@ -110,16 +112,23 @@ public class LuceneServiceImplJUnitTest {
@Test
public void userRegionShouldNotBeSetBeforeIndexInitialized() throws Exception {
+ DistributionManager dm = mock(DistributionManager.class);
TestLuceneServiceImpl testService = new TestLuceneServiceImpl();
Field f = LuceneServiceImpl.class.getDeclaredField("cache");
f.setAccessible(true);
f.set(testService, cache);
+ f = LuceneServiceImpl.class.getDeclaredField("dm");
+ f.setAccessible(true);
+ f.set(testService, dm);
AsyncEventQueueFactoryImpl aeqFactory = mock(AsyncEventQueueFactoryImpl.class);
when(cache.createAsyncEventQueueFactory()).thenReturn(aeqFactory);
DistributedSystem ds = mock(DistributedSystem.class);
+
Statistics luceneIndexStats = mock(Statistics.class);
when(cache.getDistributedSystem()).thenReturn(ds);
+ when(cache.getDistributionManager()).thenReturn(dm);
+ when(dm.getWaitingThreadPool()).thenReturn(Executors.newSingleThreadExecutor());
when(((StatisticsFactory) ds).createAtomicStatistics(any(), anyString()))
.thenReturn(luceneIndexStats);
when(cache.getRegion(anyString())).thenReturn(region);
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
index b2c30b1..ed7305d 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
@@ -26,16 +26,23 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.commons.collections.map.HashedMap;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.IndexWriter;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -83,7 +90,6 @@ public class PartitionedRepositoryManagerJUnitTest {
protected LuceneSerializer serializer;
protected PartitionedRegionDataStore userDataStore;
protected PartitionedRegionDataStore fileDataStore;
- protected PartitionedRegionHelper prHelper;
protected PartitionRegionConfig prConfig;
protected LocalRegion prRoot;
@@ -94,6 +100,8 @@ public class PartitionedRepositoryManagerJUnitTest {
protected LuceneIndexImpl indexForPR;
protected PartitionedRepositoryManager repoManager;
protected GemFireCacheImpl cache;
+ private final Map<Integer, Boolean> isIndexAvailableMap = new HashedMap();
+
@Before
public void setUp() {
@@ -127,7 +135,7 @@ public class PartitionedRepositoryManagerJUnitTest {
when(fileAndChunkRegion.getRegionIdentifier()).thenReturn("rid");
indexStats = Mockito.mock(LuceneIndexStats.class);
fileSystemStats = Mockito.mock(FileSystemStats.class);
- indexForPR = Mockito.mock(LuceneIndexForPartitionedRegion.class);
+ indexForPR = Mockito.spy(LuceneIndexForPartitionedRegion.class);
when(((LuceneIndexForPartitionedRegion) indexForPR).getFileAndChunkRegion())
.thenReturn(fileAndChunkRegion);
when(((LuceneIndexForPartitionedRegion) indexForPR).getFileSystemStats())
@@ -142,7 +150,8 @@ public class PartitionedRepositoryManagerJUnitTest {
when(prRoot.get("rid")).thenReturn(prConfig);
PowerMockito.mockStatic(PartitionedRegionHelper.class);
PowerMockito.when(PartitionedRegionHelper.getPRRoot(cache)).thenReturn(prRoot);
- repoManager = new PartitionedRepositoryManager(indexForPR, serializer);
+ repoManager = new PartitionedRepositoryManager(indexForPR, serializer,
+ Executors.newSingleThreadExecutor());
repoManager.setUserRegionForRepositoryManager(userRegion);
repoManager.allowRepositoryComputation();
}
@@ -213,30 +222,50 @@ public class PartitionedRepositoryManagerJUnitTest {
when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(null);
- when(fileAndChunkRegion.getOrCreateNodeForBucketWrite(eq(0), (RetryTimeKeeper) any()))
- .then(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(fileAndChunkBuckets.get(0));
- return null;
- }
+ when(fileAndChunkRegion.getOrCreateNodeForBucketWrite(eq(0), any()))
+ .then((Answer) invocation -> {
+ when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(fileAndChunkBuckets.get(0));
+ return null;
});
assertNotNull(repoManager.getRepository(userRegion, 0, null));
}
+ @Test(expected = LuceneIndexCreationInProgressException.class)
+ public void queryByRegionFailingWithInProgressException()
+ throws LuceneIndexCreationInProgressException, BucketNotFoundException {
+ setUpMockBucket(0);
+ setUpMockBucket(1);
+
+ Set<Integer> buckets = new LinkedHashSet<>(Arrays.asList(0, 1));
+ InternalRegionFunctionContext ctx = Mockito.mock(InternalRegionFunctionContext.class);
+ when(ctx.getLocalBucketSet((any()))).thenReturn(buckets);
+ repoManager.getRepositories(ctx);
+ }
+
@Test
- public void getByRegion() throws BucketNotFoundException {
+ public void queryByRegionWaitingForRepoToBeCreated()
+ throws LuceneIndexCreationInProgressException {
setUpMockBucket(0);
setUpMockBucket(1);
+ setupIsIndexAvailable();
+
Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1));
InternalRegionFunctionContext ctx = Mockito.mock(InternalRegionFunctionContext.class);
when(ctx.getLocalBucketSet((any()))).thenReturn(buckets);
- Collection<IndexRepository> repos = repoManager.getRepositories(ctx);
- assertEquals(2, repos.size());
+ final Collection<IndexRepository> repositories = new HashSet<>();
- Iterator<IndexRepository> itr = repos.iterator();
+ Awaitility.await().pollDelay(1, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+ .atMost(500, TimeUnit.SECONDS).until(() -> {
+ try {
+ repositories.addAll(repoManager.getRepositories(ctx));
+ } catch (BucketNotFoundException | LuceneIndexCreationInProgressException e) {
+ }
+ return repositories.size() == 2;
+ });
+
+ Iterator<IndexRepository> itr = repositories.iterator();
IndexRepositoryImpl repo0 = (IndexRepositoryImpl) itr.next();
IndexRepositoryImpl repo1 = (IndexRepositoryImpl) itr.next();
@@ -246,13 +275,40 @@ public class PartitionedRepositoryManagerJUnitTest {
checkRepository(repo0, 0);
checkRepository(repo1, 1);
+
+ }
+
+ private void setupIsIndexAvailable() {
+ when(indexForPR.isIndexAvailable(1)).then((Answer) invocation -> {
+ boolean result;
+ Boolean isAvailable = isIndexAvailableMap.get(1);
+ if (isAvailable == null || !isAvailable) {
+ isIndexAvailableMap.put(1, true);
+ result = false;
+ } else {
+ result = true;
+ }
+ return result;
+ });
+ when(indexForPR.isIndexAvailable(0)).then((Answer) invocation -> {
+ boolean result;
+ Boolean isAvailable = isIndexAvailableMap.get(0);
+ if (isAvailable == null || !isAvailable) {
+ isIndexAvailableMap.put(0, true);
+ result = false;
+ } else {
+ result = true;
+ }
+ return result;
+ });
}
/**
* Test that we get the expected exception when a user bucket is missing
*/
- @Test(expected = BucketNotFoundException.class)
- public void getMissingBucketByRegion() throws BucketNotFoundException {
+ @Test(expected = LuceneIndexCreationInProgressException.class)
+ public void getMissingBucketByRegion()
+ throws LuceneIndexCreationInProgressException, BucketNotFoundException {
setUpMockBucket(0);
Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1));
@@ -270,7 +326,7 @@ public class PartitionedRepositoryManagerJUnitTest {
assertEquals(serializer, repo0.getSerializer());
}
- protected BucketRegion setUpMockBucket(int id) throws BucketNotFoundException {
+ protected BucketRegion setUpMockBucket(int id) {
BucketRegion mockBucket = Mockito.mock(BucketRegion.class);
BucketRegion fileAndChunkBucket = Mockito.mock(BucketRegion.class);
// Allowing the fileAndChunkBucket to behave like a map so that the IndexWriter operations don't
@@ -290,6 +346,7 @@ public class PartitionedRepositoryManagerJUnitTest {
BucketAdvisor mockBucketAdvisor = Mockito.mock(BucketAdvisor.class);
when(fileAndChunkBucket.getBucketAdvisor()).thenReturn(mockBucketAdvisor);
when(mockBucketAdvisor.isPrimary()).thenReturn(true);
+
return mockBucket;
}
}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
index a000d2f..73bbd76 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
@@ -53,7 +53,7 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
@After
public void tearDown() {
- ((RawLuceneRepositoryManager) repoManager).close();
+ repoManager.close();
}
protected void createIndexAndRepoManager() {
@@ -66,6 +66,7 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
when(indexForPR.getCache()).thenReturn(cache);
when(indexForPR.getRegionPath()).thenReturn("/testRegion");
when(indexForPR.withPersistence()).thenReturn(true);
+ when(indexForPR.getName()).thenReturn("rawLuceneTest");
repoManager = new RawLuceneRepositoryManager(indexForPR, serializer);
repoManager.setUserRegionForRepositoryManager(userRegion);
repoManager.allowRepositoryComputation();
@@ -85,7 +86,7 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
}
@Override
- protected BucketRegion setUpMockBucket(int id) throws BucketNotFoundException {
+ protected BucketRegion setUpMockBucket(int id) {
BucketRegion mockBucket = Mockito.mock(BucketRegion.class);
when(mockBucket.getId()).thenReturn(id);
when(userRegion.getBucketRegion(eq(id), eq(null))).thenReturn(mockBucket);
diff --git a/gradle.properties b/gradle.properties
index ad3dbce..fc53156 100755
--- a/gradle.properties
+++ b/gradle.properties
@@ -42,6 +42,10 @@ productOrg = Apache Software Foundation (ASF)
org.gradle.daemon = true
org.gradle.jvmargs = -Xmx2048m
+org.gradle.parallel=true
+org.gradle.configureondemand=true
+org.gradle.workers.max=4
+
minimumGradleVersion = 3.5.1
# Set this on the command line with -P or in ~/.gradle/gradle.properties
# to change the buildDir location. Use an absolute path.
--
To stop receiving notification emails like this one, please contact
udo@apache.org.