You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by as...@apache.org on 2015/09/02 01:32:35 UTC
incubator-geode git commit: GEODE-11: Add lucene query function
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-11 6f9401e19 -> a379b0c18
GEODE-11: Add lucene query function
* Mocking cache instance needs hadoop dependencies. Add the jars with test scope
* Add search function to execute lucene search on shards and send result to
aggregator
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a379b0c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a379b0c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a379b0c1
Branch: refs/heads/feature/GEODE-11
Commit: a379b0c182d062a8ebbb45b4f6049917e95f18bd
Parents: 6f9401e
Author: Ashvin Agrawal <as...@apache.org>
Authored: Tue Sep 1 11:46:29 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Tue Sep 1 15:21:49 2015 -0700
----------------------------------------------------------------------
gemfire-lucene/build.gradle | 7 +-
.../distributed/LuceneQueryFunction.java | 100 ++++++++++
.../distributed/LuceneSearchFunctionArgs.java | 38 ++++
.../internal/repository/RepositoryManager.java | 5 +-
.../LuceneQueryFunctionJUnitTest.java | 196 +++++++++++++++++++
.../internal/mergers/TopDocsMergeJUnitTest.java | 2 +-
6 files changed, 344 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a379b0c1/gemfire-lucene/build.gradle
----------------------------------------------------------------------
diff --git a/gemfire-lucene/build.gradle b/gemfire-lucene/build.gradle
index f8acd53..53d4730 100644
--- a/gemfire-lucene/build.gradle
+++ b/gemfire-lucene/build.gradle
@@ -14,7 +14,12 @@ dependencies {
//the test framework somehow? We've disabled them globally in the parent
//build.gadle.
testCompile 'com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.1.6'
-
+
+ // the following test dependencies are needed for mocking cache instance
+ testRuntime 'org.apache.hadoop:hadoop-common:2.4.1'
+ testRuntime 'org.apache.hadoop:hadoop-hdfs:2.4.1'
+ testRuntime 'com.google.guava:guava:11.0.2'
+ testRuntime 'commons-collections:commons-collections:3.2.1'
}
//The lucene integration tests don't have any issues that requiring forking
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a379b0c1/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
new file mode 100644
index 0000000..6f59329
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
@@ -0,0 +1,100 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.FunctionAdapter;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneQueryResultsImpl;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneResultStructImpl;
+import com.gemstone.gemfire.cache.lucene.internal.mergers.TopDocsResultMerger;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * {@link LuceneQueryFunction} coordinates text search on a member. It receives text search query from the coordinator
+ * and arguments like region and buckets. It invokes search on the local index and provides a result collector. The
+ * locally collected results are sent to the search coordinator.
+ */
+public class LuceneQueryFunction extends FunctionAdapter {
+ private static final long serialVersionUID = 1L;
+ public static final String ID = LuceneQueryFunction.class.getName();
+
+ private static final Logger logger = LogService.getLogger();
+
+ private RepositoryManager repoManager;
+
+ @Override
+ public void execute(FunctionContext context) {
+ RegionFunctionContext ctx = (RegionFunctionContext) context;
+ ResultSender<LuceneQueryResults> resultSender = ctx.getResultSender();
+
+ Region region = ctx.getDataSet();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Executing lucene query on region:" + region.getFullPath());
+ }
+
+ LuceneSearchFunctionArgs args = (LuceneSearchFunctionArgs) ctx.getArguments();
+ Set<Integer> buckets = args == null ? null : args.getBuckets();
+
+ List<LuceneQueryResults> results = new ArrayList<>();
+ try {
+ Collection<IndexRepository> repositories = repoManager.getRepositories(region, buckets);
+ for (IndexRepository repo : repositories) {
+ ShardResultCollector collector = new ShardResultCollector();
+ logger.debug("Executing search on repo: " + repo);
+ repo.query(null, 0, collector);
+ results.add(collector.getResult());
+ }
+ } catch (IOException e) {
+ logger.warn("", e);
+ resultSender.sendException(e);
+ return;
+ } catch (BucketNotFoundException e) {
+ logger.warn("", e);
+ resultSender.sendException(e);
+ return;
+ }
+
+ TopDocsResultMerger merger = new TopDocsResultMerger();
+ LuceneQueryResults merged = merger.mergeResults(results);
+ resultSender.lastResult(merged);
+ }
+
+ /**
+ * Collects and merges results from {@link IndexRepository}s
+ */
+ class ShardResultCollector implements IndexResultCollector {
+ LuceneQueryResultsImpl result = new LuceneQueryResultsImpl();
+
+ @Override
+ public void collect(Object key, float score) {
+ result.addHit(new LuceneResultStructImpl(key, score));
+ }
+
+ public LuceneQueryResults getResult() {
+ return result;
+ }
+ }
+
+ void setRepositoryManager(RepositoryManager manager) {
+ this.repoManager = manager;
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a379b0c1/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
new file mode 100644
index 0000000..f68e373
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
@@ -0,0 +1,38 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+
+/**
+ * Contains function arguments for text / lucene search
+ */
+public class LuceneSearchFunctionArgs implements VersionedDataSerializable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Set<Integer> getBuckets() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a379b0c1/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
index 418941a..e8ea3e6 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
@@ -1,6 +1,7 @@
package com.gemstone.gemfire.cache.lucene.internal.repository;
import java.util.Collection;
+import java.util.Set;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
@@ -18,10 +19,10 @@ public interface RepositoryManager {
* bucket needs to be present on this member.
*
* @param region
- * @param bucketIds buckets of a Partitioned region for which {@link IndexRepository}s needs to be discovered. null
+ * @param buckets buckets of a Partitioned region for which {@link IndexRepository}s needs to be discovered. null
* for all primary buckets on this member or if the region is Replicated.
* @return a collection of {@link IndexRepository} instances
* @throws BucketNotFoundException if any of the requested buckets is not found on this member
*/
- Collection<IndexRepository> getRepositories(Region region, int[] bucketIds) throws BucketNotFoundException;
+ Collection<IndexRepository> getRepositories(Region region, Set<Integer> buckets) throws BucketNotFoundException;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a379b0c1/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
new file mode 100644
index 0000000..c23dffe
--- /dev/null
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
@@ -0,0 +1,196 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.lucene.search.Query;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.api.Invocation;
+import org.jmock.lib.action.CustomAction;
+import org.jmock.lib.concurrent.Synchroniser;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
+import com.gemstone.gemfire.cache.lucene.LuceneResultStruct;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneResultStructImpl;
+import com.gemstone.gemfire.cache.lucene.internal.mergers.TopDocsMergeJUnitTest;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class LuceneQueryFunctionJUnitTest {
+ Mockery mocker;
+
+ @Test
+ public void testRepoQueryAndMerge() throws Exception {
+ final LuceneResultStructImpl r1_1 = new LuceneResultStructImpl("key-1-1", .5f);
+ final LuceneResultStructImpl r1_2 = new LuceneResultStructImpl("key-1-2", .4f);
+ final LuceneResultStructImpl r1_3 = new LuceneResultStructImpl("key-1-3", .3f);
+ final LuceneResultStructImpl r2_1 = new LuceneResultStructImpl("key-2-1", .45f);
+ final LuceneResultStructImpl r2_2 = new LuceneResultStructImpl("key-2-2", .35f);
+
+ final AtomicReference<LuceneQueryResults> result = new AtomicReference<>();
+
+ QueryMocks m = new QueryMocks();
+ mocker.checking(new Expectations() {
+ {
+ oneOf(m.mockContext).getDataSet();
+ will(returnValue(m.mockRegion));
+ oneOf(m.mockContext).getArguments();
+ will(returnValue(null));
+
+ oneOf(m.mockRepoManager).getRepositories(m.mockRegion, null);
+ will(returnValue(m.repos));
+
+ oneOf(m.mockContext).getResultSender();
+ will(returnValue(m.mockResultSender));
+
+ oneOf(m.mockRepository1).query(with(aNull(Query.class)), with(equal(0)), with(any(IndexResultCollector.class)));
+ will(new CustomAction("streamSearchResults") {
+ @Override
+ public Object invoke(Invocation invocation) throws Throwable {
+ IndexResultCollector collector = (IndexResultCollector) invocation.getParameter(2);
+ collector.collect(r1_1.getKey(), r1_1.getScore());
+ collector.collect(r1_2.getKey(), r1_2.getScore());
+ collector.collect(r1_3.getKey(), r1_3.getScore());
+ return null;
+ }
+ });
+
+ oneOf(m.mockRepository2).query(with(aNull(Query.class)), with(equal(0)), with(any(IndexResultCollector.class)));
+ will(new CustomAction("streamSearchResults") {
+ @Override
+ public Object invoke(Invocation invocation) throws Throwable {
+ IndexResultCollector collector = (IndexResultCollector) invocation.getParameter(2);
+ collector.collect(r2_1.getKey(), r2_1.getScore());
+ collector.collect(r2_2.getKey(), r2_2.getScore());
+ return null;
+ }
+ });
+
+ oneOf(m.mockResultSender).lastResult(with(any(LuceneQueryResults.class)));
+ will(new CustomAction("collectResult") {
+ @Override
+ public Object invoke(Invocation invocation) throws Throwable {
+ result.set((LuceneQueryResults) invocation.getParameter(0));
+ return null;
+ }
+ });
+ }
+ });
+
+ LuceneQueryFunction function = new LuceneQueryFunction();
+ function.setRepositoryManager(m.mockRepoManager);
+
+ function.execute(m.mockContext);
+ List<LuceneResultStruct> hits = result.get().getHits();
+ assertEquals(5, hits.size());
+ TopDocsMergeJUnitTest.verifyResultOrder(result.get(), r1_1, r2_1, r1_2, r2_2, r1_3);
+ }
+
+ @Test
+ public void testIndexRepoQueryFails() throws Exception {
+ QueryMocks m = new QueryMocks();
+ mocker.checking(new Expectations() {
+ {
+ oneOf(m.mockContext).getDataSet();
+ will(returnValue(m.mockRegion));
+ oneOf(m.mockContext).getArguments();
+ will(returnValue(null));
+
+ oneOf(m.mockRepoManager).getRepositories(m.mockRegion, null);
+ will(returnValue(m.repos));
+
+ oneOf(m.mockContext).getResultSender();
+ will(returnValue(m.mockResultSender));
+ oneOf(m.mockResultSender).sendException(with(any(IOException.class)));
+
+ oneOf(m.mockRepository1).query(with(aNull(Query.class)), with(equal(0)), with(any(IndexResultCollector.class)));
+ will(throwException(new IOException()));
+ }
+ });
+
+ LuceneQueryFunction function = new LuceneQueryFunction();
+ function.setRepositoryManager(m.mockRepoManager);
+
+ function.execute(m.mockContext);
+ }
+
+ @Test
+ public void testBucketNotFound() throws Exception {
+ QueryMocks m = new QueryMocks();
+ mocker.checking(new Expectations() {
+ {
+ oneOf(m.mockContext).getDataSet();
+ will(returnValue(m.mockRegion));
+ oneOf(m.mockContext).getArguments();
+ will(returnValue(null));
+
+ oneOf(m.mockRepoManager).getRepositories(m.mockRegion, null);
+ will(throwException(new BucketNotFoundException("")));
+
+ oneOf(m.mockContext).getResultSender();
+ will(returnValue(m.mockResultSender));
+ oneOf(m.mockResultSender).sendException(with(any(BucketNotFoundException.class)));
+ }
+ });
+
+ LuceneQueryFunction function = new LuceneQueryFunction();
+ function.setRepositoryManager(m.mockRepoManager);
+
+ function.execute(m.mockContext);
+ }
+
+ @Test
+ public void testQueryFunctionId() {
+ String id = new LuceneQueryFunction().getId();
+ assertEquals(LuceneQueryFunction.class.getName(), id);
+ }
+
+ class QueryMocks {
+ RegionFunctionContext mockContext = mocker.mock(RegionFunctionContext.class);
+ ResultSender<LuceneQueryResults> mockResultSender = mocker.mock(ResultSender.class);
+ Region<Object, Object> mockRegion = mocker.mock(Region.class);
+
+ RepositoryManager mockRepoManager = mocker.mock(RepositoryManager.class);
+ ArrayList<IndexRepository> repos = new ArrayList<IndexRepository>();
+ IndexRepository mockRepository1 = mocker.mock(IndexRepository.class, "repo1");
+ IndexRepository mockRepository2 = mocker.mock(IndexRepository.class, "repo2");
+
+ QueryMocks() {
+ repos.add(mockRepository1);
+ repos.add(mockRepository2);
+ }
+ }
+
+ @Before
+ public void setupMock() {
+ mocker = new Mockery() {
+ {
+ setImposteriser(ClassImposteriser.INSTANCE);
+ setThreadingPolicy(new Synchroniser());
+ }
+ };
+ }
+
+ @After
+ public void validateMock() {
+ mocker.assertIsSatisfied();
+ mocker = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a379b0c1/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/mergers/TopDocsMergeJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/mergers/TopDocsMergeJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/mergers/TopDocsMergeJUnitTest.java
index 65574cd..8c6899f 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/mergers/TopDocsMergeJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/mergers/TopDocsMergeJUnitTest.java
@@ -170,7 +170,7 @@ public class TopDocsMergeJUnitTest {
verifyResultOrder(merged, r1_1, r2_1, r1_2, r1_3, r2_2);
}
- private void verifyResultOrder(LuceneQueryResults merged, LuceneResultStruct... results) {
+ public static void verifyResultOrder(LuceneQueryResults merged, LuceneResultStruct... results) {
int i = 0;
for (LuceneResultStruct result : results) {
LuceneResultStruct doc = merged.getHits().get(i++);