You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by as...@apache.org on 2015/09/11 23:56:39 UTC

[3/4] incubator-geode git commit: GEODE-11: Refactor CollectorManager interface

GEODE-11: Refactor CollectorManager interface

CollectorManager creates new collectors and merges results of the collectors.
Earlier the merge result type could be different from the collector type.
CollectorManager could actually use a collector itself to merge the results.
That way the actions on members and search coordinator will be the same.

https://reviews.apache.org/r/38320/


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/01c4bc9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/01c4bc9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/01c4bc9f

Branch: refs/heads/feature/GEODE-11
Commit: 01c4bc9f1ad72b0f18dac2cfc171b6dfff171aec
Parents: 54bc45e
Author: Ashvin Agrawal <as...@apache.org>
Authored: Fri Sep 11 09:22:12 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Fri Sep 11 14:55:10 2015 -0700

----------------------------------------------------------------------
 .../internal/distributed/CollectorManager.java  | 13 +++++---
 .../distributed/LuceneQueryFunction.java        |  8 ++---
 .../distributed/LuceneSearchFunctionArgs.java   |  3 +-
 .../distributed/TopEntriesCollector.java        |  6 +++-
 .../distributed/TopEntriesCollectorManager.java | 33 ++++++++++--------
 .../LuceneQueryFunctionJUnitTest.java           | 35 ++++++++++----------
 .../TopEntriesCollectorJUnitTest.java           |  9 +++--
 7 files changed, 59 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
index 8039d14..41c3f5f 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
@@ -3,15 +3,20 @@ package com.gemstone.gemfire.cache.lucene.internal.distributed;
 import java.io.IOException;
 import java.util.Collection;
 
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
 
 /**
- * This class is used to aggregate search results from multiple search requests.
+ * {@link CollectorManager}s create instances of {@link IndexResultCollector} and utility methods to aggregate results
+ * collected by individual collectors. The collectors created by this class are primarily used for collecting results
+ * from {@link IndexRepository}s. The collectors can also be used for aggregating results of other collectors of same
+ * type. Typically search result aggregation is completed in two phases. First at a member level for merging results
+ * from local buckets. And then at search coordinator level for merging results from members. Use of same collector in
+ * both phases is expected to produce deterministic merge result irrespective of the way buckets are distributed.
  * 
- * @param <T> Type of reduce result
  * @param <C> Type of IndexResultCollector created by this manager
  */
