You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by as...@apache.org on 2015/09/11 23:56:39 UTC
[3/4] incubator-geode git commit: GEODE-11: Refactor CollectorManager
interface
GEODE-11: Refactor CollectorManager interface
CollectorManager creates new collectors and merges results of the collectors.
Earlier the merge result type could be different from the collector type.
CollectorManager could actually use a collector itself to merge the results.
That way the actions on members and search coordinator will be the same.
https://reviews.apache.org/r/38320/
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/01c4bc9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/01c4bc9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/01c4bc9f
Branch: refs/heads/feature/GEODE-11
Commit: 01c4bc9f1ad72b0f18dac2cfc171b6dfff171aec
Parents: 54bc45e
Author: Ashvin Agrawal <as...@apache.org>
Authored: Fri Sep 11 09:22:12 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Fri Sep 11 14:55:10 2015 -0700
----------------------------------------------------------------------
.../internal/distributed/CollectorManager.java | 13 +++++---
.../distributed/LuceneQueryFunction.java | 8 ++---
.../distributed/LuceneSearchFunctionArgs.java | 3 +-
.../distributed/TopEntriesCollector.java | 6 +++-
.../distributed/TopEntriesCollectorManager.java | 33 ++++++++++--------
.../LuceneQueryFunctionJUnitTest.java | 35 ++++++++++----------
.../TopEntriesCollectorJUnitTest.java | 9 +++--
7 files changed, 59 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
index 8039d14..41c3f5f 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
@@ -3,15 +3,20 @@ package com.gemstone.gemfire.cache.lucene.internal.distributed;
import java.io.IOException;
import java.util.Collection;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
/**
- * This class is used to aggregate search results from multiple search requests.
+ * {@link CollectorManager}s create instances of {@link IndexResultCollector} and utility methods to aggregate results
+ * collected by individual collectors. The collectors created by this class are primarily used for collecting results
+ * from {@link IndexRepository}s. The collectors can also be used for aggregating results of other collectors of same
+ * type. Typically search result aggregation is completed in two phases. First at a member level for merging results
+ * from local buckets. And then at search coordinator level for merging results from members. Use of same collector in
+ * both phases is expected to produce deterministic merge result irrespective of the way buckets are distributed.
*
- * @param <T> Type of reduce result
* @param <C> Type of IndexResultCollector created by this manager
*/
-public interface CollectorManager<T, C extends IndexResultCollector> {
+public interface CollectorManager<C extends IndexResultCollector> {
/**
* @param name Name/Identifier for this collector. For e.g. region/bucketId.
* @return a new {@link IndexResultCollector}. This must return a different instance on
@@ -25,5 +30,5 @@ public interface CollectorManager<T, C extends IndexResultCollector> {
*
* @throws IOException
*/
- T reduce(Collection<IndexResultCollector> results) throws IOException;
+ C reduce(Collection<C> results) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
index 369ceb8..6e1d217 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
@@ -34,7 +34,7 @@ public class LuceneQueryFunction extends FunctionAdapter {
@Override
public void execute(FunctionContext context) {
RegionFunctionContext ctx = (RegionFunctionContext) context;
- ResultSender<TopEntries> resultSender = ctx.getResultSender();
+ ResultSender<TopEntriesCollector> resultSender = ctx.getResultSender();
Region region = ctx.getDataSet();
if (logger.isDebugEnabled()) {
@@ -46,7 +46,7 @@ public class LuceneQueryFunction extends FunctionAdapter {
CollectorManager manager = (args == null) ? null : args.getCollectorManager();
if (manager == null) {
int resultLimit = (args == null ? LuceneQueryFactory.DEFAULT_LIMIT : args.getLimit());
- manager = new TopEntriesCollectorManager(resultLimit);
+ manager = new TopEntriesCollectorManager(null, resultLimit);
}
Collection<IndexResultCollector> results = new ArrayList<>();
@@ -68,9 +68,9 @@ public class LuceneQueryFunction extends FunctionAdapter {
return;
}
- TopEntries mergedResult;
+ TopEntriesCollector mergedResult;
try {
- mergedResult = (TopEntries) manager.reduce(results);
+ mergedResult = (TopEntriesCollector) manager.reduce(results);
resultSender.lastResult(mergedResult);
} catch (IOException e) {
logger.warn("", e);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
index 2a5a5b6..52f3ce9 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
@@ -3,7 +3,6 @@ package com.gemstone.gemfire.cache.lucene.internal.distributed;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Set;
import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory;
import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
@@ -48,7 +47,7 @@ public class LuceneSearchFunctionArgs implements VersionedDataSerializable {
*
* @return {@link CollectorManager} instance to be used by function
*/
- public <T, C extends IndexResultCollector> CollectorManager<T, C> getCollectorManager() {
+ public <C extends IndexResultCollector> CollectorManager<C> getCollectorManager() {
return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
index 6b02391..a4b5144 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
@@ -23,7 +23,11 @@ public class TopEntriesCollector implements IndexResultCollector {
@Override
public void collect(Object key, float score) {
- entries.addHit(new EntryScore(key, score));
+ collect(new EntryScore(key, score));
+ }
+
+ public void collect(EntryScore entry) {
+ entries.addHit(entry);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
index 7360c3c..21e11ab 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
@@ -18,18 +18,24 @@ import com.gemstone.gemfire.internal.logging.LogService;
* An implementation of {@link CollectorManager} for managing {@link TopEntriesCollector}. This is used by a member to
* collect top matching entries from local buckets
*/
-public class TopEntriesCollectorManager implements CollectorManager<TopEntries, TopEntriesCollector> {
+public class TopEntriesCollectorManager implements CollectorManager<TopEntriesCollector> {
private static final Logger logger = LogService.getLogger();
final int limit;
-
+ final String id;
+
public TopEntriesCollectorManager() {
- this(LuceneQueryFactory.DEFAULT_LIMIT);
+ this(null, 0);
}
-
- public TopEntriesCollectorManager(int resultLimit) {
- this.limit = resultLimit;
- logger.debug("Max count of entries to be returned: " + limit);
+
+ public TopEntriesCollectorManager(String id) {
+ this(id, 0);
+ }
+
+ public TopEntriesCollectorManager(String id, int resultLimit) {
+ this.limit = resultLimit < 0 ? LuceneQueryFactory.DEFAULT_LIMIT : resultLimit;
+ this.id = id == null ? String.valueOf(this.hashCode()) : id;
+ logger.debug("Max count of entries to be produced by {} is {}", id, limit);
}
@Override
@@ -38,7 +44,7 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntries,
}
@Override
- public TopEntries reduce(Collection<IndexResultCollector> collectors) throws IOException {
+ public TopEntriesCollector reduce(Collection<TopEntriesCollector> collectors) throws IOException {
final EntryScoreComparator scoreComparator = new TopEntries().new EntryScoreComparator();
// orders a entry with higher score above a doc with lower score
@@ -55,15 +61,13 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntries,
// using score comparator.
PriorityQueue<List<EntryScore>> entryListsPriorityQueue;
entryListsPriorityQueue = new PriorityQueue<List<EntryScore>>(Collections.reverseOrder(entryListComparator));
- TopEntries mergedResult = new TopEntries();
+ TopEntriesCollector mergedResult = new TopEntriesCollector(id, limit);
for (IndexResultCollector collector : collectors) {
- if (logger.isDebugEnabled()) {
- logger.debug("Number of entries found in bucket {} is {}", collector.getName(), collector.size());
- }
+ logger.debug("Number of entries found in collector {} is {}", collector.getName(), collector.size());
if (collector.size() > 0) {
- entryListsPriorityQueue.add(((TopEntriesCollector)collector).getEntries().getHits());
+ entryListsPriorityQueue.add(((TopEntriesCollector) collector).getEntries().getHits());
}
}
@@ -72,13 +76,14 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntries,
List<EntryScore> list = entryListsPriorityQueue.remove();
EntryScore entry = list.remove(0);
- mergedResult.addHit(entry);
+ mergedResult.collect(entry);
if (list.size() > 0) {
entryListsPriorityQueue.add(list);
}
}
+ logger.debug("Reduced size of {} is {}", mergedResult.name, mergedResult.size());
return mergedResult;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
index b7b3f1d..ee2847e 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
@@ -42,7 +42,7 @@ public class LuceneQueryFunctionJUnitTest {
@Test
public void testRepoQueryAndMerge() throws Exception {
- final AtomicReference<TopEntries> result = new AtomicReference<>();
+ final AtomicReference<TopEntriesCollector> result = new AtomicReference<>();
final QueryMocks m = new QueryMocks();
mocker.checking(new Expectations() {
@@ -81,11 +81,11 @@ public class LuceneQueryFunctionJUnitTest {
}
});
- oneOf(m.mockResultSender).lastResult(with(any(TopEntries.class)));
+ oneOf(m.mockResultSender).lastResult(with(any(TopEntriesCollector.class)));
will(new CustomAction("collectResult") {
@Override
public Object invoke(Invocation invocation) throws Throwable {
- result.set((TopEntries) invocation.getParameter(0));
+ result.set((TopEntriesCollector) invocation.getParameter(0));
return null;
}
});
@@ -96,14 +96,14 @@ public class LuceneQueryFunctionJUnitTest {
function.setRepositoryManager(m.mockRepoManager);
function.execute(m.mockContext);
- List<EntryScore> hits = result.get().getHits();
+ List<EntryScore> hits = result.get().getEntries().getHits();
assertEquals(5, hits.size());
- TopEntriesJUnitTest.verifyResultOrder(result.get().getHits(), r1_1, r2_1, r1_2, r2_2, r1_3);
+ TopEntriesJUnitTest.verifyResultOrder(result.get().getEntries().getHits(), r1_1, r2_1, r1_2, r2_2, r1_3);
}
@Test
public void testResultLimitClause() throws Exception {
- final AtomicReference<TopEntries> result = new AtomicReference<>();
+ final AtomicReference<TopEntriesCollector> result = new AtomicReference<>();
final QueryMocks m = new QueryMocks();
mocker.checking(new Expectations() {
@@ -147,11 +147,11 @@ public class LuceneQueryFunctionJUnitTest {
}
});
- oneOf(m.mockResultSender).lastResult(with(any(TopEntries.class)));
+ oneOf(m.mockResultSender).lastResult(with(any(TopEntriesCollector.class)));
will(new CustomAction("collectResult") {
@Override
public Object invoke(Invocation invocation) throws Throwable {
- result.set((TopEntries) invocation.getParameter(0));
+ result.set((TopEntriesCollector) invocation.getParameter(0));
return null;
}
});
@@ -162,9 +162,9 @@ public class LuceneQueryFunctionJUnitTest {
function.setRepositoryManager(m.mockRepoManager);
function.execute(m.mockContext);
- List<EntryScore> hits = result.get().getHits();
+ List<EntryScore> hits = result.get().getEntries().getHits();
assertEquals(3, hits.size());
- TopEntriesJUnitTest.verifyResultOrder(result.get().getHits(), r1_1, r2_1, r1_2);
+ TopEntriesJUnitTest.verifyResultOrder(result.get().getEntries().getHits(), r1_1, r2_1, r1_2);
}
@Test
@@ -197,7 +197,7 @@ public class LuceneQueryFunctionJUnitTest {
Collection<IndexResultCollector> collectors = (Collection<IndexResultCollector>) invocation.getParameter(0);
assertEquals(1, collectors.size());
assertEquals(m.mockCollector, collectors.iterator().next());
- return new TopEntries();
+ return new TopEntriesCollector(null);
}
});
@@ -213,7 +213,7 @@ public class LuceneQueryFunctionJUnitTest {
}
});
- oneOf(m.mockResultSender).lastResult(with(any(TopEntries.class)));
+ oneOf(m.mockResultSender).lastResult(with(any(TopEntriesCollector.class)));
}
});
@@ -294,7 +294,7 @@ public class LuceneQueryFunctionJUnitTest {
will(returnValue(m.mockCollector));
oneOf(m.mockManager).reduce(with(any(Collection.class)));
will(throwException(new IOException()));
-
+
oneOf(m.mockRepoManager).getRepositories(m.mockRegion, m.mockContext);
m.repos.remove(1);
will(returnValue(m.repos));
@@ -303,13 +303,13 @@ public class LuceneQueryFunctionJUnitTest {
oneOf(m.mockResultSender).sendException(with(any(IOException.class)));
}
});
-
+
LuceneQueryFunction function = new LuceneQueryFunction();
function.setRepositoryManager(m.mockRepoManager);
-
+
function.execute(m.mockContext);
}
-
+
@Test
public void testQueryFunctionId() {
String id = new LuceneQueryFunction().getId();
@@ -318,7 +318,7 @@ public class LuceneQueryFunctionJUnitTest {
class QueryMocks {
RegionFunctionContext mockContext = mocker.mock(RegionFunctionContext.class);
- ResultSender<TopEntries> mockResultSender = mocker.mock(ResultSender.class);
+ ResultSender<TopEntriesCollector> mockResultSender = mocker.mock(ResultSender.class);
Region<Object, Object> mockRegion = mocker.mock(Region.class);
RepositoryManager mockRepoManager = mocker.mock(RepositoryManager.class);
@@ -329,7 +329,6 @@ public class LuceneQueryFunctionJUnitTest {
CollectorManager mockManager = mocker.mock(CollectorManager.class);
IndexResultCollector mockCollector = mocker.mock(IndexResultCollector.class);
-
QueryMocks() {
repos.add(mockRepository1);
repos.add(mockRepository2);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
index 4766220..3cf622c 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
@@ -8,7 +8,6 @@ import java.util.List;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
import com.gemstone.gemfire.test.junit.categories.UnitTest;
@Category(UnitTest.class)
@@ -43,14 +42,14 @@ public class TopEntriesCollectorJUnitTest {
c3.collect(r3_2.key, r3_2.score);
c3.collect(r3_3.key, r3_3.score);
- List<IndexResultCollector> collectors = new ArrayList<>();
+ List<TopEntriesCollector> collectors = new ArrayList<>();
collectors.add(c1);
collectors.add(c2);
collectors.add(c3);
- TopEntries entries = manager.reduce(collectors);
- assertEquals(8, entries.getHits().size());
- TopEntriesJUnitTest.verifyResultOrder(entries.getHits(), r1_1, r2_1, r3_1, r1_2, r2_2, r3_2, r1_3, r3_3);
+ TopEntriesCollector entries = manager.reduce(collectors);
+ assertEquals(8, entries.getEntries().getHits().size());
+ TopEntriesJUnitTest.verifyResultOrder(entries.getEntries().getHits(), r1_1, r2_1, r3_1, r1_2, r2_2, r3_2, r1_3, r3_3);
}
@Test