You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by as...@apache.org on 2015/09/04 23:15:22 UTC

[1/3] incubator-geode git commit: GEODE-11: Add classes to manage shard results

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-11 a379b0c18 -> a0155ef97


GEODE-11: Add classes to manage shard results

Lucene ScoreDoc and TopDocs classes are based on int docIds. This implementation
needs Object type keys. So new equivalent classes are needed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3c659bfa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3c659bfa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3c659bfa

Branch: refs/heads/feature/GEODE-11
Commit: 3c659bfad33a48cb4bec584452a2e5b7dce098ea
Parents: a379b0c
Author: Ashvin Agrawal <as...@apache.org>
Authored: Tue Sep 1 21:13:16 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Fri Sep 4 13:18:06 2015 -0700

----------------------------------------------------------------------
 .../lucene/internal/distributed/EntryScore.java |  22 ++++
 .../lucene/internal/distributed/TopEntries.java |  78 ++++++++++++++
 .../LuceneQueryFunctionJUnitTest.java           |   6 +-
 .../distributed/TopEntriesJUnitTest.java        | 106 +++++++++++++++++++
 4 files changed, 209 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c659bfa/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/EntryScore.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/EntryScore.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/EntryScore.java
new file mode 100644
index 0000000..6903dce
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/EntryScore.java
@@ -0,0 +1,22 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+/**
+ * Holds one entry matching search query and its metadata
+ */
+public class EntryScore {
+  // Key of the entry matching search query
+  final Object key;
+
+  // The score of this document for the query.
+  final float score;
+
+  public EntryScore(Object key, float score) {
+    this.key = key;
+    this.score = score;
+  }
+
+  @Override
+  public String toString() {
+    return "key=" + key + " score=" + score;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c659bfa/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntries.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntries.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntries.java
new file mode 100644
index 0000000..3a07d3f
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntries.java
@@ -0,0 +1,78 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory;
+
+/**
+ * Holds a ordered collection of entries matching a search query.
+ */
+public class TopEntries {
+  // ordered collection of entries
+  final private List<EntryScore> hits = new ArrayList<>();
+
+  // the maximum number of entries stored in this
+  final int limit;
+
+  // comparator to order entryScore instances
+  final Comparator<EntryScore> comparator = new EntryScoreComparator();
+
+  public TopEntries() {
+    this(LuceneQueryFactory.DEFAULT_LIMIT);
+  }
+
+  public TopEntries(int limit) {
+    if (limit < 0) {
+      throw new IllegalArgumentException();
+    }
+    this.limit = limit;
+  }
+
+  /**
+   * Adds an entry to the collection. The new entry must have a lower score than all previous entries added to the
+   * collection. The new entry will be ignored if the limit is already reached.
+   * 
+   * @param entry
+   */
+  public void addHit(EntryScore entry) {
+    if (hits.size() > 0) {
+      EntryScore lastEntry = hits.get(hits.size() - 1);
+      if (comparator.compare(lastEntry, entry) < 0) {
+        throw new IllegalArgumentException();
+      }
+    }
+
+    if (hits.size() >= limit) {
+      return;
+    }
+
+    hits.add(entry);
+  }
+
+  /**
+   * @return count of entries in the collection
+   */
+  public int size() {
+    return hits.size();
+  }
+
+  /**
+   * @return The entries collection managed by this instance
+   */
+  public List<EntryScore> getHits() {
+    return hits;
+  }
+
+  /**
+   * Compares scores of two entries using natural ordering. I.e. it returns -1 if the first entry's score is less than
+   * the second one.
+   */
+  class EntryScoreComparator implements Comparator<EntryScore> {
+    @Override
+    public int compare(EntryScore o1, EntryScore o2) {
+      return Float.compare(o1.score, o2.score);
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c659bfa/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
index c23dffe..94e088f 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
@@ -46,7 +46,7 @@ public class LuceneQueryFunctionJUnitTest {
 
     final AtomicReference<LuceneQueryResults> result = new AtomicReference<>();
 
-    QueryMocks m = new QueryMocks();
+    final QueryMocks m = new QueryMocks();
     mocker.checking(new Expectations() {
       {
         oneOf(m.mockContext).getDataSet();
@@ -105,7 +105,7 @@ public class LuceneQueryFunctionJUnitTest {
 
   @Test
   public void testIndexRepoQueryFails() throws Exception {
-    QueryMocks m = new QueryMocks();
+    final QueryMocks m = new QueryMocks();
     mocker.checking(new Expectations() {
       {
         oneOf(m.mockContext).getDataSet();
@@ -133,7 +133,7 @@ public class LuceneQueryFunctionJUnitTest {
 
   @Test
   public void testBucketNotFound() throws Exception {
-    QueryMocks m = new QueryMocks();
+    final QueryMocks m = new QueryMocks();
     mocker.checking(new Expectations() {
       {
         oneOf(m.mockContext).getDataSet();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3c659bfa/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesJUnitTest.java
new file mode 100644
index 0000000..b799a00
--- /dev/null
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesJUnitTest.java
@@ -0,0 +1,106 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.jmock.Mockery;
+import org.jmock.lib.concurrent.Synchroniser;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class TopEntriesJUnitTest {
+  Mockery mockContext;
+
+  EntryScore r1_1 = new EntryScore("3", .9f);
+  EntryScore r1_2 = new EntryScore("1", .8f);
+  EntryScore r2_1 = new EntryScore("2", 0.85f);
+  EntryScore r2_2 = new EntryScore("4", 0.1f);
+
+  @Test
+  public void testPopulateTopEntries() {
+    TopEntries hits = new TopEntries();
+    hits.addHit(r1_1);
+    hits.addHit(r2_1);
+    hits.addHit(r1_2);
+    hits.addHit(r2_2);
+    
+    assertEquals(4, hits.size());
+    verifyResultOrder(hits.getHits(), r1_1, r2_1, r1_2, r2_2);
+  }
+
+  @Test
+  public void putSameScoreEntries() {
+    TopEntries hits = new TopEntries();
+    EntryScore r1 = new EntryScore("1", .8f);
+    EntryScore r2 = new EntryScore("2", .8f);
+    hits.addHit(r1);
+    hits.addHit(r2);
+    
+    assertEquals(2, hits.size());
+    verifyResultOrder(hits.getHits(), r1, r2);
+  }
+  
+  @Test
+  public void testInitialization() {
+    TopEntries hits = new TopEntries();
+    assertEquals(LuceneQueryFactory.DEFAULT_LIMIT, hits.limit);
+
+    hits = new TopEntries(123);
+    assertEquals(123, hits.limit);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidLimit() {
+    new TopEntries(-1);
+  }
+  
+  @Test
+  public void enforceLimit() throws Exception {
+    TopEntries hits = new TopEntries(3);
+    hits.addHit(r1_1);
+    hits.addHit(r2_1);
+    hits.addHit(r1_2);
+    hits.addHit(r2_2);
+
+    assertEquals(3, hits.size());
+    verifyResultOrder(hits.getHits(), r1_1, r2_1, r1_2);
+  }
+
+  public static void verifyResultOrder(Collection<EntryScore> list, EntryScore... expectedEntries) {
+    Iterator<EntryScore> iter = list.iterator();
+    for (EntryScore expectedEntry : expectedEntries) {
+      if (!iter.hasNext()) {
+        fail();
+      }
+      EntryScore toVerify = iter.next();
+      assertEquals(expectedEntry.key, toVerify.key);
+      assertEquals(expectedEntry.score, toVerify.score, .0f);
+    }
+  }
+
+  @Before
+  public void setupMock() {
+    mockContext = new Mockery() {
+      {
+        setImposteriser(ClassImposteriser.INSTANCE);
+        setThreadingPolicy(new Synchroniser());
+      }
+    };
+  }
+
+  @After
+  public void validateMock() {
+    mockContext.assertIsSatisfied();
+    mockContext = null;
+  }
+}


[2/3] incubator-geode git commit: GEODE-11: Add customizable search result collector

Posted by as...@apache.org.
GEODE-11: Add customizable search result collector

Use Lucene's CollectorManager pattern code for collecting and reducing results
from local buckets in a member. Each bucket is expected to have its index
repository. This will be used to accept custom collectors and reducers at shard
level.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/240b39b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/240b39b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/240b39b9

Branch: refs/heads/feature/GEODE-11
Commit: 240b39b9900f9dbcebfd77fd6a2ad571e1fff777
Parents: 3c659bf
Author: Ashvin Agrawal <as...@apache.org>
Authored: Fri Sep 4 13:24:08 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Fri Sep 4 14:07:40 2015 -0700

----------------------------------------------------------------------
 .../internal/distributed/CollectorManager.java  | 29 ++++++++
 .../distributed/TopEntriesCollector.java        | 40 +++++++++++
 .../distributed/TopEntriesCollectorManager.java | 74 ++++++++++++++++++++
 .../repository/IndexResultCollector.java        | 13 +++-
 .../TopEntriesCollectorJUnitTest.java           | 62 ++++++++++++++++
 .../IndexRepositoryImplJUnitTest.java           | 15 ++--
 6 files changed, 225 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/240b39b9/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
new file mode 100644
index 0000000..8039d14
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/CollectorManager.java
@@ -0,0 +1,29 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+
+/**
+ * This class is used to aggregate search results from multiple search requests.
+ * 
+ * @param <T> Type of reduce result
+ * @param <C> Type of IndexResultCollector created by this manager
+ */
+public interface CollectorManager<T, C extends IndexResultCollector> {
+  /**
+   * @param name Name/Identifier for this collector. For e.g. region/bucketId.
+   * @return a new {@link IndexResultCollector}. This must return a different instance on
+   *         each call. A new collector would be created for each bucket on a member node.
+   */
+  C newCollector(String name);
+
+  /**
+   * Reduce the results of individual collectors into a meaningful result. This method must be called after collection
+   * is finished on all provided collectors.
+   * 
+   * @throws IOException
+   */
+  T reduce(Collection<IndexResultCollector> results) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/240b39b9/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
new file mode 100644
index 0000000..5b70499
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollector.java
@@ -0,0 +1,40 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+
+/**
+ * An implementation of {@link IndexResultCollector} to collect {@link EntryScore}. It is expected that the results will
+ * be ordered by score of the entry.
+ */
+public class TopEntriesCollector implements IndexResultCollector {
+  private final String name;
+  private final TopEntries entries;
+
+  public TopEntriesCollector(String name) {
+    this.name = name;
+    this.entries = new TopEntries();
+  }
+
+  @Override
+  public void collect(Object key, float score) {
+    entries.addHit(new EntryScore(key, score));
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public int size() {
+    TopEntries entries = getEntries();
+    return entries == null ? 0 : entries.size();
+  }
+
+  /**
+   * @return The entries collected by this collector
+   */
+  public TopEntries getEntries() {
+    return entries;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/240b39b9/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
new file mode 100644
index 0000000..4aab0af
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
@@ -0,0 +1,74 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.lucene.LuceneQueryFactory;
+import com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntries.EntryScoreComparator;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * An implementation of {@link CollectorManager} for managing {@link TopEntriesCollector}. This is used by a member to
+ * collect top matching entries from local buckets
+ */
+public class TopEntriesCollectorManager implements CollectorManager<TopEntries, TopEntriesCollector> {
+  private static final Logger logger = LogService.getLogger();
+
+  @Override
+  public TopEntriesCollector newCollector(String name) {
+    return new TopEntriesCollector(name);
+  }
+
+  @Override
+  public TopEntries reduce(Collection<IndexResultCollector> collectors) throws IOException {
+    final EntryScoreComparator scoreComparator = new TopEntries().new EntryScoreComparator();
+
+    // orders a entry with higher score above a doc with lower score
+    Comparator<List<EntryScore>> entryListComparator = new Comparator<List<EntryScore>>() {
+      @Override
+      public int compare(List<EntryScore> l1, List<EntryScore> l2) {
+        EntryScore o1 = l1.get(0);
+        EntryScore o2 = l2.get(0);
+        return scoreComparator.compare(o1, o2);
+      }
+    };
+
+    // The queue contains iterators for all bucket results. The queue puts the entry with the highest score at the head
+    // using score comparator.
+    PriorityQueue<List<EntryScore>> entryListsPriorityQueue;
+    entryListsPriorityQueue = new PriorityQueue<List<EntryScore>>(Collections.reverseOrder(entryListComparator));
+    TopEntries mergedResult = new TopEntries();
+
+    for (IndexResultCollector collector : collectors) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Number of entries found in bucket {} is {}", collector.getName(), collector.size());
+      }
+
+      if (collector.size() > 0) {
+        entryListsPriorityQueue.add(((TopEntriesCollector)collector).getEntries().getHits());
+      }
+    }
+
+    int limit = LuceneQueryFactory.DEFAULT_LIMIT;
+    logger.debug("Only {} count of entries will be reduced. Other entries will be ignored", limit);
+    while (entryListsPriorityQueue.size() > 0 && limit-- > 0) {
+
+      List<EntryScore> list = entryListsPriorityQueue.remove();
+      EntryScore entry = list.remove(0);
+      mergedResult.addHit(entry);
+
+      if (list.size() > 0) {
+        entryListsPriorityQueue.add(list);
+      }
+    }
+
+    return mergedResult;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/240b39b9/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexResultCollector.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexResultCollector.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexResultCollector.java
index fd78b11..94931a4 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexResultCollector.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexResultCollector.java
@@ -1,16 +1,25 @@
 package com.gemstone.gemfire.cache.lucene.internal.repository;
 
 /**
- * Interface for collection results of a query on 
+ * Interface for collection results of a query on
  * an IndexRepository. See {@link IndexRepository#query(org.apache.lucene.search.Query, int, IndexResultCollector)}
  */
 public interface IndexResultCollector {
+  /**
+   * @return Name/identifier of this collector
+   */
+  public String getName();
+
+  /**
+   * @return Number of results collected by this collector
+   */
+  public int size();
 
   /**
    * Collect a single document
+   * 
    * @param key the gemfire key of the object
    * @param score the lucene score of this object
    */
   void collect(Object key, float score);
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/240b39b9/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
new file mode 100644
index 0000000..4766220
--- /dev/null
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorJUnitTest.java
@@ -0,0 +1,62 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class TopEntriesCollectorJUnitTest {
+
+  @Test
+  public void testReduce() throws Exception {
+    EntryScore r1_1 = new EntryScore("1-1", .9f);
+    EntryScore r1_2 = new EntryScore("1-2", .7f);
+    EntryScore r1_3 = new EntryScore("1-3", .5f);
+
+    EntryScore r2_1 = new EntryScore("2-1", .85f);
+    EntryScore r2_2 = new EntryScore("2-2", .65f);
+
+    EntryScore r3_1 = new EntryScore("3-1", .8f);
+    EntryScore r3_2 = new EntryScore("3-2", .6f);
+    EntryScore r3_3 = new EntryScore("3-3", .4f);
+
+    TopEntriesCollectorManager manager = new TopEntriesCollectorManager();
+
+    TopEntriesCollector c1 = manager.newCollector("c1");
+    c1.collect(r1_1.key, r1_1.score);
+    c1.collect(r1_2.key, r1_2.score);
+    c1.collect(r1_3.key, r1_3.score);
+
+    TopEntriesCollector c2 = manager.newCollector("c2");
+    c2.collect(r2_1.key, r2_1.score);
+    c2.collect(r2_2.key, r2_2.score);
+
+    TopEntriesCollector c3 = manager.newCollector("c3");
+    c3.collect(r3_1.key, r3_1.score);
+    c3.collect(r3_2.key, r3_2.score);
+    c3.collect(r3_3.key, r3_3.score);
+
+    List<IndexResultCollector> collectors = new ArrayList<>();
+    collectors.add(c1);
+    collectors.add(c2);
+    collectors.add(c3);
+
+    TopEntries entries = manager.reduce(collectors);
+    assertEquals(8, entries.getHits().size());
+    TopEntriesJUnitTest.verifyResultOrder(entries.getHits(), r1_1, r2_1, r3_1, r1_2, r2_2, r3_2, r1_3, r3_3);
+  }
+  
+  @Test
+  public void testInitialization() {
+    TopEntriesCollector collector = new TopEntriesCollector("name");
+    assertEquals("name", collector.getName());
+    assertEquals(0, collector.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/240b39b9/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
index f4a994b..0b4a4cd 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
@@ -11,15 +11,10 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.queryparser.classic.ParseException;
 import org.apache.lucene.queryparser.classic.QueryParser;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TopDocs;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -28,7 +23,6 @@ import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.File;
 import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer;
-import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.SerializerUtil;
 import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.Type2;
 import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
 
@@ -134,6 +128,15 @@ public class IndexRepositoryImplJUnitTest {
     public void collect(Object key, float score) {
       results.add(key);
     }
+    
+    @Override
+    public String getName() {
+      return null;
+    }
+    @Override
+    public int size() {
+      return results.size();
+    }
   }
 
   /**


[3/3] incubator-geode git commit: GEODE-11: Use ResultCollector in LuceneFunction

Posted by as...@apache.org.
GEODE-11: Use ResultCollector in LuceneFunction

Shards will not use LuceneQueryResults object to send result to coordinator.
Replace this dependency with EntryScore and related objects.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a0155ef9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a0155ef9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a0155ef9

Branch: refs/heads/feature/GEODE-11
Commit: a0155ef9757f0dddca4326f27ec5d76ee123032f
Parents: 240b39b
Author: Ashvin Agrawal <as...@apache.org>
Authored: Fri Sep 4 14:10:04 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Fri Sep 4 14:14:00 2015 -0700

----------------------------------------------------------------------
 .../distributed/LuceneQueryFunction.java        | 45 +++++++-------------
 .../LuceneQueryFunctionJUnitTest.java           | 36 +++++++---------
 2 files changed, 32 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a0155ef9/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
index 6f59329..9b494e0 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunction.java
@@ -3,7 +3,6 @@ package com.gemstone.gemfire.cache.lucene.internal.distributed;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
@@ -13,10 +12,6 @@ import com.gemstone.gemfire.cache.execute.FunctionAdapter;
 import com.gemstone.gemfire.cache.execute.FunctionContext;
 import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
 import com.gemstone.gemfire.cache.execute.ResultSender;
-import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
-import com.gemstone.gemfire.cache.lucene.internal.LuceneQueryResultsImpl;
-import com.gemstone.gemfire.cache.lucene.internal.LuceneResultStructImpl;
-import com.gemstone.gemfire.cache.lucene.internal.mergers.TopDocsResultMerger;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
 import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
@@ -39,7 +34,7 @@ public class LuceneQueryFunction extends FunctionAdapter {
   @Override
   public void execute(FunctionContext context) {
     RegionFunctionContext ctx = (RegionFunctionContext) context;
-    ResultSender<LuceneQueryResults> resultSender = ctx.getResultSender();
+    ResultSender<TopEntries> resultSender = ctx.getResultSender();
 
     Region region = ctx.getDataSet();
     if (logger.isDebugEnabled()) {
@@ -47,16 +42,18 @@ public class LuceneQueryFunction extends FunctionAdapter {
     }
 
     LuceneSearchFunctionArgs args = (LuceneSearchFunctionArgs) ctx.getArguments();
-    Set<Integer> buckets = args == null ? null : args.getBuckets();
+    Set<Integer> buckets = (args == null ? null : args.getBuckets());
 
-    List<LuceneQueryResults> results = new ArrayList<>();
+    CollectorManager<TopEntries, TopEntriesCollector> manager = new TopEntriesCollectorManager();
+
+    Collection<IndexResultCollector> results = new ArrayList<>();
     try {
       Collection<IndexRepository> repositories = repoManager.getRepositories(region, buckets);
       for (IndexRepository repo : repositories) {
-        ShardResultCollector collector = new ShardResultCollector();
-        logger.debug("Executing search on repo: " + repo);
+        TopEntriesCollector collector = manager.newCollector(repo.toString());
+        logger.debug("Executing search on repo: " + repo.toString());
         repo.query(null, 0, collector);
-        results.add(collector.getResult());
+        results.add(collector);
       }
     } catch (IOException e) {
       logger.warn("", e);
@@ -68,24 +65,14 @@ public class LuceneQueryFunction extends FunctionAdapter {
       return;
     }
 
-    TopDocsResultMerger merger = new TopDocsResultMerger();
-    LuceneQueryResults merged = merger.mergeResults(results);
-    resultSender.lastResult(merged);
-  }
-
-  /**
-   * Collects and merges results from {@link IndexRepository}s
-   */
-  class ShardResultCollector implements IndexResultCollector {
-    LuceneQueryResultsImpl result = new LuceneQueryResultsImpl();
-    
-    @Override
-    public void collect(Object key, float score) {
-      result.addHit(new LuceneResultStructImpl(key, score));
-    }
-
-    public LuceneQueryResults getResult() {
-      return result;
+    TopEntries mergedResult;
+    try {
+      mergedResult = manager.reduce(results);
+      resultSender.lastResult(mergedResult);
+    } catch (IOException e) {
+      logger.warn("", e);
+      resultSender.sendException(e);
+      return;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a0155ef9/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
index 94e088f..069ca90 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
@@ -22,10 +22,6 @@ import org.junit.experimental.categories.Category;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
 import com.gemstone.gemfire.cache.execute.ResultSender;
-import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
-import com.gemstone.gemfire.cache.lucene.LuceneResultStruct;
-import com.gemstone.gemfire.cache.lucene.internal.LuceneResultStructImpl;
-import com.gemstone.gemfire.cache.lucene.internal.mergers.TopDocsMergeJUnitTest;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexResultCollector;
 import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
@@ -38,13 +34,13 @@ public class LuceneQueryFunctionJUnitTest {
 
   @Test
   public void testRepoQueryAndMerge() throws Exception {
-    final LuceneResultStructImpl r1_1 = new LuceneResultStructImpl("key-1-1", .5f);
-    final LuceneResultStructImpl r1_2 = new LuceneResultStructImpl("key-1-2", .4f);
-    final LuceneResultStructImpl r1_3 = new LuceneResultStructImpl("key-1-3", .3f);
-    final LuceneResultStructImpl r2_1 = new LuceneResultStructImpl("key-2-1", .45f);
-    final LuceneResultStructImpl r2_2 = new LuceneResultStructImpl("key-2-2", .35f);
+    final EntryScore r1_1 = new EntryScore("key-1-1", .5f);
+    final EntryScore r1_2 = new EntryScore("key-1-2", .4f);
+    final EntryScore r1_3 = new EntryScore("key-1-3", .3f);
+    final EntryScore r2_1 = new EntryScore("key-2-1", .45f);
+    final EntryScore r2_2 = new EntryScore("key-2-2", .35f);
 
-    final AtomicReference<LuceneQueryResults> result = new AtomicReference<>();
+    final AtomicReference<TopEntries> result = new AtomicReference<>();
 
     final QueryMocks m = new QueryMocks();
     mocker.checking(new Expectations() {
@@ -65,9 +61,9 @@ public class LuceneQueryFunctionJUnitTest {
           @Override
           public Object invoke(Invocation invocation) throws Throwable {
             IndexResultCollector collector = (IndexResultCollector) invocation.getParameter(2);
-            collector.collect(r1_1.getKey(), r1_1.getScore());
-            collector.collect(r1_2.getKey(), r1_2.getScore());
-            collector.collect(r1_3.getKey(), r1_3.getScore());
+            collector.collect(r1_1.key, r1_1.score);
+            collector.collect(r1_2.key, r1_2.score);
+            collector.collect(r1_3.key, r1_3.score);
             return null;
           }
         });
@@ -77,17 +73,17 @@ public class LuceneQueryFunctionJUnitTest {
           @Override
           public Object invoke(Invocation invocation) throws Throwable {
             IndexResultCollector collector = (IndexResultCollector) invocation.getParameter(2);
-            collector.collect(r2_1.getKey(), r2_1.getScore());
-            collector.collect(r2_2.getKey(), r2_2.getScore());
+            collector.collect(r2_1.key, r2_1.score);
+            collector.collect(r2_2.key, r2_2.score);
             return null;
           }
         });
 
-        oneOf(m.mockResultSender).lastResult(with(any(LuceneQueryResults.class)));
+        oneOf(m.mockResultSender).lastResult(with(any(TopEntries.class)));
         will(new CustomAction("collectResult") {
           @Override
           public Object invoke(Invocation invocation) throws Throwable {
-            result.set((LuceneQueryResults) invocation.getParameter(0));
+            result.set((TopEntries) invocation.getParameter(0));
             return null;
           }
         });
@@ -98,9 +94,9 @@ public class LuceneQueryFunctionJUnitTest {
     function.setRepositoryManager(m.mockRepoManager);
 
     function.execute(m.mockContext);
-    List<LuceneResultStruct> hits = result.get().getHits();
+    List<EntryScore> hits = result.get().getHits();
     assertEquals(5, hits.size());
-    TopDocsMergeJUnitTest.verifyResultOrder(result.get(), r1_1, r2_1, r1_2, r2_2, r1_3);
+    TopEntriesJUnitTest.verifyResultOrder(result.get().getHits(), r1_1, r2_1, r1_2, r2_2, r1_3);
   }
 
   @Test
@@ -164,7 +160,7 @@ public class LuceneQueryFunctionJUnitTest {
 
   class QueryMocks {
     RegionFunctionContext mockContext = mocker.mock(RegionFunctionContext.class);
-    ResultSender<LuceneQueryResults> mockResultSender = mocker.mock(ResultSender.class);
+    ResultSender<TopEntries> mockResultSender = mocker.mock(ResultSender.class);
     Region<Object, Object> mockRegion = mocker.mock(Region.class);
 
     RepositoryManager mockRepoManager = mocker.mock(RepositoryManager.class);