-public interface CollectorManager<T, C extends IndexResultCollector> {
+public interface CollectorManager<C extends IndexResultCollector> {
   /**
    * @param name Name/Identifier for this collector. For e.g. region/bucketId.
    * @return a new {@link IndexResultCollector}. This must return a different instance on
@@ -25,5 +30,5 @@ public interface CollectorManager<T, C extends IndexResultCollector> {
    * 
    * @throws IOException
    */
-  T reduce(Collection<IndexResultCollector> results) throws IOException;
+  C reduce(Collection<C> results) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
index 369ceb8..6e1d217 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
@@ -34,7 +34,7 @@ public class LuceneQueryFunction extends FunctionAdapter {
   @Override
   public void execute(FunctionContext context) {
     RegionFunctionContext ctx = (RegionFunctionContext) context;
-    ResultSender<TopEntries> resultSender = ctx.getResultSender();
+    ResultSender<TopEntriesCollector> resultSender = ctx.getResultSender();
 
     Region region = ctx.getDataSet();
     if (logger.isDebugEnabled()) {
@@ -46,7 +46,7 @@ public class LuceneQueryFunction extends FunctionAdapter {
     CollectorManager manager = (args == null) ? null : args.getCollectorManager();
     if (manager == null) {
       int resultLimit = (args == null ? LuceneQueryFactory.DEFAULT_LIMIT : args.getLimit());
-      manager = new TopEntriesCollectorManager(resultLimit);
+      manager = new TopEntriesCollectorManager(null, resultLimit);
     }
 
     Collection<IndexResultCollector> results = new ArrayList<>();
@@ -68,9 +68,9 @@ public class LuceneQueryFunction extends FunctionAdapter {
       return;
     }
 
-    TopEntries mergedResult;
+    TopEntriesCollector mergedResult;
     try {
-      mergedResult = (TopEntries) manager.reduce(results);
+      mergedResult = (TopEntriesCollector) manager.reduce(results);
       resultSender.lastResult(mergedResult);
     } catch (IOException e) {
       logger.warn("", e);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
index 2a5a5b6..52f3ce9 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneSearchFunctionArgs.java
@@ -3,7 +3,6 @@ package com.gemstone.gemfire.cache.lucene.internal.distributed;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Set;
 
 import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
@@ -48,7 +47,7 @@ public class LuceneSearchFunctionArgs implements VersionedDataSerializable {
    * 
    * @return {@link CollectorManager} instance to be used by function
    */
-  public <T, C extends IndexResultCollector> CollectorManager<T, C> getCollectorManager() {
+  public <C extends IndexResultCollector> CollectorManager<C> getCollectorManager() {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
index 6b02391..a4b5144 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
@@ -23,7 +23,11 @@ public class TopEntriesCollector implements IndexResultCollector {
 
   @Override
   public void collect(Object key, float score) {
-    entries.addHit(new EntryScore(key, score));
+    collect(new EntryScore(key, score));
+  }
+  
+  public void collect(EntryScore entry) {
+    entries.addHit(entry);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
index 7360c3c..21e11ab 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
@@ -18,18 +18,24 @@ import com.gemstone.gemfire.internal.logging.LogService;
  * An implementation of {@link CollectorManager} for managing {@link TopEntriesCollector}. This is used by a member to
  * collect top matching entries from local buckets
  */
-public class TopEntriesCollectorManager implements CollectorManager<TopEntries, TopEntriesCollector> {
+public class TopEntriesCollectorManager implements CollectorManager<TopEntriesCollector> {
   private static final Logger logger = LogService.getLogger();
 
   final int limit;
-  
+  final String id;
+
   public TopEntriesCollectorManager() {
-    this(LuceneQueryFactory.DEFAULT_LIMIT);
+    this(null, 0);
   }
-  
-  public TopEntriesCollectorManager(int resultLimit) {
-    this.limit = resultLimit;
-    logger.debug("Max count of entries to be returned: " + limit);
+
+  public TopEntriesCollectorManager(String id) {
+    this(id, 0);
+  }
+
+  public TopEntriesCollectorManager(String id, int resultLimit) {
+    this.limit = resultLimit < 0 ? LuceneQueryFactory.DEFAULT_LIMIT : resultLimit;
+    this.id = id == null ? String.valueOf(this.hashCode()) : id;
+    logger.debug("Max count of entries to be produced by {} is {}", id, limit);
   }
 
   @Override
@@ -38,7 +44,7 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntries,
   }
 
   @Override
-  public TopEntries reduce(Collection<IndexResultCollector> collectors) throws IOException {
+  public TopEntriesCollector reduce(Collection<TopEntriesCollector> collectors) throws IOException {
     final EntryScoreComparator scoreComparator = new TopEntries().new EntryScoreComparator();
 
     // orders a entry with higher score above a doc with lower score
@@ -55,15 +61,13 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntries,
     // using score comparator.
     PriorityQueue<List<EntryScore>> entryListsPriorityQueue;
     entryListsPriorityQueue = new PriorityQueue<List<EntryScore>>(Collections.reverseOrder(entryListComparator));
-    TopEntries mergedResult = new TopEntries();
+    TopEntriesCollector mergedResult = new TopEntriesCollector(id, limit);
 
     for (IndexResultCollector collector : collectors) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Number of entries found in bucket {} is {}", collector.getName(), collector.size());
-      }
+      logger.debug("Number of entries found in collector {} is {}", collector.getName(), collector.size());
 
       if (collector.size() > 0) {
-        entryListsPriorityQueue.add(((TopEntriesCollector)collector).getEntries().getHits());
+        entryListsPriorityQueue.add(((TopEntriesCollector) collector).getEntries().getHits());
       }
     }
 
@@ -72,13 +76,14 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntries,
 
       List<EntryScore> list = entryListsPriorityQueue.remove();
       EntryScore entry = list.remove(0);
-      mergedResult.addHit(entry);
+      mergedResult.collect(entry);
 
       if (list.size() > 0) {
         entryListsPriorityQueue.add(list);
       }
     }
 
+    logger.debug("Reduced size of {} is {}", mergedResult.name, mergedResult.size());
     return mergedResult;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
index b7b3f1d..ee2847e 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
@@ -42,7 +42,7 @@ public class LuceneQueryFunctionJUnitTest {
 
   @Test
   public void testRepoQueryAndMerge() throws Exception {
-    final AtomicReference<TopEntries> result = new AtomicReference<>();
+    final AtomicReference<TopEntriesCollector> result = new AtomicReference<>();
 
     final QueryMocks m = new QueryMocks();
     mocker.checking(new Expectations() {
@@ -81,11 +81,11 @@ public class LuceneQueryFunctionJUnitTest {
           }
         });
 
-        oneOf(m.mockResultSender).lastResult(with(any(TopEntries.class)));
+        oneOf(m.mockResultSender).lastResult(with(any(TopEntriesCollector.class)));
         will(new CustomAction("collectResult") {
           @Override
           public Object invoke(Invocation invocation) throws Throwable {
-            result.set((TopEntries) invocation.getParameter(0));
+            result.set((TopEntriesCollector) invocation.getParameter(0));
             return null;
           }
         });
@@ -96,14 +96,14 @@ public class LuceneQueryFunctionJUnitTest {
     function.setRepositoryManager(m.mockRepoManager);
 
     function.execute(m.mockContext);
-    List<EntryScore> hits = result.get().getHits();
+    List<EntryScore> hits = result.get().getEntries().getHits();
     assertEquals(5, hits.size());
-    TopEntriesJUnitTest.verifyResultOrder(result.get().getHits(), r1_1, r2_1, r1_2, r2_2, r1_3);
+    TopEntriesJUnitTest.verifyResultOrder(result.get().getEntries().getHits(), r1_1, r2_1, r1_2, r2_2, r1_3);
   }
 
   @Test
   public void testResultLimitClause() throws Exception {
-    final AtomicReference<TopEntries> result = new AtomicReference<>();
+    final AtomicReference<TopEntriesCollector> result = new AtomicReference<>();
 
     final QueryMocks m = new QueryMocks();
     mocker.checking(new Expectations() {
@@ -147,11 +147,11 @@ public class LuceneQueryFunctionJUnitTest {
           }
         });
 
-        oneOf(m.mockResultSender).lastResult(with(any(TopEntries.class)));
+        oneOf(m.mockResultSender).lastResult(with(any(TopEntriesCollector.class)));
         will(new CustomAction("collectResult") {
           @Override
           public Object invoke(Invocation invocation) throws Throwable {
-            result.set((TopEntries) invocation.getParameter(0));
+            result.set((TopEntriesCollector) invocation.getParameter(0));
             return null;
           }
         });
@@ -162,9 +162,9 @@ public class LuceneQueryFunctionJUnitTest {
     function.setRepositoryManager(m.mockRepoManager);
 
     function.execute(m.mockContext);
-    List<EntryScore> hits = result.get().getHits();
+    List<EntryScore> hits = result.get().getEntries().getHits();
     assertEquals(3, hits.size());
-    TopEntriesJUnitTest.verifyResultOrder(result.get().getHits(), r1_1, r2_1, r1_2);
+    TopEntriesJUnitTest.verifyResultOrder(result.get().getEntries().getHits(), r1_1, r2_1, r1_2);
   }
 
   @Test
@@ -197,7 +197,7 @@ public class LuceneQueryFunctionJUnitTest {
             Collection<IndexResultCollector> collectors = (Collection<IndexResultCollector>) invocation.getParameter(0);
             assertEquals(1, collectors.size());
             assertEquals(m.mockCollector, collectors.iterator().next());
-            return new TopEntries();
+            return new TopEntriesCollector(null);
           }
         });
 
@@ -213,7 +213,7 @@ public class LuceneQueryFunctionJUnitTest {
           }
         });
 
-        oneOf(m.mockResultSender).lastResult(with(any(TopEntries.class)));
+        oneOf(m.mockResultSender).lastResult(with(any(TopEntriesCollector.class)));
       }
     });
 
@@ -294,7 +294,7 @@ public class LuceneQueryFunctionJUnitTest {
         will(returnValue(m.mockCollector));
         oneOf(m.mockManager).reduce(with(any(Collection.class)));
         will(throwException(new IOException()));
-        
+
         oneOf(m.mockRepoManager).getRepositories(m.mockRegion, m.mockContext);
         m.repos.remove(1);
         will(returnValue(m.repos));
@@ -303,13 +303,13 @@ public class LuceneQueryFunctionJUnitTest {
         oneOf(m.mockResultSender).sendException(with(any(IOException.class)));
       }
     });
-    
+
     LuceneQueryFunction function = new LuceneQueryFunction();
     function.setRepositoryManager(m.mockRepoManager);
-    
+
     function.execute(m.mockContext);
   }
-  
+
   @Test
   public void testQueryFunctionId() {
     String id = new LuceneQueryFunction().getId();
@@ -318,7 +318,7 @@ public class LuceneQueryFunctionJUnitTest {
 
   class QueryMocks {
     RegionFunctionContext mockContext = mocker.mock(RegionFunctionContext.class);
-    ResultSender<TopEntries> mockResultSender = mocker.mock(ResultSender.class);
+    ResultSender<TopEntriesCollector> mockResultSender = mocker.mock(ResultSender.class);
     Region<Object, Object> mockRegion = mocker.mock(Region.class);
 
     RepositoryManager mockRepoManager = mocker.mock(RepositoryManager.class);
@@ -329,7 +329,6 @@ public class LuceneQueryFunctionJUnitTest {
     CollectorManager mockManager = mocker.mock(CollectorManager.class);
     IndexResultCollector mockCollector = mocker.mock(IndexResultCollector.class);
 
-
     QueryMocks() {
       repos.add(mockRepository1);
       repos.add(mockRepository2);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/01c4bc9f/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
index 4766220..3cf622c 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
@@ -8,7 +8,6 @@ import java.util.List;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
 import com.gemstone.gemfire.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -43,14 +42,14 @@ public class TopEntriesCollectorJUnitTest {
     c3.collect(r3_2.key, r3_2.score);
     c3.collect(r3_3.key, r3_3.score);
 
-    List<IndexResultCollector> collectors = new ArrayList<>();
+    List<TopEntriesCollector> collectors = new ArrayList<>();
     collectors.add(c1);
     collectors.add(c2);
     collectors.add(c3);
 
-    TopEntries entries = manager.reduce(collectors);
-    assertEquals(8, entries.getHits().size());
-    TopEntriesJUnitTest.verifyResultOrder(entries.getHits(), r1_1, r2_1, r3_1, r1_2, r2_2, r3_2, r1_3, r3_3);
+    TopEntriesCollector entries = manager.reduce(collectors);
+    assertEquals(8, entries.getEntries().getHits().size());
+    TopEntriesJUnitTest.verifyResultOrder(entries.getEntries().getHits(), r1_1, r2_1, r3_1, r1_2, r2_2, r3_2, r1_3, r3_3);
   }
   
   @Test