You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by as...@apache.org on 2015/09/04 23:15:22 UTC
[1/3] incubator-geode git commit: GEODE-11: Add classes to manage
shard results
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-11 a379b0c18 -> a0155ef97
GEODE-11: Add classes to manage shard results
Lucene ScoreDoc and TopDocs classes are based on int docIds. This implementation
needs Object type keys. So new equivalent classes are needed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3c659bfa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3c659bfa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3c659bfa
Branch: refs/heads/feature/GEODE-11
Commit: 3c659bfad33a48cb4bec584452a2e5b7dce098ea
Parents: a379b0c
Author: Ashvin Agrawal <as...@apache.org>
Authored: Tue Sep 1 21:13:16 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Fri Sep 4 13:18:06 2015 -0700
----------------------------------------------------------------------
.../lucene/internal/distributed/EntryScore.java | 22 ++++
.../lucene/internal/distributed/TopEntries.java | 78 ++++++++++++++
.../LuceneQueryFunctionJUnitTest.java | 6 +-
.../distributed/TopEntriesJUnitTest.java | 106 +++++++++++++++++++
4 files changed, 209 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c659bfa/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/EntryScore.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/EntryScore.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/EntryScore.java
new file mode 100644
index 0000000..6903dce
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/EntryScore.java
@@ -0,0 +1,22 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+/**
+ * Holds one entry matching search query and its metadata
+ */
+public class EntryScore {
+ // Key of the entry matching search query
+ final Object key;
+
+ // The score of this document for the query.
+ final float score;
+
+ public EntryScore(Object key, float score) {
+ this.key = key;
+ this.score = score;
+ }
+
+ @Override
+ public String toString() {
+ return "key=" + key + " score=" + score;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c659bfa/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntries.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntries.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntries.java
new file mode 100644
index 0000000..3a07d3f
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntries.java
@@ -0,0 +1,78 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory;
+
+/**
+ * Holds a ordered collection of entries matching a search query.
+ */
+public class TopEntries {
+ // ordered collection of entries
+ final private List<EntryScore> hits = new ArrayList<>();
+
+ // the maximum number of entries stored in this
+ final int limit;
+
+ // comparator to order entryScore instances
+ final Comparator<EntryScore> comparator = new EntryScoreComparator();
+
+ public TopEntries() {
+ this(LuceneQueryFactory.DEFAULT_LIMIT);
+ }
+
+ public TopEntries(int limit) {
+ if (limit < 0) {
+ throw new IllegalArgumentException();
+ }
+ this.limit = limit;
+ }
+
+ /**
+ * Adds an entry to the collection. The new entry must have a lower score than all previous entries added to the
+ * collection. The new entry will be ignored if the limit is already reached.
+ *
+ * @param entry
+ */
+ public void addHit(EntryScore entry) {
+ if (hits.size() > 0) {
+ EntryScore lastEntry = hits.get(hits.size() - 1);
+ if (comparator.compare(lastEntry, entry) < 0) {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ if (hits.size() >= limit) {
+ return;
+ }
+
+ hits.add(entry);
+ }
+
+ /**
+ * @return count of entries in the collection
+ */
+ public int size() {
+ return hits.size();
+ }
+
+ /**
+ * @return The entries collection managed by this instance
+ */
+ public List<EntryScore> getHits() {
+ return hits;
+ }
+
+ /**
+ * Compares scores of two entries using natural ordering. I.e. it returns -1 if the first entry's score is less than
+ * the second one.
+ */
+ class EntryScoreComparator implements Comparator<EntryScore> {
+ @Override
+ public int compare(EntryScore o1, EntryScore o2) {
+ return Float.compare(o1.score, o2.score);
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c659bfa/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
index c23dffe..94e088f 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
@@ -46,7 +46,7 @@ public class LuceneQueryFunctionJUnitTest {
final AtomicReference<LuceneQueryResults> result = new AtomicReference<>();
- QueryMocks m = new QueryMocks();
+ final QueryMocks m = new QueryMocks();
mocker.checking(new Expectations() {
{
oneOf(m.mockContext).getDataSet();
@@ -105,7 +105,7 @@ public class LuceneQueryFunctionJUnitTest {
@Test
public void testIndexRepoQueryFails() throws Exception {
- QueryMocks m = new QueryMocks();
+ final QueryMocks m = new QueryMocks();
mocker.checking(new Expectations() {
{
oneOf(m.mockContext).getDataSet();
@@ -133,7 +133,7 @@ public class LuceneQueryFunctionJUnitTest {
@Test
public void testBucketNotFound() throws Exception {
- QueryMocks m = new QueryMocks();
+ final QueryMocks m = new QueryMocks();
mocker.checking(new Expectations() {
{
oneOf(m.mockContext).getDataSet();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c659bfa/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesJUnitTest.java
new file mode 100644
index 0000000..b799a00
--- /dev/null
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesJUnitTest.java
@@ -0,0 +1,106 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.jmock.Mockery;
+import org.jmock.lib.concurrent.Synchroniser;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class TopEntriesJUnitTest {
+ Mockery mockContext;
+
+ EntryScore r1_1 = new EntryScore("3", .9f);
+ EntryScore r1_2 = new EntryScore("1", .8f);
+ EntryScore r2_1 = new EntryScore("2", 0.85f);
+ EntryScore r2_2 = new EntryScore("4", 0.1f);
+
+ @Test
+ public void testPopulateTopEntries() {
+ TopEntries hits = new TopEntries();
+ hits.addHit(r1_1);
+ hits.addHit(r2_1);
+ hits.addHit(r1_2);
+ hits.addHit(r2_2);
+
+ assertEquals(4, hits.size());
+ verifyResultOrder(hits.getHits(), r1_1, r2_1, r1_2, r2_2);
+ }
+
+ @Test
+ public void putSameScoreEntries() {
+ TopEntries hits = new TopEntries();
+ EntryScore r1 = new EntryScore("1", .8f);
+ EntryScore r2 = new EntryScore("2", .8f);
+ hits.addHit(r1);
+ hits.addHit(r2);
+
+ assertEquals(2, hits.size());
+ verifyResultOrder(hits.getHits(), r1, r2);
+ }
+
+ @Test
+ public void testInitialization() {
+ TopEntries hits = new TopEntries();
+ assertEquals(LuceneQueryFactory.DEFAULT_LIMIT, hits.limit);
+
+ hits = new TopEntries(123);
+ assertEquals(123, hits.limit);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidLimit() {
+ new TopEntries(-1);
+ }
+
+ @Test
+ public void enforceLimit() throws Exception {
+ TopEntries hits = new TopEntries(3);
+ hits.addHit(r1_1);
+ hits.addHit(r2_1);
+ hits.addHit(r1_2);
+ hits.addHit(r2_2);
+
+ assertEquals(3, hits.size());
+ verifyResultOrder(hits.getHits(), r1_1, r2_1, r1_2);
+ }
+
+ public static void verifyResultOrder(Collection<EntryScore> list, EntryScore... expectedEntries) {
+ Iterator<EntryScore> iter = list.iterator();
+ for (EntryScore expectedEntry : expectedEntries) {
+ if (!iter.hasNext()) {
+ fail();
+ }
+ EntryScore toVerify = iter.next();
+ assertEquals(expectedEntry.key, toVerify.key);
+ assertEquals(expectedEntry.score, toVerify.score, .0f);
+ }
+ }
+
+ @Before
+ public void setupMock() {
+ mockContext = new Mockery() {
+ {
+ setImposteriser(ClassImposteriser.INSTANCE);
+ setThreadingPolicy(new Synchroniser());
+ }
+ };
+ }
+
+ @After
+ public void validateMock() {
+ mockContext.assertIsSatisfied();
+ mockContext = null;
+ }
+}
[2/3] incubator-geode git commit: GEODE-11: Add customizable search
result collector
Posted by as...@apache.org.
GEODE-11: Add customizable search result collector
Use Lucene's CollectorManager pattern code for collecting and reducing results
from local buckets in a member. Each bucket is expected to have its index
repository. This will be used to accept custom collectors and reducers at shard
level.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/240b39b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/240b39b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/240b39b9
Branch: refs/heads/feature/GEODE-11
Commit: 240b39b9900f9dbcebfd77fd6a2ad571e1fff777
Parents: 3c659bf
Author: Ashvin Agrawal <as...@apache.org>
Authored: Fri Sep 4 13:24:08 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Fri Sep 4 14:07:40 2015 -0700
----------------------------------------------------------------------
.../internal/distributed/CollectorManager.java | 29 ++++++++
.../distributed/TopEntriesCollector.java | 40 +++++++++++
.../distributed/TopEntriesCollectorManager.java | 74 ++++++++++++++++++++
.../repository/IndexResultCollector.java | 13 +++-
.../TopEntriesCollectorJUnitTest.java | 62 ++++++++++++++++
.../IndexRepositoryImplJUnitTest.java | 15 ++--
6 files changed, 225 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/240b39b9/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
new file mode 100644
index 0000000..8039d14
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
@@ -0,0 +1,29 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+
+/**
+ * This class is used to aggregate search results from multiple search requests.
+ *
+ * @param <T> Type of reduce result
+ * @param <C> Type of IndexResultCollector created by this manager
+ */
+public interface CollectorManager<T, C extends IndexResultCollector> {
+ /**
+ * @param name Name/Identifier for this collector. For e.g. region/bucketId.
+ * @return a new {@link IndexResultCollector}. This must return a different instance on
+ * each call. A new collector would be created for each bucket on a member node.
+ */
+ C newCollector(String name);
+
+ /**
+ * Reduce the results of individual collectors into a meaningful result. This method must be called after collection
+ * is finished on all provided collectors.
+ *
+ * @throws IOException
+ */
+ T reduce(Collection<IndexResultCollector> results) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/240b39b9/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
new file mode 100644
index 0000000..5b70499
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
@@ -0,0 +1,40 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+
+/**
+ * An implementation of {@link IndexResultCollector} to collect {@link EntryScore}. It is expected that the results will
+ * be ordered by score of the entry.
+ */
+public class TopEntriesCollector implements IndexResultCollector {
+ private final String name;
+ private final TopEntries entries;
+
+ public TopEntriesCollector(String name) {
+ this.name = name;
+ this.entries = new TopEntries();
+ }
+
+ @Override
+ public void collect(Object key, float score) {
+ entries.addHit(new EntryScore(key, score));
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public int size() {
+ TopEntries entries = getEntries();
+ return entries == null ? 0 : entries.size();
+ }
+
+ /**
+ * @return The entries collected by this collector
+ */
+ public TopEntries getEntries() {
+ return entries;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/240b39b9/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
new file mode 100644
index 0000000..4aab0af
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
@@ -0,0 +1,74 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory;
+import com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntries.EntryScoreComparator;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * An implementation of {@link CollectorManager} for managing {@link TopEntriesCollector}. This is used by a member to
+ * collect top matching entries from local buckets
+ */
+public class TopEntriesCollectorManager implements CollectorManager<TopEntries, TopEntriesCollector> {
+ private static final Logger logger = LogService.getLogger();
+
+ @Override
+ public TopEntriesCollector newCollector(String name) {
+ return new TopEntriesCollector(name);
+ }
+
+ @Override
+ public TopEntries reduce(Collection<IndexResultCollector> collectors) throws IOException {
+ final EntryScoreComparator scoreComparator = new TopEntries().new EntryScoreComparator();
+
+ // orders a entry with higher score above a doc with lower score
+ Comparator<List<EntryScore>> entryListComparator = new Comparator<List<EntryScore>>() {
+ @Override
+ public int compare(List<EntryScore> l1, List<EntryScore> l2) {
+ EntryScore o1 = l1.get(0);
+ EntryScore o2 = l2.get(0);
+ return scoreComparator.compare(o1, o2);
+ }
+ };
+
+ // The queue contains iterators for all bucket results. The queue puts the entry with the highest score at the head
+ // using score comparator.
+ PriorityQueue<List<EntryScore>> entryListsPriorityQueue;
+ entryListsPriorityQueue = new PriorityQueue<List<EntryScore>>(Collections.reverseOrder(entryListComparator));
+ TopEntries mergedResult = new TopEntries();
+
+ for (IndexResultCollector collector : collectors) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Number of entries found in bucket {} is {}", collector.getName(), collector.size());
+ }
+
+ if (collector.size() > 0) {
+ entryListsPriorityQueue.add(((TopEntriesCollector)collector).getEntries().getHits());
+ }
+ }
+
+ int limit = LuceneQueryFactory.DEFAULT_LIMIT;
+ logger.debug("Only {} count of entries will be reduced. Other entries will be ignored", limit);
+ while (entryListsPriorityQueue.size() > 0 && limit-- > 0) {
+
+ List<EntryScore> list = entryListsPriorityQueue.remove();
+ EntryScore entry = list.remove(0);
+ mergedResult.addHit(entry);
+
+ if (list.size() > 0) {
+ entryListsPriorityQueue.add(list);
+ }
+ }
+
+ return mergedResult;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/240b39b9/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexResultCollector.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexResultCollector.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexResultCollector.java
index fd78b11..94931a4 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexResultCollector.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexResultCollector.java
@@ -1,16 +1,25 @@
package com.gemstone.gemfire.cache.lucene.internal.repository;
/**
- * Interface for collection results of a query on
+ * Interface for collection results of a query on
* an IndexRepository. See {@link IndexRepository#query(org.apache.lucene.search.Query, int, IndexResultCollector)}
*/
public interface IndexResultCollector {
+ /**
+ * @return Name/identifier of this collector
+ */
+ public String getName();
+
+ /**
+ * @return Number of results collected by this collector
+ */
+ public int size();
/**
* Collect a single document
+ *
* @param key the gemfire key of the object
* @param score the lucene score of this object
*/
void collect(Object key, float score);
-
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/240b39b9/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
new file mode 100644
index 0000000..4766220
--- /dev/null
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
@@ -0,0 +1,62 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class TopEntriesCollectorJUnitTest {
+
+ @Test
+ public void testReduce() throws Exception {
+ EntryScore r1_1 = new EntryScore("1-1", .9f);
+ EntryScore r1_2 = new EntryScore("1-2", .7f);
+ EntryScore r1_3 = new EntryScore("1-3", .5f);
+
+ EntryScore r2_1 = new EntryScore("2-1", .85f);
+ EntryScore r2_2 = new EntryScore("2-2", .65f);
+
+ EntryScore r3_1 = new EntryScore("3-1", .8f);
+ EntryScore r3_2 = new EntryScore("3-2", .6f);
+ EntryScore r3_3 = new EntryScore("3-3", .4f);
+
+ TopEntriesCollectorManager manager = new TopEntriesCollectorManager();
+
+ TopEntriesCollector c1 = manager.newCollector("c1");
+ c1.collect(r1_1.key, r1_1.score);
+ c1.collect(r1_2.key, r1_2.score);
+ c1.collect(r1_3.key, r1_3.score);
+
+ TopEntriesCollector c2 = manager.newCollector("c2");
+ c2.collect(r2_1.key, r2_1.score);
+ c2.collect(r2_2.key, r2_2.score);
+
+ TopEntriesCollector c3 = manager.newCollector("c3");
+ c3.collect(r3_1.key, r3_1.score);
+ c3.collect(r3_2.key, r3_2.score);
+ c3.collect(r3_3.key, r3_3.score);
+
+ List<IndexResultCollector> collectors = new ArrayList<>();
+ collectors.add(c1);
+ collectors.add(c2);
+ collectors.add(c3);
+
+ TopEntries entries = manager.reduce(collectors);
+ assertEquals(8, entries.getHits().size());
+ TopEntriesJUnitTest.verifyResultOrder(entries.getHits(), r1_1, r2_1, r3_1, r1_2, r2_2, r3_2, r1_3, r3_3);
+ }
+
+ @Test
+ public void testInitialization() {
+ TopEntriesCollector collector = new TopEntriesCollector("name");
+ assertEquals("name", collector.getName());
+ assertEquals(0, collector.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/240b39b9/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
index f4a994b..0b4a4cd 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
@@ -11,15 +11,10 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.queryparser.classic.QueryParser;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TopDocs;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -28,7 +23,6 @@ import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey;
import com.gemstone.gemfire.cache.lucene.internal.filesystem.File;
import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer;
-import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.SerializerUtil;
import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.Type2;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
@@ -134,6 +128,15 @@ public class IndexRepositoryImplJUnitTest {
public void collect(Object key, float score) {
results.add(key);
}
+
+ @Override
+ public String getName() {
+ return null;
+ }
+ @Override
+ public int size() {
+ return results.size();
+ }
}
/**
[3/3] incubator-geode git commit: GEODE-11: Use ResultCollector in
LuceneFunction
Posted by as...@apache.org.
GEODE-11: Use ResultCollector in LuceneFunction
Shards will not use LuceneQueryResults object to send result to coordinator.
Replace this dependency with EntryScore and related objects.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a0155ef9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a0155ef9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a0155ef9
Branch: refs/heads/feature/GEODE-11
Commit: a0155ef9757f0dddca4326f27ec5d76ee123032f
Parents: 240b39b
Author: Ashvin Agrawal <as...@apache.org>
Authored: Fri Sep 4 14:10:04 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Fri Sep 4 14:14:00 2015 -0700
----------------------------------------------------------------------
.../distributed/LuceneQueryFunction.java | 45 +++++++-------------
.../LuceneQueryFunctionJUnitTest.java | 36 +++++++---------
2 files changed, 32 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a0155ef9/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
index 6f59329..9b494e0 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
@@ -3,7 +3,6 @@ package com.gemstone.gemfire.cache.lucene.internal.distributed;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import java.util.Set;
import org.apache.logging.log4j.Logger;
@@ -13,10 +12,6 @@ import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.execute.ResultSender;
-import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
-import com.gemstone.gemfire.cache.lucene.internal.LuceneQueryResultsImpl;
-import com.gemstone.gemfire.cache.lucene.internal.LuceneResultStructImpl;
-import com.gemstone.gemfire.cache.lucene.internal.mergers.TopDocsResultMerger;
import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
@@ -39,7 +34,7 @@ public class LuceneQueryFunction extends FunctionAdapter {
@Override
public void execute(FunctionContext context) {
RegionFunctionContext ctx = (RegionFunctionContext) context;
- ResultSender<LuceneQueryResults> resultSender = ctx.getResultSender();
+ ResultSender<TopEntries> resultSender = ctx.getResultSender();
Region region = ctx.getDataSet();
if (logger.isDebugEnabled()) {
@@ -47,16 +42,18 @@ public class LuceneQueryFunction extends FunctionAdapter {
}
LuceneSearchFunctionArgs args = (LuceneSearchFunctionArgs) ctx.getArguments();
- Set<Integer> buckets = args == null ? null : args.getBuckets();
+ Set<Integer> buckets = (args == null ? null : args.getBuckets());
- List<LuceneQueryResults> results = new ArrayList<>();
+ CollectorManager<TopEntries, TopEntriesCollector> manager = new TopEntriesCollectorManager();
+
+ Collection<IndexResultCollector> results = new ArrayList<>();
try {
Collection<IndexRepository> repositories = repoManager.getRepositories(region, buckets);
for (IndexRepository repo : repositories) {
- ShardResultCollector collector = new ShardResultCollector();
- logger.debug("Executing search on repo: " + repo);
+ TopEntriesCollector collector = manager.newCollector(repo.toString());
+ logger.debug("Executing search on repo: " + repo.toString());
repo.query(null, 0, collector);
- results.add(collector.getResult());
+ results.add(collector);
}
} catch (IOException e) {
logger.warn("", e);
@@ -68,24 +65,14 @@ public class LuceneQueryFunction extends FunctionAdapter {
return;
}
- TopDocsResultMerger merger = new TopDocsResultMerger();
- LuceneQueryResults merged = merger.mergeResults(results);
- resultSender.lastResult(merged);
- }
-
- /**
- * Collects and merges results from {@link IndexRepository}s
- */
- class ShardResultCollector implements IndexResultCollector {
- LuceneQueryResultsImpl result = new LuceneQueryResultsImpl();
-
- @Override
- public void collect(Object key, float score) {
- result.addHit(new LuceneResultStructImpl(key, score));
- }
-
- public LuceneQueryResults getResult() {
- return result;
+ TopEntries mergedResult;
+ try {
+ mergedResult = manager.reduce(results);
+ resultSender.lastResult(mergedResult);
+ } catch (IOException e) {
+ logger.warn("", e);
+ resultSender.sendException(e);
+ return;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a0155ef9/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
index 94e088f..069ca90 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
@@ -22,10 +22,6 @@ import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.execute.ResultSender;
-import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
-import com.gemstone.gemfire.cache.lucene.LuceneResultStruct;
-import com.gemstone.gemfire.cache.lucene.internal.LuceneResultStructImpl;
-import com.gemstone.gemfire.cache.lucene.internal.mergers.TopDocsMergeJUnitTest;
import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
@@ -38,13 +34,13 @@ public class LuceneQueryFunctionJUnitTest {
@Test
public void testRepoQueryAndMerge() throws Exception {
- final LuceneResultStructImpl r1_1 = new LuceneResultStructImpl("key-1-1", .5f);
- final LuceneResultStructImpl r1_2 = new LuceneResultStructImpl("key-1-2", .4f);
- final LuceneResultStructImpl r1_3 = new LuceneResultStructImpl("key-1-3", .3f);
- final LuceneResultStructImpl r2_1 = new LuceneResultStructImpl("key-2-1", .45f);
- final LuceneResultStructImpl r2_2 = new LuceneResultStructImpl("key-2-2", .35f);
+ final EntryScore r1_1 = new EntryScore("key-1-1", .5f);
+ final EntryScore r1_2 = new EntryScore("key-1-2", .4f);
+ final EntryScore r1_3 = new EntryScore("key-1-3", .3f);
+ final EntryScore r2_1 = new EntryScore("key-2-1", .45f);
+ final EntryScore r2_2 = new EntryScore("key-2-2", .35f);
- final AtomicReference<LuceneQueryResults> result = new AtomicReference<>();
+ final AtomicReference<TopEntries> result = new AtomicReference<>();
final QueryMocks m = new QueryMocks();
mocker.checking(new Expectations() {
@@ -65,9 +61,9 @@ public class LuceneQueryFunctionJUnitTest {
@Override
public Object invoke(Invocation invocation) throws Throwable {
IndexResultCollector collector = (IndexResultCollector) invocation.getParameter(2);
- collector.collect(r1_1.getKey(), r1_1.getScore());
- collector.collect(r1_2.getKey(), r1_2.getScore());
- collector.collect(r1_3.getKey(), r1_3.getScore());
+ collector.collect(r1_1.key, r1_1.score);
+ collector.collect(r1_2.key, r1_2.score);
+ collector.collect(r1_3.key, r1_3.score);
return null;
}
});
@@ -77,17 +73,17 @@ public class LuceneQueryFunctionJUnitTest {
@Override
public Object invoke(Invocation invocation) throws Throwable {
IndexResultCollector collector = (IndexResultCollector) invocation.getParameter(2);
- collector.collect(r2_1.getKey(), r2_1.getScore());
- collector.collect(r2_2.getKey(), r2_2.getScore());
+ collector.collect(r2_1.key, r2_1.score);
+ collector.collect(r2_2.key, r2_2.score);
return null;
}
});
- oneOf(m.mockResultSender).lastResult(with(any(LuceneQueryResults.class)));
+ oneOf(m.mockResultSender).lastResult(with(any(TopEntries.class)));
will(new CustomAction("collectResult") {
@Override
public Object invoke(Invocation invocation) throws Throwable {
- result.set((LuceneQueryResults) invocation.getParameter(0));
+ result.set((TopEntries) invocation.getParameter(0));
return null;
}
});
@@ -98,9 +94,9 @@ public class LuceneQueryFunctionJUnitTest {
function.setRepositoryManager(m.mockRepoManager);
function.execute(m.mockContext);
- List<LuceneResultStruct> hits = result.get().getHits();
+ List<EntryScore> hits = result.get().getHits();
assertEquals(5, hits.size());
- TopDocsMergeJUnitTest.verifyResultOrder(result.get(), r1_1, r2_1, r1_2, r2_2, r1_3);
+ TopEntriesJUnitTest.verifyResultOrder(result.get().getHits(), r1_1, r2_1, r1_2, r2_2, r1_3);
}
@Test
@@ -164,7 +160,7 @@ public class LuceneQueryFunctionJUnitTest {
class QueryMocks {
RegionFunctionContext mockContext = mocker.mock(RegionFunctionContext.class);
- ResultSender<LuceneQueryResults> mockResultSender = mocker.mock(ResultSender.class);
+ ResultSender<TopEntries> mockResultSender = mocker.mock(ResultSender.class);
Region<Object, Object> mockRegion = mocker.mock(Region.class);
RepositoryManager mockRepoManager = mocker.mock(RepositoryManager.class);