You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/04/20 10:20:55 UTC

[22/23] lucene-solr:feature/autoscaling: Squash-merge from master.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java b/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java
index fa22320..d273057 100644
--- a/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java
+++ b/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
+import java.util.stream.Collectors;
 
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.store.ChecksumIndexInput;
@@ -242,17 +243,22 @@ public class OfflineSorter {
     sortInfo = new SortInfo();
     sortInfo.totalTime = System.currentTimeMillis();
 
-    List<String> segments = new ArrayList<>();
+    List<PartitionAndCount> segments = new ArrayList<>();
     int[] levelCounts = new int[1];
 
     // So we can remove any partially written temp files on exception:
     TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
 
     boolean success = false;
+    boolean[] isExhausted = new boolean[1];
     try (ByteSequencesReader is = getReader(dir.openChecksumInput(inputFileName, IOContext.READONCE), inputFileName)) {
-      int lineCount;
-      while ((lineCount = readPartition(is)) > 0) {
-        segments.add(sortPartition(trackingDir));
+      while (isExhausted[0] == false) {
+        int lineCount = readPartition(is, isExhausted);
+        if (lineCount == 0) {
+          assert isExhausted[0];
+          break;
+        }
+        segments.add(sortPartition(trackingDir, lineCount));
         sortInfo.tempMergeFiles++;
         sortInfo.lineCount += lineCount;
         levelCounts[0]++;
@@ -286,7 +292,7 @@ public class OfflineSorter {
           result = out.getName();
         }
       } else {
-        result = segments.get(0);
+        result = segments.get(0).fileName;
       }
 
       // We should be explicitly removing all intermediate files ourselves unless there is an exception:
@@ -308,10 +314,10 @@ public class OfflineSorter {
   }
 
   /** Sort a single partition in-memory. */
-  protected String sortPartition(TrackingDirectoryWrapper trackingDir) throws IOException {
+  protected PartitionAndCount sortPartition(TrackingDirectoryWrapper trackingDir, int lineCount) throws IOException {
 
     try (IndexOutput tempFile = trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT);
-         ByteSequencesWriter out = getWriter(tempFile);) {
+         ByteSequencesWriter out = getWriter(tempFile, lineCount);) {
       
       BytesRef spare;
 
@@ -319,17 +325,21 @@ public class OfflineSorter {
       BytesRefIterator iter = buffer.iterator(comparator);
       sortInfo.sortTime += System.currentTimeMillis() - start;
 
+      int count = 0;
       while ((spare = iter.next()) != null) {
         assert spare.length <= Short.MAX_VALUE;
         out.write(spare);
+        count++;
       }
+
+      assert count == lineCount;
       
       // Clean up the buffer for the next partition.
       buffer.clear();
 
       CodecUtil.writeFooter(out.out);
 
-      return tempFile.getName();
+      return new PartitionAndCount(lineCount, tempFile.getName());
     }
   }
 
@@ -342,16 +352,21 @@ public class OfflineSorter {
   }
 
   /** Merge the most recent {@code maxTempFile} partitions into a new partition. */
-  void mergePartitions(Directory trackingDir, List<String> segments) throws IOException {
+  void mergePartitions(Directory trackingDir, List<PartitionAndCount> segments) throws IOException {
     long start = System.currentTimeMillis();
 
-    List<String> segmentsToMerge;
+    List<PartitionAndCount> segmentsToMerge;
     if (segments.size() > maxTempFiles) {
       segmentsToMerge = segments.subList(segments.size() - maxTempFiles, segments.size());
     } else {
       segmentsToMerge = segments;
     }
 
+    long totalCount = 0;
+    for (PartitionAndCount segment : segmentsToMerge) {
+      totalCount += segment.count;
+    }
+
     PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segmentsToMerge.size()) {
       @Override
       protected boolean lessThan(FileAndTop a, FileAndTop b) {
@@ -363,13 +378,13 @@ public class OfflineSorter {
 
     String newSegmentName = null;
 
-    try (ByteSequencesWriter writer = getWriter(trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT))) {
+    try (ByteSequencesWriter writer = getWriter(trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT), totalCount)) {
 
       newSegmentName = writer.out.getName();
       
       // Open streams and read the top for each file
       for (int i = 0; i < segmentsToMerge.size(); i++) {
-        streams[i] = getReader(dir.openChecksumInput(segmentsToMerge.get(i), IOContext.READONCE), segmentsToMerge.get(i));
+        streams[i] = getReader(dir.openChecksumInput(segmentsToMerge.get(i).fileName, IOContext.READONCE), segmentsToMerge.get(i).fileName);
         BytesRef item = null;
         try {
           item = streams[i].next();
@@ -412,16 +427,16 @@ public class OfflineSorter {
       IOUtils.close(streams);
     }
 
-    IOUtils.deleteFiles(trackingDir, segmentsToMerge);
+    IOUtils.deleteFiles(trackingDir, segmentsToMerge.stream().map(segment -> segment.fileName).collect(Collectors.toList()));
 
     segmentsToMerge.clear();
-    segments.add(newSegmentName);
+    segments.add(new PartitionAndCount(totalCount, newSegmentName));
 
     sortInfo.tempMergeFiles++;
   }
 
-  /** Read in a single partition of data */
-  int readPartition(ByteSequencesReader reader) throws IOException {
+  /** Read in a single partition of data, setting isExhausted[0] to true if there are no more items. */
+  int readPartition(ByteSequencesReader reader, boolean[] isExhausted) throws IOException {
     long start = System.currentTimeMillis();
     if (valueLength != -1) {
       int limit = ramBufferSize.bytes / valueLength;
@@ -433,6 +448,7 @@ public class OfflineSorter {
           verifyChecksum(t, reader);
         }
         if (item == null) {
+          isExhausted[0] = true;
           break;
         }
         buffer.append(item);
@@ -446,6 +462,7 @@ public class OfflineSorter {
           verifyChecksum(t, reader);
         }
         if (item == null) {
+          isExhausted[0] = true;
           break;
         }
         buffer.append(item);
@@ -471,7 +488,7 @@ public class OfflineSorter {
   }
 
   /** Subclasses can override to change how byte sequences are written to disk. */
-  protected ByteSequencesWriter getWriter(IndexOutput out) throws IOException {
+  protected ByteSequencesWriter getWriter(IndexOutput out, long itemCount) throws IOException {
     return new ByteSequencesWriter(out);
   }
 
@@ -587,5 +604,15 @@ public class OfflineSorter {
   /** Returns the comparator in use to sort entries */
   public Comparator<BytesRef> getComparator() {
     return comparator;
-  }  
+  }
+
+  private static class PartitionAndCount {
+    final long count;
+    final String fileName;
+
+    public PartitionAndCount(long count, String fileName) {
+      this.count = count;
+      this.fileName = fileName;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/core/src/java/org/apache/lucene/util/Version.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/Version.java b/lucene/core/src/java/org/apache/lucene/util/Version.java
index da6d653..f6e6adc 100644
--- a/lucene/core/src/java/org/apache/lucene/util/Version.java
+++ b/lucene/core/src/java/org/apache/lucene/util/Version.java
@@ -102,6 +102,13 @@ public final class Version {
   public static final Version LUCENE_6_5_0 = new Version(6, 5, 0);
 
   /**
+   * Match settings and bugs in Lucene's 6.5.1 release.
+   * @deprecated Use latest
+   */
+  @Deprecated
+  public static final Version LUCENE_6_5_1 = new Version(6, 5, 1);
+
+  /**
    * Match settings and bugs in Lucene's 6.6.0 release.
    * @deprecated Use latest
    */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
index eeb40fa..8a2356b 100644
--- a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
@@ -888,7 +888,7 @@ public class BKDWriter implements Closeable {
 
           /** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
           @Override
-          protected ByteSequencesWriter getWriter(IndexOutput out) {
+          protected ByteSequencesWriter getWriter(IndexOutput out, long count) {
             return new ByteSequencesWriter(out) {
               @Override
               public void write(byte[] bytes, int off, int len) throws IOException {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/core/src/test/org/apache/lucene/analysis/standard/TestStandardAnalyzer.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/analysis/standard/TestStandardAnalyzer.java b/lucene/core/src/test/org/apache/lucene/analysis/standard/TestStandardAnalyzer.java
index 2cc9274..6abbc2b 100644
--- a/lucene/core/src/test/org/apache/lucene/analysis/standard/TestStandardAnalyzer.java
+++ b/lucene/core/src/test/org/apache/lucene/analysis/standard/TestStandardAnalyzer.java
@@ -393,4 +393,27 @@ public class TestStandardAnalyzer extends BaseTokenStreamTestCase {
     Analyzer a = new StandardAnalyzer();
     assertEquals(new BytesRef("\"\\�3[]()! cz@"), a.normalize("dummy", "\"\\�3[]()! Cz@"));
   }
+
+  public void testMaxTokenLengthDefault() throws Exception {
+    StandardAnalyzer a = new StandardAnalyzer();
+
+    StringBuilder bToken = new StringBuilder();
+    // exact max length:
+    for(int i=0;i<StandardAnalyzer.DEFAULT_MAX_TOKEN_LENGTH;i++) {
+      bToken.append('b');
+    }
+
+    String bString = bToken.toString();
+    // first bString is exact max default length; next one is 1 too long
+    String input = "x " + bString + " " + bString + "b";
+    assertAnalyzesTo(a, input.toString(), new String[] {"x", bString, bString, "b"});
+    a.close();
+  }
+
+  public void testMaxTokenLengthNonDefault() throws Exception {
+    StandardAnalyzer a = new StandardAnalyzer();
+    a.setMaxTokenLength(5);
+    assertAnalyzesTo(a, "ab cd toolong xy z", new String[]{"ab", "cd", "toolo", "ng", "xy", "z"});
+    a.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/core/src/test/org/apache/lucene/index/TestIndexReaderClose.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexReaderClose.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexReaderClose.java
index 20088a5..b99666e 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexReaderClose.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexReaderClose.java
@@ -73,7 +73,7 @@ public class TestIndexReaderClose extends LuceneTestCase {
             reader.getReaderCacheHelper().addClosedListener(new FaultyListener());
           } else {
             count.incrementAndGet();
-            reader.getReaderCacheHelper().addClosedListener(new CountListener(count));
+            reader.getReaderCacheHelper().addClosedListener(new CountListener(count, reader.getReaderCacheHelper().getKey()));
           }
       }
       if (!faultySet && !throwOnClose) {
@@ -123,7 +123,7 @@ public class TestIndexReaderClose extends LuceneTestCase {
     AtomicInteger counter = new AtomicInteger(numListeners);
 
     for (int i = 0; i < numListeners; ++i) {
-      CountCoreListener listener = new CountCoreListener(counter, leafReader.getCoreCacheHelper().getKey());
+      CountListener listener = new CountListener(counter, leafReader.getCoreCacheHelper().getKey());
       listeners.add(listener);
       leafReader.getCoreCacheHelper().addClosedListener(listener);
     }
@@ -141,12 +141,12 @@ public class TestIndexReaderClose extends LuceneTestCase {
     w.w.getDirectory().close();
   }
 
-  private static final class CountCoreListener implements IndexReader.ClosedListener {
+  private static final class CountListener implements IndexReader.ClosedListener {
 
     private final AtomicInteger count;
     private final Object coreCacheKey;
 
-    public CountCoreListener(AtomicInteger count, Object coreCacheKey) {
+    public CountListener(AtomicInteger count, Object coreCacheKey) {
       this.count = count;
       this.coreCacheKey = coreCacheKey;
     }
@@ -159,25 +159,33 @@ public class TestIndexReaderClose extends LuceneTestCase {
 
   }
 
-  private static final class CountListener implements IndexReader.ClosedListener  {
-    private final AtomicInteger count;
-
-    public CountListener(AtomicInteger count) {
-      this.count = count;
-    }
+  private static final class FaultyListener implements IndexReader.ClosedListener {
 
     @Override
     public void onClose(IndexReader.CacheKey cacheKey) {
-      count.decrementAndGet();
+      throw new IllegalStateException("GRRRRRRRRRRRR!");
     }
   }
 
-  private static final class FaultyListener implements IndexReader.ClosedListener {
+  public void testRegisterListenerOnClosedReader() throws IOException {
+    Directory dir = newDirectory();
+    IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
+    w.addDocument(new Document());
+    DirectoryReader r = DirectoryReader.open(w);
+    w.close();
 
-    @Override
-    public void onClose(IndexReader.CacheKey cacheKey) {
-      throw new IllegalStateException("GRRRRRRRRRRRR!");
-    }
+    // The reader is open, everything should work
+    r.getReaderCacheHelper().addClosedListener(key -> {});
+    r.leaves().get(0).reader().getReaderCacheHelper().addClosedListener(key -> {});
+    r.leaves().get(0).reader().getCoreCacheHelper().addClosedListener(key -> {});
+
+    // But now we close
+    r.close();
+    expectThrows(AlreadyClosedException.class, () -> r.getReaderCacheHelper().addClosedListener(key -> {}));
+    expectThrows(AlreadyClosedException.class, () -> r.leaves().get(0).reader().getReaderCacheHelper().addClosedListener(key -> {}));
+    expectThrows(AlreadyClosedException.class, () -> r.leaves().get(0).reader().getCoreCacheHelper().addClosedListener(key -> {}));
+
+    dir.close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java b/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java
index df73687..475f716 100644
--- a/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java
+++ b/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java
@@ -18,6 +18,7 @@ package org.apache.lucene.util;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 public class TestByteBlockPool extends LuceneTestCase {
@@ -34,8 +35,7 @@ public class TestByteBlockPool extends LuceneTestCase {
       final int numValues = atLeast(100);
       BytesRefBuilder ref = new BytesRefBuilder();
       for (int i = 0; i < numValues; i++) {
-        final String value = TestUtil.randomRealisticUnicodeString(random(),
-            maxLength);
+        final String value = TestUtil.randomRealisticUnicodeString(random(), maxLength);
         list.add(new BytesRef(value));
         ref.copyChars(value);
         pool.append(ref.get());
@@ -76,5 +76,33 @@ public class TestByteBlockPool extends LuceneTestCase {
         pool.nextBuffer(); // prepare for next iter
       }
     }
-  } 
+  }
+
+  public void testLargeRandomBlocks() throws IOException {
+    Counter bytesUsed = Counter.newCounter();
+    ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectTrackingAllocator(bytesUsed));
+    pool.nextBuffer();
+
+    List<byte[]> items = new ArrayList<>();
+    for (int i=0;i<100;i++) {
+      int size;
+      if (random().nextBoolean()) {
+        size = TestUtil.nextInt(random(), 100, 1000);
+      } else {
+        size = TestUtil.nextInt(random(), 50000, 100000);
+      }
+      byte[] bytes = new byte[size];
+      random().nextBytes(bytes);
+      items.add(bytes);
+      pool.append(new BytesRef(bytes));
+    }
+
+    long position = 0;
+    for (byte[] expected : items) {
+      byte[] actual = new byte[expected.length];
+      pool.readBytes(position, actual, 0, actual.length);
+      assertTrue(Arrays.equals(expected, actual));
+      position += expected.length;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java b/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java
index 49ed110..839f103 100644
--- a/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java
+++ b/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.store.ChecksumIndexInput;
 import org.apache.lucene.store.CorruptingIndexOutput;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FilterDirectory;
@@ -455,6 +456,47 @@ public class TestOfflineSorter extends LuceneTestCase {
     dir.close();
   }
 
+  // OfflineSorter should not call my BytesSequencesReader.next() again after it already returned null:
+  public void testOverNexting() throws Exception {
+    Directory dir = newDirectory();
+    IndexOutput out = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT);
+    try (ByteSequencesWriter w = new OfflineSorter.ByteSequencesWriter(out)) {
+      byte[] bytes = new byte[Integer.BYTES];
+      random().nextBytes(bytes);
+      w.write(bytes);
+      CodecUtil.writeFooter(out);
+    }
+
+    new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES) {
+      @Override
+      protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException {
+        ByteSequencesReader other = super.getReader(in, name);
+
+        return new ByteSequencesReader(in, name) {
+
+          private boolean alreadyEnded;
+              
+          @Override
+          public BytesRef next() throws IOException {
+            // if we returned null already, OfflineSorter should not call next() again
+            assertFalse(alreadyEnded);
+            BytesRef result = other.next();
+            if (result == null) {
+              alreadyEnded = true;
+            }
+            return result;
+          }
+
+          @Override
+          public void close() throws IOException {
+            other.close();
+          }
+        };
+      }
+    }.sort(out.getName());
+    dir.close();
+  }
+
   public void testInvalidFixedLength() throws Exception {
     IllegalArgumentException e;
     e = expectThrows(IllegalArgumentException.class,

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/grouping/src/java/org/apache/lucene/search/grouping/AllGroupHeadsCollector.java
----------------------------------------------------------------------
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/AllGroupHeadsCollector.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/AllGroupHeadsCollector.java
index b5fbdc3..503b952 100644
--- a/lucene/grouping/src/java/org/apache/lucene/search/grouping/AllGroupHeadsCollector.java
+++ b/lucene/grouping/src/java/org/apache/lucene/search/grouping/AllGroupHeadsCollector.java
@@ -18,27 +18,62 @@ package org.apache.lucene.search.grouping;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.FieldComparator;
+import org.apache.lucene.search.LeafFieldComparator;
+import org.apache.lucene.search.Scorer;
 import org.apache.lucene.search.SimpleCollector;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
 import org.apache.lucene.util.FixedBitSet;
 
 /**
- * This collector specializes in collecting the most relevant document (group head) for each group that match the query.
+ * This collector specializes in collecting the most relevant document (group head) for each
+ * group that matches the query.
+ *
+ * Clients should create new collectors by calling {@link #newCollector(GroupSelector, Sort)}
  *
  * @lucene.experimental
  */
 @SuppressWarnings({"unchecked","rawtypes"})
 public abstract class AllGroupHeadsCollector<T> extends SimpleCollector {
 
+  private final GroupSelector<T> groupSelector;
+  protected final Sort sort;
+
   protected final int[] reversed;
   protected final int compIDXEnd;
-  protected final TemporalResult temporalResult;
 
-  protected AllGroupHeadsCollector(int numberOfSorts) {
-    this.reversed = new int[numberOfSorts];
-    this.compIDXEnd = numberOfSorts - 1;
-    temporalResult = new TemporalResult();
+  protected Map<T, GroupHead<T>> heads = new HashMap<>();
+
+  protected LeafReaderContext context;
+  protected Scorer scorer;
+
+  /**
+   * Create a new AllGroupHeadsCollector based on the type of within-group Sort required
+   * @param selector a GroupSelector to define the groups
+   * @param sort     the within-group sort to use to choose the group head document
+   * @param <T>      the group value type
+   */
+  public static <T> AllGroupHeadsCollector<T> newCollector(GroupSelector<T> selector, Sort sort) {
+    if (sort.equals(Sort.RELEVANCE))
+      return new ScoringGroupHeadsCollector<>(selector, sort);
+    return new SortingGroupHeadsCollector<>(selector, sort);
+  }
+
+  private AllGroupHeadsCollector(GroupSelector<T> selector, Sort sort) {
+    this.groupSelector = selector;
+    this.sort = sort;
+    this.reversed = new int[sort.getSort().length];
+    final SortField[] sortFields = sort.getSort();
+    for (int i = 0; i < sortFields.length; i++) {
+      reversed[i] = sortFields[i].getReverse() ? -1 : 1;
+    }
+    this.compIDXEnd = this.reversed.length - 1;
   }
 
   /**
@@ -79,34 +114,27 @@ public abstract class AllGroupHeadsCollector<T> extends SimpleCollector {
   }
 
   /**
-   * Returns the group head and puts it into {@link #temporalResult}.
-   * If the group head wasn't encountered before then it will be added to the collected group heads.
-   * <p>
-   * The {@link TemporalResult#stop} property will be <code>true</code> if the group head wasn't encountered before
-   * otherwise <code>false</code>.
-   *
-   * @param doc The document to retrieve the group head for.
-   * @throws IOException If I/O related errors occur
-   */
-  protected abstract void retrieveGroupHeadAndAddIfNotExist(int doc) throws IOException;
-
-  /**
    * Returns the collected group heads.
    * Subsequent calls should return the same group heads.
    *
    * @return the collected group heads
    */
-  protected abstract Collection<? extends GroupHead<T>> getCollectedGroupHeads();
+  protected Collection<? extends GroupHead<T>> getCollectedGroupHeads() {
+    return heads.values();
+  }
 
   @Override
   public void collect(int doc) throws IOException {
-    retrieveGroupHeadAndAddIfNotExist(doc);
-    if (temporalResult.stop) {
+    groupSelector.advanceTo(doc);
+    T groupValue = groupSelector.currentValue();
+    if (heads.containsKey(groupValue) == false) {
+      groupValue = groupSelector.copyValue();
+      heads.put(groupValue, newGroupHead(doc, groupValue, context, scorer));
       return;
     }
-    GroupHead<T> groupHead = temporalResult.groupHead;
 
-    // Ok now we need to check if the current doc is more relevant then current doc for this group
+    GroupHead<T> groupHead = heads.get(groupValue);
+    // Ok now we need to check if the current doc is more relevant than top doc for this group
     for (int compIDX = 0; ; compIDX++) {
       final int c = reversed[compIDX] * groupHead.compare(compIDX, doc);
       if (c < 0) {
@@ -125,18 +153,34 @@ public abstract class AllGroupHeadsCollector<T> extends SimpleCollector {
     groupHead.updateDocHead(doc);
   }
 
-  /**
-   * Contains the result of group head retrieval.
-   * To prevent new object creations of this class for every collect.
-   */
-  protected class TemporalResult {
+  @Override
+  public boolean needsScores() {
+    return sort.needsScores();
+  }
 
-    public GroupHead<T> groupHead;
-    public boolean stop;
+  @Override
+  protected void doSetNextReader(LeafReaderContext context) throws IOException {
+    groupSelector.setNextReader(context);
+    this.context = context;
+    for (GroupHead<T> head : heads.values()) {
+      head.setNextReader(context);
+    }
+  }
 
+  @Override
+  public void setScorer(Scorer scorer) throws IOException {
+    this.scorer = scorer;
+    for (GroupHead<T> head : heads.values()) {
+      head.setScorer(scorer);
+    }
   }
 
   /**
+   * Create a new GroupHead for the given group value, initialized with a doc, context and scorer
+   */
+  protected abstract GroupHead<T> newGroupHead(int doc, T value, LeafReaderContext context, Scorer scorer) throws IOException;
+
+  /**
    * Represents a group head. A group head is the most relevant document for a particular group.
    * The relevancy is based is usually based on the sort.
    *
@@ -147,12 +191,30 @@ public abstract class AllGroupHeadsCollector<T> extends SimpleCollector {
     public final T groupValue;
     public int doc;
 
-    protected GroupHead(T groupValue, int doc) {
+    protected int docBase;
+
+    /**
+     * Create a new GroupHead for the given value
+     */
+    protected GroupHead(T groupValue, int doc, int docBase) {
       this.groupValue = groupValue;
-      this.doc = doc;
+      this.doc = doc + docBase;
+      this.docBase = docBase;
+    }
+
+    /**
+     * Called for each segment
+     */
+    protected void setNextReader(LeafReaderContext ctx) throws IOException {
+      this.docBase = ctx.docBase;
     }
 
     /**
+     * Called for each segment
+     */
+    protected abstract void setScorer(Scorer scorer) throws IOException;
+
+    /**
      * Compares the specified document for a specified comparator against the current most relevant document.
      *
      * @param compIDX The comparator index of the specified comparator.
@@ -173,4 +235,117 @@ public abstract class AllGroupHeadsCollector<T> extends SimpleCollector {
 
   }
 
+  /**
+   * General implementation using a {@link FieldComparator} to select the group head
+   */
+  private static class SortingGroupHeadsCollector<T> extends AllGroupHeadsCollector<T> {
+
+    protected SortingGroupHeadsCollector(GroupSelector<T> selector, Sort sort) {
+      super(selector, sort);
+    }
+
+    @Override
+    protected GroupHead<T> newGroupHead(int doc, T value, LeafReaderContext ctx, Scorer scorer) throws IOException {
+      return new SortingGroupHead<>(sort, value, doc, ctx, scorer);
+    }
+  }
+
+  private static class SortingGroupHead<T> extends GroupHead<T> {
+
+    final FieldComparator[] comparators;
+    final LeafFieldComparator[] leafComparators;
+
+    protected SortingGroupHead(Sort sort, T groupValue, int doc, LeafReaderContext context, Scorer scorer) throws IOException {
+      super(groupValue, doc, context.docBase);
+      final SortField[] sortFields = sort.getSort();
+      comparators = new FieldComparator[sortFields.length];
+      leafComparators = new LeafFieldComparator[sortFields.length];
+      for (int i = 0; i < sortFields.length; i++) {
+        comparators[i] = sortFields[i].getComparator(1, i);
+        leafComparators[i] = comparators[i].getLeafComparator(context);
+        leafComparators[i].setScorer(scorer);
+        leafComparators[i].copy(0, doc);
+        leafComparators[i].setBottom(0);
+      }
+    }
+
+    @Override
+    public void setNextReader(LeafReaderContext ctx) throws IOException {
+      super.setNextReader(ctx);
+      for (int i = 0; i < comparators.length; i++) {
+        leafComparators[i] = comparators[i].getLeafComparator(ctx);
+      }
+    }
+
+    @Override
+    protected void setScorer(Scorer scorer) throws IOException {
+      for (LeafFieldComparator c : leafComparators) {
+        c.setScorer(scorer);
+      }
+    }
+
+    @Override
+    public int compare(int compIDX, int doc) throws IOException {
+      return leafComparators[compIDX].compareBottom(doc);
+    }
+
+    @Override
+    public void updateDocHead(int doc) throws IOException {
+      for (LeafFieldComparator comparator : leafComparators) {
+        comparator.copy(0, doc);
+        comparator.setBottom(0);
+      }
+      this.doc = doc + docBase;
+    }
+  }
+
+  /**
+   * Specialized implementation for sorting by score
+   */
+  private static class ScoringGroupHeadsCollector<T> extends AllGroupHeadsCollector<T> {
+
+    protected ScoringGroupHeadsCollector(GroupSelector<T> selector, Sort sort) {
+      super(selector, sort);
+    }
+
+    @Override
+    protected GroupHead<T> newGroupHead(int doc, T value, LeafReaderContext context, Scorer scorer) throws IOException {
+      return new ScoringGroupHead<>(scorer, value, doc, context.docBase);
+    }
+  }
+
+  private static class ScoringGroupHead<T> extends GroupHead<T> {
+
+    private Scorer scorer;
+    private float topScore;
+
+    protected ScoringGroupHead(Scorer scorer, T groupValue, int doc, int docBase) throws IOException {
+      super(groupValue, doc, docBase);
+      assert scorer.docID() == doc;
+      this.scorer = scorer;
+      this.topScore = scorer.score();
+    }
+
+    @Override
+    protected void setScorer(Scorer scorer) {
+      this.scorer = scorer;
+    }
+
+    @Override
+    protected int compare(int compIDX, int doc) throws IOException {
+      assert scorer.docID() == doc;
+      assert compIDX == 0;
+      float score = scorer.score();
+      int c = Float.compare(score, topScore);
+      if (c > 0)
+        topScore = score;
+      return c;
+    }
+
+    @Override
+    protected void updateDocHead(int doc) throws IOException {
+      this.doc = doc + docBase;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/grouping/src/java/org/apache/lucene/search/grouping/AllGroupsCollector.java
----------------------------------------------------------------------
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/AllGroupsCollector.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/AllGroupsCollector.java
index af697af..8434534 100644
--- a/lucene/grouping/src/java/org/apache/lucene/search/grouping/AllGroupsCollector.java
+++ b/lucene/grouping/src/java/org/apache/lucene/search/grouping/AllGroupsCollector.java
@@ -18,23 +18,34 @@ package org.apache.lucene.search.grouping;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
+import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.search.Scorer;
 import org.apache.lucene.search.SimpleCollector;
-import org.apache.lucene.util.BytesRef;
 
 /**
  * A collector that collects all groups that match the
  * query. Only the group value is collected, and the order
  * is undefined.  This collector does not determine
  * the most relevant document of a group.
- * <p>
- * This is an abstract version. Concrete implementations define
- * what a group actually is and how it is internally collected.
  *
  * @lucene.experimental
  */
-public abstract class AllGroupsCollector<T> extends SimpleCollector {
+public class AllGroupsCollector<T> extends SimpleCollector {
+
+  private final GroupSelector<T> groupSelector;
+
+  private final Set<T> groups = new HashSet<T>();
+
+  /**
+   * Create a new AllGroupsCollector
+   * @param groupSelector the GroupSelector to determine groups
+   */
+  public AllGroupsCollector(GroupSelector<T> groupSelector) {
+    this.groupSelector = groupSelector;
+  }
 
   /**
    * Returns the total number of groups for the executed search.
@@ -49,18 +60,31 @@ public abstract class AllGroupsCollector<T> extends SimpleCollector {
   /**
    * Returns the group values
    * <p>
-   * This is an unordered collections of group values. For each group that matched the query there is a {@link BytesRef}
-   * representing a group value.
+   * This is an unordered collections of group values.
    *
    * @return the group values
    */
-  public abstract Collection<T> getGroups();
+  public Collection<T> getGroups() {
+    return groups;
+  }
 
-  // Empty not necessary
   @Override
   public void setScorer(Scorer scorer) throws IOException {}
 
   @Override
+  protected void doSetNextReader(LeafReaderContext context) throws IOException {
+    groupSelector.setNextReader(context);
+  }
+
+  @Override
+  public void collect(int doc) throws IOException {
+    groupSelector.advanceTo(doc);
+    if (groups.contains(groupSelector.currentValue()))
+      return;
+    groups.add(groupSelector.copyValue());
+  }
+
+  @Override
   public boolean needsScores() {
     return false; // the result is unaffected by relevancy
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollector.java
----------------------------------------------------------------------
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollector.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollector.java
index c965042..a50fda1 100644
--- a/lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollector.java
+++ b/lucene/grouping/src/java/org/apache/lucene/search/grouping/BlockGroupingCollector.java
@@ -50,7 +50,7 @@ import org.apache.lucene.util.PriorityQueue;
  *  being that the documents in each group must always be
  *  indexed as a block.  This collector also fills in
  *  TopGroups.totalGroupCount without requiring the separate
- *  {@link org.apache.lucene.search.grouping.term.TermAllGroupsCollector}.  However, this collector does
+ *  {@link org.apache.lucene.search.grouping.AllGroupsCollector}.  However, this collector does
  *  not fill in the groupValue of each group; this field
  *  will always be null.
  *

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/grouping/src/java/org/apache/lucene/search/grouping/DistinctValuesCollector.java
----------------------------------------------------------------------
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/DistinctValuesCollector.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/DistinctValuesCollector.java
index 54d752c..103b0d2 100644
--- a/lucene/grouping/src/java/org/apache/lucene/search/grouping/DistinctValuesCollector.java
+++ b/lucene/grouping/src/java/org/apache/lucene/search/grouping/DistinctValuesCollector.java
@@ -16,10 +16,14 @@
  */
 package org.apache.lucene.search.grouping;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.search.SimpleCollector;
 
 /**
@@ -27,33 +31,99 @@ import org.apache.lucene.search.SimpleCollector;
  *
  * @lucene.experimental
  */
-public abstract class DistinctValuesCollector<T> extends SimpleCollector {
+public class DistinctValuesCollector<T, R> extends SecondPassGroupingCollector<T> {
+
+  /**
+   * Create a DistinctValuesCollector
+   * @param groupSelector the group selector to determine the top-level groups
+   * @param groups        the top-level groups to collect for
+   * @param valueSelector a group selector to determine which values to collect per-group
+   */
+  public DistinctValuesCollector(GroupSelector<T> groupSelector, Collection<SearchGroup<T>> groups,
+                                       GroupSelector<R> valueSelector) {
+    super(groupSelector, groups, new DistinctValuesReducer<>(valueSelector));
+  }
+
+  private static class ValuesCollector<R> extends SimpleCollector {
+
+    final GroupSelector<R> valueSelector;
+    final Set<R> values = new HashSet<>();
+
+    private ValuesCollector(GroupSelector<R> valueSelector) {
+      this.valueSelector = valueSelector;
+    }
+
+    @Override
+    public void collect(int doc) throws IOException {
+      if (valueSelector.advanceTo(doc) == GroupSelector.State.ACCEPT) {
+        R value = valueSelector.currentValue();
+        if (values.contains(value) == false)
+          values.add(valueSelector.copyValue());
+      }
+      else {
+        if (values.contains(null) == false)
+          values.add(null);
+      }
+    }
+
+    @Override
+    protected void doSetNextReader(LeafReaderContext context) throws IOException {
+      valueSelector.setNextReader(context);
+    }
+
+    @Override
+    public boolean needsScores() {
+      return false;
+    }
+  }
+
+  private static class DistinctValuesReducer<T, R> extends GroupReducer<T, ValuesCollector<R>> {
+
+    final GroupSelector<R> valueSelector;
+
+    private DistinctValuesReducer(GroupSelector<R> valueSelector) {
+      this.valueSelector = valueSelector;
+    }
+
+    @Override
+    public boolean needsScores() {
+      return false;
+    }
+
+    @Override
+    protected ValuesCollector<R> newCollector() {
+      return new ValuesCollector<>(valueSelector);
+    }
+  }
 
   /**
    * Returns all unique values for each top N group.
    *
    * @return all unique values for each top N group
    */
-  public abstract List<GroupCount<T>> getGroups();
+  public List<GroupCount<T, R>> getGroups() {
+    List<GroupCount<T, R>> counts = new ArrayList<>();
+    for (SearchGroup<T> group : groups) {
+      @SuppressWarnings("unchecked")
+      ValuesCollector<R> vc = (ValuesCollector<R>) groupReducer.getCollector(group.groupValue);
+      counts.add(new GroupCount<>(group.groupValue, vc.values));
+    }
+    return counts;
+  }
 
   /**
    * Returned by {@link DistinctValuesCollector#getGroups()},
    * representing the value and set of distinct values for the group.
    */
-  public static class GroupCount<T> {
+  public static class GroupCount<T, R> {
 
     public final T groupValue;
-    public final Set<T> uniqueValues;
+    public final Set<R> uniqueValues;
 
-    public GroupCount(T groupValue) {
+    public GroupCount(T groupValue, Set<R> values) {
       this.groupValue = groupValue;
-      this.uniqueValues = new HashSet<>();
+      this.uniqueValues = values;
     }
   }
 
-  @Override
-  public boolean needsScores() {
-    return false; // not needed to fetch all values
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/grouping/src/java/org/apache/lucene/search/grouping/FirstPassGroupingCollector.java
----------------------------------------------------------------------
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/FirstPassGroupingCollector.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/FirstPassGroupingCollector.java
index 02bb1a2..bd47adb 100644
--- a/lucene/grouping/src/java/org/apache/lucene/search/grouping/FirstPassGroupingCollector.java
+++ b/lucene/grouping/src/java/org/apache/lucene/search/grouping/FirstPassGroupingCollector.java
@@ -33,15 +33,16 @@ import org.apache.lucene.search.SortField;
 
 /** FirstPassGroupingCollector is the first of two passes necessary
  *  to collect grouped hits.  This pass gathers the top N sorted
- *  groups. Concrete subclasses define what a group is and how it
- *  is internally collected.
+ *  groups. Groups are defined by a {@link GroupSelector}
  *
  *  <p>See {@link org.apache.lucene.search.grouping} for more
  *  details including a full code example.</p>
  *
  * @lucene.experimental
  */
-abstract public class FirstPassGroupingCollector<T> extends SimpleCollector {
+public class FirstPassGroupingCollector<T> extends SimpleCollector {
+
+  private final GroupSelector<T> groupSelector;
 
   private final FieldComparator<?>[] comparators;
   private final LeafFieldComparator[] leafComparators;
@@ -60,16 +61,18 @@ abstract public class FirstPassGroupingCollector<T> extends SimpleCollector {
   /**
    * Create the first pass collector.
    *
-   *  @param groupSort The {@link Sort} used to sort the
+   * @param groupSelector a GroupSelector used to defined groups
+   * @param groupSort The {@link Sort} used to sort the
    *    groups.  The top sorted document within each group
    *    according to groupSort, determines how that group
    *    sorts against other groups.  This must be non-null,
    *    ie, if you want to groupSort by relevance use
    *    Sort.RELEVANCE.
-   *  @param topNGroups How many top groups to keep.
+   * @param topNGroups How many top groups to keep.
    */
   @SuppressWarnings({"unchecked", "rawtypes"})
-  public FirstPassGroupingCollector(Sort groupSort, int topNGroups) {
+  public FirstPassGroupingCollector(GroupSelector<T> groupSelector, Sort groupSort, int topNGroups) {
+    this.groupSelector = groupSelector;
     if (topNGroups < 1) {
       throw new IllegalArgumentException("topNGroups must be >= 1 (got " + topNGroups + ")");
     }
@@ -133,7 +136,7 @@ abstract public class FirstPassGroupingCollector<T> extends SimpleCollector {
       if (upto++ < groupOffset) {
         continue;
       }
-      //System.out.println("  group=" + (group.groupValue == null ? "null" : group.groupValue.utf8ToString()));
+      // System.out.println("  group=" + (group.groupValue == null ? "null" : group.groupValue.toString()));
       SearchGroup<T> searchGroup = new SearchGroup<>();
       searchGroup.groupValue = group.groupValue;
       if (fillFields) {
@@ -155,14 +158,11 @@ abstract public class FirstPassGroupingCollector<T> extends SimpleCollector {
     }
   }
 
-  @Override
-  public void collect(int doc) throws IOException {
-    //System.out.println("FP.collect doc=" + doc);
-
+  private boolean isCompetitive(int doc) throws IOException {
     // If orderedGroups != null we already have collected N groups and
     // can short circuit by comparing this document to the bottom group,
     // without having to find what group this document belongs to.
-    
+
     // Even if this document belongs to a group in the top N, we'll know that
     // we don't have to update that group.
 
@@ -173,7 +173,7 @@ abstract public class FirstPassGroupingCollector<T> extends SimpleCollector {
         final int c = reversed[compIDX] * leafComparators[compIDX].compareBottom(doc);
         if (c < 0) {
           // Definitely not competitive. So don't even bother to continue
-          return;
+          return false;
         } else if (c > 0) {
           // Definitely competitive.
           break;
@@ -181,15 +181,24 @@ abstract public class FirstPassGroupingCollector<T> extends SimpleCollector {
           // Here c=0. If we're at the last comparator, this doc is not
           // competitive, since docs are visited in doc Id order, which means
           // this doc cannot compete with any other document in the queue.
-          return;
+          return false;
         }
       }
     }
+    return true;
+  }
+
+  @Override
+  public void collect(int doc) throws IOException {
+
+    if (isCompetitive(doc) == false)
+      return;
 
     // TODO: should we add option to mean "ignore docs that
     // don't have the group field" (instead of stuffing them
     // under null group)?
-    final T groupValue = getDocGroupValue(doc);
+    groupSelector.advanceTo(doc);
+    T groupValue = groupSelector.currentValue();
 
     final CollectedSearchGroup<T> group = groupMap.get(groupValue);
 
@@ -207,7 +216,7 @@ abstract public class FirstPassGroupingCollector<T> extends SimpleCollector {
 
         // Add a new CollectedSearchGroup:
         CollectedSearchGroup<T> sg = new CollectedSearchGroup<>();
-        sg.groupValue = copyDocGroupValue(groupValue, null);
+        sg.groupValue = groupSelector.copyValue();
         sg.comparatorSlot = groupMap.size();
         sg.topDoc = docBase + doc;
         for (LeafFieldComparator fc : leafComparators) {
@@ -233,7 +242,7 @@ abstract public class FirstPassGroupingCollector<T> extends SimpleCollector {
       groupMap.remove(bottomGroup.groupValue);
 
       // reuse the removed CollectedSearchGroup
-      bottomGroup.groupValue = copyDocGroupValue(groupValue, bottomGroup.groupValue);
+      bottomGroup.groupValue = groupSelector.copyValue();
       bottomGroup.topDoc = docBase + doc;
 
       for (LeafFieldComparator fc : leafComparators) {
@@ -338,25 +347,15 @@ abstract public class FirstPassGroupingCollector<T> extends SimpleCollector {
     for (int i=0; i<comparators.length; i++) {
       leafComparators[i] = comparators[i].getLeafComparator(readerContext);
     }
+    groupSelector.setNextReader(readerContext);
   }
 
   /**
-   * Returns the group value for the specified doc.
-   *
-   * @param doc The specified doc
-   * @return the group value for the specified doc
-   */
-  protected abstract T getDocGroupValue(int doc) throws IOException;
-
-  /**
-   * Returns a copy of the specified group value by creating a new instance and copying the value from the specified
-   * groupValue in the new instance. Or optionally the reuse argument can be used to copy the group value in.
-   *
-   * @param groupValue The group value to copy
-   * @param reuse Optionally a reuse instance to prevent a new instance creation
-   * @return a copy of the specified group value
+   * @return the GroupSelector used for this Collector
    */
-  protected abstract T copyDocGroupValue(T groupValue, T reuse);
+  public GroupSelector<T> getGroupSelector() {
+    return groupSelector;
+  }
 
 }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupReducer.java
----------------------------------------------------------------------
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupReducer.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupReducer.java
new file mode 100644
index 0000000..4366e91
--- /dev/null
+++ b/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupReducer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.Scorer;
+
+/**
+ * Concrete implementations of this class define what to collect for individual
+ * groups during the second-pass of a grouping search.
+ *
+ * Each group is assigned a Collector returned by {@link #newCollector()}, and
+ * {@link LeafCollector#collect(int)} is called for each document that is in
+ * a group
+ *
+ * @see SecondPassGroupingCollector
+ *
+ * @param <T> the type of the value used for grouping
+ * @param <C> the type of {@link Collector} used to reduce each group
+ */
+public abstract class GroupReducer<T, C extends Collector> {
+
+  private final Map<T, GroupCollector<C>> groups = new HashMap<>();
+
+  /**
+   * Define which groups should be reduced.
+   *
+   * Called by {@link SecondPassGroupingCollector}
+   */
+  public void setGroups(Collection<SearchGroup<T>> groups) {
+    for (SearchGroup<T> group : groups) {
+      this.groups.put(group.groupValue, new GroupCollector<>(newCollector()));
+    }
+  }
+
+  /**
+   * Whether or not this reducer requires collected documents to be scored
+   */
+  public abstract boolean needsScores();
+
+  /**
+   * Creates a new Collector for each group
+   */
+  protected abstract C newCollector();
+
+  /**
+   * Get the Collector for a given group
+   */
+  public final C getCollector(T value) {
+    return groups.get(value).collector;
+  }
+
+  /**
+   * Collect a given document into a given group
+   * @throws IOException on error
+   */
+  public final void collect(T value, int doc) throws IOException {
+    GroupCollector<C> collector = groups.get(value);
+    collector.leafCollector.collect(doc);
+  }
+
+  /**
+   * Set the Scorer on all group collectors
+   */
+  public final void setScorer(Scorer scorer) throws IOException {
+    for (GroupCollector<C> collector : groups.values()) {
+      collector.leafCollector.setScorer(scorer);
+    }
+  }
+
+  /**
+   * Called when the parent {@link SecondPassGroupingCollector} moves to a new segment
+   */
+  public final void setNextReader(LeafReaderContext ctx) throws IOException {
+    for (GroupCollector<C> collector : groups.values()) {
+      collector.leafCollector = collector.collector.getLeafCollector(ctx);
+    }
+  }
+
+  private static final class GroupCollector<C extends Collector> {
+
+    final C collector;
+    LeafCollector leafCollector;
+
+    private GroupCollector(C collector) {
+      this.collector = collector;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupSelector.java
----------------------------------------------------------------------
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupSelector.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupSelector.java
new file mode 100644
index 0000000..dbb0932
--- /dev/null
+++ b/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupSelector.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.lucene.index.LeafReaderContext;
+
+/**
+ * Defines a group, for use by grouping collectors
+ *
+ * A GroupSelector acts as an iterator over documents.  For each segment, clients
+ * should call {@link #setNextReader(LeafReaderContext)}, and then {@link #advanceTo(int)}
+ * for each matching document.
+ *
+ * @param <T> the type of the group value
+ */
+public abstract class GroupSelector<T> {
+
+  /**
+   * What to do with the current value
+   */
+  public enum State { SKIP, ACCEPT }
+
+  /**
+   * Set the LeafReaderContext
+   */
+  public abstract void setNextReader(LeafReaderContext readerContext) throws IOException;
+
+  /**
+   * Advance the GroupSelector's iterator to the given document
+   */
+  public abstract State advanceTo(int doc) throws IOException;
+
+  /**
+   * Get the group value of the current document
+   *
+   * N.B. this object may be reused, for a persistent version use {@link #copyValue()}
+   */
+  public abstract T currentValue();
+
+  /**
+   * @return a copy of the group value of the current document
+   */
+  public abstract T copyValue();
+
+  /**
+   * Set a restriction on the group values returned by this selector
+   *
+   * If the selector is positioned on a document whose group value is not contained
+   * within this set, then {@link #advanceTo(int)} will return {@link State#SKIP}
+   *
+   * @param groups a set of {@link SearchGroup} objects to limit selections to
+   */
+  public abstract void setGroups(Collection<SearchGroup<T>> groups);
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/grouping/src/java/org/apache/lucene/search/grouping/Grouper.java
----------------------------------------------------------------------
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/Grouper.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/Grouper.java
deleted file mode 100644
index 2ff79a1..0000000
--- a/lucene/grouping/src/java/org/apache/lucene/search/grouping/Grouper.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.lucene.search.grouping;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.lucene.search.Sort;
-
-/**
- * A factory object to create first and second-pass collectors, run by a {@link GroupingSearch}
- * @param <T> the type the group value
- */
-public abstract class Grouper<T> {
-
-  /**
-   * Create a first-pass collector
-   * @param sort  the order in which groups should be returned
-   * @param count how many groups to return
-   */
-  public abstract FirstPassGroupingCollector<T> getFirstPassCollector(Sort sort, int count) throws IOException;
-
-  /**
-   * Create an {@link AllGroupsCollector}
-   */
-  public abstract AllGroupsCollector<T> getAllGroupsCollector();
-
-  /**
-   * Create an {@link AllGroupHeadsCollector}
-   * @param sort a within-group sort order to determine which doc is the group head
-   */
-  public abstract AllGroupHeadsCollector<T> getGroupHeadsCollector(Sort sort);
-
-  /**
-   * Create a second-pass collector
-   */
-  public abstract SecondPassGroupingCollector<T> getSecondPassCollector(
-      Collection<SearchGroup<T>> groups, Sort groupSort, Sort withinGroupSort,
-      int maxDocsPerGroup, boolean getScores, boolean getMaxScores, boolean fillSortFields) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupingSearch.java
----------------------------------------------------------------------
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupingSearch.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupingSearch.java
index f4319d5..a36917d 100644
--- a/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupingSearch.java
+++ b/lucene/grouping/src/java/org/apache/lucene/search/grouping/GroupingSearch.java
@@ -30,8 +30,6 @@ import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.Weight;
-import org.apache.lucene.search.grouping.function.FunctionGrouper;
-import org.apache.lucene.search.grouping.term.TermGrouper;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.mutable.MutableValue;
@@ -43,7 +41,7 @@ import org.apache.lucene.util.mutable.MutableValue;
  */
 public class GroupingSearch {
 
-  private final Grouper grouper;
+  private final GroupSelector grouper;
   private final Query groupEndDocs;
 
   private Sort groupSort = Sort.RELEVANCE;
@@ -71,11 +69,7 @@ public class GroupingSearch {
    * @param groupField The name of the field to group by.
    */
   public GroupingSearch(String groupField) {
-    this(new TermGrouper(groupField, 128), null);
-  }
-
-  public GroupingSearch(String groupField, int initialSize) {
-    this(new TermGrouper(groupField, initialSize), null);
+    this(new TermGroupSelector(groupField), null);
   }
 
   /**
@@ -86,7 +80,7 @@ public class GroupingSearch {
    * @param valueSourceContext The context of the specified groupFunction
    */
   public GroupingSearch(ValueSource groupFunction, Map<?, ?> valueSourceContext) {
-    this(new FunctionGrouper(groupFunction, valueSourceContext), null);
+    this(new ValueSourceGroupSelector(groupFunction, valueSourceContext), null);
   }
 
   /**
@@ -99,7 +93,7 @@ public class GroupingSearch {
     this(null, groupEndDocs);
   }
 
-  private GroupingSearch(Grouper grouper, Query groupEndDocs) {
+  private GroupingSearch(GroupSelector grouper, Query groupEndDocs) {
     this.grouper = grouper;
     this.groupEndDocs = groupEndDocs;
   }
@@ -129,10 +123,10 @@ public class GroupingSearch {
   protected TopGroups groupByFieldOrFunction(IndexSearcher searcher, Query query, int groupOffset, int groupLimit) throws IOException {
     int topN = groupOffset + groupLimit;
 
-    final FirstPassGroupingCollector firstPassCollector = grouper.getFirstPassCollector(groupSort, topN);
-    final AllGroupsCollector allGroupsCollector = allGroups ? grouper.getAllGroupsCollector() : null;
+    final FirstPassGroupingCollector firstPassCollector = new FirstPassGroupingCollector(grouper, groupSort, topN);
+    final AllGroupsCollector allGroupsCollector = allGroups ? new AllGroupsCollector(grouper) : null;
     final AllGroupHeadsCollector allGroupHeadsCollector
-        = allGroupHeads ? grouper.getGroupHeadsCollector(sortWithinGroup) : null;
+        = allGroupHeads ? AllGroupHeadsCollector.newCollector(grouper, sortWithinGroup) : null;
 
     final Collector firstRound = MultiCollector.wrap(firstPassCollector, allGroupsCollector, allGroupHeadsCollector);
 
@@ -158,8 +152,8 @@ public class GroupingSearch {
     }
 
     int topNInsideGroup = groupDocsOffset + groupDocsLimit;
-    SecondPassGroupingCollector secondPassCollector
-        = grouper.getSecondPassCollector(topSearchGroups, groupSort, sortWithinGroup, topNInsideGroup,
+    TopGroupsCollector secondPassCollector
+        = new TopGroupsCollector(grouper, topSearchGroups, groupSort, sortWithinGroup, topNInsideGroup,
                                          includeScores, includeMaxScore, fillSortFields);
 
     if (cachedCollector != null && cachedCollector.isCached()) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/grouping/src/java/org/apache/lucene/search/grouping/SecondPassGroupingCollector.java
----------------------------------------------------------------------
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/SecondPassGroupingCollector.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/SecondPassGroupingCollector.java
index f8feb75..c54c8ee 100644
--- a/lucene/grouping/src/java/org/apache/lucene/search/grouping/SecondPassGroupingCollector.java
+++ b/lucene/grouping/src/java/org/apache/lucene/search/grouping/SecondPassGroupingCollector.java
@@ -18,152 +18,82 @@ package org.apache.lucene.search.grouping;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Objects;
 
 import org.apache.lucene.index.LeafReaderContext;
-import org.apache.lucene.search.LeafCollector;
 import org.apache.lucene.search.Scorer;
 import org.apache.lucene.search.SimpleCollector;
-import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.search.TopDocsCollector;
-import org.apache.lucene.search.TopFieldCollector;
-import org.apache.lucene.search.TopScoreDocCollector;
 
 /**
- * SecondPassGroupingCollector is the second of two passes
- * necessary to collect grouped docs.  This pass gathers the
- * top N documents per top group computed from the
- * first pass. Concrete subclasses define what a group is and how it
- * is internally collected.
+ * SecondPassGroupingCollector runs over an already collected set of
+ * groups, further applying a {@link GroupReducer} to each group
  *
- * <p>See {@link org.apache.lucene.search.grouping} for more
- * details including a full code example.</p>
+ * @see TopGroupsCollector
+ * @see DistinctValuesCollector
  *
  * @lucene.experimental
  */
-public abstract class SecondPassGroupingCollector<T> extends SimpleCollector {
+public class SecondPassGroupingCollector<T> extends SimpleCollector {
 
-  private final Collection<SearchGroup<T>> groups;
-  private final Sort groupSort;
-  private final Sort withinGroupSort;
-  private final int maxDocsPerGroup;
-  private final boolean needsScores;
-  protected final Map<T, SearchGroupDocs<T>> groupMap;
+  protected final GroupSelector<T> groupSelector;
+  protected final Collection<SearchGroup<T>> groups;
+  protected final GroupReducer<T, ?> groupReducer;
 
-  protected SearchGroupDocs<T>[] groupDocs;
+  protected int totalHitCount;
+  protected int totalGroupedHitCount;
 
-  private int totalHitCount;
-  private int totalGroupedHitCount;
-
-  public SecondPassGroupingCollector(Collection<SearchGroup<T>> groups, Sort groupSort, Sort withinGroupSort,
-                                     int maxDocsPerGroup, boolean getScores, boolean getMaxScores, boolean fillSortFields)
-    throws IOException {
+  /**
+   * Create a new SecondPassGroupingCollector
+   * @param groupSelector   the GroupSelector that defines groups for this search
+   * @param groups          the groups to collect documents for
+   * @param reducer         the reducer to apply to each group
+   */
+  public SecondPassGroupingCollector(GroupSelector<T> groupSelector, Collection<SearchGroup<T>> groups, GroupReducer<T, ?> reducer) {
 
     //System.out.println("SP init");
     if (groups.isEmpty()) {
       throw new IllegalArgumentException("no groups to collect (groups is empty)");
     }
 
+    this.groupSelector = Objects.requireNonNull(groupSelector);
+    this.groupSelector.setGroups(groups);
+
     this.groups = Objects.requireNonNull(groups);
-    this.groupSort = Objects.requireNonNull(groupSort);
-    this.withinGroupSort = Objects.requireNonNull(withinGroupSort);
-    this.maxDocsPerGroup = maxDocsPerGroup;
-    this.needsScores = getScores || getMaxScores || withinGroupSort.needsScores();
+    this.groupReducer = reducer;
+    reducer.setGroups(groups);
+  }
 
-    this.groupMap = new HashMap<>(groups.size());
-    for (SearchGroup<T> group : groups) {
-      //System.out.println("  prep group=" + (group.groupValue == null ? "null" : group.groupValue.utf8ToString()));
-      final TopDocsCollector<?> collector;
-      if (withinGroupSort.equals(Sort.RELEVANCE)) { // optimize to use TopScoreDocCollector
-        // Sort by score
-        collector = TopScoreDocCollector.create(maxDocsPerGroup);
-      } else {
-        // Sort by fields
-        collector = TopFieldCollector.create(withinGroupSort, maxDocsPerGroup, fillSortFields, getScores, getMaxScores);
-      }
-      groupMap.put(group.groupValue, new SearchGroupDocs<>(group.groupValue, collector));
-    }
+  /**
+   * @return the GroupSelector used in this collector
+   */
+  public GroupSelector<T> getGroupSelector() {
+    return groupSelector;
   }
 
   @Override
   public boolean needsScores() {
-    return needsScores;
+    return groupReducer.needsScores();
   }
 
   @Override
   public void setScorer(Scorer scorer) throws IOException {
-    for (SearchGroupDocs<T> group : groupMap.values()) {
-      group.leafCollector.setScorer(scorer);
-    }
+    groupReducer.setScorer(scorer);
   }
 
   @Override
   public void collect(int doc) throws IOException {
     totalHitCount++;
-    SearchGroupDocs<T> group = retrieveGroup(doc);
-    if (group != null) {
-      totalGroupedHitCount++;
-      group.leafCollector.collect(doc);
-    }
+    if (groupSelector.advanceTo(doc) == GroupSelector.State.SKIP)
+      return;
+    totalGroupedHitCount++;
+    T value = groupSelector.currentValue();
+    groupReducer.collect(value, doc);
   }
 
-  /**
-   * Returns the group the specified doc belongs to or <code>null</code> if no group could be retrieved.
-   *
-   * @param doc The specified doc
-   * @return the group the specified doc belongs to or <code>null</code> if no group could be retrieved
-   * @throws IOException If an I/O related error occurred
-   */
-  protected abstract SearchGroupDocs<T> retrieveGroup(int doc) throws IOException;
-
   @Override
   protected void doSetNextReader(LeafReaderContext readerContext) throws IOException {
-    //System.out.println("SP.setNextReader");
-    for (SearchGroupDocs<T> group : groupMap.values()) {
-      group.leafCollector = group.collector.getLeafCollector(readerContext);
-    }
-  }
-
-  public TopGroups<T> getTopGroups(int withinGroupOffset) {
-    @SuppressWarnings({"unchecked","rawtypes"})
-    final GroupDocs<T>[] groupDocsResult = (GroupDocs<T>[]) new GroupDocs[groups.size()];
-
-    int groupIDX = 0;
-    float maxScore = Float.MIN_VALUE;
-    for(SearchGroup<?> group : groups) {
-      final SearchGroupDocs<T> groupDocs = groupMap.get(group.groupValue);
-      final TopDocs topDocs = groupDocs.collector.topDocs(withinGroupOffset, maxDocsPerGroup);
-      groupDocsResult[groupIDX++] = new GroupDocs<>(Float.NaN,
-                                                                    topDocs.getMaxScore(),
-                                                                    topDocs.totalHits,
-                                                                    topDocs.scoreDocs,
-                                                                    groupDocs.groupValue,
-                                                                    group.sortValues);
-      maxScore = Math.max(maxScore, topDocs.getMaxScore());
-    }
-
-    return new TopGroups<>(groupSort.getSort(),
-                                           withinGroupSort.getSort(),
-                                           totalHitCount, totalGroupedHitCount, groupDocsResult,
-                                           maxScore);
+    groupReducer.setNextReader(readerContext);
+    groupSelector.setNextReader(readerContext);
   }
 
-
-  // TODO: merge with SearchGroup or not?
-  // ad: don't need to build a new hashmap
-  // disad: blows up the size of SearchGroup if we need many of them, and couples implementations
-  public class SearchGroupDocs<T> {
-
-    public final T groupValue;
-    public final TopDocsCollector<?> collector;
-    public LeafCollector leafCollector;
-
-    public SearchGroupDocs(T groupValue, TopDocsCollector<?> collector) {
-      this.groupValue = groupValue;
-      this.collector = collector;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/grouping/src/java/org/apache/lucene/search/grouping/TermGroupFacetCollector.java
----------------------------------------------------------------------
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/TermGroupFacetCollector.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/TermGroupFacetCollector.java
new file mode 100644
index 0000000..39d28a5
--- /dev/null
+++ b/lucene/grouping/src/java/org/apache/lucene/search/grouping/TermGroupFacetCollector.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.index.DocValues;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.index.SortedSetDocValues;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefBuilder;
+import org.apache.lucene.util.SentinelIntSet;
+import org.apache.lucene.util.UnicodeUtil;
+
+/**
+ * An implementation of {@link GroupFacetCollector} that computes grouped facets based on the indexed terms
+ * from DocValues.
+ *
+ * @lucene.experimental
+ */
+public abstract class TermGroupFacetCollector extends GroupFacetCollector {
+
+  final List<GroupedFacetHit> groupedFacetHits;
+  final SentinelIntSet segmentGroupedFacetHits;
+
+  SortedDocValues groupFieldTermsIndex;
+
+  /**
+   * Factory method for creating the right implementation based on the fact whether the facet field contains
+   * multiple tokens per documents.
+   *
+   * @param groupField The group field
+   * @param facetField The facet field
+   * @param facetFieldMultivalued Whether the facet field has multiple tokens per document
+   * @param facetPrefix The facet prefix a facet entry should start with to be included.
+   * @param initialSize The initial allocation size of the internal int set and group facet list which should roughly
+   *                    match the total number of expected unique groups. Be aware that the heap usage is
+   *                    4 bytes * initialSize.
+   * @return <code>TermGroupFacetCollector</code> implementation
+   */
+  public static TermGroupFacetCollector createTermGroupFacetCollector(String groupField,
+                                                                      String facetField,
+                                                                      boolean facetFieldMultivalued,
+                                                                      BytesRef facetPrefix,
+                                                                      int initialSize) {
+    if (facetFieldMultivalued) {
+      return new MV(groupField, facetField, facetPrefix, initialSize);
+    } else {
+      return new SV(groupField, facetField, facetPrefix, initialSize);
+    }
+  }
+
+  TermGroupFacetCollector(String groupField, String facetField, BytesRef facetPrefix, int initialSize) {
+    super(groupField, facetField, facetPrefix);
+    groupedFacetHits = new ArrayList<>(initialSize);
+    segmentGroupedFacetHits = new SentinelIntSet(initialSize, Integer.MIN_VALUE);
+  }
+
+  // Implementation for single valued facet fields.
+  static class SV extends TermGroupFacetCollector {
+
+    private SortedDocValues facetFieldTermsIndex;
+
+    SV(String groupField, String facetField, BytesRef facetPrefix, int initialSize) {
+      super(groupField, facetField, facetPrefix, initialSize);
+    }
+
+    @Override
+    public void collect(int doc) throws IOException {
+      if (doc > facetFieldTermsIndex.docID()) {
+        facetFieldTermsIndex.advance(doc);
+      }
+
+      int facetOrd;
+      if (doc == facetFieldTermsIndex.docID()) {
+        facetOrd = facetFieldTermsIndex.ordValue();
+      } else {
+        facetOrd = -1;
+      }
+      
+      if (facetOrd < startFacetOrd || facetOrd >= endFacetOrd) {
+        return;
+      }
+
+      if (doc > groupFieldTermsIndex.docID()) {
+        groupFieldTermsIndex.advance(doc);
+      }
+
+      int groupOrd;
+      if (doc == groupFieldTermsIndex.docID()) {
+        groupOrd = groupFieldTermsIndex.ordValue();
+      } else {
+        groupOrd = -1;
+      }
+      int segmentGroupedFacetsIndex = groupOrd * (facetFieldTermsIndex.getValueCount()+1) + facetOrd;
+      if (segmentGroupedFacetHits.exists(segmentGroupedFacetsIndex)) {
+        return;
+      }
+
+      segmentTotalCount++;
+      segmentFacetCounts[facetOrd+1]++;
+
+      segmentGroupedFacetHits.put(segmentGroupedFacetsIndex);
+
+      BytesRef groupKey;
+      if (groupOrd == -1) {
+        groupKey = null;
+      } else {
+        groupKey = BytesRef.deepCopyOf(groupFieldTermsIndex.lookupOrd(groupOrd));
+      }
+
+      BytesRef facetKey;
+      if (facetOrd == -1) {
+        facetKey = null;
+      } else {
+        facetKey = BytesRef.deepCopyOf(facetFieldTermsIndex.lookupOrd(facetOrd));
+      }
+
+      groupedFacetHits.add(new GroupedFacetHit(groupKey, facetKey));
+    }
+
+    @Override
+    protected void doSetNextReader(LeafReaderContext context) throws IOException {
+      if (segmentFacetCounts != null) {
+        segmentResults.add(createSegmentResult());
+      }
+
+      groupFieldTermsIndex = DocValues.getSorted(context.reader(), groupField);
+      facetFieldTermsIndex = DocValues.getSorted(context.reader(), facetField);
+
+      // 1+ to allow for the -1 "not set":
+      segmentFacetCounts = new int[facetFieldTermsIndex.getValueCount()+1];
+      segmentTotalCount = 0;
+
+      segmentGroupedFacetHits.clear();
+      for (GroupedFacetHit groupedFacetHit : groupedFacetHits) {
+        int facetOrd = groupedFacetHit.facetValue == null ? -1 : facetFieldTermsIndex.lookupTerm(groupedFacetHit.facetValue);
+        if (groupedFacetHit.facetValue != null && facetOrd < 0) {
+          continue;
+        }
+
+        int groupOrd = groupedFacetHit.groupValue == null ? -1 : groupFieldTermsIndex.lookupTerm(groupedFacetHit.groupValue);
+        if (groupedFacetHit.groupValue != null && groupOrd < 0) {
+          continue;
+        }
+
+        int segmentGroupedFacetsIndex = groupOrd * (facetFieldTermsIndex.getValueCount()+1) + facetOrd;
+        segmentGroupedFacetHits.put(segmentGroupedFacetsIndex);
+      }
+
+      if (facetPrefix != null) {
+        startFacetOrd = facetFieldTermsIndex.lookupTerm(facetPrefix);
+        if (startFacetOrd < 0) {
+          // Points to the ord one higher than facetPrefix
+          startFacetOrd = -startFacetOrd - 1;
+        }
+        BytesRefBuilder facetEndPrefix = new BytesRefBuilder();
+        facetEndPrefix.append(facetPrefix);
+        facetEndPrefix.append(UnicodeUtil.BIG_TERM);
+        endFacetOrd = facetFieldTermsIndex.lookupTerm(facetEndPrefix.get());
+        assert endFacetOrd < 0;
+        endFacetOrd = -endFacetOrd - 1; // Points to the ord one higher than facetEndPrefix
+      } else {
+        startFacetOrd = -1;
+        endFacetOrd = facetFieldTermsIndex.getValueCount();
+      }
+    }
+
+    @Override
+    protected SegmentResult createSegmentResult() throws IOException {
+      return new SegmentResult(segmentFacetCounts, segmentTotalCount, facetFieldTermsIndex.termsEnum(), startFacetOrd, endFacetOrd);
+    }
+
+    private static class SegmentResult extends GroupFacetCollector.SegmentResult {
+
+      final TermsEnum tenum;
+
+      SegmentResult(int[] counts, int total, TermsEnum tenum, int startFacetOrd, int endFacetOrd) throws IOException {
+        super(counts, total - counts[0], counts[0], endFacetOrd+1);
+        this.tenum = tenum;
+        this.mergePos = startFacetOrd == -1 ? 1 : startFacetOrd+1;
+        if (mergePos < maxTermPos) {
+          assert tenum != null;
+          tenum.seekExact(startFacetOrd == -1 ? 0 : startFacetOrd);
+          mergeTerm = tenum.term();
+        }
+      }
+
+      @Override
+      protected void nextTerm() throws IOException {
+        mergeTerm = tenum.next();
+      }
+    }
+  }
+
+  // Implementation for multi valued facet fields.
+  static class MV extends TermGroupFacetCollector {
+
+    private SortedSetDocValues facetFieldDocTermOrds;
+    private TermsEnum facetOrdTermsEnum;
+    private int facetFieldNumTerms;
+
+    MV(String groupField, String facetField, BytesRef facetPrefix, int initialSize) {
+      super(groupField, facetField, facetPrefix, initialSize);
+    }
+
+    @Override
+    public void collect(int doc) throws IOException {
+      if (doc > groupFieldTermsIndex.docID()) {
+        groupFieldTermsIndex.advance(doc);
+      }
+
+      int groupOrd;
+      if (doc == groupFieldTermsIndex.docID()) {
+        groupOrd = groupFieldTermsIndex.ordValue();
+      } else {
+        groupOrd = -1;
+      }
+      
+      if (facetFieldNumTerms == 0) {
+        int segmentGroupedFacetsIndex = groupOrd * (facetFieldNumTerms + 1);
+        if (facetPrefix != null || segmentGroupedFacetHits.exists(segmentGroupedFacetsIndex)) {
+          return;
+        }
+
+        segmentTotalCount++;
+        segmentFacetCounts[facetFieldNumTerms]++;
+
+        segmentGroupedFacetHits.put(segmentGroupedFacetsIndex);
+        BytesRef groupKey;
+        if (groupOrd == -1) {
+          groupKey = null;
+        } else {
+          groupKey = BytesRef.deepCopyOf(groupFieldTermsIndex.lookupOrd(groupOrd));
+        }
+        groupedFacetHits.add(new GroupedFacetHit(groupKey, null));
+        return;
+      }
+
+      if (doc > facetFieldDocTermOrds.docID()) {
+        facetFieldDocTermOrds.advance(doc);
+      }
+      boolean empty = true;
+      if (doc == facetFieldDocTermOrds.docID()) {
+        long ord;
+        while ((ord = facetFieldDocTermOrds.nextOrd()) != SortedSetDocValues.NO_MORE_ORDS) {
+          process(groupOrd, (int) ord);
+          empty = false;
+        }
+      }
+      
+      if (empty) {
+        process(groupOrd, facetFieldNumTerms); // this facet ord is reserved for docs not containing facet field.
+      }
+    }
+    
+    private void process(int groupOrd, int facetOrd) throws IOException {
+      if (facetOrd < startFacetOrd || facetOrd >= endFacetOrd) {
+        return;
+      }
+
+      int segmentGroupedFacetsIndex = groupOrd * (facetFieldNumTerms + 1) + facetOrd;
+      if (segmentGroupedFacetHits.exists(segmentGroupedFacetsIndex)) {
+        return;
+      }
+
+      segmentTotalCount++;
+      segmentFacetCounts[facetOrd]++;
+
+      segmentGroupedFacetHits.put(segmentGroupedFacetsIndex);
+
+      BytesRef groupKey;
+      if (groupOrd == -1) {
+        groupKey = null;
+      } else {
+        groupKey = BytesRef.deepCopyOf(groupFieldTermsIndex.lookupOrd(groupOrd));
+      }
+
+      final BytesRef facetValue;
+      if (facetOrd == facetFieldNumTerms) {
+        facetValue = null;
+      } else {
+        facetValue = BytesRef.deepCopyOf(facetFieldDocTermOrds.lookupOrd(facetOrd));
+      }
+      groupedFacetHits.add(new GroupedFacetHit(groupKey, facetValue));
+    }
+
+    @Override
+    protected void doSetNextReader(LeafReaderContext context) throws IOException {
+      if (segmentFacetCounts != null) {
+        segmentResults.add(createSegmentResult());
+      }
+
+      groupFieldTermsIndex = DocValues.getSorted(context.reader(), groupField);
+      facetFieldDocTermOrds = DocValues.getSortedSet(context.reader(), facetField);
+      facetFieldNumTerms = (int) facetFieldDocTermOrds.getValueCount();
+      if (facetFieldNumTerms == 0) {
+        facetOrdTermsEnum = null;
+      } else {
+        facetOrdTermsEnum = facetFieldDocTermOrds.termsEnum();
+      }
+      // [facetFieldNumTerms() + 1] for all possible facet values and docs not containing facet field
+      segmentFacetCounts = new int[facetFieldNumTerms + 1];
+      segmentTotalCount = 0;
+
+      segmentGroupedFacetHits.clear();
+      for (GroupedFacetHit groupedFacetHit : groupedFacetHits) {
+        int groupOrd = groupedFacetHit.groupValue == null ? -1 : groupFieldTermsIndex.lookupTerm(groupedFacetHit.groupValue);
+        if (groupedFacetHit.groupValue != null && groupOrd < 0) {
+          continue;
+        }
+
+        int facetOrd;
+        if (groupedFacetHit.facetValue != null) {
+          if (facetOrdTermsEnum == null || !facetOrdTermsEnum.seekExact(groupedFacetHit.facetValue)) {
+            continue;
+          }
+          facetOrd = (int) facetOrdTermsEnum.ord();
+        } else {
+          facetOrd = facetFieldNumTerms;
+        }
+
+        // (facetFieldDocTermOrds.numTerms() + 1) for all possible facet values and docs not containing facet field
+        int segmentGroupedFacetsIndex = groupOrd * (facetFieldNumTerms + 1) + facetOrd;
+        segmentGroupedFacetHits.put(segmentGroupedFacetsIndex);
+      }
+
+      if (facetPrefix != null) {
+        TermsEnum.SeekStatus seekStatus;
+        if (facetOrdTermsEnum != null) {
+          seekStatus = facetOrdTermsEnum.seekCeil(facetPrefix);
+        } else {
+          seekStatus = TermsEnum.SeekStatus.END;
+        }
+
+        if (seekStatus != TermsEnum.SeekStatus.END) {
+          startFacetOrd = (int) facetOrdTermsEnum.ord();
+        } else {
+          startFacetOrd = 0;
+          endFacetOrd = 0;
+          return;
+        }
+
+        BytesRefBuilder facetEndPrefix = new BytesRefBuilder();
+        facetEndPrefix.append(facetPrefix);
+        facetEndPrefix.append(UnicodeUtil.BIG_TERM);
+        seekStatus = facetOrdTermsEnum.seekCeil(facetEndPrefix.get());
+        if (seekStatus != TermsEnum.SeekStatus.END) {
+          endFacetOrd = (int) facetOrdTermsEnum.ord();
+        } else {
+          endFacetOrd = facetFieldNumTerms; // Don't include null...
+        }
+      } else {
+        startFacetOrd = 0;
+        endFacetOrd = facetFieldNumTerms + 1;
+      }
+    }
+
+    @Override
+    protected SegmentResult createSegmentResult() throws IOException {
+      return new SegmentResult(segmentFacetCounts, segmentTotalCount, facetFieldNumTerms, facetOrdTermsEnum, startFacetOrd, endFacetOrd);
+    }
+
+    private static class SegmentResult extends GroupFacetCollector.SegmentResult {
+
+      final TermsEnum tenum;
+
+      SegmentResult(int[] counts, int total, int missingCountIndex, TermsEnum tenum, int startFacetOrd, int endFacetOrd) throws IOException {
+        super(counts, total - counts[missingCountIndex], counts[missingCountIndex],
+            endFacetOrd == missingCountIndex + 1 ?  missingCountIndex : endFacetOrd);
+        this.tenum = tenum;
+        this.mergePos = startFacetOrd;
+        if (tenum != null) {
+          tenum.seekExact(mergePos);
+          mergeTerm = tenum.term();
+        }
+      }
+
+      @Override
+      protected void nextTerm() throws IOException {
+        mergeTerm = tenum.next();
+      }
+    }
+  }
+
+  private static class GroupedFacetHit {
+
+    final BytesRef groupValue;
+    final BytesRef facetValue;
+
+    GroupedFacetHit(BytesRef groupValue, BytesRef facetValue) {
+      this.groupValue = groupValue;
+      this.facetValue = facetValue;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8df9f8c/lucene/grouping/src/java/org/apache/lucene/search/grouping/TermGroupSelector.java
----------------------------------------------------------------------
diff --git a/lucene/grouping/src/java/org/apache/lucene/search/grouping/TermGroupSelector.java b/lucene/grouping/src/java/org/apache/lucene/search/grouping/TermGroupSelector.java
new file mode 100644
index 0000000..5b8f77c
--- /dev/null
+++ b/lucene/grouping/src/java/org/apache/lucene/search/grouping/TermGroupSelector.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.search.grouping;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.lucene.index.DocValues;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefHash;
+
+/**
+ * A GroupSelector implementation that groups via SortedDocValues
+ */
+public class TermGroupSelector extends GroupSelector<BytesRef> {
+
+  private final String field;
+  private final BytesRefHash values = new BytesRefHash();
+  private final Map<Integer, Integer> ordsToGroupIds = new HashMap<>();
+
+  private SortedDocValues docValues;
+  private int groupId;
+
+  private boolean secondPass;
+  private boolean includeEmpty;
+
+  /**
+   * Create a new TermGroupSelector
+   * @param field the SortedDocValues field to use for grouping
+   */
+  public TermGroupSelector(String field) {
+    this.field = field;
+  }
+
+  @Override
+  public void setNextReader(LeafReaderContext readerContext) throws IOException {
+    this.docValues = DocValues.getSorted(readerContext.reader(), field);
+    this.ordsToGroupIds.clear();
+    BytesRef scratch = new BytesRef();
+    for (int i = 0; i < values.size(); i++) {
+      values.get(i, scratch);
+      int ord = this.docValues.lookupTerm(scratch);
+      if (ord >= 0)
+        ordsToGroupIds.put(ord, i);
+    }
+  }
+
+  @Override
+  public State advanceTo(int doc) throws IOException {
+    if (this.docValues.advanceExact(doc) == false) {
+      groupId = -1;
+      return includeEmpty ? State.ACCEPT : State.SKIP;
+    }
+    int ord = docValues.ordValue();
+    if (ordsToGroupIds.containsKey(ord)) {
+      groupId = ordsToGroupIds.get(ord);
+      return State.ACCEPT;
+    }
+    if (secondPass)
+      return State.SKIP;
+    groupId = values.add(docValues.binaryValue());
+    ordsToGroupIds.put(ord, groupId);
+    return State.ACCEPT;
+  }
+
+  private BytesRef scratch = new BytesRef();
+
+  @Override
+  public BytesRef currentValue() {
+    if (groupId == -1)
+      return null;
+    values.get(groupId, scratch);
+    return scratch;
+  }
+
+  @Override
+  public BytesRef copyValue() {
+    if (groupId == -1)
+      return null;
+    return BytesRef.deepCopyOf(currentValue());
+  }
+
+  @Override
+  public void setGroups(Collection<SearchGroup<BytesRef>> searchGroups) {
+    this.values.clear();
+    this.values.reinit();
+    for (SearchGroup<BytesRef> sg : searchGroups) {
+      if (sg.groupValue == null)
+        includeEmpty = true;
+      else
+        this.values.add(sg.groupValue);
+    }
+    this.secondPass = true;
+  }
+}