You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2013/01/25 22:27:08 UTC

svn commit: r1438721 - in /lucene/dev/trunk/lucene: ./ core/src/java/org/apache/lucene/search/ core/src/test/org/apache/lucene/search/

Author: mikemccand
Date: Fri Jan 25 21:27:08 2013
New Revision: 1438721

URL: http://svn.apache.org/viewvc?rev=1438721&view=rev
Log:
LUCENE-4695: add LiveFieldValues, to get current (live/real-time) values for fields indexed after the last NRT reopen

Added:
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java   (with props)
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java   (with props)
Modified:
    lucene/dev/trunk/lucene/CHANGES.txt
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java

Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1438721&r1=1438720&r2=1438721&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Fri Jan 25 21:27:08 2013
@@ -73,12 +73,23 @@ New Features
   compresses term vectors into chunks of documents similarly to
   CompressingStoredFieldsFormat. (Adrien Grand)
 
+* LUCENE-4695: Added LiveFieldValues utility class, for getting the
+  current (live, real-time) value for any indexed doc/field.  The
+  class buffers recently indexed doc/field values until a new
+  near-real-time reader is opened that contains those changes.
+  (Robert Muir, Mike McCandless)
+
 API Changes
 
 * LUCENE-4709: FacetResultNode no longer has a residue field. (Shai Erera)
 
 * LUCENE-4716: DrillDown.query now takes Occur, allowing to specify if
   categories should be OR'ed or AND'ed. (Shai Erera)
+
+* LUCENE-4695: ReferenceManager.RefreshListener.afterRefresh now takes
+  a boolean indicating whether a new reference was in fact opened, and
+  a new beforeRefresh method notifies you when a refresh attempt is
+  starting.  (Robert Muir, Mike McCandless)
   
 Bug Fixes
 

Added: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java?rev=1438721&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java (added)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java Fri Jan 25 21:27:08 2013
@@ -0,0 +1,147 @@
+package org.apache.lucene.search;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.IndexDocument;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.util.Counter;
+
+/** Tracks live field values across NRT reader reopens.
+ *  This holds a map for all updated ids since
+ *  the last reader reopen.  Once the NRT reader is reopened,
+ *  it prunes the map.  This means you must reopen your NRT
+ *  reader periodically otherwise the RAM consumption of
+ *  this class will grow unbounded!
+ *
+ *  <p>NOTE: you must ensure the same id is never updated at
+ *  the same time by two threads, because in this case you
+ *  cannot in general know which thread "won". */
+
+public abstract class LiveFieldValues<T> implements ReferenceManager.RefreshListener, Closeable {
+
+  private volatile Map<String,T> current = new ConcurrentHashMap<String,T>();
+  private volatile Map<String,T> old = new ConcurrentHashMap<String,T>();
+  private final ReferenceManager<IndexSearcher> mgr;
+  private final T missingValue;
+
+  public LiveFieldValues(ReferenceManager<IndexSearcher> mgr, T missingValue) {
+    this.missingValue = missingValue;
+    this.mgr = mgr;
+    mgr.addListener(this);
+  }
+
+  @Override
+  public void close() {
+    mgr.removeListener(this);
+  }
+
+  @Override
+  public void beforeRefresh() throws IOException {
+    old = current;
+    // Start sending all updates after this point to the new
+    // map.  While reopen is running, any lookup will first
+    // try this new map, then fallback to old, then to the
+    // current searcher:
+    current = new ConcurrentHashMap<String,T>();
+  }
+
+  @Override
+  public void afterRefresh(boolean didRefresh) throws IOException {
+    // Now drop all the old values because they are now
+    // visible via the searcher that was just opened; if
+    // didRefresh is false, it's possible old has some
+    // entries in it, which is fine: it means they were
+    // actually already included in the previously opened
+    // reader.  So we can safely clear old here:
+    old = new ConcurrentHashMap<String,T>();
+  }
+
+  /** Call this after you've successfully added a document
+   *  to the index, to record what value you just set the
+   *  field to. */
+  public void add(String id, T value) {
+    current.put(id, value);
+  }
+
+  /** Call this after you've successfully deleted a document
+   *  from the index. */
+  public void delete(String id) {
+    current.put(id, missingValue);
+  }
+
+  /** Returns the [approximate] number of id/value pairs
+   *  buffered in RAM. */
+  public int size() {
+    return current.size() + old.size();
+  }
+
+  /** Returns the current value for this id, or null if the
+   *  id isn't in the index or was deleted. */
+  public T get(String id) throws IOException {
+    // First try to get the "live" value:
+    T value = current.get(id);
+    if (value == missingValue) {
+      // Deleted but the deletion is not yet reflected in
+      // the reader:
+      return null;
+    } else if (value != null) {
+      return value;
+    } else {
+      value = old.get(id);
+      if (value == missingValue) {
+        // Deleted but the deletion is not yet reflected in
+        // the reader:
+        return null;
+      } else if (value != null) {
+        return value;
+      } else {
+        // It either does not exist in the index, or, it was
+        // already flushed & NRT reader was opened on the
+        // segment, so fallback to current searcher:
+        IndexSearcher s = mgr.acquire();
+        try {
+          return lookupFromSearcher(s, id);
+        } finally {
+          mgr.release(s);
+        }
+      }
+    }
+  }
+
+  /** This is called when the id/value was already flushed & opened
+   *  in an NRT IndexSearcher.  You must implement this to
+   *  go look up the value (eg, via doc values, field cache,
+   *  stored fields, etc.). */
+  protected abstract T lookupFromSearcher(IndexSearcher s, String id) throws IOException;
+}
+

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java?rev=1438721&r1=1438720&r2=1438721&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java Fri Jan 25 21:27:08 2013
@@ -151,6 +151,7 @@ public abstract class ReferenceManager<G
     try {
       final G reference = acquire();
       try {
+        notifyRefreshListenersBefore();
         G newReference = refreshIfNeeded(reference);
         if (newReference != null) {
           assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";
@@ -165,11 +166,9 @@ public abstract class ReferenceManager<G
         }
       } finally {
         release(reference);
+        notifyRefreshListenersRefreshed(refreshed);
       }
       afterMaybeRefresh();
-      if (refreshed) {
-        notifyRefreshListeners();
-      }
     } finally {
       refreshLock.unlock();
     }
