You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2013/09/15 19:11:53 UTC
svn commit: r1523461 [2/3] - in /lucene/dev/trunk/lucene:
codecs/src/java/org/apache/lucene/codecs/simpletext/
core/src/java/org/apache/lucene/codecs/perfield/
core/src/java/org/apache/lucene/index/
core/src/java/org/apache/lucene/util/ core/src/test/o...
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java?rev=1523461&r1=1523460&r2=1523461&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReadersAndLiveDocs.java Sun Sep 15 17:11:52 2013
@@ -18,19 +18,29 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.lucene.codecs.Codec;
+import org.apache.lucene.codecs.DocValuesConsumer;
+import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.LiveDocsFormat;
+import org.apache.lucene.index.FieldInfo.DocValuesType;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.MutableBits;
// Used by IndexWriter to hold open SegmentReaders (for
-// searching or merging), plus pending deletes,
+// searching or merging), plus pending deletes and updates,
// for a given segment
-class ReadersAndLiveDocs {
+class ReadersAndLiveDocs { // TODO (DVU_RENAME) to ReaderAndUpdates
// Not final because we replace (clone) when we need to
// change it and it's been shared:
public final SegmentInfoPerCommit info;
@@ -43,25 +53,38 @@ class ReadersAndLiveDocs {
// Set once (null, and then maybe set, and never set again):
private SegmentReader reader;
- // Holds the current shared (readable and writable
- // liveDocs). This is null when there are no deleted
+ // Holds the current shared (readable and writable)
+ // liveDocs. This is null when there are no deleted
// docs, and it's copy-on-write (cloned whenever we need
// to change it but it's been shared to an external NRT
// reader).
private Bits liveDocs;
+ // Holds the numeric DocValues updates.
+ private final Map<String,Map<Integer,Long>> numericUpdates = new HashMap<String,Map<Integer,Long>>();
+
// How many further deletions we've done against
// liveDocs vs when we loaded it or last wrote it:
private int pendingDeleteCount;
// True if the current liveDocs is referenced by an
// external NRT reader:
- private boolean shared;
+ private boolean liveDocsShared;
+
+ // Indicates whether this segment is currently being merged. While a segment
+ // is merging, all field updates are also registered in the mergingUpdates
+ // map. Also, calls to writeLiveDocs merge the updates with mergingUpdates.
+ // That way, when the segment is done merging, IndexWriter can apply the
+ // updates on the merged segment too.
+ private boolean isMerging = false;
+
+ // Holds any updates that come through while this segment was being merged.
+ private final Map<String,Map<Integer,Long>> mergingUpdates = new HashMap<String,Map<Integer,Long>>();
public ReadersAndLiveDocs(IndexWriter writer, SegmentInfoPerCommit info) {
this.info = info;
this.writer = writer;
- shared = true;
+ liveDocsShared = true;
}
public void incRef() {
@@ -83,7 +106,19 @@ class ReadersAndLiveDocs {
public synchronized int getPendingDeleteCount() {
return pendingDeleteCount;
}
-
+
+ public synchronized boolean hasFieldUpdates() {
+ return numericUpdates.size() > 0;
+ }
+
+ public synchronized int getPendingUpdatesCount() {
+ int pendingUpdatesCount = 0;
+ for (Entry<String,Map<Integer,Long>> e : numericUpdates.entrySet()) {
+ pendingUpdatesCount += e.getValue().size();
+ }
+ return pendingUpdatesCount;
+ }
+
// Call only from assert!
public synchronized boolean verifyDocCounts() {
int count;
@@ -102,35 +137,109 @@ class ReadersAndLiveDocs {
return true;
}
- // Get reader for searching/deleting
- public synchronized SegmentReader getReader(IOContext context) throws IOException {
- //System.out.println(" livedocs=" + rld.liveDocs);
-
+ public synchronized void reopenReader(IOContext context) throws IOException {
+ if (reader != null) {
+ SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.getDocCount() - info.getDelCount() - pendingDeleteCount);
+ boolean reopened = false;
+ try {
+ reader.decRef();
+ reader = newReader;
+ if (liveDocs == null) {
+ liveDocs = reader.getLiveDocs();
+ }
+ reopened = true;
+ } finally {
+ if (!reopened) {
+ newReader.decRef();
+ }
+ }
+ }
+ }
+
+ private synchronized SegmentReader doGetReader(IOContext context) throws IOException {
if (reader == null) {
// We steal returned ref:
reader = new SegmentReader(info, context);
if (liveDocs == null) {
liveDocs = reader.getLiveDocs();
}
- //System.out.println("ADD seg=" + rld.info + " isMerge=" + isMerge + " " + readerMap.size() + " in pool");
- //System.out.println(Thread.currentThread().getName() + ": getReader seg=" + info.name);
}
-
+
// Ref for caller
reader.incRef();
return reader;
}
-
+
+ private SegmentReader doGetReaderWithUpdates(IOContext context) throws IOException {
+ boolean checkpoint = false;
+ try {
+ // don't synchronize the entire method because we cannot call
+ // writer.checkpoint() while holding the RLD lock, otherwise we might hit
+ // a deadlock w/ e.g. a concurrent merging thread.
+ synchronized (this) {
+ checkpoint = writeLiveDocs(info.info.dir);
+ if (reader == null) {
+ // We steal returned ref:
+ reader = new SegmentReader(info, context);
+ if (liveDocs == null) {
+ liveDocs = reader.getLiveDocs();
+ }
+ } else if (checkpoint) {
+ // enroll a new reader with the applied updates
+ reopenReader(context);
+ }
+
+ // Ref for caller
+ reader.incRef();
+ return reader;
+ }
+ } finally {
+ if (checkpoint) {
+ writer.checkpoint();
+ }
+ }
+ }
+
+ /** Returns a {@link SegmentReader} while applying field updates if requested. */
+ public SegmentReader getReader(boolean applyFieldUpdates, IOContext context) throws IOException {
+ // if we need to apply field updates, we call writeLiveDocs and change
+ // SegmentInfos. Therefore must hold the lock on IndexWriter. This code
+ // ensures that readers that don't need to apply updates don't pay the
+ // cost of obtaining it.
+ if (applyFieldUpdates && hasFieldUpdates()) {
+ synchronized (writer) {
+ return doGetReaderWithUpdates(context);
+ }
+ } else {
+ return doGetReader(context);
+ }
+ }
+
public synchronized void release(SegmentReader sr) throws IOException {
assert info == sr.getSegmentInfo();
sr.decRef();
}
+ /**
+ * Updates the numeric doc value of {@code docID} under {@code field} to the
+ * given {@code value}.
+ */
+ public synchronized void updateNumericDocValue(String field, int docID, Long value) {
+ assert Thread.holdsLock(writer);
+ assert docID >= 0 && docID < reader.maxDoc() : "out of bounds: docid=" + docID + " maxDoc=" + reader.maxDoc() + " seg=" + info.info.name + " docCount=" + info.info.getDocCount();
+ Map<Integer,Long> updates = numericUpdates.get(field);
+ if (updates == null) {
+ updates = new HashMap<Integer,Long>();
+ numericUpdates.put(field, updates);
+ }
+ updates.put(docID, value);
+ }
+
public synchronized boolean delete(int docID) {
assert liveDocs != null;
assert Thread.holdsLock(writer);
assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.info.name + " docCount=" + info.info.getDocCount();
- assert !shared;
+ assert !liveDocsShared;
final boolean didDelete = liveDocs.get(docID);
if (didDelete) {
((MutableBits) liveDocs).clear(docID);
@@ -162,13 +271,11 @@ class ReadersAndLiveDocs {
* it when you're done (ie, do not call release()).
*/
public synchronized SegmentReader getReadOnlyClone(IOContext context) throws IOException {
- if (reader == null) {
- getReader(context).decRef();
- assert reader != null;
- }
- shared = true;
+ getReader(true, context).decRef(); // make sure we enroll a new reader if there are field updates
+ assert reader != null;
+ liveDocsShared = true;
if (liveDocs != null) {
- return new SegmentReader(reader.getSegmentInfo(), reader.core, liveDocs, info.info.getDocCount() - info.getDelCount() - pendingDeleteCount);
+ return new SegmentReader(reader.getSegmentInfo(), reader, liveDocs, info.info.getDocCount() - info.getDelCount() - pendingDeleteCount);
} else {
assert reader.getLiveDocs() == liveDocs;
reader.incRef();
@@ -180,7 +287,7 @@ class ReadersAndLiveDocs {
assert Thread.holdsLock(writer);
assert info.info.getDocCount() > 0;
//System.out.println("initWritableLivedocs seg=" + info + " liveDocs=" + liveDocs + " shared=" + shared);
- if (shared) {
+ if (liveDocsShared) {
// Copy on write: this means we've cloned a
// SegmentReader sharing the current liveDocs
// instance; must now make a private clone so we can
@@ -192,9 +299,7 @@ class ReadersAndLiveDocs {
} else {
liveDocs = liveDocsFormat.newLiveDocs(liveDocs);
}
- shared = false;
- } else {
- assert liveDocs != null;
+ liveDocsShared = false;
}
}
@@ -206,7 +311,7 @@ class ReadersAndLiveDocs {
public synchronized Bits getReadOnlyLiveDocs() {
//System.out.println("getROLiveDocs seg=" + info);
assert Thread.holdsLock(writer);
- shared = true;
+ liveDocsShared = true;
//if (liveDocs != null) {
//System.out.println(" liveCount=" + liveDocs.count());
//}
@@ -222,61 +327,270 @@ class ReadersAndLiveDocs {
// deletes onto the newly merged segment, so we can
// discard them on the sub-readers:
pendingDeleteCount = 0;
+ numericUpdates.clear();
+ mergingUpdates.clear();
}
- // Commit live docs to the directory (writes new
- // _X_N.del files); returns true if it wrote the file
- // and false if there were no new deletes to write:
+ // Commit live docs (writes new _X_N.del files) and field updates (writes new
+ // _X_N updates files) to the directory; returns true if it wrote any file
+ // and false if there were no new deletes or updates to write:
+ // TODO (DVU_RENAME) to writeDeletesAndUpdates
public synchronized boolean writeLiveDocs(Directory dir) throws IOException {
- //System.out.println("rld.writeLiveDocs seg=" + info + " pendingDelCount=" + pendingDeleteCount);
- if (pendingDeleteCount != 0) {
- // We have new deletes
+ assert Thread.holdsLock(writer);
+ //System.out.println("rld.writeLiveDocs seg=" + info + " pendingDelCount=" + pendingDeleteCount + " numericUpdates=" + numericUpdates);
+ final boolean hasFieldUpdates = hasFieldUpdates();
+ if (pendingDeleteCount == 0 && !hasFieldUpdates) {
+ return false;
+ }
+
+ // We have new deletes or updates
+ if (pendingDeleteCount > 0) {
assert liveDocs.length() == info.info.getDocCount();
-
- // Do this so we can delete any created files on
- // exception; this saves all codecs from having to do
- // it:
- TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
-
- // We can write directly to the actual name (vs to a
- // .tmp & renaming it) because the file is not live
- // until segments file is written:
- boolean success = false;
- try {
- info.info.getCodec().liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT);
- success = true;
- } finally {
- if (!success) {
- // Advance only the nextWriteDelGen so that a 2nd
- // attempt to write will write to a new file
- info.advanceNextWriteDelGen();
-
- // Delete any partially created file(s):
- for(String fileName : trackingDir.getCreatedFiles()) {
- try {
- dir.deleteFile(fileName);
- } catch (Throwable t) {
- // Ignore so we throw only the first exc
+ }
+
+ // Do this so we can delete any created files on
+ // exception; this saves all codecs from having to do
+ // it:
+ TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
+
+ // We can write directly to the actual name (vs to a
+ // .tmp & renaming it) because the file is not live
+ // until segments file is written:
+ boolean success = false;
+ try {
+ Codec codec = info.info.getCodec();
+ if (pendingDeleteCount > 0) {
+ codec.liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT);
+ }
+
+ // apply numeric updates if there are any
+ if (hasFieldUpdates) {
+ // reader could be null e.g. for a just merged segment (from
+ // IndexWriter.commitMergedDeletes).
+ final SegmentReader reader = this.reader == null ? new SegmentReader(info, IOContext.READONCE) : this.reader;
+ try {
+ // clone FieldInfos so that we can update their numericUpdatesGen
+ // separately from the reader's infos and write them to a new
+ // fieldInfos_gen file
+ FieldInfos.Builder builder = new FieldInfos.Builder(writer.globalFieldNumberMap);
+ // cannot use builder.add(reader.getFieldInfos()) because it does not clone FI.attributes
+ for (FieldInfo fi : reader.getFieldInfos()) {
+ FieldInfo clone = builder.add(fi);
+ // copy the stuff FieldInfos.Builder doesn't copy
+ if (fi.attributes() != null) {
+ for (Entry<String,String> e : fi.attributes().entrySet()) {
+ clone.putAttribute(e.getKey(), e.getValue());
+ }
+ }
+ }
+ // create new fields or update existing ones to have NumericDV type
+// for (String f : numericUpdates.keySet()) {
+// builder.addOrUpdate(f, NumericDocValuesField.TYPE);
+// }
+
+ final FieldInfos fieldInfos = builder.finish();
+ final long nextDocValuesGen = info.getNextDocValuesGen();
+ final String segmentSuffix = Long.toString(nextDocValuesGen, Character.MAX_RADIX);
+ final SegmentWriteState state = new SegmentWriteState(null, trackingDir, info.info, fieldInfos, null, IOContext.DEFAULT, segmentSuffix, true);
+ final DocValuesFormat docValuesFormat = codec.docValuesFormat();
+ final DocValuesConsumer fieldsConsumer = docValuesFormat.fieldsConsumer(state);
+ boolean fieldsConsumerSuccess = false;
+ try {
+ for (Entry<String,Map<Integer,Long>> e : numericUpdates.entrySet()) {
+ final String field = e.getKey();
+ final Map<Integer,Long> updates = e.getValue();
+ final FieldInfo fieldInfo = fieldInfos.fieldInfo(field);
+
+ if (fieldInfo == null || fieldInfo.getDocValuesType() != DocValuesType.NUMERIC) {
+ throw new UnsupportedOperationException(
+ "cannot update docvalues in a segment with no docvalues field: segment=" + info + ", field=" + field);
+ }
+// assert fieldInfo != null;
+
+ info.setDocValuesGen(fieldInfo.number, nextDocValuesGen);
+
+ // write the numeric updates to a new gen'd docvalues file
+ fieldsConsumer.addNumericField(fieldInfo, new Iterable<Number>() {
+ @SuppressWarnings("synthetic-access")
+ final NumericDocValues currentValues = reader.getNumericDocValues(field);
+ @Override
+ public Iterator<Number> iterator() {
+ return new Iterator<Number>() {
+
+ @SuppressWarnings("synthetic-access")
+ final int maxDoc = reader.maxDoc();
+ int curDoc = -1;
+
+ @Override
+ public boolean hasNext() {
+ return curDoc < maxDoc - 1;
+ }
+
+ @Override
+ public Number next() {
+ if (++curDoc >= maxDoc) {
+ throw new NoSuchElementException("no more documents to return values for");
+ }
+ Long updatedValue = updates.get(curDoc);
+ if (updatedValue == null) {
+ updatedValue = Long.valueOf(currentValues.get(curDoc));
+ } else if (updatedValue == NumericUpdate.MISSING) {
+ updatedValue = null;
+ }
+ return updatedValue;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("this iterator does not support removing elements");
+ }
+
+ };
+ }
+ });
+ }
+ fieldsConsumerSuccess = true;
+ } finally {
+ if (fieldsConsumerSuccess) {
+ fieldsConsumer.close();
+ } else {
+ IOUtils.closeWhileHandlingException(fieldsConsumer);
}
}
+ } finally {
+ if (reader != this.reader) {
+ reader.close();
+ }
}
}
-
- // If we hit an exc in the line above (eg disk full)
- // then info's delGen remains pointing to the previous
- // (successfully written) del docs:
+ success = true;
+ } finally {
+ if (!success) {
+ // Advance only the nextWriteDelGen so that a 2nd
+ // attempt to write will write to a new file
+ if (pendingDeleteCount > 0) {
+ info.advanceNextWriteDelGen();
+ }
+
+ // Advance only the nextWriteDocValuesGen so that a 2nd
+ // attempt to write will write to a new file
+ if (hasFieldUpdates) {
+ info.advanceNextWriteDocValuesGen();
+ }
+
+ // Delete any partially created file(s):
+ for (String fileName : trackingDir.getCreatedFiles()) {
+ try {
+ dir.deleteFile(fileName);
+ } catch (Throwable t) {
+ // Ignore so we throw only the first exc
+ }
+ }
+ }
+ }
+
+ // If we hit an exc in the line above (eg disk full)
+ // then info's delGen remains pointing to the previous
+ // (successfully written) del docs:
+ if (pendingDeleteCount > 0) {
info.advanceDelGen();
info.setDelCount(info.getDelCount() + pendingDeleteCount);
-
pendingDeleteCount = 0;
- return true;
- } else {
- return false;
}
+
+ if (hasFieldUpdates) {
+ info.advanceDocValuesGen();
+ // copy all the updates to mergingUpdates, so they can later be applied to the merged segment
+ if (isMerging) {
+ copyUpdatesToMerging();
+ }
+ numericUpdates.clear();
+ }
+
+ info.addUpdatesFiles(trackingDir.getCreatedFiles());
+
+ return true;
}
+ private void copyUpdatesToMerging() {
+ for (Entry<String,Map<Integer,Long>> e : numericUpdates.entrySet()) {
+ String field = e.getKey();
+ Map<Integer,Long> merging = mergingUpdates.get(field);
+ if (merging == null) {
+ mergingUpdates.put(field, new HashMap<Integer,Long>(e.getValue()));
+ } else {
+ merging.putAll(e.getValue());
+ }
+ }
+ }
+
+ /**
+ * Indicates whether this segment is currently being merged. Call this just
+ * before the segment is being merged with {@code true} and when the merge has
+ * finished and all updates have been applied to the merged segment, call this
+ * with {@code false}.
+ */
+ public synchronized void setMerging(boolean isMerging) {
+ this.isMerging = isMerging;
+ if (!isMerging) {
+ mergingUpdates.clear();
+ }
+ }
+
+ /**
+ * Called from IndexWriter after applying deletes to the merged segment, while
+ * it was being merged.
+ */
+ public synchronized void setMergingUpdates(Map<Integer,Map<String,Long>> updates) {
+ for (Entry<Integer,Map<String,Long>> e : updates.entrySet()) {
+ int doc = e.getKey().intValue();
+ for (Entry<String,Long> docUpdates : e.getValue().entrySet()) {
+ String field = docUpdates.getKey();
+ Long value = docUpdates.getValue();
+ Map<Integer,Long> fieldUpdates = numericUpdates.get(field);
+ if (fieldUpdates == null) {
+ fieldUpdates = new HashMap<Integer,Long>();
+ numericUpdates.put(field, fieldUpdates);
+ }
+ fieldUpdates.put(doc, value);
+ }
+ }
+ }
+
+ /** Returns updates that came in while this segment was merging. */
+ public synchronized Map<Integer,Map<String,Long>> getMergingUpdates() {
+ copyUpdatesToMerging();
+ if (mergingUpdates.isEmpty()) {
+ return null;
+ }
+
+ Map<Integer,Map<String,Long>> updates = new HashMap<Integer,Map<String,Long>>();
+ for (Entry<String,Map<Integer,Long>> e : mergingUpdates.entrySet()) {
+ String field = e.getKey();
+ for (Entry<Integer,Long> fieldUpdates : e.getValue().entrySet()) {
+ Integer doc = fieldUpdates.getKey();
+ Long value = fieldUpdates.getValue();
+ Map<String,Long> docUpdates = updates.get(doc);
+ if (docUpdates == null) {
+ docUpdates = new HashMap<String,Long>();
+ updates.put(doc, docUpdates);
+ }
+ docUpdates.put(field, value);
+ }
+ }
+
+ mergingUpdates.clear();
+ return updates;
+ }
+
@Override
public String toString() {
- return "ReadersAndLiveDocs(seg=" + info + " pendingDeleteCount=" + pendingDeleteCount + " shared=" + shared + ")";
+ StringBuilder sb = new StringBuilder();
+ sb.append("ReadersAndLiveDocs(seg=").append(info);
+ sb.append(" pendingDeleteCount=").append(pendingDeleteCount);
+ sb.append(" liveDocsShared=").append(liveDocsShared);
+ sb.append(" pendingUpdatesCount=").append(getPendingUpdatesCount());
+ return sb.toString();
}
+
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java?rev=1523461&r1=1523460&r2=1523461&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java Sun Sep 15 17:11:52 2013
@@ -26,17 +26,15 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.codecs.Codec;
+import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.codecs.PostingsFormat;
-import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.TermVectorsReader;
-import org.apache.lucene.index.FieldInfo.DocValuesType;
import org.apache.lucene.index.SegmentReader.CoreClosedListener;
import org.apache.lucene.store.CompoundFileDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
-import org.apache.lucene.util.Bits;
import org.apache.lucene.util.CloseableThreadLocal;
import org.apache.lucene.util.IOUtils;
@@ -44,18 +42,16 @@ import org.apache.lucene.util.IOUtils;
* SegmentReader is cloned or reopened */
final class SegmentCoreReaders {
- // Counts how many other reader share the core objects
+ // Counts how many other readers share the core objects
// (freqStream, proxStream, tis, etc.) of this reader;
// when coreRef drops to 0, these core objects may be
// closed. A given instance of SegmentReader may be
- // closed, even those it shares core objects with other
+ // closed, even though it shares core objects with other
// SegmentReaders:
private final AtomicInteger ref = new AtomicInteger(1);
final FieldInfos fieldInfos;
-
final FieldsProducer fields;
- final DocValuesProducer dvProducer;
final DocValuesProducer normsProducer;
private final SegmentReader owner;
@@ -66,7 +62,7 @@ final class SegmentCoreReaders {
// TODO: make a single thread local w/ a
// Thingy class holding fieldsReader, termVectorsReader,
- // normsProducer, dvProducer
+ // normsProducer
final CloseableThreadLocal<StoredFieldsReader> fieldsReaderLocal = new CloseableThreadLocal<StoredFieldsReader>() {
@Override
@@ -82,20 +78,6 @@ final class SegmentCoreReaders {
}
};
- final CloseableThreadLocal<Map<String,Object>> docValuesLocal = new CloseableThreadLocal<Map<String,Object>>() {
- @Override
- protected Map<String,Object> initialValue() {
- return new HashMap<String,Object>();
- }
- };
-
- final CloseableThreadLocal<Map<String,Bits>> docsWithFieldLocal = new CloseableThreadLocal<Map<String,Bits>>() {
- @Override
- protected Map<String,Bits> initialValue() {
- return new HashMap<String,Bits>();
- }
- };
-
final CloseableThreadLocal<Map<String,Object>> normsLocal = new CloseableThreadLocal<Map<String,Object>>() {
@Override
protected Map<String,Object> initialValue() {
@@ -120,8 +102,9 @@ final class SegmentCoreReaders {
cfsReader = null;
cfsDir = dir;
}
- fieldInfos = codec.fieldInfosFormat().getFieldInfosReader().read(cfsDir, si.info.name, IOContext.READONCE);
+ fieldInfos = codec.fieldInfosFormat().getFieldInfosReader().read(cfsDir, si.info.name, IOContext.READONCE);
+
final PostingsFormat format = codec.postingsFormat();
final SegmentReadState segmentReadState = new SegmentReadState(cfsDir, si.info, fieldInfos, context);
// Ask codec for its Fields
@@ -131,13 +114,6 @@ final class SegmentCoreReaders {
// TODO: since we don't write any norms file if there are no norms,
// kinda jaky to assume the codec handles the case of no norms file at all gracefully?!
- if (fieldInfos.hasDocValues()) {
- dvProducer = codec.docValuesFormat().fieldsProducer(segmentReadState);
- assert dvProducer != null;
- } else {
- dvProducer = null;
- }
-
if (fieldInfos.hasNorms()) {
normsProducer = codec.normsFormat().normsProducer(segmentReadState);
assert normsProducer != null;
@@ -167,144 +143,12 @@ final class SegmentCoreReaders {
this.owner = owner;
}
- void incRef() {
- ref.incrementAndGet();
- }
-
- NumericDocValues getNumericDocValues(String field) throws IOException {
- FieldInfo fi = fieldInfos.fieldInfo(field);
- if (fi == null) {
- // Field does not exist
- return null;
- }
- if (fi.getDocValuesType() == null) {
- // Field was not indexed with doc values
- return null;
- }
- if (fi.getDocValuesType() != DocValuesType.NUMERIC) {
- // DocValues were not numeric
- return null;
- }
-
- assert dvProducer != null;
-
- Map<String,Object> dvFields = docValuesLocal.get();
-
- NumericDocValues dvs = (NumericDocValues) dvFields.get(field);
- if (dvs == null) {
- dvs = dvProducer.getNumeric(fi);
- dvFields.put(field, dvs);
- }
-
- return dvs;
- }
-
- BinaryDocValues getBinaryDocValues(String field) throws IOException {
- FieldInfo fi = fieldInfos.fieldInfo(field);
- if (fi == null) {
- // Field does not exist
- return null;
- }
- if (fi.getDocValuesType() == null) {
- // Field was not indexed with doc values
- return null;
- }
- if (fi.getDocValuesType() != DocValuesType.BINARY) {
- // DocValues were not binary
- return null;
- }
-
- assert dvProducer != null;
-
- Map<String,Object> dvFields = docValuesLocal.get();
-
- BinaryDocValues dvs = (BinaryDocValues) dvFields.get(field);
- if (dvs == null) {
- dvs = dvProducer.getBinary(fi);
- dvFields.put(field, dvs);
- }
-
- return dvs;
- }
-
- SortedDocValues getSortedDocValues(String field) throws IOException {
- FieldInfo fi = fieldInfos.fieldInfo(field);
- if (fi == null) {
- // Field does not exist
- return null;
- }
- if (fi.getDocValuesType() == null) {
- // Field was not indexed with doc values
- return null;
- }
- if (fi.getDocValuesType() != DocValuesType.SORTED) {
- // DocValues were not sorted
- return null;
- }
-
- assert dvProducer != null;
-
- Map<String,Object> dvFields = docValuesLocal.get();
-
- SortedDocValues dvs = (SortedDocValues) dvFields.get(field);
- if (dvs == null) {
- dvs = dvProducer.getSorted(fi);
- dvFields.put(field, dvs);
- }
-
- return dvs;
+ int getRefCount() {
+ return ref.get();
}
- SortedSetDocValues getSortedSetDocValues(String field) throws IOException {
- FieldInfo fi = fieldInfos.fieldInfo(field);
- if (fi == null) {
- // Field does not exist
- return null;
- }
- if (fi.getDocValuesType() == null) {
- // Field was not indexed with doc values
- return null;
- }
- if (fi.getDocValuesType() != DocValuesType.SORTED_SET) {
- // DocValues were not sorted
- return null;
- }
-
- assert dvProducer != null;
-
- Map<String,Object> dvFields = docValuesLocal.get();
-
- SortedSetDocValues dvs = (SortedSetDocValues) dvFields.get(field);
- if (dvs == null) {
- dvs = dvProducer.getSortedSet(fi);
- dvFields.put(field, dvs);
- }
-
- return dvs;
- }
-
- Bits getDocsWithField(String field) throws IOException {
- FieldInfo fi = fieldInfos.fieldInfo(field);
- if (fi == null) {
- // Field does not exist
- return null;
- }
- if (fi.getDocValuesType() == null) {
- // Field was not indexed with doc values
- return null;
- }
-
- assert dvProducer != null;
-
- Map<String,Bits> dvFields = docsWithFieldLocal.get();
-
- Bits dvs = dvFields.get(field);
- if (dvs == null) {
- dvs = dvProducer.getDocsWithField(fi);
- dvFields.put(field, dvs);
- }
-
- return dvs;
+ void incRef() {
+ ref.incrementAndGet();
}
NumericDocValues getNormValues(String field) throws IOException {
@@ -332,8 +176,9 @@ final class SegmentCoreReaders {
void decRef() throws IOException {
if (ref.decrementAndGet() == 0) {
- IOUtils.close(termVectorsLocal, fieldsReaderLocal, docValuesLocal, normsLocal, docsWithFieldLocal, fields,
- dvProducer, termVectorsReaderOrig, fieldsReaderOrig, cfsReader, normsProducer);
+// System.err.println("--- closing core readers");
+ IOUtils.close(termVectorsLocal, fieldsReaderLocal, normsLocal, fields, termVectorsReaderOrig, fieldsReaderOrig,
+ cfsReader, normsProducer);
notifyCoreClosedListeners();
}
}
@@ -356,12 +201,12 @@ final class SegmentCoreReaders {
/** Returns approximate RAM bytes used */
public long ramBytesUsed() {
- return ((dvProducer!=null) ? dvProducer.ramBytesUsed() : 0) +
- ((normsProducer!=null) ? normsProducer.ramBytesUsed() : 0) +
+ return ((normsProducer!=null) ? normsProducer.ramBytesUsed() : 0) +
((fields!=null) ? fields.ramBytesUsed() : 0) +
((fieldsReaderOrig!=null)? fieldsReaderOrig.ramBytesUsed() : 0) +
((termVectorsReaderOrig!=null) ? termVectorsReaderOrig.ramBytesUsed() : 0);
}
+
@Override
public String toString() {
return "SegmentCoreReader(owner=" + owner + ")";
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java?rev=1523461&r1=1523460&r2=1523461&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentInfo.java Sun Sep 15 17:11:52 2013
@@ -119,7 +119,7 @@ public final class SegmentInfo {
public void setCodec(Codec codec) {
assert this.codec == null;
if (codec == null) {
- throw new IllegalArgumentException("segmentCodecs must be non-null");
+ throw new IllegalArgumentException("codec must be non-null");
}
this.codec = codec;
}
@@ -170,7 +170,6 @@ public final class SegmentInfo {
* left off when there are no deletions).</p>
*/
public String toString(Directory dir, int delCount) {
-
StringBuilder s = new StringBuilder();
s.append(name).append('(').append(version == null ? "?" : version).append(')').append(':');
char cfs = getUseCompoundFile() ? 'c' : 'C';
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentInfoPerCommit.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentInfoPerCommit.java?rev=1523461&r1=1523460&r2=1523461&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentInfoPerCommit.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentInfoPerCommit.java Sun Sep 15 17:11:52 2013
@@ -19,7 +19,10 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import org.apache.lucene.store.Directory;
@@ -27,9 +30,8 @@ import org.apache.lucene.store.Directory
* fields.
*
* @lucene.experimental */
-
-public class SegmentInfoPerCommit {
-
+public class SegmentInfoPerCommit { // TODO (DVU_RENAME) to SegmentCommitInfo
+
/** The {@link SegmentInfo} that we wrap. */
public final SegmentInfo info;
@@ -44,15 +46,35 @@ public class SegmentInfoPerCommit {
// attempt to write:
private long nextWriteDelGen;
+ // holds field.number to docValuesGen mapping
+ // TODO (DVU_FIELDINFOS_GEN) once we gen FieldInfos, get rid of this; every FieldInfo will record its dvGen
+ private final Map<Integer,Long> fieldDocValuesGens = new HashMap<Integer,Long>();
+
+ // Generation number of the docValues (-1 if there are no field updates)
+ private long docValuesGen;
+
+ // Normally 1 + docValuesGen, unless an exception was hit on last attempt to
+ // write
+ private long nextWriteDocValuesGen;
+
+ // Tracks the files with field updates
+ private Set<String> updatesFiles = new HashSet<String>();
+
private volatile long sizeInBytes = -1;
- /** Sole constructor.
- * @param info {@link SegmentInfo} that we wrap
- * @param delCount number of deleted documents in this segment
- * @param delGen deletion generation number (used to name
- deletion files)
+ /**
+ * Sole constructor.
+ *
+ * @param info
+ * {@link SegmentInfo} that we wrap
+ * @param delCount
+ * number of deleted documents in this segment
+ * @param delGen
+ * deletion generation number (used to name deletion files)
+ * @param docValuesGen
+ * doc-values generation number (used to name docvalues files)
**/
- public SegmentInfoPerCommit(SegmentInfo info, int delCount, long delGen) {
+ public SegmentInfoPerCommit(SegmentInfo info, int delCount, long delGen, long docValuesGen) {
this.info = info;
this.delCount = delCount;
this.delGen = delGen;
@@ -61,8 +83,25 @@ public class SegmentInfoPerCommit {
} else {
nextWriteDelGen = delGen+1;
}
+
+ this.docValuesGen = docValuesGen;
+ if (docValuesGen == -1) {
+ nextWriteDocValuesGen = 1;
+ } else {
+ nextWriteDocValuesGen = docValuesGen + 1;
+ }
}
+ /** Returns the files which contains field updates. */
+ public Set<String> getUpdatesFiles() {
+ return new HashSet<String>(updatesFiles);
+ }
+
+ /** Called when we succeed in writing field updates. */
+ public void addUpdatesFiles(Set<String> files) {
+ updatesFiles.addAll(files);
+ }
+
/** Called when we succeed in writing deletes */
void advanceDelGen() {
delGen = nextWriteDelGen;
@@ -76,6 +115,21 @@ public class SegmentInfoPerCommit {
void advanceNextWriteDelGen() {
nextWriteDelGen++;
}
+
+ /** Called when we succeed in writing docvalues updates */
+ void advanceDocValuesGen() {
+ docValuesGen = nextWriteDocValuesGen;
+ nextWriteDocValuesGen = docValuesGen + 1;
+ sizeInBytes = -1;
+ }
+
+ /**
+ * Called if there was an exception while writing docvalues updates, so that
+ * we don't try to write to the same file more than once.
+ */
+ void advanceNextWriteDocValuesGen() {
+ nextWriteDocValuesGen++;
+ }
/** Returns total size in bytes of all files for this
* segment. */
@@ -96,9 +150,15 @@ public class SegmentInfoPerCommit {
// Start from the wrapped info's files:
Collection<String> files = new HashSet<String>(info.files());
+ // TODO we could rely on TrackingDir.getCreatedFiles() (like we do for
+ // updates) and then maybe even be able to remove LiveDocsFormat.files().
+
// Must separately add any live docs files:
info.getCodec().liveDocsFormat().files(this, files);
+ // Must separately add any field updates files
+ files.addAll(updatesFiles);
+
return files;
}
@@ -115,26 +175,53 @@ public class SegmentInfoPerCommit {
sizeInBytes = -1;
}
- void clearDelGen() {
- delGen = -1;
- sizeInBytes = -1;
- }
-
- /**
- * Sets the generation number of the live docs file.
- * @see #getDelGen()
- */
- public void setDelGen(long delGen) {
- this.delGen = delGen;
- sizeInBytes = -1;
- }
-
/** Returns true if there are any deletions for the
* segment at this commit. */
public boolean hasDeletions() {
return delGen != -1;
}
+ /** Returns true if there are any field updates for the segment in this commit. */
+ public boolean hasFieldUpdates() {
+ return docValuesGen != -1;
+ }
+
+ /** Returns the next available generation number of the docvalues files. */
+ public long getNextDocValuesGen() {
+ return nextWriteDocValuesGen;
+ }
+
+ /**
+ * Returns the docvalues generation of this field, or -1 if there are
+ * no updates to it.
+ */
+ public long getDocValuesGen(int fieldNumber) {
+ Long gen = fieldDocValuesGens.get(fieldNumber);
+ return gen == null ? -1 : gen.longValue();
+ }
+
+ /** Sets the docvalues generation for this field. */
+ public void setDocValuesGen(int fieldNumber, long gen) {
+ fieldDocValuesGens.put(fieldNumber, gen);
+ }
+
+ /**
+ * Returns a mapping from a field number to its DV generation.
+ *
+ * @see #getDocValuesGen(int)
+ */
+ public Map<Integer,Long> getFieldDocValuesGens() {
+ return fieldDocValuesGens;
+ }
+
+ /**
+ * Returns the generation number of the field infos file or -1 if there are no
+ * field updates yet.
+ */
+ public long getDocValuesGen() {
+ return docValuesGen;
+ }
+
/**
* Returns the next available generation number
* of the live docs file.
@@ -174,17 +261,25 @@ public class SegmentInfoPerCommit {
if (delGen != -1) {
s += ":delGen=" + delGen;
}
+ if (docValuesGen != -1) {
+ s += ":docValuesGen=" + docValuesGen;
+ }
return s;
}
@Override
public SegmentInfoPerCommit clone() {
- SegmentInfoPerCommit other = new SegmentInfoPerCommit(info, delCount, delGen);
+ SegmentInfoPerCommit other = new SegmentInfoPerCommit(info, delCount, delGen, docValuesGen);
// Not clear that we need to carry over nextWriteDelGen
// (i.e. do we ever clone after a failed write and
// before the next successful write?), but just do it to
// be safe:
other.nextWriteDelGen = nextWriteDelGen;
+ other.nextWriteDocValuesGen = nextWriteDocValuesGen;
+
+ other.updatesFiles.addAll(updatesFiles);
+
+ other.fieldDocValuesGens.putAll(fieldDocValuesGens);
return other;
}
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java?rev=1523461&r1=1523460&r2=1523461&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java Sun Sep 15 17:11:52 2013
@@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.lucene.codecs.Codec;
@@ -35,14 +36,13 @@ import org.apache.lucene.codecs.CodecUti
import org.apache.lucene.codecs.LiveDocsFormat;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.ChecksumIndexOutput;
-import org.apache.lucene.store.DataOutput; // javadocs
+import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.NoSuchDirectoryException;
import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.ThreadInterruptedException;
/**
* A collection of segmentInfo objects with methods for operating on
@@ -111,11 +111,12 @@ import org.apache.lucene.util.ThreadInte
*/
public final class SegmentInfos implements Cloneable, Iterable<SegmentInfoPerCommit> {
- /**
- * The file format version for the segments_N codec header
- */
+ /** The file format version for the segments_N codec header, up to 4.4. */
public static final int VERSION_40 = 0;
+ /** The file format version for the segments_N codec header, since 4.5+. */
+ public static final int VERSION_45 = 1;
+
/** Used for the segments.gen file only!
* Whenever you add a new format, make it 1 smaller (negative version logic)! */
public static final int FORMAT_SEGMENTS_GEN_CURRENT = -2;
@@ -319,14 +320,14 @@ public final class SegmentInfos implemen
throw new IndexFormatTooOldException(input, magic, CodecUtil.CODEC_MAGIC, CodecUtil.CODEC_MAGIC);
}
// 4.0+
- CodecUtil.checkHeaderNoMagic(input, "segments", VERSION_40, VERSION_40);
+ int format = CodecUtil.checkHeaderNoMagic(input, "segments", VERSION_40, VERSION_45);
version = input.readLong();
counter = input.readInt();
int numSegments = input.readInt();
if (numSegments < 0) {
throw new CorruptIndexException("invalid segment count: " + numSegments + " (resource: " + input + ")");
}
- for(int seg=0;seg<numSegments;seg++) {
+ for (int seg = 0; seg < numSegments; seg++) {
String segName = input.readString();
Codec codec = Codec.forName(input.readString());
//System.out.println("SIS.read seg=" + seg + " codec=" + codec);
@@ -337,7 +338,19 @@ public final class SegmentInfos implemen
if (delCount < 0 || delCount > info.getDocCount()) {
throw new CorruptIndexException("invalid deletion count: " + delCount + " (resource: " + input + ")");
}
- add(new SegmentInfoPerCommit(info, delCount, delGen));
+ long docValuesGen = -1;
+ if (format >= VERSION_45) {
+ docValuesGen = input.readLong();
+ }
+ SegmentInfoPerCommit siPerCommit = new SegmentInfoPerCommit(info, delCount, delGen, docValuesGen);
+ if (format >= VERSION_45) {
+ int numUpdates = input.readInt();
+ for (int i = 0; i < numUpdates; i++) {
+ siPerCommit.setDocValuesGen(input.readInt(), input.readLong());
+ }
+ siPerCommit.addUpdatesFiles(input.readStringSet());
+ }
+ add(siPerCommit);
}
userData = input.readStringStringMap();
@@ -395,7 +408,7 @@ public final class SegmentInfos implemen
try {
segnOutput = new ChecksumIndexOutput(directory.createOutput(segmentFileName, IOContext.DEFAULT));
- CodecUtil.writeHeader(segnOutput, "segments", VERSION_40);
+ CodecUtil.writeHeader(segnOutput, "segments", VERSION_45);
segnOutput.writeLong(version);
segnOutput.writeInt(counter); // write counter
segnOutput.writeInt(size()); // write infos
@@ -405,6 +418,14 @@ public final class SegmentInfos implemen
segnOutput.writeString(si.getCodec().getName());
segnOutput.writeLong(siPerCommit.getDelGen());
segnOutput.writeInt(siPerCommit.getDelCount());
+ segnOutput.writeLong(siPerCommit.getDocValuesGen());
+ Map<Integer,Long> docValuesUpdatesGen = siPerCommit.getFieldDocValuesGens();
+ segnOutput.writeInt(docValuesUpdatesGen.size());
+ for (Entry<Integer,Long> e : docValuesUpdatesGen.entrySet()) {
+ segnOutput.writeInt(e.getKey());
+ segnOutput.writeLong(e.getValue());
+ }
+ segnOutput.writeStringSet(siPerCommit.getUpdatesFiles());
assert si.dir == directory;
assert siPerCommit.getDelCount() <= si.getDocCount();
@@ -815,6 +836,7 @@ public final class SegmentInfos implemen
files.addAll(info.files());
}
}
+
return files;
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java?rev=1523461&r1=1523460&r2=1523461&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentReader.java Sun Sep 15 17:11:52 2013
@@ -18,13 +18,24 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
-
-import org.apache.lucene.store.Directory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.lucene.codecs.Codec;
+import org.apache.lucene.codecs.DocValuesFormat;
+import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.TermVectorsReader;
-import org.apache.lucene.search.FieldCache; // javadocs
+import org.apache.lucene.index.FieldInfo.DocValuesType;
+import org.apache.lucene.search.FieldCache;
+import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.CloseableThreadLocal;
+import org.apache.lucene.util.RefCount;
/**
* IndexReader implementation over a single segment.
@@ -45,6 +56,23 @@ public final class SegmentReader extends
final SegmentCoreReaders core;
+ final CloseableThreadLocal<Map<String,Object>> docValuesLocal = new CloseableThreadLocal<Map<String,Object>>() {
+ @Override
+ protected Map<String,Object> initialValue() {
+ return new HashMap<String,Object>();
+ }
+ };
+
+ final CloseableThreadLocal<Map<String,Bits>> docsWithFieldLocal = new CloseableThreadLocal<Map<String,Bits>>() {
+ @Override
+ protected Map<String,Bits> initialValue() {
+ return new HashMap<String,Bits>();
+ }
+ };
+
+ final Map<String,DocValuesProducer> dvProducers = new HashMap<String,DocValuesProducer>();
+ final Map<Long,RefCount<DocValuesProducer>> genDVProducers = new HashMap<Long,RefCount<DocValuesProducer>>();
+
/**
* Constructs a new SegmentReader with a new core.
* @throws CorruptIndexException if the index is corrupt
@@ -54,16 +82,41 @@ public final class SegmentReader extends
public SegmentReader(SegmentInfoPerCommit si, IOContext context) throws IOException {
this.si = si;
core = new SegmentCoreReaders(this, si.info.dir, si, context);
+
boolean success = false;
+ final Codec codec = si.info.getCodec();
try {
if (si.hasDeletions()) {
// NOTE: the bitvector is stored using the regular directory, not cfs
- liveDocs = si.info.getCodec().liveDocsFormat().readLiveDocs(directory(), si, IOContext.READONCE);
+ liveDocs = codec.liveDocsFormat().readLiveDocs(directory(), si, IOContext.READONCE);
} else {
assert si.getDelCount() == 0;
liveDocs = null;
}
numDocs = si.info.getDocCount() - si.getDelCount();
+
+ if (core.fieldInfos.hasDocValues()) {
+ final Directory dir = core.cfsReader != null ? core.cfsReader : si.info.dir;
+ final DocValuesFormat dvFormat = codec.docValuesFormat();
+ // initialize the per generation numericDVProducers and put the correct
+ // DVProducer for each field
+ final Map<Long,List<FieldInfo>> genInfos = getGenInfos(si);
+
+ for (Entry<Long,List<FieldInfo>> e : genInfos.entrySet()) {
+ Long gen = e.getKey();
+ List<FieldInfo> infos = e.getValue();
+ RefCount<DocValuesProducer> dvp = genDVProducers.get(gen);
+ if (dvp == null) {
+ dvp = newDocValuesProducer(si, context, dir, dvFormat, gen, infos);
+ assert dvp != null;
+ genDVProducers.put(gen, dvp);
+ }
+ for (FieldInfo fi : infos) {
+ dvProducers.put(fi.name, dvp.get());
+ }
+ }
+ }
+
success = true;
} finally {
// With lock-less commits, it's entirely possible (and
@@ -72,7 +125,7 @@ public final class SegmentReader extends
// of things that were opened so that we don't have to
// wait for a GC to do so.
if (!success) {
- core.decRef();
+ doClose();
}
}
}
@@ -80,8 +133,8 @@ public final class SegmentReader extends
/** Create new SegmentReader sharing core from a previous
* SegmentReader and loading new live docs from a new
* deletes file. Used by openIfChanged. */
- SegmentReader(SegmentInfoPerCommit si, SegmentCoreReaders core) throws IOException {
- this(si, core,
+ SegmentReader(SegmentInfoPerCommit si, SegmentReader sr) throws IOException {
+ this(si, sr,
si.info.getCodec().liveDocsFormat().readLiveDocs(si.info.dir, si, IOContext.READONCE),
si.info.getDocCount() - si.getDelCount());
}
@@ -90,15 +143,88 @@ public final class SegmentReader extends
* SegmentReader and using the provided in-memory
* liveDocs. Used by IndexWriter to provide a new NRT
* reader */
- SegmentReader(SegmentInfoPerCommit si, SegmentCoreReaders core, Bits liveDocs, int numDocs) {
+ SegmentReader(SegmentInfoPerCommit si, SegmentReader sr, Bits liveDocs, int numDocs) throws IOException {
this.si = si;
- this.core = core;
+ this.liveDocs = liveDocs;
+ this.numDocs = numDocs;
+ this.core = sr.core;
core.incRef();
+
+ // increment refCount of DocValuesProducers that are used by this reader
+ boolean success = false;
+ try {
+ if (core.fieldInfos.hasDocValues()) {
+ final Codec codec = si.info.getCodec();
+ final Directory dir = core.cfsReader != null ? core.cfsReader : si.info.dir;
+
+ final DocValuesFormat dvFormat = codec.docValuesFormat();
+ final Map<Long,List<FieldInfo>> genInfos = getGenInfos(si);
+
+ for (Entry<Long,List<FieldInfo>> e : genInfos.entrySet()) {
+ Long gen = e.getKey();
+ List<FieldInfo> infos = e.getValue();
+ RefCount<DocValuesProducer> dvp = genDVProducers.get(gen);
+ if (dvp == null) {
+ // check if this DVP gen is used by the given reader
+ dvp = sr.genDVProducers.get(gen);
+ if (dvp != null) {
+ // gen used by given reader, incRef its DVP
+ dvp.incRef();
+ } else {
+ // this gen is not used by given reader, initialize a new one
+ dvp = newDocValuesProducer(si, IOContext.READ, dir, dvFormat, gen, infos);
+ }
+ assert dvp != null;
+ genDVProducers.put(gen, dvp);
+ }
+ for (FieldInfo fi : infos) {
+ dvProducers.put(fi.name, dvp.get());
+ }
+ }
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ doClose();
+ }
+ }
+ }
- assert liveDocs != null;
- this.liveDocs = liveDocs;
+ // returns a gen->List<FieldInfo> mapping. Fields without DV updates have gen=-1
+ private Map<Long,List<FieldInfo>> getGenInfos(SegmentInfoPerCommit si) {
+ final Map<Long,List<FieldInfo>> genInfos = new HashMap<Long,List<FieldInfo>>();
+ for (FieldInfo fi : core.fieldInfos) {
+ if (fi.getDocValuesType() == null) {
+ continue;
+ }
+ long gen = si.getDocValuesGen(fi.number);
+ List<FieldInfo> infos = genInfos.get(gen);
+ if (infos == null) {
+ infos = new ArrayList<FieldInfo>();
+ genInfos.put(gen, infos);
+ }
+ infos.add(fi);
+ }
+ return genInfos;
+ }
- this.numDocs = numDocs;
+ private RefCount<DocValuesProducer> newDocValuesProducer(SegmentInfoPerCommit si, IOContext context, Directory dir,
+ DocValuesFormat dvFormat, Long gen, List<FieldInfo> infos) throws IOException {
+ Directory dvDir = dir;
+ String segmentSuffix = "";
+ if (gen.longValue() != -1) {
+ dvDir = si.info.dir; // gen'd files are written outside CFS, so use SegInfo directory
+ segmentSuffix = Long.toString(gen.longValue(), Character.MAX_RADIX);
+ }
+
+ // set SegmentReadState to list only the fields that are relevant to that gen
+ SegmentReadState srs = new SegmentReadState(dvDir, si.info, new FieldInfos(infos.toArray(new FieldInfo[infos.size()])), context, segmentSuffix);
+ return new RefCount<DocValuesProducer>(dvFormat.fieldsProducer(srs)) {
+ @Override
+ protected void release() throws IOException {
+ object.close();
+ }
+ };
}
@Override
@@ -110,7 +236,26 @@ public final class SegmentReader extends
@Override
protected void doClose() throws IOException {
//System.out.println("SR.close seg=" + si);
- core.decRef();
+ try {
+ core.decRef();
+ } finally {
+ Throwable t = null;
+ for (RefCount<DocValuesProducer> dvp : genDVProducers.values()) {
+ try {
+ dvp.decRef();
+ } catch (Throwable th) {
+ if (t != null) {
+ t = th;
+ }
+ }
+ }
+ if (t != null) {
+ if (t instanceof IOException) throw (IOException) t;
+ if (t instanceof RuntimeException) throw (RuntimeException) t;
+ if (t instanceof Error) throw (Error) t;
+ throw new RuntimeException(t);
+ }
+ }
}
@Override
@@ -217,34 +362,140 @@ public final class SegmentReader extends
return this;
}
+ // returns the FieldInfo that corresponds to the given field and type, or
+ // null if the field does not exist, or not indexed as the requested
+ // DovDocValuesType.
+ private FieldInfo getDVField(String field, DocValuesType type) {
+ FieldInfo fi = core.fieldInfos.fieldInfo(field);
+ if (fi == null) {
+ // Field does not exist
+ return null;
+ }
+ if (fi.getDocValuesType() == null) {
+ // Field was not indexed with doc values
+ return null;
+ }
+ if (fi.getDocValuesType() != type) {
+ // Field DocValues are different than requested type
+ return null;
+ }
+
+ return fi;
+ }
+
@Override
public NumericDocValues getNumericDocValues(String field) throws IOException {
ensureOpen();
- return core.getNumericDocValues(field);
+ FieldInfo fi = getDVField(field, DocValuesType.NUMERIC);
+ if (fi == null) {
+ return null;
+ }
+
+ DocValuesProducer dvProducer = dvProducers.get(field);
+ assert dvProducer != null;
+
+ Map<String,Object> dvFields = docValuesLocal.get();
+
+ NumericDocValues dvs = (NumericDocValues) dvFields.get(field);
+ if (dvs == null) {
+ dvs = dvProducer.getNumeric(fi);
+ dvFields.put(field, dvs);
+ }
+
+ return dvs;
}
@Override
public Bits getDocsWithField(String field) throws IOException {
ensureOpen();
- return core.getDocsWithField(field);
+ FieldInfo fi = core.fieldInfos.fieldInfo(field);
+ if (fi == null) {
+ // Field does not exist
+ return null;
+ }
+ if (fi.getDocValuesType() == null) {
+ // Field was not indexed with doc values
+ return null;
+ }
+
+ DocValuesProducer dvProducer = dvProducers.get(field);
+ assert dvProducer != null;
+
+ Map<String,Bits> dvFields = docsWithFieldLocal.get();
+
+ Bits dvs = dvFields.get(field);
+ if (dvs == null) {
+ dvs = dvProducer.getDocsWithField(fi);
+ dvFields.put(field, dvs);
+ }
+
+ return dvs;
}
@Override
public BinaryDocValues getBinaryDocValues(String field) throws IOException {
ensureOpen();
- return core.getBinaryDocValues(field);
+ FieldInfo fi = getDVField(field, DocValuesType.BINARY);
+ if (fi == null) {
+ return null;
+ }
+
+ DocValuesProducer dvProducer = dvProducers.get(field);
+ assert dvProducer != null;
+
+ Map<String,Object> dvFields = docValuesLocal.get();
+
+ BinaryDocValues dvs = (BinaryDocValues) dvFields.get(field);
+ if (dvs == null) {
+ dvs = dvProducer.getBinary(fi);
+ dvFields.put(field, dvs);
+ }
+
+ return dvs;
}
@Override
public SortedDocValues getSortedDocValues(String field) throws IOException {
ensureOpen();
- return core.getSortedDocValues(field);
+ FieldInfo fi = getDVField(field, DocValuesType.SORTED);
+ if (fi == null) {
+ return null;
+ }
+
+ DocValuesProducer dvProducer = dvProducers.get(field);
+ assert dvProducer != null;
+
+ Map<String,Object> dvFields = docValuesLocal.get();
+
+ SortedDocValues dvs = (SortedDocValues) dvFields.get(field);
+ if (dvs == null) {
+ dvs = dvProducer.getSorted(fi);
+ dvFields.put(field, dvs);
+ }
+
+ return dvs;
}
@Override
public SortedSetDocValues getSortedSetDocValues(String field) throws IOException {
ensureOpen();
- return core.getSortedSetDocValues(field);
+ FieldInfo fi = getDVField(field, DocValuesType.SORTED_SET);
+ if (fi == null) {
+ return null;
+ }
+
+ DocValuesProducer dvProducer = dvProducers.get(field);
+ assert dvProducer != null;
+
+ Map<String,Object> dvFields = docValuesLocal.get();
+
+ SortedSetDocValues dvs = (SortedSetDocValues) dvFields.get(field);
+ if (dvs == null) {
+ dvs = dvProducer.getSortedSet(fi);
+ dvFields.put(field, dvs);
+ }
+
+ return dvs;
}
@Override
@@ -284,9 +535,21 @@ public final class SegmentReader extends
core.removeCoreClosedListener(listener);
}
+ private long dvRamBytesUsed() {
+ long ramBytesUsed = 0;
+ for (RefCount<DocValuesProducer> dvp : genDVProducers.values()) {
+ ramBytesUsed += dvp.get().ramBytesUsed();
+ }
+ return ramBytesUsed;
+ }
+
/** Returns approximate RAM Bytes used */
public long ramBytesUsed() {
ensureOpen();
- return (core!=null) ? core.ramBytesUsed() : 0;
+ long ramBytesUsed = dvRamBytesUsed();
+ if (core != null) {
+ ramBytesUsed += core.ramBytesUsed();
+ }
+ return ramBytesUsed;
}
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java?rev=1523461&r1=1523460&r2=1523461&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/SegmentWriteState.java Sun Sep 15 17:11:52 2013
@@ -71,16 +71,32 @@ public class SegmentWriteState {
* to {@link Directory#createOutput(String,IOContext)}. */
public final IOContext context;
+ /** True is this instance represents a field update. */
+ public final boolean isFieldUpdate; // TODO (DVU_FIELDINFOS_GEN) once we gen FieldInfos, get rid of this
+
/** Sole constructor. */
public SegmentWriteState(InfoStream infoStream, Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos,
BufferedDeletes segDeletes, IOContext context) {
+ this(infoStream, directory, segmentInfo, fieldInfos, segDeletes, context, "", false);
+ }
+
+ /**
+ * Constructor which takes segment suffix and isFieldUpdate in addition to the
+ * other parameters.
+ *
+ * @see #SegmentWriteState(InfoStream, Directory, SegmentInfo, FieldInfos,
+ * BufferedDeletes, IOContext)
+ */
+ public SegmentWriteState(InfoStream infoStream, Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos,
+ BufferedDeletes segDeletes, IOContext context, String segmentSuffix, boolean isFieldUpdate) {
this.infoStream = infoStream;
this.segDeletes = segDeletes;
this.directory = directory;
this.segmentInfo = segmentInfo;
this.fieldInfos = fieldInfos;
- segmentSuffix = "";
+ this.segmentSuffix = segmentSuffix;
this.context = context;
+ this.isFieldUpdate = isFieldUpdate;
}
/** Create a shallow copy of {@link SegmentWriteState} with a new segment suffix. */
@@ -93,5 +109,6 @@ public class SegmentWriteState {
this.segmentSuffix = segmentSuffix;
segDeletes = state.segDeletes;
delCountOnFlush = state.delCountOnFlush;
+ isFieldUpdate = state.isFieldUpdate;
}
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java?rev=1523461&r1=1523460&r2=1523461&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java Sun Sep 15 17:11:52 2013
@@ -162,18 +162,26 @@ final class StandardDirectoryReader exte
readerShared[i] = false;
newReaders[i] = newReader;
} else {
- if (newReaders[i].getSegmentInfo().getDelGen() == infos.info(i).getDelGen()) {
+ if (newReaders[i].getSegmentInfo().getDelGen() == infos.info(i).getDelGen()
+ && newReaders[i].getSegmentInfo().getDocValuesGen() == infos.info(i).getDocValuesGen()) {
// No change; this reader will be shared between
// the old and the new one, so we must incRef
// it:
readerShared[i] = true;
newReaders[i].incRef();
} else {
+ // there are changes to the reader, either liveDocs or DV updates
readerShared[i] = false;
// Steal the ref returned by SegmentReader ctor:
assert infos.info(i).info.dir == newReaders[i].getSegmentInfo().info.dir;
- assert infos.info(i).hasDeletions();
- newReaders[i] = new SegmentReader(infos.info(i), newReaders[i].core);
+ assert infos.info(i).hasDeletions() || infos.info(i).hasFieldUpdates();
+ if (newReaders[i].getSegmentInfo().getDelGen() == infos.info(i).getDelGen()) {
+ // only DV updates
+ newReaders[i] = new SegmentReader(infos.info(i), newReaders[i], newReaders[i].getLiveDocs(), newReaders[i].numDocs());
+ } else {
+ // both DV and liveDocs have changed
+ newReaders[i] = new SegmentReader(infos.info(i), newReaders[i]);
+ }
}
}
success = true;
Added: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/util/RefCount.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/util/RefCount.java?rev=1523461&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/util/RefCount.java (added)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/util/RefCount.java Sun Sep 15 17:11:52 2013
@@ -0,0 +1,84 @@
+package org.apache.lucene.util;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Manages reference counting for a given object. Extensions can override
+ * {@link #release()} to do custom logic when reference counting hits 0.
+ */
+public class RefCount<T> {
+
+ private final AtomicInteger refCount = new AtomicInteger(1);
+
+ protected final T object;
+
+ public RefCount(T object) {
+ this.object = object;
+ }
+
+ /**
+ * Called when reference counting hits 0. By default this method does nothing,
+ * but extensions can override to e.g. release resources attached to object
+ * that is managed by this class.
+ */
+ protected void release() throws IOException {}
+
+ /**
+ * Decrements the reference counting of this object. When reference counting
+ * hits 0, calls {@link #release()}.
+ */
+ public final void decRef() throws IOException {
+ final int rc = refCount.decrementAndGet();
+ if (rc == 0) {
+ boolean success = false;
+ try {
+ release();
+ success = true;
+ } finally {
+ if (!success) {
+ // Put reference back on failure
+ refCount.incrementAndGet();
+ }
+ }
+ } else if (rc < 0) {
+ throw new IllegalStateException("too many decRef calls: refCount is " + rc + " after decrement");
+ }
+ }
+
+ public final T get() {
+ return object;
+ }
+
+ /** Returns the current reference count. */
+ public final int getRefCount() {
+ return refCount.get();
+ }
+
+ /**
+ * Increments the reference count. Calls to this method must be matched with
+ * calls to {@link #decRef()}.
+ */
+ public final void incRef() {
+ refCount.incrementAndGet();
+ }
+
+}
+
Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDoc.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDoc.java?rev=1523461&r1=1523460&r2=1523461&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDoc.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDoc.java Sun Sep 15 17:11:52 2013
@@ -239,7 +239,7 @@ public class TestDoc extends LuceneTestC
}
}
- return new SegmentInfoPerCommit(info, 0, -1L);
+ return new SegmentInfoPerCommit(info, 0, -1L, -1L);
}