You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/05/03 17:17:08 UTC
[geode] branch develop updated: GEODE-5055: Handle index creation
in progress. (#1847)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new be29133 GEODE-5055: Handle index creation in progress. (#1847)
be29133 is described below
commit be2913392c0edf3ef468729fb09346a4b1d41dff
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Thu May 3 10:16:59 2018 -0700
GEODE-5055: Handle index creation in progress. (#1847)
* If all the servers are above the 1.6.0 version then the LuceneIndexCreationInProgressException will be propogated up to the servers
* If there are servers which are lower than 1.6.0 then get repository will be re-executed but it will wait till the indexes are available.
---
.../internal/PartitionedRepositoryManager.java | 9 ++++-
.../internal/distributed/LuceneQueryFunction.java | 41 +++++++++++++++++++++-
.../internal/repository/RepositoryManager.java | 13 +++++++
.../LuceneSearchWithRollingUpgradeDUnit.java | 36 +++++++++++++++----
.../distributed/LuceneQueryFunctionJUnitTest.java | 11 +++---
5 files changed, 97 insertions(+), 13 deletions(-)
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
index bc8450e..50cad28 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
@@ -77,6 +77,12 @@ public class PartitionedRepositoryManager implements RepositoryManager {
@Override
public Collection<IndexRepository> getRepositories(RegionFunctionContext ctx)
throws BucketNotFoundException {
+ return getRepositories(ctx, false);
+ }
+
+ @Override
+ public Collection<IndexRepository> getRepositories(RegionFunctionContext ctx,
+ boolean waitForRepository) throws BucketNotFoundException {
Region<Object, Object> region = ctx.getDataSet();
Set<Integer> buckets = ((InternalRegionFunctionContext) ctx).getLocalBucketSet(region);
ArrayList<IndexRepository> repos = new ArrayList<IndexRepository>(buckets.size());
@@ -86,7 +92,8 @@ public class PartitionedRepositoryManager implements RepositoryManager {
throw new BucketNotFoundException(
"User bucket was not found for region " + region + "bucket id " + bucketId);
} else {
- if (index.isIndexAvailable(userBucket.getId()) || userBucket.isEmpty()) {
+ if (index.isIndexAvailable(userBucket.getId()) || userBucket.isEmpty()
+ || waitForRepository) {
repos.add(getRepository(userBucket.getId()));
} else {
waitingThreadPoolFromDM.execute(() -> {
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java
index 135abec..716838d 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java
@@ -18,6 +18,7 @@ package org.apache.geode.cache.lucene.internal.distributed;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.Query;
@@ -35,12 +36,16 @@ import org.apache.geode.cache.lucene.LuceneQueryProvider;
import org.apache.geode.cache.lucene.LuceneService;
import org.apache.geode.cache.lucene.LuceneServiceProvider;
import org.apache.geode.cache.lucene.internal.InternalLuceneIndex;
+import org.apache.geode.cache.lucene.internal.LuceneIndexCreationInProgressException;
import org.apache.geode.cache.lucene.internal.LuceneIndexStats;
import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
import org.apache.geode.cache.lucene.internal.repository.IndexResultCollector;
import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PrimaryBucketException;
import org.apache.geode.internal.cache.execute.InternalFunction;
import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
@@ -60,6 +65,33 @@ public class LuceneQueryFunction implements InternalFunction<LuceneFunctionConte
@Override
public void execute(FunctionContext<LuceneFunctionContext> context) {
+ execute(context, false);
+ }
+
+ private void handleException(LuceneIndexCreationInProgressException ex,
+ RegionFunctionContext ctx) {
+ PartitionedRegion userDataRegion = (PartitionedRegion) ctx.getDataSet();
+
+ // get the remote members
+ Set<InternalDistributedMember> remoteMembers =
+ userDataRegion.getRegionAdvisor().adviseAllPRNodes();
+ // Old members with version numbers 1.6 or lower cannot handle IndexCreationInProgressException
+ // Hence the query waits for the repositories to be ready instead of throwing the exception
+ if (!remoteMembers.isEmpty()) {
+ for (InternalDistributedMember remoteMember : remoteMembers) {
+ if (remoteMember.getVersionObject().ordinal() <= Version.GEODE_160.ordinal()) {
+ // re-execute but wait till indexing is complete
+ execute(ctx, true);
+ return;
+ }
+ }
+ }
+ // Will return the LuceneIndexCreationInProgressException as the new servers can handle this
+ // exception
+ throw ex;
+ }
+
+ public void execute(FunctionContext<LuceneFunctionContext> context, boolean waitForRepository) {
RegionFunctionContext ctx = (RegionFunctionContext) context;
ResultSender<TopEntriesCollector> resultSender = ctx.getResultSender();
@@ -102,7 +134,7 @@ public class LuceneQueryFunction implements InternalFunction<LuceneFunctionConte
Collection<IndexRepository> repositories = null;
try {
- repositories = repoManager.getRepositories(ctx);
+ repositories = repoManager.getRepositories(ctx, waitForRepository);
for (IndexRepository repo : repositories) {
IndexResultCollector collector = manager.newCollector(repo.toString());
@@ -122,6 +154,13 @@ public class LuceneQueryFunction implements InternalFunction<LuceneFunctionConte
| PrimaryBucketException e) {
logger.debug("Exception during lucene query function", e);
throw new InternalFunctionInvocationTargetException(e);
+ } catch (LuceneIndexCreationInProgressException ex) {
+ if (!waitForRepository) {
+ handleException(ex, ctx);
+ } else {
+ logger.debug("The lucene query should have waited for the index to be created");
+ throw ex;
+ }
}
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java
index cf69b43..3f7d292 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java
@@ -41,6 +41,19 @@ public interface RepositoryManager {
throws BucketNotFoundException;
/**
+ * Returns a collection of {@link IndexRepository} instances hosting index data of the input list
+ * of bucket ids. The bucket needs to be present on this member.
+ *
+ * The method will wait till the index is ready to be used if the waitOnRetry boolean is set to
+ * true
+ *
+ * @return a collection of {@link IndexRepository} instances
+ * @throws BucketNotFoundException if any of the requested buckets is not found on this member
+ */
+ Collection<IndexRepository> getRepositories(RegionFunctionContext context, boolean waitOnRetry)
+ throws BucketNotFoundException;
+
+ /**
* Closes this {@link RepositoryManager}
*/
void close();
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
index dc999eb..b1017ae 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -274,10 +275,14 @@ public class LuceneSearchWithRollingUpgradeDUnit extends JUnit4DistributedTestCa
shortcut.name(), regionName, locatorPorts);
expectedRegionSize += 10;
putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15,
- 25, server1, server2);
+ 25, server2);
+ putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15,
+ 25, server1);
expectedRegionSize += 5;
putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 20,
- 30, server1, server2);
+ 30, server2);
+ putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 20,
+ 30, server1);
server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType, null,
shortcut.name(), regionName, locatorPorts);
@@ -735,20 +740,39 @@ public class LuceneSearchWithRollingUpgradeDUnit extends JUnit4DistributedTestCa
.getMethod("create", String.class, String.class, String.class, String.class)
.invoke(luceneQueryFactory, INDEX_NAME, regionName, "active", "status");
- Collection resultsActive =
- (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
+ Collection resultsActive = executeLuceneQuery(luceneQuery);
luceneQuery = luceneQueryFactory.getClass()
.getMethod("create", String.class, String.class, String.class, String.class)
.invoke(luceneQueryFactory, INDEX_NAME, regionName, "inactive", "status");
- Collection resultsInactive =
- (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
+ Collection resultsInactive = executeLuceneQuery(luceneQuery);
assertEquals("Result size not as expected ", expectedRegionSize,
resultsActive.size() + resultsInactive.size());
}
+ private Collection executeLuceneQuery(Object luceneQuery)
+ throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+ Collection results = null;
+ int retryCount = 10;
+ while (true) {
+ try {
+ results = (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
+ break;
+ } catch (Exception ex) {
+ if (!ex.getCause().getMessage().contains("currently indexing")) {
+ throw ex;
+ }
+ if (--retryCount == 0) {
+ throw ex;
+ }
+ }
+ }
+ return results;
+
+ }
+
private void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize,
VM... vms) {
for (VM vm : vms) {
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
index da82cf3..4dce809 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
@@ -94,7 +94,7 @@ public class LuceneQueryFunctionJUnitTest {
when(mockContext.getDataSet()).thenReturn(mockRegion);
when(mockContext.getArguments()).thenReturn(searchArgs);
when(mockContext.getResultSender()).thenReturn(mockResultSender);
- when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+ when(mockRepoManager.getRepositories(eq(mockContext), eq(false))).thenReturn(repos);
doAnswer(invocation -> {
IndexResultCollector collector = invocation.getArgument(2);
collector.collect(r1_1.getKey(), r1_1.getScore());
@@ -136,7 +136,7 @@ public class LuceneQueryFunctionJUnitTest {
when(mockContext.getDataSet()).thenReturn(mockRegion);
when(mockContext.getArguments()).thenReturn(searchArgs);
when(mockContext.getResultSender()).thenReturn(mockResultSender);
- when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+ when(mockRepoManager.getRepositories(eq(mockContext), eq(false))).thenReturn(repos);
doAnswer(invocation -> {
IndexResultCollector collector = invocation.getArgument(2);
@@ -177,7 +177,7 @@ public class LuceneQueryFunctionJUnitTest {
when(mockContext.getArguments()).thenReturn(searchArgs);
when(mockContext.getResultSender()).thenReturn(mockResultSender);
repos.remove(0);
- when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+ when(mockRepoManager.getRepositories(eq(mockContext), eq(false))).thenReturn(repos);
when(mockManager.newCollector(eq("repo2"))).thenReturn(mockCollector);
when(mockManager.reduce(any(Collection.class))).thenAnswer(invocation -> {
Collection<IndexResultCollector> collectors = invocation.getArgument(0);
@@ -208,7 +208,7 @@ public class LuceneQueryFunctionJUnitTest {
when(mockContext.getDataSet()).thenReturn(mockRegion);
when(mockContext.getArguments()).thenReturn(searchArgs);
when(mockContext.getResultSender()).thenReturn(mockResultSender);
- when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+ when(mockRepoManager.getRepositories(eq(mockContext), eq(false))).thenReturn(repos);
doThrow(IOException.class).when(mockRepository1).query(eq(query),
eq(LuceneQueryFactory.DEFAULT_LIMIT), any(IndexResultCollector.class));
@@ -285,7 +285,8 @@ public class LuceneQueryFunctionJUnitTest {
when(mockContext.getDataSet()).thenReturn(mockRegion);
when(mockContext.getArguments()).thenReturn(searchArgs);
LuceneQueryFunction function = new LuceneQueryFunction();
- when(mockRepoManager.getRepositories(eq(mockContext))).thenThrow(new CacheClosedException());
+ when(mockRepoManager.getRepositories(eq(mockContext), eq(false)))
+ .thenThrow(new CacheClosedException());
function.execute(mockContext);
}
--
To stop receiving notification emails like this one, please contact
nnag@apache.org.