@@ -254,9 +253,15 @@ public abstract class ReferenceManager<G
     decRef(reference);
   }
 
-  private void notifyRefreshListeners() {
+  private void notifyRefreshListenersBefore() throws IOException {
+    for (RefreshListener refreshListener : refreshListeners) {
+      refreshListener.beforeRefresh();
+    }
+  }
+
+  private void notifyRefreshListenersRefreshed(boolean didRefresh) throws IOException {
     for (RefreshListener refreshListener : refreshListeners) {
-      refreshListener.afterRefresh();
+      refreshListener.afterRefresh(didRefresh);
     }
   }
 
@@ -284,9 +289,13 @@ public abstract class ReferenceManager<G
    *  finished.  See {@link #addListener}. */
   public interface RefreshListener {
 
-    /**
-     * Called after a successful refresh and a new reference has been installed. When this is called {@link #acquire()} is guaranteed to return a new instance.
-     */
-    void afterRefresh();
+    /** Called right before a refresh attempt starts. */
+    void beforeRefresh() throws IOException;
+
+    /** Called after the attempted refresh; if the refresh
+     * did open a new reference then didRefresh will be true
+     * and {@link #acquire()} is guaranteed to return the new
+     * reference. */
+    void afterRefresh(boolean didRefresh) throws IOException;
   }
 }

Added: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java?rev=1438721&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java (added)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java Fri Jan 25 21:27:08 2013
@@ -0,0 +1,180 @@
+package org.apache.lucene.search;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.IntField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.StoredDocument;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util._TestUtil;
+
+public class TestLiveFieldValues extends LuceneTestCase {
+  public void test() throws Exception {
+
+    Directory dir = newFSDirectory(_TestUtil.getTempDir("livefieldupdates"));
+    IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
+
+    final IndexWriter _w = new IndexWriter(dir, iwc);
+    final TrackingIndexWriter w = new TrackingIndexWriter(_w);
+
+    final NRTManager mgr = new NRTManager(w, new SearcherFactory() {
+        @Override
+        public IndexSearcher newSearcher(IndexReader r) {
+          return new IndexSearcher(r);
+        }
+      });
+
+    final Integer missing = -1;
+
+    final LiveFieldValues<Integer> rt = new LiveFieldValues<Integer>(mgr, missing) {
+        @Override
+        protected Integer lookupFromSearcher(IndexSearcher s, String id) throws IOException {
+          TermQuery tq = new TermQuery(new Term("id", id));
+          TopDocs hits = s.search(tq, 1);
+          assertTrue(hits.totalHits <= 1);
+          if (hits.totalHits == 0) {
+            return null;
+          } else {
+            StoredDocument doc = s.doc(hits.scoreDocs[0].doc);
+            return (Integer) doc.getField("field").numericValue();
+          }
+        }
+    };
+
+    int numThreads = _TestUtil.nextInt(random(), 2, 5);
+    if (VERBOSE) {
+      System.out.println(numThreads + " threads");
+    }
+
+    final CountDownLatch startingGun = new CountDownLatch(1);
+    List<Thread> threads = new ArrayList<Thread>();
+
+    final int iters = atLeast(1000);
+    final int idCount = _TestUtil.nextInt(random(), 100, 10000);
+
+    final double reopenChance = random().nextDouble()*0.01;
+    final double deleteChance = random().nextDouble()*0.25;
+    final double addChance = random().nextDouble()*0.5;
+    
+    for(int t=0;t<numThreads;t++) {
+      final int threadID = t;
+      final Random threadRandom = new Random(random().nextLong());
+      Thread thread = new Thread() {
+
+          @Override
+          public void run() {
+            try {
+              Map<String,Integer> values = new HashMap<String,Integer>();
+              List<String> allIDs = Collections.synchronizedList(new ArrayList<String>());
+
+              startingGun.await();
+              for(int iter=0; iter<iters;iter++) {
+                // Add/update a document
+                Document doc = new Document();
+                // Threads must not update the same id at the
+                // same time:
+                if (threadRandom.nextDouble() <= addChance) {
+                  String id = String.format(Locale.ROOT, "%d_%04x", threadID, threadRandom.nextInt(idCount));
+                  Integer field = threadRandom.nextInt(Integer.MAX_VALUE);
+                  doc.add(new StringField("id", id, Field.Store.YES));
+                  doc.add(new IntField("field", field.intValue(), Field.Store.YES));
+                  w.updateDocument(new Term("id", id), doc);
+                  rt.add(id, field);
+                  if (values.put(id, field) == null) {
+                    allIDs.add(id);
+                  }
+                }
+
+                if (allIDs.size() > 0 && threadRandom.nextDouble() <= deleteChance) {
+                  String randomID = allIDs.get(threadRandom.nextInt(allIDs.size()));
+                  w.deleteDocuments(new Term("id", randomID));
+                  rt.delete(randomID);
+                  values.put(randomID, missing);
+                }
+
+                if (threadRandom.nextDouble() <= reopenChance || rt.size() > 10000) {
+                  //System.out.println("refresh @ " + rt.size());
+                  mgr.maybeRefresh();
+                  if (VERBOSE) {
+                    IndexSearcher s = mgr.acquire();
+                    try {
+                      System.out.println("TEST: reopen " + s);
+                    } finally {
+                      mgr.release(s);
+                    }
+                    System.out.println("TEST: " + values.size() + " values");
+                  }
+                }
+
+                if (threadRandom.nextInt(10) == 7) {
+                  assertEquals(null, rt.get("foo"));
+                }
+
+                if (allIDs.size() > 0) {
+                  String randomID = allIDs.get(threadRandom.nextInt(allIDs.size()));
+                  Integer expected = values.get(randomID);
+                  if (expected == missing) {
+                    expected = null;
+                  }
+                  assertEquals("id=" + randomID, expected, rt.get(randomID));
+                }
+              }
+            } catch (Throwable t) {
+              throw new RuntimeException(t);
+            }
+          }
+        };
+      threads.add(thread);
+      thread.start();
+    }
+
+    startingGun.countDown();
+
+    for(Thread thread : threads) {
+      thread.join();
+    }
+    mgr.maybeRefresh();
+    assertEquals(0, rt.size());
+
+    rt.close();
+    mgr.close();
+    _w.close();
+    dir.close();
+  }
+}

Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java?rev=1438721&r1=1438720&r2=1438721&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java Fri Jan 25 21:27:08 2013
@@ -423,8 +423,13 @@ public class TestNRTManager extends Thre
     NRTManager sm = new NRTManager(new NRTManager.TrackingIndexWriter(iw),new SearcherFactory());
     sm.addListener(new ReferenceManager.RefreshListener() {
       @Override
-      public void afterRefresh() {
-        afterRefreshCalled.set(true);
+      public void beforeRefresh() {
+      }
+      @Override
+      public void afterRefresh(boolean didRefresh) {
+        if (didRefresh) {
+          afterRefreshCalled.set(true);
+        }
       }
     });
     iw.addDocument(new Document());

Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java?rev=1438721&r1=1438720&r2=1438721&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java Fri Jan 25 21:27:08 2013
@@ -331,8 +331,13 @@ public class TestSearcherManager extends
     SearcherManager sm = new SearcherManager(iw, false, new SearcherFactory());
     sm.addListener(new ReferenceManager.RefreshListener() {
       @Override
-      public void afterRefresh() {
-        afterRefreshCalled.set(true);
+      public void beforeRefresh() {
+      }
+      @Override
+      public void afterRefresh(boolean didRefresh) {
+        if (didRefresh) {
+          afterRefreshCalled.set(true);
+        }
       }
     });
     iw.addDocument(new Document());