You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2013/01/25 22:27:08 UTC
svn commit: r1438721 - in /lucene/dev/trunk/lucene: ./
core/src/java/org/apache/lucene/search/
core/src/test/org/apache/lucene/search/
Author: mikemccand
Date: Fri Jan 25 21:27:08 2013
New Revision: 1438721
URL: http://svn.apache.org/viewvc?rev=1438721&view=rev
Log:
LUCENE-4695: add LiveFieldValues, to get current (live/real-time) values for fields indexed after the last NRT reopen
Added:
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java (with props)
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java (with props)
Modified:
lucene/dev/trunk/lucene/CHANGES.txt
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java
Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1438721&r1=1438720&r2=1438721&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Fri Jan 25 21:27:08 2013
@@ -73,12 +73,23 @@ New Features
compresses term vectors into chunks of documents similarly to
CompressingStoredFieldsFormat. (Adrien Grand)
+* LUCENE-4695: Added LiveFieldValues utility class, for getting the
+ current (live, real-time) value for any indexed doc/field. The
+ class buffers recently indexed doc/field values until a new
+ near-real-time reader is opened that contains those changes.
+ (Robert Muir, Mike McCandless)
+
API Changes
* LUCENE-4709: FacetResultNode no longer has a residue field. (Shai Erera)
* LUCENE-4716: DrillDown.query now takes Occur, allowing to specify if
categories should be OR'ed or AND'ed. (Shai Erera)
+
+* LUCENE-4695: ReferenceManager.RefreshListener.afterRefresh now takes
+ a boolean indicating whether a new reference was in fact opened, and
+ a new beforeRefresh method notifies you when a refresh attempt is
+ starting. (Robert Muir, Mike McCandless)
Bug Fixes
Added: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java?rev=1438721&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java (added)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/LiveFieldValues.java Fri Jan 25 21:27:08 2013
@@ -0,0 +1,147 @@
+package org.apache.lucene.search;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.IndexDocument;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.MergeState;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.util.Counter;
+
+/** Tracks live field values across NRT reader reopens.
+ * This holds a map for all updated ids since
+ * the last reader reopen. Once the NRT reader is reopened,
+ * it prunes the map. This means you must reopen your NRT
+ * reader periodically otherwise the RAM consumption of
+ * this class will grow unbounded!
+ *
+ * <p>NOTE: you must ensure the same id is never updated at
+ * the same time by two threads, because in this case you
+ * cannot in general know which thread "won". */
+
+public abstract class LiveFieldValues<T> implements ReferenceManager.RefreshListener, Closeable {
+
+ private volatile Map<String,T> current = new ConcurrentHashMap<String,T>();
+ private volatile Map<String,T> old = new ConcurrentHashMap<String,T>();
+ private final ReferenceManager<IndexSearcher> mgr;
+ private final T missingValue;
+
+ public LiveFieldValues(ReferenceManager<IndexSearcher> mgr, T missingValue) {
+ this.missingValue = missingValue;
+ this.mgr = mgr;
+ mgr.addListener(this);
+ }
+
+ @Override
+ public void close() {
+ mgr.removeListener(this);
+ }
+
+ @Override
+ public void beforeRefresh() throws IOException {
+ old = current;
+ // Start sending all updates after this point to the new
+ // map. While reopen is running, any lookup will first
+ // try this new map, then fallback to old, then to the
+ // current searcher:
+ current = new ConcurrentHashMap<String,T>();
+ }
+
+ @Override
+ public void afterRefresh(boolean didRefresh) throws IOException {
+ // Now drop all the old values because they are now
+ // visible via the searcher that was just opened; if
+ // didRefresh is false, it's possible old has some
+ // entries in it, which is fine: it means they were
+ // actually already included in the previously opened
+ // reader. So we can safely clear old here:
+ old = new ConcurrentHashMap<String,T>();
+ }
+
+ /** Call this after you've successfully added a document
+ * to the index, to record what value you just set the
+ * field to. */
+ public void add(String id, T value) {
+ current.put(id, value);
+ }
+
+ /** Call this after you've successfully deleted a document
+ * from the index. */
+ public void delete(String id) {
+ current.put(id, missingValue);
+ }
+
+ /** Returns the [approximate] number of id/value pairs
+ * buffered in RAM. */
+ public int size() {
+ return current.size() + old.size();
+ }
+
+ /** Returns the current value for this id, or null if the
+ * id isn't in the index or was deleted. */
+ public T get(String id) throws IOException {
+ // First try to get the "live" value:
+ T value = current.get(id);
+ if (value == missingValue) {
+ // Deleted but the deletion is not yet reflected in
+ // the reader:
+ return null;
+ } else if (value != null) {
+ return value;
+ } else {
+ value = old.get(id);
+ if (value == missingValue) {
+ // Deleted but the deletion is not yet reflected in
+ // the reader:
+ return null;
+ } else if (value != null) {
+ return value;
+ } else {
+ // It either does not exist in the index, or, it was
+ // already flushed & NRT reader was opened on the
+ // segment, so fallback to current searcher:
+ IndexSearcher s = mgr.acquire();
+ try {
+ return lookupFromSearcher(s, id);
+ } finally {
+ mgr.release(s);
+ }
+ }
+ }
+ }
+
+ /** This is called when the id/value was already flushed & opened
+ * in an NRT IndexSearcher. You must implement this to
+ * go look up the value (eg, via doc values, field cache,
+ * stored fields, etc.). */
+ protected abstract T lookupFromSearcher(IndexSearcher s, String id) throws IOException;
+}
+
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java?rev=1438721&r1=1438720&r2=1438721&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java Fri Jan 25 21:27:08 2013
@@ -151,6 +151,7 @@ public abstract class ReferenceManager<G
try {
final G reference = acquire();
try {
+ notifyRefreshListenersBefore();
G newReference = refreshIfNeeded(reference);
if (newReference != null) {
assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";
@@ -165,11 +166,9 @@ public abstract class ReferenceManager<G
}
} finally {
release(reference);
+ notifyRefreshListenersRefreshed(refreshed);
}
afterMaybeRefresh();
- if (refreshed) {
- notifyRefreshListeners();
- }
} finally {
refreshLock.unlock();
}
@@ -254,9 +253,15 @@ public abstract class ReferenceManager<G
decRef(reference);
}
- private void notifyRefreshListeners() {
+ private void notifyRefreshListenersBefore() throws IOException {
+ for (RefreshListener refreshListener : refreshListeners) {
+ refreshListener.beforeRefresh();
+ }
+ }
+
+ private void notifyRefreshListenersRefreshed(boolean didRefresh) throws IOException {
for (RefreshListener refreshListener : refreshListeners) {
- refreshListener.afterRefresh();
+ refreshListener.afterRefresh(didRefresh);
}
}
@@ -284,9 +289,13 @@ public abstract class ReferenceManager<G
* finished. See {@link #addListener}. */
public interface RefreshListener {
- /**
- * Called after a successful refresh and a new reference has been installed. When this is called {@link #acquire()} is guaranteed to return a new instance.
- */
- void afterRefresh();
+ /** Called right before a refresh attempt starts. */
+ void beforeRefresh() throws IOException;
+
+ /** Called after the attempted refresh; if the refresh
+ * did open a new reference then didRefresh will be true
+ * and {@link #acquire()} is guaranteed to return the new
+ * reference. */
+ void afterRefresh(boolean didRefresh) throws IOException;
}
}
Added: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java?rev=1438721&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java (added)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestLiveFieldValues.java Fri Jan 25 21:27:08 2013
@@ -0,0 +1,180 @@
+package org.apache.lucene.search;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.IntField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.StoredDocument;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util._TestUtil;
+
+public class TestLiveFieldValues extends LuceneTestCase {
+ public void test() throws Exception {
+
+ Directory dir = newFSDirectory(_TestUtil.getTempDir("livefieldupdates"));
+ IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
+
+ final IndexWriter _w = new IndexWriter(dir, iwc);
+ final TrackingIndexWriter w = new TrackingIndexWriter(_w);
+
+ final NRTManager mgr = new NRTManager(w, new SearcherFactory() {
+ @Override
+ public IndexSearcher newSearcher(IndexReader r) {
+ return new IndexSearcher(r);
+ }
+ });
+
+ final Integer missing = -1;
+
+ final LiveFieldValues<Integer> rt = new LiveFieldValues<Integer>(mgr, missing) {
+ @Override
+ protected Integer lookupFromSearcher(IndexSearcher s, String id) throws IOException {
+ TermQuery tq = new TermQuery(new Term("id", id));
+ TopDocs hits = s.search(tq, 1);
+ assertTrue(hits.totalHits <= 1);
+ if (hits.totalHits == 0) {
+ return null;
+ } else {
+ StoredDocument doc = s.doc(hits.scoreDocs[0].doc);
+ return (Integer) doc.getField("field").numericValue();
+ }
+ }
+ };
+
+ int numThreads = _TestUtil.nextInt(random(), 2, 5);
+ if (VERBOSE) {
+ System.out.println(numThreads + " threads");
+ }
+
+ final CountDownLatch startingGun = new CountDownLatch(1);
+ List<Thread> threads = new ArrayList<Thread>();
+
+ final int iters = atLeast(1000);
+ final int idCount = _TestUtil.nextInt(random(), 100, 10000);
+
+ final double reopenChance = random().nextDouble()*0.01;
+ final double deleteChance = random().nextDouble()*0.25;
+ final double addChance = random().nextDouble()*0.5;
+
+ for(int t=0;t<numThreads;t++) {
+ final int threadID = t;
+ final Random threadRandom = new Random(random().nextLong());
+ Thread thread = new Thread() {
+
+ @Override
+ public void run() {
+ try {
+ Map<String,Integer> values = new HashMap<String,Integer>();
+ List<String> allIDs = Collections.synchronizedList(new ArrayList<String>());
+
+ startingGun.await();
+ for(int iter=0; iter<iters;iter++) {
+ // Add/update a document
+ Document doc = new Document();
+ // Threads must not update the same id at the
+ // same time:
+ if (threadRandom.nextDouble() <= addChance) {
+ String id = String.format(Locale.ROOT, "%d_%04x", threadID, threadRandom.nextInt(idCount));
+ Integer field = threadRandom.nextInt(Integer.MAX_VALUE);
+ doc.add(new StringField("id", id, Field.Store.YES));
+ doc.add(new IntField("field", field.intValue(), Field.Store.YES));
+ w.updateDocument(new Term("id", id), doc);
+ rt.add(id, field);
+ if (values.put(id, field) == null) {
+ allIDs.add(id);
+ }
+ }
+
+ if (allIDs.size() > 0 && threadRandom.nextDouble() <= deleteChance) {
+ String randomID = allIDs.get(threadRandom.nextInt(allIDs.size()));
+ w.deleteDocuments(new Term("id", randomID));
+ rt.delete(randomID);
+ values.put(randomID, missing);
+ }
+
+ if (threadRandom.nextDouble() <= reopenChance || rt.size() > 10000) {
+ //System.out.println("refresh @ " + rt.size());
+ mgr.maybeRefresh();
+ if (VERBOSE) {
+ IndexSearcher s = mgr.acquire();
+ try {
+ System.out.println("TEST: reopen " + s);
+ } finally {
+ mgr.release(s);
+ }
+ System.out.println("TEST: " + values.size() + " values");
+ }
+ }
+
+ if (threadRandom.nextInt(10) == 7) {
+ assertEquals(null, rt.get("foo"));
+ }
+
+ if (allIDs.size() > 0) {
+ String randomID = allIDs.get(threadRandom.nextInt(allIDs.size()));
+ Integer expected = values.get(randomID);
+ if (expected == missing) {
+ expected = null;
+ }
+ assertEquals("id=" + randomID, expected, rt.get(randomID));
+ }
+ }
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+ };
+ threads.add(thread);
+ thread.start();
+ }
+
+ startingGun.countDown();
+
+ for(Thread thread : threads) {
+ thread.join();
+ }
+ mgr.maybeRefresh();
+ assertEquals(0, rt.size());
+
+ rt.close();
+ mgr.close();
+ _w.close();
+ dir.close();
+ }
+}
Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java?rev=1438721&r1=1438720&r2=1438721&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestNRTManager.java Fri Jan 25 21:27:08 2013
@@ -423,8 +423,13 @@ public class TestNRTManager extends Thre
NRTManager sm = new NRTManager(new NRTManager.TrackingIndexWriter(iw),new SearcherFactory());
sm.addListener(new ReferenceManager.RefreshListener() {
@Override
- public void afterRefresh() {
- afterRefreshCalled.set(true);
+ public void beforeRefresh() {
+ }
+ @Override
+ public void afterRefresh(boolean didRefresh) {
+ if (didRefresh) {
+ afterRefreshCalled.set(true);
+ }
}
});
iw.addDocument(new Document());
Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java?rev=1438721&r1=1438720&r2=1438721&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java Fri Jan 25 21:27:08 2013
@@ -331,8 +331,13 @@ public class TestSearcherManager extends
SearcherManager sm = new SearcherManager(iw, false, new SearcherFactory());
sm.addListener(new ReferenceManager.RefreshListener() {
@Override
- public void afterRefresh() {
- afterRefreshCalled.set(true);
+ public void beforeRefresh() {
+ }
+ @Override
+ public void afterRefresh(boolean didRefresh) {
+ if (didRefresh) {
+ afterRefreshCalled.set(true);
+ }
}
});
iw.addDocument(new Document());