You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/05/03 17:17:08 UTC

[geode] branch develop updated: GEODE-5055: Handle index creation in progress. (#1847)

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new be29133  GEODE-5055: Handle index creation in progress. (#1847)
be29133 is described below

commit be2913392c0edf3ef468729fb09346a4b1d41dff
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Thu May 3 10:16:59 2018 -0700

    GEODE-5055: Handle index creation in progress. (#1847)
    
             * If all the servers are above the 1.6.0 version then the LuceneIndexCreationInProgressException will be propogated up to the servers
    	* If there are servers which are lower than 1.6.0 then get repository will be re-executed but it will wait till the indexes are available.
---
 .../internal/PartitionedRepositoryManager.java     |  9 ++++-
 .../internal/distributed/LuceneQueryFunction.java  | 41 +++++++++++++++++++++-
 .../internal/repository/RepositoryManager.java     | 13 +++++++
 .../LuceneSearchWithRollingUpgradeDUnit.java       | 36 +++++++++++++++----
 .../distributed/LuceneQueryFunctionJUnitTest.java  | 11 +++---
 5 files changed, 97 insertions(+), 13 deletions(-)

diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
index bc8450e..50cad28 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java
@@ -77,6 +77,12 @@ public class PartitionedRepositoryManager implements RepositoryManager {
   @Override
   public Collection<IndexRepository> getRepositories(RegionFunctionContext ctx)
       throws BucketNotFoundException {
+    return getRepositories(ctx, false);
+  }
+
+  @Override
+  public Collection<IndexRepository> getRepositories(RegionFunctionContext ctx,
+      boolean waitForRepository) throws BucketNotFoundException {
     Region<Object, Object> region = ctx.getDataSet();
     Set<Integer> buckets = ((InternalRegionFunctionContext) ctx).getLocalBucketSet(region);
     ArrayList<IndexRepository> repos = new ArrayList<IndexRepository>(buckets.size());
@@ -86,7 +92,8 @@ public class PartitionedRepositoryManager implements RepositoryManager {
         throw new BucketNotFoundException(
             "User bucket was not found for region " + region + "bucket id " + bucketId);
       } else {
-        if (index.isIndexAvailable(userBucket.getId()) || userBucket.isEmpty()) {
+        if (index.isIndexAvailable(userBucket.getId()) || userBucket.isEmpty()
+            || waitForRepository) {
           repos.add(getRepository(userBucket.getId()));
         } else {
           waitingThreadPoolFromDM.execute(() -> {
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java
index 135abec..716838d 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java
@@ -18,6 +18,7 @@ package org.apache.geode.cache.lucene.internal.distributed;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.search.Query;
@@ -35,12 +36,16 @@ import org.apache.geode.cache.lucene.LuceneQueryProvider;
 import org.apache.geode.cache.lucene.LuceneService;
 import org.apache.geode.cache.lucene.LuceneServiceProvider;
 import org.apache.geode.cache.lucene.internal.InternalLuceneIndex;
+import org.apache.geode.cache.lucene.internal.LuceneIndexCreationInProgressException;
 import org.apache.geode.cache.lucene.internal.LuceneIndexStats;
 import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
 import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
 import org.apache.geode.cache.lucene.internal.repository.IndexResultCollector;
 import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PrimaryBucketException;
 import org.apache.geode.internal.cache.execute.InternalFunction;
 import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
@@ -60,6 +65,33 @@ public class LuceneQueryFunction implements InternalFunction<LuceneFunctionConte
 
   @Override
   public void execute(FunctionContext<LuceneFunctionContext> context) {
+    execute(context, false);
+  }
+
+  private void handleException(LuceneIndexCreationInProgressException ex,
+      RegionFunctionContext ctx) {
+    PartitionedRegion userDataRegion = (PartitionedRegion) ctx.getDataSet();
+
+    // get the remote members
+    Set<InternalDistributedMember> remoteMembers =
+        userDataRegion.getRegionAdvisor().adviseAllPRNodes();
+    // Old members with version numbers 1.6 or lower cannot handle IndexCreationInProgressException
+    // Hence the query waits for the repositories to be ready instead of throwing the exception
+    if (!remoteMembers.isEmpty()) {
+      for (InternalDistributedMember remoteMember : remoteMembers) {
+        if (remoteMember.getVersionObject().ordinal() <= Version.GEODE_160.ordinal()) {
+          // re-execute but wait till indexing is complete
+          execute(ctx, true);
+          return;
+        }
+      }
+    }
+    // Will return the LuceneIndexCreationInProgressException as the new servers can handle this
+    // exception
+    throw ex;
+  }
+
+  public void execute(FunctionContext<LuceneFunctionContext> context, boolean waitForRepository) {
     RegionFunctionContext ctx = (RegionFunctionContext) context;
     ResultSender<TopEntriesCollector> resultSender = ctx.getResultSender();
 
@@ -102,7 +134,7 @@ public class LuceneQueryFunction implements InternalFunction<LuceneFunctionConte
       Collection<IndexRepository> repositories = null;
 
       try {
-        repositories = repoManager.getRepositories(ctx);
+        repositories = repoManager.getRepositories(ctx, waitForRepository);
 
         for (IndexRepository repo : repositories) {
           IndexResultCollector collector = manager.newCollector(repo.toString());
@@ -122,6 +154,13 @@ public class LuceneQueryFunction implements InternalFunction<LuceneFunctionConte
         | PrimaryBucketException e) {
       logger.debug("Exception during lucene query function", e);
       throw new InternalFunctionInvocationTargetException(e);
+    } catch (LuceneIndexCreationInProgressException ex) {
+      if (!waitForRepository) {
+        handleException(ex, ctx);
+      } else {
+        logger.debug("The lucene query should have waited for the index to be created");
+        throw ex;
+      }
     }
   }
 
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java
index cf69b43..3f7d292 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java
@@ -41,6 +41,19 @@ public interface RepositoryManager {
       throws BucketNotFoundException;
 
   /**
+   * Returns a collection of {@link IndexRepository} instances hosting index data of the input list
+   * of bucket ids. The bucket needs to be present on this member.
+   *
+   * The method will wait till the index is ready to be used if the waitOnRetry boolean is set to
+   * true
+   *
+   * @return a collection of {@link IndexRepository} instances
+   * @throws BucketNotFoundException if any of the requested buckets is not found on this member
+   */
+  Collection<IndexRepository> getRepositories(RegionFunctionContext context, boolean waitOnRetry)
+      throws BucketNotFoundException;
+
+  /**
    * Closes this {@link RepositoryManager}
    */
   void close();
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
index dc999eb..b1017ae 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -274,10 +275,14 @@ public class LuceneSearchWithRollingUpgradeDUnit extends JUnit4DistributedTestCa
           shortcut.name(), regionName, locatorPorts);
       expectedRegionSize += 10;
       putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15,
-          25, server1, server2);
+          25, server2);
+      putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15,
+          25, server1);
       expectedRegionSize += 5;
       putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 20,
-          30, server1, server2);
+          30, server2);
+      putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 20,
+          30, server1);
 
       server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType, null,
           shortcut.name(), regionName, locatorPorts);
@@ -735,20 +740,39 @@ public class LuceneSearchWithRollingUpgradeDUnit extends JUnit4DistributedTestCa
         .getMethod("create", String.class, String.class, String.class, String.class)
         .invoke(luceneQueryFactory, INDEX_NAME, regionName, "active", "status");
 
-    Collection resultsActive =
-        (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
+    Collection resultsActive = executeLuceneQuery(luceneQuery);
 
     luceneQuery = luceneQueryFactory.getClass()
         .getMethod("create", String.class, String.class, String.class, String.class)
         .invoke(luceneQueryFactory, INDEX_NAME, regionName, "inactive", "status");
 
-    Collection resultsInactive =
-        (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
+    Collection resultsInactive = executeLuceneQuery(luceneQuery);
 
     assertEquals("Result size not as expected ", expectedRegionSize,
         resultsActive.size() + resultsInactive.size());
   }
 
+  private Collection executeLuceneQuery(Object luceneQuery)
+      throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+    Collection results = null;
+    int retryCount = 10;
+    while (true) {
+      try {
+        results = (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
+        break;
+      } catch (Exception ex) {
+        if (!ex.getCause().getMessage().contains("currently indexing")) {
+          throw ex;
+        }
+        if (--retryCount == 0) {
+          throw ex;
+        }
+      }
+    }
+    return results;
+
+  }
+
   private void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize,
       VM... vms) {
     for (VM vm : vms) {
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
index da82cf3..4dce809 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
@@ -94,7 +94,7 @@ public class LuceneQueryFunctionJUnitTest {
     when(mockContext.getDataSet()).thenReturn(mockRegion);
     when(mockContext.getArguments()).thenReturn(searchArgs);
     when(mockContext.getResultSender()).thenReturn(mockResultSender);
-    when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+    when(mockRepoManager.getRepositories(eq(mockContext), eq(false))).thenReturn(repos);
     doAnswer(invocation -> {
       IndexResultCollector collector = invocation.getArgument(2);
       collector.collect(r1_1.getKey(), r1_1.getScore());
@@ -136,7 +136,7 @@ public class LuceneQueryFunctionJUnitTest {
     when(mockContext.getDataSet()).thenReturn(mockRegion);
     when(mockContext.getArguments()).thenReturn(searchArgs);
     when(mockContext.getResultSender()).thenReturn(mockResultSender);
-    when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+    when(mockRepoManager.getRepositories(eq(mockContext), eq(false))).thenReturn(repos);
 
     doAnswer(invocation -> {
       IndexResultCollector collector = invocation.getArgument(2);
@@ -177,7 +177,7 @@ public class LuceneQueryFunctionJUnitTest {
     when(mockContext.getArguments()).thenReturn(searchArgs);
     when(mockContext.getResultSender()).thenReturn(mockResultSender);
     repos.remove(0);
-    when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+    when(mockRepoManager.getRepositories(eq(mockContext), eq(false))).thenReturn(repos);
     when(mockManager.newCollector(eq("repo2"))).thenReturn(mockCollector);
     when(mockManager.reduce(any(Collection.class))).thenAnswer(invocation -> {
       Collection<IndexResultCollector> collectors = invocation.getArgument(0);
@@ -208,7 +208,7 @@ public class LuceneQueryFunctionJUnitTest {
     when(mockContext.getDataSet()).thenReturn(mockRegion);
     when(mockContext.getArguments()).thenReturn(searchArgs);
     when(mockContext.getResultSender()).thenReturn(mockResultSender);
-    when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+    when(mockRepoManager.getRepositories(eq(mockContext), eq(false))).thenReturn(repos);
     doThrow(IOException.class).when(mockRepository1).query(eq(query),
         eq(LuceneQueryFactory.DEFAULT_LIMIT), any(IndexResultCollector.class));
 
@@ -285,7 +285,8 @@ public class LuceneQueryFunctionJUnitTest {
     when(mockContext.getDataSet()).thenReturn(mockRegion);
     when(mockContext.getArguments()).thenReturn(searchArgs);
     LuceneQueryFunction function = new LuceneQueryFunction();
-    when(mockRepoManager.getRepositories(eq(mockContext))).thenThrow(new CacheClosedException());
+    when(mockRepoManager.getRepositories(eq(mockContext), eq(false)))
+        .thenThrow(new CacheClosedException());
     function.execute(mockContext);
   }
 

-- 
To stop receiving notification emails like this one, please contact
nnag@apache.org.