You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by as...@apache.org on 2015/09/02 01:32:35 UTC

incubator-geode git commit: GEODE-11: Add lucene query function

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-11 6f9401e19 -> a379b0c18


GEODE-11: Add lucene query function

* Mocking cache instance needs hadoop dependencies. Add the jars with test scope
* Add search function to execute lucene search on shards and send result to
  aggregator


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a379b0c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a379b0c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a379b0c1

Branch: refs/heads/feature/GEODE-11
Commit: a379b0c182d062a8ebbb45b4f6049917e95f18bd
Parents: 6f9401e
Author: Ashvin Agrawal <as...@apache.org>
Authored: Tue Sep 1 11:46:29 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Tue Sep 1 15:21:49 2015 -0700

----------------------------------------------------------------------
 gemfire-lucene/build.gradle                     |   7 +-
 .../distributed/LuceneQueryFunction.java        | 100 ++++++++++
 .../distributed/LuceneSearchFunctionArgs.java   |  38 ++++
 .../internal/repository/RepositoryManager.java  |   5 +-
 .../LuceneQueryFunctionJUnitTest.java           | 196 +++++++++++++++++++
 .../internal/mergers/TopDocsMergeJUnitTest.java |   2 +-
 6 files changed, 344 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a379b0c1/gemfire-lucene/build.gradle
