You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2018/03/29 22:22:33 UTC
[geode] 01/02: GEODE-3926 Initial commit to add
LuceneIndexCreationInProgressException
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-3926_3
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 6ea3f648e7abf2bf77a53e8df1bf761ed17e8d5e
Author: Udo <uk...@pivotal.io>
AuthorDate: Mon Mar 26 10:51:16 2018 -0700
GEODE-3926 Initial commit to add LuceneIndexCreationInProgressException
---
.../sanctioned-geode-core-serializables.txt | 1 +
.../cache/lucene/internal/InternalLuceneIndex.java | 2 +
...=> LuceneIndexCreationInProgressException.java} | 9 +-
.../internal/LuceneIndexForPartitionedRegion.java | 15 +-
.../cache/lucene/internal/LuceneRawIndex.java | 15 +-
.../lucene/internal/LuceneRawIndexFactory.java | 8 +-
.../cache/lucene/internal/LuceneServiceImpl.java | 6 +-
.../internal/PartitionedRepositoryManager.java | 35 ++++-
.../lucene/internal/RawIndexRepositoryFactory.java | 11 +-
.../internal/RawLuceneRepositoryManager.java | 9 +-
.../sanctioned-geode-lucene-serializables.txt | 1 +
.../lucene/LuceneIndexCreationIntegrationTest.java | 9 +-
.../LuceneIndexRecoveryHAIntegrationTest.java | 6 +-
.../internal/LuceneServiceImplJUnitTest.java | 5 +
.../PartitionedRepositoryManagerJUnitTest.java | 151 ++++++++++++++-------
.../RawLuceneRepositoryManagerJUnitTest.java | 21 +--
16 files changed, 219 insertions(+), 85 deletions(-)
diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index d296f77..c2d1aa4 100644
--- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -803,3 +803,4 @@ org/apache/geode/security/AuthenticationFailedException,true,-820286647227908887
org/apache/geode/security/AuthenticationRequiredException,true,4675976651103154919
org/apache/geode/security/GemFireSecurityException,true,3814254578203076926,cause:java/lang/Throwable
org/apache/geode/security/NotAuthorizedException,true,419215768216387745,principal:java/security/Principal
+org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException,false
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java
index 74e4ac8..1e95a46 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java
@@ -39,4 +39,6 @@ public interface InternalLuceneIndex extends LuceneIndex {
void initialize();
+ boolean isIndexAvailable(int id);
+
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java
old mode 100755
new mode 100644
similarity index 74%
copy from geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java
copy to geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java
index 4a92049..9c58ba5
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java
@@ -14,11 +14,10 @@
*/
package org.apache.geode.cache.lucene.internal;
-import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.GemFireException;
-public class LuceneRawIndexFactory extends LuceneIndexImplFactory {
- @Override
- public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache) {
- return new LuceneRawIndex(indexName, regionPath, cache);
+public class LuceneIndexCreationInProgressException extends GemFireException {
+ public LuceneIndexCreationInProgressException(String message) {
+ super(message);
}
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index 577bdef..032f1b7 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -16,6 +16,7 @@
package org.apache.geode.cache.lucene.internal;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import org.apache.geode.CancelException;
import org.apache.geode.cache.AttributesFactory;
@@ -49,8 +50,11 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
public static final String FILES_REGION_SUFFIX = ".files";
+ private ExecutorService waitingThreadPoolFromDM;
+
public LuceneIndexForPartitionedRegion(String indexName, String regionPath, InternalCache cache) {
super(indexName, regionPath, cache);
+ this.waitingThreadPoolFromDM = cache.getDistributionManager().getWaitingThreadPool();
final String statsName = indexName + "-" + regionPath;
this.fileSystemStats = new FileSystemStats(cache.getDistributedSystem(), statsName);
@@ -62,7 +66,7 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
mapper = new HeterogeneousLuceneSerializer();
}
PartitionedRepositoryManager partitionedRepositoryManager =
- new PartitionedRepositoryManager(this, mapper);
+ new PartitionedRepositoryManager(this, mapper, this.waitingThreadPoolFromDM);
return partitionedRepositoryManager;
}
@@ -202,6 +206,15 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
}
}
+ @Override
+ public boolean isIndexAvailable(int id) {
+ PartitionedRegion fileAndChunkRegion = getFileAndChunkRegion();
+ if (fileAndChunkRegion != null) {
+ return fileAndChunkRegion.get(IndexRepositoryFactory.APACHE_GEODE_INDEX_COMPLETE, id) != null;
+ }
+ return false;
+ }
+
private void destroyOnRemoteMembers() {
PartitionedRegion pr = (PartitionedRegion) getDataRegion();
DistributionManager dm = pr.getDistributionManager();
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
index d4168bd..6bf7a23 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java
@@ -22,8 +22,12 @@ import org.apache.geode.internal.cache.PartitionedRegion;
public class LuceneRawIndex extends LuceneIndexImpl {
- protected LuceneRawIndex(String indexName, String regionPath, InternalCache cache) {
+ private String luceneFolderPath;
+
+ protected LuceneRawIndex(String indexName, String regionPath, InternalCache cache,
+ String luceneFolderPath) {
super(indexName, regionPath, cache);
+ this.luceneFolderPath = luceneFolderPath;
}
@Override
@@ -32,8 +36,8 @@ public class LuceneRawIndex extends LuceneIndexImpl {
if (mapper == null) {
mapper = new HeterogeneousLuceneSerializer();
}
- RawLuceneRepositoryManager rawLuceneRepositoryManager =
- new RawLuceneRepositoryManager(this, mapper);
+ RawLuceneRepositoryManager rawLuceneRepositoryManager = new RawLuceneRepositoryManager(this,
+ mapper, cache.getDistributionManager().getWaitingThreadPool(), luceneFolderPath);
return rawLuceneRepositoryManager;
}
@@ -50,4 +54,9 @@ public class LuceneRawIndex extends LuceneIndexImpl {
@Override
public void destroy(boolean initiator) {}
+
+ @Override
+ public boolean isIndexAvailable(int id) {
+ return true;
+ }
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java
index 4a92049..1f91af8 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java
@@ -17,8 +17,14 @@ package org.apache.geode.cache.lucene.internal;
import org.apache.geode.internal.cache.InternalCache;
public class LuceneRawIndexFactory extends LuceneIndexImplFactory {
+ private String luceneFolderPath;
+
+ public LuceneRawIndexFactory(String luceneFolderPath) {
+ this.luceneFolderPath = luceneFolderPath;
+ }
+
@Override
public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache) {
- return new LuceneRawIndex(indexName, regionPath, cache);
+ return new LuceneRawIndex(indexName, regionPath, cache, luceneFolderPath);
}
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index 01bc5c6..3e08abd 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -166,8 +166,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
if (!regionPath.startsWith("/")) {
regionPath = "/" + regionPath;
}
- String name = indexName + "#" + regionPath.replace('/', '_');
- return name;
+ return indexName + "#" + regionPath.replace('/', '_');
}
public static String getUniqueIndexRegionName(String indexName, String regionPath,
@@ -182,7 +181,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
}
Analyzer analyzer = new PerFieldAnalyzerWrapper(new StandardAnalyzer(), fieldAnalyzers);
Set<String> fieldsSet = fieldAnalyzers.keySet();
- String[] fields = fieldsSet.toArray(new String[fieldsSet.size()]);
+ String[] fields = fieldsSet.toArray(new String[0]);
createIndex(indexName, regionPath, analyzer, fieldAnalyzers, serializer, allowOnExistingRegion,
fields);
@@ -209,6 +208,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
return;
}
+
if (!allowOnExistingRegion) {
definedIndexMap.remove(LuceneServiceImpl.getUniqueIndexName(indexName, regionPath));
throw new IllegalStateException("The lucene index must be created before region");
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
index f60f83b..92b4c08 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
@@ -20,6 +20,9 @@ import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.logging.log4j.Logger;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.Region;
@@ -32,8 +35,10 @@ import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
+import org.apache.geode.internal.logging.LogService;
public class PartitionedRepositoryManager implements RepositoryManager {
+ private final Logger logger = LogService.getLogger();
public static IndexRepositoryFactory indexRepositoryFactory = new IndexRepositoryFactory();
/**
* map of the parent bucket region to the index repository
@@ -47,17 +52,23 @@ public class PartitionedRepositoryManager implements RepositoryManager {
protected final ConcurrentHashMap<Integer, IndexRepository> indexRepositories =
new ConcurrentHashMap<Integer, IndexRepository>();
- /** The user region for this index */
+ /**
+ * The user region for this index
+ */
protected PartitionedRegion userRegion = null;
protected final LuceneSerializer serializer;
protected final InternalLuceneIndex index;
protected volatile boolean closed;
private final CountDownLatch isDataRegionReady = new CountDownLatch(1);
- public PartitionedRepositoryManager(InternalLuceneIndex index, LuceneSerializer serializer) {
+ private final ExecutorService waitingThreadPoolFromDM;
+
+ public PartitionedRepositoryManager(InternalLuceneIndex index, LuceneSerializer serializer,
+ ExecutorService waitingThreadPool) {
this.index = index;
this.serializer = serializer;
this.closed = false;
+ this.waitingThreadPoolFromDM = waitingThreadPool;
}
public void setUserRegionForRepositoryManager(PartitionedRegion userRegion) {
@@ -70,13 +81,28 @@ public class PartitionedRepositoryManager implements RepositoryManager {
Region<Object, Object> region = ctx.getDataSet();
Set<Integer> buckets = ((InternalRegionFunctionContext) ctx).getLocalBucketSet(region);
ArrayList<IndexRepository> repos = new ArrayList<IndexRepository>(buckets.size());
+
for (Integer bucketId : buckets) {
BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(bucketId);
if (userBucket == null) {
throw new BucketNotFoundException(
"User bucket was not found for region " + region + "bucket id " + bucketId);
} else {
- repos.add(getRepository(userBucket.getId()));
+ if (index.isIndexAvailable(userBucket.getId())) {
+ repos.add(getRepository(userBucket.getId()));
+ } else {
+ waitingThreadPoolFromDM.execute(() -> {
+ try {
+ IndexRepository repository = getRepository(userBucket.getId());
+ repos.add(repository);
+ } catch (BucketNotFoundException e) {
+ logger.debug(
+ "Lucene Index creation in progress. Catching BucketNotFoundException");
+ }
+ });
+ throw new LuceneIndexCreationInProgressException(
+ "Lucene Index creation in progress for bucket: " + userBucket.getId());
+ }
}
}
@@ -155,7 +181,8 @@ public class PartitionedRepositoryManager implements RepositoryManager {
try {
computeRepository(bucketId);
} catch (LuceneIndexDestroyedException e) {
- /* expected exception */}
+ /* expected exception */
+ }
}
}
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
index 984d3eb..ea598bb 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
@@ -30,6 +30,9 @@ import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
public class RawIndexRepositoryFactory extends IndexRepositoryFactory {
+
+ private String luceneFolderPath = "";
+
public RawIndexRepositoryFactory() {}
@Override
@@ -46,7 +49,9 @@ public class RawIndexRepositoryFactory extends IndexRepositoryFactory {
if (indexForRaw.withPersistence()) {
String bucketLocation = LuceneServiceImpl.getUniqueIndexName(index.getName(),
index.getRegionPath() + "_" + bucketId);
- File location = new File(index.getName(), bucketLocation);
+ String filePath = luceneFolderPath.isEmpty() ? index.getName()
+ : luceneFolderPath + File.separator + index.getName();
+ File location = new File(filePath, bucketLocation);
if (!location.exists()) {
location.mkdirs();
}
@@ -60,4 +65,8 @@ public class RawIndexRepositoryFactory extends IndexRepositoryFactory {
return new IndexRepositoryImpl(null, writer, serializer, indexForRaw.getIndexStats(),
dataBucket, null, "", indexForRaw);
}
+
+ public void setLuceneFolderPath(String luceneFolderPath) {
+ this.luceneFolderPath = luceneFolderPath;
+ }
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
index 0b38c45..c297159 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
@@ -15,6 +15,7 @@
package org.apache.geode.cache.lucene.internal;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
@@ -22,10 +23,12 @@ import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.PartitionedRegion;
public class RawLuceneRepositoryManager extends PartitionedRepositoryManager {
- public static IndexRepositoryFactory indexRepositoryFactory = new RawIndexRepositoryFactory();
+ public static RawIndexRepositoryFactory indexRepositoryFactory = new RawIndexRepositoryFactory();
- public RawLuceneRepositoryManager(LuceneIndexImpl index, LuceneSerializer serializer) {
- super(index, serializer);
+ public RawLuceneRepositoryManager(LuceneIndexImpl index, LuceneSerializer serializer,
+ ExecutorService waitingThreadPool, String luceneFolderPath) {
+ super(index, serializer, waitingThreadPool);
+ indexRepositoryFactory.setLuceneFolderPath(luceneFolderPath);
}
@Override
diff --git a/geode-lucene/src/main/resources/org/apache/geode/internal/sanctioned-geode-lucene-serializables.txt b/geode-lucene/src/main/resources/org/apache/geode/internal/sanctioned-geode-lucene-serializables.txt
index a13c06b..3a9117a 100755
--- a/geode-lucene/src/main/resources/org/apache/geode/internal/sanctioned-geode-lucene-serializables.txt
+++ b/geode-lucene/src/main/resources/org/apache/geode/internal/sanctioned-geode-lucene-serializables.txt
@@ -17,3 +17,4 @@ org/apache/geode/cache/lucene/internal/directory/DumpDirectoryFiles,true,1
org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction,true,1
org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction,true,1
org/apache/geode/cache/lucene/internal/results/LuceneGetPageFunction,true,1
+org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException,false
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationIntegrationTest.java
index 55c2222..80310ab 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationIntegrationTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationIntegrationTest.java
@@ -69,6 +69,7 @@ import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.test.junit.categories.IntegrationTest;
import org.apache.geode.test.junit.categories.LuceneTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
/**
* Tests of creating lucene indexes on regions. All tests of index creation use cases should be in
@@ -85,6 +86,9 @@ public class LuceneIndexCreationIntegrationTest extends LuceneIntegrationTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
@Test
public void shouldCreateIndexWriterWithAnalyzersWhenSettingPerFieldAnalyzers()
@@ -162,14 +166,15 @@ public class LuceneIndexCreationIntegrationTest extends LuceneIntegrationTest {
@Test
public void shouldCreateRawIndexIfSpecifiedItsFactory()
- throws BucketNotFoundException, InterruptedException {
+ throws BucketNotFoundException, InterruptedException, IOException {
Map<String, Analyzer> analyzers = new HashMap<>();
final RecordingAnalyzer field1Analyzer = new RecordingAnalyzer();
final RecordingAnalyzer field2Analyzer = new RecordingAnalyzer();
analyzers.put("field1", field1Analyzer);
analyzers.put("field2", field2Analyzer);
- LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory();
+ LuceneServiceImpl.luceneIndexFactory =
+ new LuceneRawIndexFactory(temporaryFolder.newFolder("lucene").getPath());
try {
luceneService.createIndexFactory().setFields(analyzers).create(INDEX_NAME, REGION_NAME);
Region region = createRegion();
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
index 0f67cb6..599d0a1 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
@@ -19,6 +19,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.Analyzer;
@@ -91,7 +92,8 @@ public class LuceneIndexRecoveryHAIntegrationTest {
userRegion.put("rebalance", "test");
service.waitUntilFlushed("index1", "userRegion", 30000, TimeUnit.MILLISECONDS);
- RepositoryManager manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper);
+ RepositoryManager manager =
+ new PartitionedRepositoryManager(index, mapper, Executors.newSingleThreadExecutor());
IndexRepository repo = manager.getRepository(userRegion, 0, null);
assertNotNull(repo);
@@ -106,7 +108,7 @@ public class LuceneIndexRecoveryHAIntegrationTest {
userRegion = (PartitionedRegion) regionfactory.create("userRegion");
userRegion.put("rebalance", "test");
- manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper);
+ manager = new PartitionedRepositoryManager(index, mapper, Executors.newSingleThreadExecutor());
IndexRepository newRepo = manager.getRepository(userRegion, 0, null);
Assert.assertNotEquals(newRepo, repo);
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java
index 07c2d6c..8b09ff5 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.Analyzer;
@@ -52,6 +53,7 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
import org.apache.geode.cache.lucene.LuceneIndexFactory;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
@@ -123,6 +125,9 @@ public class LuceneServiceImplJUnitTest {
when(((StatisticsFactory) ds).createAtomicStatistics(any(), anyString()))
.thenReturn(luceneIndexStats);
when(cache.getRegion(anyString())).thenReturn(region);
+ when(cache.getDistributionManager()).thenReturn(mock(DistributionManager.class));
+ when(cache.getDistributionManager().getWaitingThreadPool())
+ .thenReturn(Executors.newSingleThreadExecutor());
RegionAttributes ratts = mock(RegionAttributes.class);
when(region.getAttributes()).thenReturn(ratts);
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
index b2c30b1..38319b6 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
@@ -29,20 +29,24 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.IndexWriter;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -64,13 +68,13 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionRegionConfig;
import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegion.RetryTimeKeeper;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
import org.apache.geode.test.fake.Fakes;
import org.apache.geode.test.junit.categories.LuceneTest;
import org.apache.geode.test.junit.categories.UnitTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
@Category({UnitTest.class, LuceneTest.class})
@RunWith(PowerMockRunner.class)
@@ -83,20 +87,24 @@ public class PartitionedRepositoryManagerJUnitTest {
protected LuceneSerializer serializer;
protected PartitionedRegionDataStore userDataStore;
protected PartitionedRegionDataStore fileDataStore;
- protected PartitionedRegionHelper prHelper;
protected PartitionRegionConfig prConfig;
protected LocalRegion prRoot;
- protected Map<Integer, BucketRegion> fileAndChunkBuckets = new HashMap<Integer, BucketRegion>();
- protected Map<Integer, BucketRegion> dataBuckets = new HashMap<Integer, BucketRegion>();
+ protected Map<Integer, BucketRegion> fileAndChunkBuckets = new HashMap<>();
+ protected Map<Integer, BucketRegion> dataBuckets = new HashMap<>();
protected LuceneIndexStats indexStats;
protected FileSystemStats fileSystemStats;
protected LuceneIndexImpl indexForPR;
protected PartitionedRepositoryManager repoManager;
protected GemFireCacheImpl cache;
+ private final Map<Integer, Boolean> isIndexAvailableMap = new HashMap<>();
+
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
@Before
- public void setUp() {
+ public void setUp() throws IOException {
cache = Fakes.cache();
userRegion = Mockito.mock(PartitionedRegion.class);
userDataStore = Mockito.mock(PartitionedRegionDataStore.class);
@@ -117,7 +125,10 @@ public class PartitionedRepositoryManagerJUnitTest {
DLockService.removeLockServiceForTests(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
}
- protected void createIndexAndRepoManager() {
+ protected void createIndexAndRepoManager() throws IOException {
+ String luceneFolderPath = temporaryFolder.newFolder("lucene").getPath();
+ LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory(luceneFolderPath);
+
fileAndChunkRegion = Mockito.mock(PartitionedRegion.class);
fileDataStore = Mockito.mock(PartitionedRegionDataStore.class);
when(fileAndChunkRegion.getDataStore()).thenReturn(fileDataStore);
@@ -142,13 +153,14 @@ public class PartitionedRepositoryManagerJUnitTest {
when(prRoot.get("rid")).thenReturn(prConfig);
PowerMockito.mockStatic(PartitionedRegionHelper.class);
PowerMockito.when(PartitionedRegionHelper.getPRRoot(cache)).thenReturn(prRoot);
- repoManager = new PartitionedRepositoryManager(indexForPR, serializer);
+ repoManager = new PartitionedRepositoryManager(indexForPR, serializer,
+ Executors.newSingleThreadExecutor());
repoManager.setUserRegionForRepositoryManager(userRegion);
repoManager.allowRepositoryComputation();
}
@Test
- public void getByKey() throws BucketNotFoundException, IOException {
+ public void getByKey() throws BucketNotFoundException {
setUpMockBucket(0);
setUpMockBucket(1);
@@ -165,23 +177,22 @@ public class PartitionedRepositoryManagerJUnitTest {
assertEquals(repo0, repo113);
assertNotEquals(repo0, repo1);
- checkRepository(repo0, 0);
- checkRepository(repo1, 1);
+ checkRepositoryContainsBucket(repo0, 0);
+ checkRepositoryContainsBucket(repo1, 1);
}
/**
* Test what happens when a bucket is destroyed.
*/
@Test
- public void destroyBucketShouldCreateNewIndexRepository()
- throws BucketNotFoundException, IOException {
+ public void destroyBucketShouldCreateNewIndexRepository() throws BucketNotFoundException {
setUpMockBucket(0);
IndexRepositoryImpl repo0 =
(IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null);
assertNotNull(repo0);
- checkRepository(repo0, 0);
+ checkRepositoryContainsBucket(repo0, 0);
BucketRegion fileBucket0 = fileAndChunkBuckets.get(0);
BucketRegion dataBucket0 = dataBuckets.get(0);
@@ -194,7 +205,7 @@ public class PartitionedRepositoryManagerJUnitTest {
IndexRepositoryImpl newRepo0 =
(IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null);
assertNotEquals(repo0, newRepo0);
- checkRepository(newRepo0, 0);
+ checkRepositoryContainsBucket(newRepo0, 0);
assertTrue(repo0.isClosed());
assertFalse(repo0.getWriter().isOpen());
}
@@ -213,45 +224,19 @@ public class PartitionedRepositoryManagerJUnitTest {
when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(null);
- when(fileAndChunkRegion.getOrCreateNodeForBucketWrite(eq(0), (RetryTimeKeeper) any()))
- .then(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(fileAndChunkBuckets.get(0));
- return null;
- }
+ when(fileAndChunkRegion.getOrCreateNodeForBucketWrite(eq(0), any()))
+ .then((Answer) invocation -> {
+ when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(fileAndChunkBuckets.get(0));
+ return null;
});
assertNotNull(repoManager.getRepository(userRegion, 0, null));
}
- @Test
- public void getByRegion() throws BucketNotFoundException {
- setUpMockBucket(0);
- setUpMockBucket(1);
-
- Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1));
- InternalRegionFunctionContext ctx = Mockito.mock(InternalRegionFunctionContext.class);
- when(ctx.getLocalBucketSet((any()))).thenReturn(buckets);
- Collection<IndexRepository> repos = repoManager.getRepositories(ctx);
- assertEquals(2, repos.size());
-
- Iterator<IndexRepository> itr = repos.iterator();
- IndexRepositoryImpl repo0 = (IndexRepositoryImpl) itr.next();
- IndexRepositoryImpl repo1 = (IndexRepositoryImpl) itr.next();
-
- assertNotNull(repo0);
- assertNotNull(repo1);
- assertNotEquals(repo0, repo1);
-
- checkRepository(repo0, 0);
- checkRepository(repo1, 1);
- }
-
/**
* Test that we get the expected exception when a user bucket is missing
*/
- @Test(expected = BucketNotFoundException.class)
+ @Test(expected = LuceneIndexCreationInProgressException.class)
public void getMissingBucketByRegion() throws BucketNotFoundException {
setUpMockBucket(0);
@@ -262,15 +247,21 @@ public class PartitionedRepositoryManagerJUnitTest {
repoManager.getRepositories(ctx);
}
- protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) {
+ protected void checkRepositoryContainsBucket(IndexRepositoryImpl repo0, int... bucketIds) {
IndexWriter writer0 = repo0.getWriter();
RegionDirectory dir0 = (RegionDirectory) writer0.getDirectory();
- assertEquals(new BucketTargetingMap(fileAndChunkBuckets.get(bucketId), bucketId),
- dir0.getFileSystem().getFileAndChunkRegion());
+ boolean result = false;
+ for (int bucketId : bucketIds) {
+ BucketTargetingMap bucketTargetingMap =
+ new BucketTargetingMap(fileAndChunkBuckets.get(bucketId), bucketId);
+ result |= bucketTargetingMap.equals(dir0.getFileSystem().getFileAndChunkRegion());
+ }
+
+ assertTrue(result);
assertEquals(serializer, repo0.getSerializer());
}
- protected BucketRegion setUpMockBucket(int id) throws BucketNotFoundException {
+ protected void setUpMockBucket(int id) {
BucketRegion mockBucket = Mockito.mock(BucketRegion.class);
BucketRegion fileAndChunkBucket = Mockito.mock(BucketRegion.class);
// Allowing the fileAndChunkBucket to behave like a map so that the IndexWriter operations don't
@@ -290,6 +281,64 @@ public class PartitionedRepositoryManagerJUnitTest {
BucketAdvisor mockBucketAdvisor = Mockito.mock(BucketAdvisor.class);
when(fileAndChunkBucket.getBucketAdvisor()).thenReturn(mockBucketAdvisor);
when(mockBucketAdvisor.isPrimary()).thenReturn(true);
- return mockBucket;
}
+
+ @Test
+ public void queryByRegionWaitingForRepoToBeCreated() {
+ setUpMockBucket(0);
+ setUpMockBucket(1);
+
+ setupIsIndexAvailable();
+
+ Set<Integer> buckets = new LinkedHashSet<>(Arrays.asList(0, 1));
+ InternalRegionFunctionContext ctx = Mockito.mock(InternalRegionFunctionContext.class);
+ when(ctx.getLocalBucketSet((any()))).thenReturn(buckets);
+ final Collection<IndexRepository> repositories = new HashSet<>();
+
+ Awaitility.await().pollDelay(1, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+ .atMost(500, TimeUnit.SECONDS).until(() -> {
+ try {
+ repositories.addAll(repoManager.getRepositories(ctx));
+ } catch (BucketNotFoundException | LuceneIndexCreationInProgressException e) {
+ }
+ return repositories.size() == 2;
+ });
+
+ Iterator<IndexRepository> itr = repositories.iterator();
+ IndexRepositoryImpl repo0 = (IndexRepositoryImpl) itr.next();
+ IndexRepositoryImpl repo1 = (IndexRepositoryImpl) itr.next();
+
+ assertNotNull(repo0);
+ assertNotNull(repo1);
+ assertNotEquals(repo0, repo1);
+
+ checkRepositoryContainsBucket(repo0, 0, 1);
+ checkRepositoryContainsBucket(repo1, 0, 1);
+ }
+
+ private void setupIsIndexAvailable() {
+ when(indexForPR.isIndexAvailable(1)).then((Answer) invocation -> {
+ boolean result;
+ Boolean isAvailable = isIndexAvailableMap.get(1);
+ if (isAvailable == null || !isAvailable) {
+ isIndexAvailableMap.put(1, true);
+ result = false;
+ } else {
+ result = true;
+ }
+ return result;
+ });
+ when(indexForPR.isIndexAvailable(0)).then((Answer) invocation -> {
+ boolean result;
+ Boolean isAvailable = isIndexAvailableMap.get(0);
+ if (isAvailable == null || !isAvailable) {
+ isIndexAvailableMap.put(0, true);
+ result = false;
+ } else {
+ result = true;
+ }
+ return result;
+ });
+ }
+
}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
index a000d2f..ffe24ce 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
@@ -19,6 +19,9 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
@@ -35,12 +38,11 @@ import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
import org.apache.geode.test.fake.Fakes;
-import org.apache.geode.test.junit.categories.LuceneTest;
public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryManagerJUnitTest {
@Before
- public void setUp() {
+ public void setUp() throws IOException {
cache = Fakes.cache();
userRegion = Mockito.mock(PartitionedRegion.class);
@@ -53,11 +55,12 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
@After
public void tearDown() {
- ((RawLuceneRepositoryManager) repoManager).close();
+ repoManager.close();
}
- protected void createIndexAndRepoManager() {
- LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory();
+ protected void createIndexAndRepoManager() throws IOException {
+ String luceneFolderPath = temporaryFolder.newFolder("lucene").getPath();
+ LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory(luceneFolderPath);
indexStats = Mockito.mock(LuceneIndexStats.class);
indexForPR = Mockito.mock(LuceneRawIndex.class);
@@ -66,7 +69,8 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
when(indexForPR.getCache()).thenReturn(cache);
when(indexForPR.getRegionPath()).thenReturn("/testRegion");
when(indexForPR.withPersistence()).thenReturn(true);
- repoManager = new RawLuceneRepositoryManager(indexForPR, serializer);
+ repoManager = new RawLuceneRepositoryManager(indexForPR, serializer,
+ Executors.newSingleThreadExecutor(), luceneFolderPath);
repoManager.setUserRegionForRepositoryManager(userRegion);
repoManager.allowRepositoryComputation();
}
@@ -78,14 +82,14 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
}
@Override
- protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) {
+ protected void checkRepositoryContainsBucket(IndexRepositoryImpl repo0, int... bucketIds) {
IndexWriter writer0 = repo0.getWriter();
Directory dir0 = writer0.getDirectory();
assertTrue(dir0 instanceof NIOFSDirectory);
}
@Override
- protected BucketRegion setUpMockBucket(int id) throws BucketNotFoundException {
+ protected void setUpMockBucket(int id) {
BucketRegion mockBucket = Mockito.mock(BucketRegion.class);
when(mockBucket.getId()).thenReturn(id);
when(userRegion.getBucketRegion(eq(id), eq(null))).thenReturn(mockBucket);
@@ -95,7 +99,6 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
dataBuckets.put(id, mockBucket);
repoManager.computeRepository(mockBucket.getId());
- return mockBucket;
}
@Test
--
To stop receiving notification emails like this one, please contact
udo@apache.org.