----------------------------------------------------------------------
diff --git a/gemfire-lucene/build.gradle b/gemfire-lucene/build.gradle
index f8acd53..53d4730 100644
--- a/gemfire-lucene/build.gradle
+++ b/gemfire-lucene/build.gradle
@@ -14,7 +14,12 @@ dependencies {
     //the test framework somehow? We've disabled them globally in the parent
     //build.gadle.
     testCompile 'com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.1.6'
-    
+
+    // the following test dependencies are needed for mocking cache instance
+    testRuntime 'org.apache.hadoop:hadoop-common:2.4.1'
+    testRuntime 'org.apache.hadoop:hadoop-hdfs:2.4.1'
+    testRuntime 'com.google.guava:guava:11.0.2'
+    testRuntime 'commons-collections:commons-collections:3.2.1'
 }
 
 //The lucene integration tests don't have any issues that requiring forking

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a379b0c1/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
new file mode 100644
index 0000000..6f59329
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
@@ -0,0 +1,100 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.FunctionAdapter;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneQueryResultsImpl;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneResultStructImpl;
+import com.gemstone.gemfire.cache.lucene.internal.mergers.TopDocsResultMerger;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * {@link LuceneQueryFunction} coordinates text search on a member. It receives text search query from the coordinator
+ * and arguments like region and buckets. It invokes search on the local index and provides a result collector. The
+ * locally collected results are sent to the search coordinator.
+ */
+public class LuceneQueryFunction extends FunctionAdapter {
+  private static final long serialVersionUID = 1L;
+  public static final String ID = LuceneQueryFunction.class.getName();
+
+  private static final Logger logger = LogService.getLogger();
+
+  private RepositoryManager repoManager;
+
+  @Override
+  public void execute(FunctionContext context) {
+    RegionFunctionContext ctx = (RegionFunctionContext) context;
+    ResultSender<LuceneQueryResults> resultSender = ctx.getResultSender();
+
+    Region region = ctx.getDataSet();
+    if (logger.isDebugEnabled()) {
+      logger.debug("Executing lucene query on region:" + region.getFullPath());
+    }
+
+    LuceneSearchFunctionArgs args = (LuceneSearchFunctionArgs) ctx.getArguments();
+    Set<Integer> buckets = args == null ? null : args.getBuckets();
+
+    List<LuceneQueryResults> results = new ArrayList<>();
+    try {
+      Collection<IndexRepository> repositories = repoManager.getRepositories(region, buckets);
+      for (IndexRepository repo : repositories) {
+        ShardResultCollector collector = new ShardResultCollector();
+        logger.debug("Executing search on repo: " + repo);
+        repo.query(null, 0, collector);
+        results.add(collector.getResult());
+      }
+    } catch (IOException e) {
+      logger.warn("", e);
+      resultSender.sendException(e);
+      return;
+    } catch (BucketNotFoundException e) {
+      logger.warn("", e);
+      resultSender.sendException(e);
+      return;
+    }
+
+    TopDocsResultMerger merger = new TopDocsResultMerger();
+    LuceneQueryResults merged = merger.mergeResults(results);
+    resultSender.lastResult(merged);
+  }
+
+  /**
+   * Collects and merges results from {@link IndexRepository}s
+   */
+  class ShardResultCollector implements IndexResultCollector {
+    LuceneQueryResultsImpl result = new LuceneQueryResultsImpl();
+    
+    @Override
+    public void collect(Object key, float score) {
+      result.addHit(new LuceneResultStructImpl(key, score));
+    }
+
+    public LuceneQueryResults getResult() {
+      return result;
+    }
+  }
+
+  void setRepositoryManager(RepositoryManager manager) {
+    this.repoManager = manager;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a379b0c1/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
new file mode 100644
index 0000000..f68e373
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
@@ -0,0 +1,38 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+
+/**
+ * Contains function arguments for text / lucene search
+ */
+public class LuceneSearchFunctionArgs implements VersionedDataSerializable {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public Set<Integer> getBuckets() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a379b0c1/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
index 418941a..e8ea3e6 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
@@ -1,6 +1,7 @@
 package com.gemstone.gemfire.cache.lucene.internal.repository;
 
 import java.util.Collection;
+import java.util.Set;
 
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
@@ -18,10 +19,10 @@ public interface RepositoryManager {
    * bucket needs to be present on this member.
    * 
    * @param region
-   * @param bucketIds buckets of a Partitioned region for which {@link IndexRepository}s needs to be discovered. null
+   * @param buckets buckets of a Partitioned region for which {@link IndexRepository}s needs to be discovered. null
    *          for all primary buckets on this member or if the region is Replicated.
    * @return a collection of {@link IndexRepository} instances
    * @throws BucketNotFoundException if any of the requested buckets is not found on this member
    */
-  Collection<IndexRepository> getRepositories(Region region, int[] bucketIds) throws BucketNotFoundException;
+  Collection<IndexRepository> getRepositories(Region region, Set<Integer> buckets) throws BucketNotFoundException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a379b0c1/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
new file mode 100644
index 0000000..c23dffe
--- /dev/null
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
@@ -0,0 +1,196 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.lucene.search.Query;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.api.Invocation;
+import org.jmock.lib.action.CustomAction;
+import org.jmock.lib.concurrent.Synchroniser;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
+import com.gemstone.gemfire.cache.lucene.LuceneResultStruct;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneResultStructImpl;
+import com.gemstone.gemfire.cache.lucene.internal.mergers.TopDocsMergeJUnitTest;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class LuceneQueryFunctionJUnitTest {
+  Mockery mocker;
+
+  @Test
+  public void testRepoQueryAndMerge() throws Exception {
+    final LuceneResultStructImpl r1_1 = new LuceneResultStructImpl("key-1-1", .5f);
+    final LuceneResultStructImpl r1_2 = new LuceneResultStructImpl("key-1-2", .4f);
+    final LuceneResultStructImpl r1_3 = new LuceneResultStructImpl("key-1-3", .3f);
+    final LuceneResultStructImpl r2_1 = new LuceneResultStructImpl("key-2-1", .45f);
+    final LuceneResultStructImpl r2_2 = new LuceneResultStructImpl("key-2-2", .35f);
+
+    final AtomicReference<LuceneQueryResults> result = new AtomicReference<>();
+
+    QueryMocks m = new QueryMocks();
+    mocker.checking(new Expectations() {
+      {
+        oneOf(m.mockContext).getDataSet();
+        will(returnValue(m.mockRegion));
+        oneOf(m.mockContext).getArguments();
+        will(returnValue(null));
+
+        oneOf(m.mockRepoManager).getRepositories(m.mockRegion, null);
+        will(returnValue(m.repos));
+
+        oneOf(m.mockContext).getResultSender();
+        will(returnValue(m.mockResultSender));
+
+        oneOf(m.mockRepository1).query(with(aNull(Query.class)), with(equal(0)), with(any(IndexResultCollector.class)));
+        will(new CustomAction("streamSearchResults") {
+          @Override
+          public Object invoke(Invocation invocation) throws Throwable {
+            IndexResultCollector collector = (IndexResultCollector) invocation.getParameter(2);
+            collector.collect(r1_1.getKey(), r1_1.getScore());
+            collector.collect(r1_2.getKey(), r1_2.getScore());
+            collector.collect(r1_3.getKey(), r1_3.getScore());
+            return null;
+          }
+        });
+        
+        oneOf(m.mockRepository2).query(with(aNull(Query.class)), with(equal(0)), with(any(IndexResultCollector.class)));
+        will(new CustomAction("streamSearchResults") {
+          @Override
+          public Object invoke(Invocation invocation) throws Throwable {
+            IndexResultCollector collector = (IndexResultCollector) invocation.getParameter(2);
+            collector.collect(r2_1.getKey(), r2_1.getScore());
+            collector.collect(r2_2.getKey(), r2_2.getScore());
+            return null;
+          }
+        });
+
+        oneOf(m.mockResultSender).lastResult(with(any(LuceneQueryResults.class)));
+        will(new CustomAction("collectResult") {
+          @Override
+          public Object invoke(Invocation invocation) throws Throwable {
+            result.set((LuceneQueryResults) invocation.getParameter(0));
+            return null;
+          }
+        });
+      }
+    });
+
+    LuceneQueryFunction function = new LuceneQueryFunction();
+    function.setRepositoryManager(m.mockRepoManager);
+
+    function.execute(m.mockContext);
+    List<LuceneResultStruct> hits = result.get().getHits();
+    assertEquals(5, hits.size());
+    TopDocsMergeJUnitTest.verifyResultOrder(result.get(), r1_1, r2_1, r1_2, r2_2, r1_3);
+  }
+
+  @Test
+  public void testIndexRepoQueryFails() throws Exception {
+    QueryMocks m = new QueryMocks();
+    mocker.checking(new Expectations() {
+      {
+        oneOf(m.mockContext).getDataSet();
+        will(returnValue(m.mockRegion));
+        oneOf(m.mockContext).getArguments();
+        will(returnValue(null));
+
+        oneOf(m.mockRepoManager).getRepositories(m.mockRegion, null);
+        will(returnValue(m.repos));
+
+        oneOf(m.mockContext).getResultSender();
+        will(returnValue(m.mockResultSender));
+        oneOf(m.mockResultSender).sendException(with(any(IOException.class)));
+
+        oneOf(m.mockRepository1).query(with(aNull(Query.class)), with(equal(0)), with(any(IndexResultCollector.class)));
+        will(throwException(new IOException()));
+      }
+    });
+
+    LuceneQueryFunction function = new LuceneQueryFunction();
+    function.setRepositoryManager(m.mockRepoManager);
+
+    function.execute(m.mockContext);
+  }
+
+  @Test
+  public void testBucketNotFound() throws Exception {
+    QueryMocks m = new QueryMocks();
+    mocker.checking(new Expectations() {
+      {
+        oneOf(m.mockContext).getDataSet();
+        will(returnValue(m.mockRegion));
+        oneOf(m.mockContext).getArguments();
+        will(returnValue(null));
+
+        oneOf(m.mockRepoManager).getRepositories(m.mockRegion, null);
+        will(throwException(new BucketNotFoundException("")));
+
+        oneOf(m.mockContext).getResultSender();
+        will(returnValue(m.mockResultSender));
+        oneOf(m.mockResultSender).sendException(with(any(BucketNotFoundException.class)));
+      }
+    });
+
+    LuceneQueryFunction function = new LuceneQueryFunction();
+    function.setRepositoryManager(m.mockRepoManager);
+
+    function.execute(m.mockContext);
+  }
+
+  @Test
+  public void testQueryFunctionId() {
+    String id = new LuceneQueryFunction().getId();
+    assertEquals(LuceneQueryFunction.class.getName(), id);
+  }
+
+  class QueryMocks {
+    RegionFunctionContext mockContext = mocker.mock(RegionFunctionContext.class);
+    ResultSender<LuceneQueryResults> mockResultSender = mocker.mock(ResultSender.class);
+    Region<Object, Object> mockRegion = mocker.mock(Region.class);
+
+    RepositoryManager mockRepoManager = mocker.mock(RepositoryManager.class);
+    ArrayList<IndexRepository> repos = new ArrayList<IndexRepository>();
+    IndexRepository mockRepository1 = mocker.mock(IndexRepository.class, "repo1");
+    IndexRepository mockRepository2 = mocker.mock(IndexRepository.class, "repo2");
+
+    QueryMocks() {
+      repos.add(mockRepository1);
+      repos.add(mockRepository2);
+    }
+  }
+
+  @Before
+  public void setupMock() {
+    mocker = new Mockery() {
+      {
+        setImposteriser(ClassImposteriser.INSTANCE);
+        setThreadingPolicy(new Synchroniser());
+      }
+    };
+  }
+
+  @After
+  public void validateMock() {
+    mocker.assertIsSatisfied();
+    mocker = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a379b0c1/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/mergers/TopDocsMergeJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/mergers/TopDocsMergeJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/mergers/TopDocsMergeJUnitTest.java
index 65574cd..8c6899f 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/mergers/TopDocsMergeJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/mergers/TopDocsMergeJUnitTest.java
@@ -170,7 +170,7 @@ public class TopDocsMergeJUnitTest {
     verifyResultOrder(merged, r1_1, r2_1, r1_2, r1_3, r2_2);
   }
 
-  private void verifyResultOrder(LuceneQueryResults merged, LuceneResultStruct... results) {
+  public static void verifyResultOrder(LuceneQueryResults merged, LuceneResultStruct... results) {
     int i = 0;
     for (LuceneResultStruct result : results) {
       LuceneResultStruct doc = merged.getHits().get(i++);