You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/11 17:42:38 UTC
[04/31] lucene-solr git commit: current patch
current patch
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/1ae72914
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/1ae72914
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/1ae72914
Branch: refs/heads/master
Commit: 1ae7291429bad742715344f86cfa5200229b3698
Parents: b62c671
Author: Mike McCandless <mi...@apache.org>
Authored: Sun Jan 24 18:17:20 2016 -0500
Committer: Mike McCandless <mi...@apache.org>
Committed: Sun Jan 24 18:17:20 2016 -0500
----------------------------------------------------------------------
.../org/apache/lucene/codecs/CodecUtil.java | 30 +
.../apache/lucene/index/DirectoryReader.java | 9 +-
.../org/apache/lucene/index/IndexWriter.java | 43 +-
.../org/apache/lucene/index/ReaderManager.java | 8 +-
.../org/apache/lucene/index/SegmentInfos.java | 369 +++---
.../lucene/index/StandardDirectoryReader.java | 31 +-
.../apache/lucene/search/SearcherManager.java | 8 +-
.../lucene/store/ByteArrayIndexInput.java | 163 +++
.../lucene/store/NRTCachingDirectory.java | 6 +-
.../java/org/apache/lucene/util/IOUtils.java | 3 +
.../apache/lucene/index/TestIndexWriter.java | 2 +-
.../lucene/index/TestIndexWriterDelete.java | 2 +-
.../index/TestIndexWriterExceptions2.java | 2 +-
.../lucene/index/TestIndexWriterOnJRECrash.java | 42 +-
.../lucene/index/TestIndexWriterOnVMError.java | 2 +-
.../apache/lucene/index/TestRollingUpdates.java | 2 +-
.../apache/lucene/index/TestStressDeletes.java | 2 +-
.../TestControlledRealTimeReopenThread.java | 6 +-
.../apache/lucene/search/TestLRUQueryCache.java | 2 +-
.../lucene/search/TestSearcherManager.java | 10 +-
.../facet/taxonomy/SearcherTaxonomyManager.java | 2 +-
.../directory/DirectoryTaxonomyWriter.java | 2 +-
.../apache/lucene/replicator/nrt/CopyJob.java | 237 ++++
.../lucene/replicator/nrt/CopyOneFile.java | 132 ++
.../apache/lucene/replicator/nrt/CopyState.java | 56 +
.../lucene/replicator/nrt/FileMetaData.java | 40 +
.../org/apache/lucene/replicator/nrt/Node.java | 213 ++++
.../nrt/NodeCommunicationException.java | 26 +
.../nrt/PreCopyMergedSegmentWarmer.java | 80 ++
.../lucene/replicator/nrt/PrimaryNode.java | 316 +++++
.../replicator/nrt/ReplicaFileDeleter.java | 218 ++++
.../lucene/replicator/nrt/ReplicaNode.java | 772 ++++++++++++
.../nrt/SegmentInfosSearcherManager.java | 129 ++
.../lucene/replicator/nrt/Connection.java | 63 +
.../org/apache/lucene/replicator/nrt/Jobs.java | 152 +++
.../lucene/replicator/nrt/NodeProcess.java | 238 ++++
.../lucene/replicator/nrt/SimpleCopyJob.java | 294 +++++
.../replicator/nrt/SimplePrimaryNode.java | 674 ++++++++++
.../replicator/nrt/SimpleReplicaNode.java | 316 +++++
.../lucene/replicator/nrt/SimpleServer.java | 390 ++++++
.../lucene/replicator/nrt/SimpleTransLog.java | 250 ++++
.../replicator/nrt/TestNRTReplication.java | 1175 ++++++++++++++++++
.../lucene/replicator/nrt/ThreadPumper.java | 59 +
lucene/replicator/test.cmd | 1 +
.../lucene/util/BaseGeoPointTestCase.java | 2 +-
.../index/BaseIndexFileFormatTestCase.java | 2 +-
.../apache/lucene/index/RandomIndexWriter.java | 8 +-
.../lucene/store/MockDirectoryWrapper.java | 286 +++--
.../org/apache/lucene/util/LuceneTestCase.java | 13 +-
.../java/org/apache/lucene/util/TestUtil.java | 3 +-
50 files changed, 6568 insertions(+), 323 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/java/org/apache/lucene/codecs/CodecUtil.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/CodecUtil.java b/lucene/core/src/java/org/apache/lucene/codecs/CodecUtil.java
index 1bc2f40..4ddad22 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/CodecUtil.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/CodecUtil.java
@@ -258,6 +258,36 @@ public final class CodecUtil {
checkIndexHeaderSuffix(in, expectedSuffix);
return version;
}
+
+ /** Retrieves the full index header from the provided {@link IndexInput}.
+ * This throws {@link CorruptIndexException} if this file does
+ * not appear to be an index file. */
+ public static byte[] readIndexHeader(IndexInput in) throws IOException {
+ in.seek(0);
+ final int actualHeader = in.readInt();
+ if (actualHeader != CODEC_MAGIC) {
+ throw new CorruptIndexException("codec header mismatch: actual header=" + actualHeader + " vs expected header=" + CODEC_MAGIC, in);
+ }
+ String codec = in.readString();
+ in.readInt();
+ in.seek(in.getFilePointer() + StringHelper.ID_LENGTH);
+ int suffixLength = in.readByte() & 0xFF;
+ byte[] bytes = new byte[headerLength(codec) + StringHelper.ID_LENGTH + 1 + suffixLength];
+ in.seek(0);
+ in.readBytes(bytes, 0, bytes.length);
+ return bytes;
+ }
+
+ /** Retrieves the full footer from the provided {@link IndexInput}. This throws
+ * {@link CorruptIndexException} if this file does not have a valid footer. */
+ public static byte[] readFooter(IndexInput in) throws IOException {
+ in.seek(in.length() - footerLength());
+ validateFooter(in);
+ in.seek(in.length() - footerLength());
+ byte[] bytes = new byte[footerLength()];
+ in.readBytes(bytes, 0, bytes.length);
+ return bytes;
+ }
/** Expert: just reads and verifies the object ID of an index header */
public static byte[] checkIndexHeaderID(DataInput in, byte[] expectedID) throws IOException {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/java/org/apache/lucene/index/DirectoryReader.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DirectoryReader.java b/lucene/core/src/java/org/apache/lucene/index/DirectoryReader.java
index 3df0b70..a926726 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DirectoryReader.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DirectoryReader.java
@@ -76,7 +76,7 @@ public abstract class DirectoryReader extends BaseCompositeReader<LeafReader> {
* @lucene.experimental
*/
public static DirectoryReader open(final IndexWriter writer) throws IOException {
- return open(writer, true);
+ return open(writer, true, false);
}
/**
@@ -91,13 +91,16 @@ public abstract class DirectoryReader extends BaseCompositeReader<LeafReader> {
* future. Applying deletes can be costly, so if your app
* can tolerate deleted documents being returned you might
* gain some performance by passing false.
+ * @param writeAllDeletes If true, new deletes will be written
+ * down to index files instead of carried over from writer to
+ * reader in heap
*
* @see #open(IndexWriter)
*
* @lucene.experimental
*/
- public static DirectoryReader open(final IndexWriter writer, boolean applyAllDeletes) throws IOException {
- return writer.getReader(applyAllDeletes);
+ public static DirectoryReader open(final IndexWriter writer, boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
+ return writer.getReader(applyAllDeletes, writeAllDeletes);
}
/** Expert: returns an IndexReader reading the index in the given
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index b05e15a..96dfa0b 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -332,7 +332,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final CloseableThreadLocal<MergeRateLimiter> rateLimiters = new CloseableThreadLocal<>();
DirectoryReader getReader() throws IOException {
- return getReader(true);
+ return getReader(true, false);
}
/**
@@ -393,9 +393,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* @throws IOException If there is a low-level I/O error
*/
- DirectoryReader getReader(boolean applyAllDeletes) throws IOException {
+ DirectoryReader getReader(boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
ensureOpen();
+ if (writeAllDeletes && applyAllDeletes == false) {
+ throw new IllegalArgumentException("applyAllDeletes must be true when writeAllDeletes=true");
+ }
+
final long tStart = System.currentTimeMillis();
if (infoStream.isEnabled("IW")) {
@@ -431,7 +435,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// just like we do when loading segments_N
synchronized(this) {
anyChanges |= maybeApplyDeletes(applyAllDeletes);
- r = StandardDirectoryReader.open(this, segmentInfos, applyAllDeletes);
+ if (writeAllDeletes) {
+ // Must move the deletes to disk:
+ System.out.println("IW: now readerPool.commit");
+ readerPool.commit(segmentInfos);
+ }
+
+ r = StandardDirectoryReader.open(this, segmentInfos, applyAllDeletes, writeAllDeletes);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "return reader version=" + r.getVersion() + " reader=" + r);
}
@@ -1159,6 +1169,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return docWriter.getNumDocs() + segmentInfos.totalMaxDoc();
}
+ /** If {@link SegmentInfos#getVersion} is below {@code newVersion} then update it to this value. */
+ public synchronized void advanceSegmentInfosVersion(long newVersion) {
+ ensureOpen();
+ if (segmentInfos.getVersion() < newVersion) {
+ segmentInfos.setVersion(newVersion);
+ }
+ changed();
+ }
+
/** Returns total number of docs in this index, including
* docs not yet flushed (still in the RAM buffer), and
* including deletions. <b>NOTE:</b> buffered deletions
@@ -2870,7 +2889,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* contents after calling this method has no effect.
*/
public final synchronized void setCommitData(Map<String,String> commitUserData) {
- segmentInfos.setUserData(new HashMap<>(commitUserData));
+ setCommitData(commitUserData, true);
+ }
+
+ public final synchronized void setCommitData(Map<String,String> commitUserData, boolean doIncrementVersion) {
+ segmentInfos.setUserData(new HashMap<>(commitUserData), doIncrementVersion);
changeCount.incrementAndGet();
}
@@ -4576,10 +4599,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized boolean nrtIsCurrent(SegmentInfos infos) {
//System.out.println("IW.nrtIsCurrent " + (infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedDeletesStream.any()));
ensureOpen();
- boolean isCurrent = infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedUpdatesStream.any();
+ boolean isCurrent = infos.getVersion() == segmentInfos.getVersion() && !docWriter.anyChanges() && !bufferedUpdatesStream.any();
if (infoStream.isEnabled("IW")) {
if (isCurrent == false) {
- infoStream.message("IW", "nrtIsCurrent: infoVersion matches: " + (infos.version == segmentInfos.version) + "; DW changes: " + docWriter.anyChanges() + "; BD changes: "+ bufferedUpdatesStream.any());
+ infoStream.message("IW", "nrtIsCurrent: infoVersion matches: " + (infos.getVersion() == segmentInfos.getVersion()) + "; DW changes: " + docWriter.anyChanges() + "; BD changes: "+ bufferedUpdatesStream.any());
}
}
return isCurrent;
@@ -4708,15 +4731,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
- synchronized void incRefDeleter(SegmentInfos segmentInfos) throws IOException {
+ /** @lucene.internal */
+ public synchronized void incRefDeleter(SegmentInfos segmentInfos) throws IOException {
ensureOpen();
deleter.incRef(segmentInfos, false);
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "incRefDeleter for NRT reader version=" + segmentInfos.getVersion() + " segments=" + segString(segmentInfos));
}
}
-
- synchronized void decRefDeleter(SegmentInfos segmentInfos) throws IOException {
+
+ /** @lucene.internal */
+ public synchronized void decRefDeleter(SegmentInfos segmentInfos) throws IOException {
ensureOpen();
deleter.decRef(segmentInfos);
if (infoStream.isEnabled("IW")) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java b/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java
index 79d9a94..8044dc1 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java
@@ -45,7 +45,7 @@ public final class ReaderManager extends ReferenceManager<DirectoryReader> {
* @throws IOException If there is a low-level I/O error
*/
public ReaderManager(IndexWriter writer) throws IOException {
- this(writer, true);
+ this(writer, true, false);
}
/**
@@ -63,11 +63,13 @@ public final class ReaderManager extends ReferenceManager<DirectoryReader> {
* tolerate deleted documents being returned you might gain some
* performance by passing <code>false</code>. See
* {@link DirectoryReader#openIfChanged(DirectoryReader, IndexWriter, boolean)}.
+ * @param writeAllDeletes
+ * If <code>true</code>, new deletes will be forcefully written to index files.
*
* @throws IOException If there is a low-level I/O error
*/
- public ReaderManager(IndexWriter writer, boolean applyAllDeletes) throws IOException {
- current = DirectoryReader.open(writer, applyAllDeletes);
+ public ReaderManager(IndexWriter writer, boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
+ current = DirectoryReader.open(writer, applyAllDeletes, writeAllDeletes);
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
index de5dbff..2f8d914 100644
--- a/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
+++ b/lucene/core/src/java/org/apache/lucene/index/SegmentInfos.java
@@ -279,124 +279,131 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
public static final SegmentInfos readCommit(Directory directory, String segmentFileName) throws IOException {
long generation = generationFromSegmentsFileName(segmentFileName);
+ //System.out.println(Thread.currentThread() + ": SegmentInfos.readCommit " + segmentFileName);
try (ChecksumIndexInput input = directory.openChecksumInput(segmentFileName, IOContext.READ)) {
- // NOTE: as long as we want to throw indexformattooold (vs corruptindexexception), we need
- // to read the magic ourselves.
- int magic = input.readInt();
- if (magic != CodecUtil.CODEC_MAGIC) {
- throw new IndexFormatTooOldException(input, magic, CodecUtil.CODEC_MAGIC, CodecUtil.CODEC_MAGIC);
- }
- int format = CodecUtil.checkHeaderNoMagic(input, "segments", VERSION_50, VERSION_CURRENT);
- byte id[] = new byte[StringHelper.ID_LENGTH];
- input.readBytes(id, 0, id.length);
- CodecUtil.checkIndexHeaderSuffix(input, Long.toString(generation, Character.MAX_RADIX));
-
- SegmentInfos infos = new SegmentInfos();
- infos.id = id;
- infos.generation = generation;
- infos.lastGeneration = generation;
- if (format >= VERSION_53) {
- // TODO: in the future (7.0? sigh) we can use this to throw IndexFormatTooOldException ... or just rely on the
- // minSegmentLuceneVersion check instead:
- infos.luceneVersion = Version.fromBits(input.readVInt(), input.readVInt(), input.readVInt());
- } else {
- // else compute the min version down below in the for loop
- }
+ return readCommit(directory, input, generation);
+ }
+ }
- infos.version = input.readLong();
- infos.counter = input.readInt();
- int numSegments = input.readInt();
- if (numSegments < 0) {
- throw new CorruptIndexException("invalid segment count: " + numSegments, input);
- }
+ public static final SegmentInfos readCommit(Directory directory, ChecksumIndexInput input, long generation) throws IOException {
- if (format >= VERSION_53) {
- if (numSegments > 0) {
- infos.minSegmentLuceneVersion = Version.fromBits(input.readVInt(), input.readVInt(), input.readVInt());
- if (infos.minSegmentLuceneVersion.onOrAfter(Version.LUCENE_5_0_0) == false) {
- throw new IndexFormatTooOldException(input, "this index contains a too-old segment (version: " + infos.minSegmentLuceneVersion + ")");
- }
- } else {
- // else leave as null: no segments
+ // NOTE: as long as we want to throw indexformattooold (vs corruptindexexception), we need
+ // to read the magic ourselves.
+ int magic = input.readInt();
+ if (magic != CodecUtil.CODEC_MAGIC) {
+ throw new IndexFormatTooOldException(input, magic, CodecUtil.CODEC_MAGIC, CodecUtil.CODEC_MAGIC);
+ }
+ int format = CodecUtil.checkHeaderNoMagic(input, "segments", VERSION_50, VERSION_CURRENT);
+ byte id[] = new byte[StringHelper.ID_LENGTH];
+ input.readBytes(id, 0, id.length);
+ CodecUtil.checkIndexHeaderSuffix(input, Long.toString(generation, Character.MAX_RADIX));
+
+ SegmentInfos infos = new SegmentInfos();
+ infos.id = id;
+ infos.generation = generation;
+ infos.lastGeneration = generation;
+ if (format >= VERSION_53) {
+ // TODO: in the future (7.0? sigh) we can use this to throw IndexFormatTooOldException ... or just rely on the
+ // minSegmentLuceneVersion check instead:
+ infos.luceneVersion = Version.fromBits(input.readVInt(), input.readVInt(), input.readVInt());
+ } else {
+ // else compute the min version down below in the for loop
+ }
+
+ infos.version = input.readLong();
+ //System.out.println("READ sis version=" + infos.version);
+ infos.counter = input.readInt();
+ int numSegments = input.readInt();
+ if (numSegments < 0) {
+ throw new CorruptIndexException("invalid segment count: " + numSegments, input);
+ }
+
+ if (format >= VERSION_53) {
+ if (numSegments > 0) {
+ infos.minSegmentLuceneVersion = Version.fromBits(input.readVInt(), input.readVInt(), input.readVInt());
+ if (infos.minSegmentLuceneVersion.onOrAfter(Version.LUCENE_5_0_0) == false) {
+ throw new IndexFormatTooOldException(input, "this index contains a too-old segment (version: " + infos.minSegmentLuceneVersion + ")");
}
} else {
- // else we recompute it below as we visit segments; it can't be used for throwing IndexFormatTooOldExc, but consumers of
- // SegmentInfos can maybe still use it for other reasons
+ // else leave as null: no segments
}
+ } else {
+ // else we recompute it below as we visit segments; it can't be used for throwing IndexFormatTooOldExc, but consumers of
+ // SegmentInfos can maybe still use it for other reasons
+ }
- long totalDocs = 0;
- for (int seg = 0; seg < numSegments; seg++) {
- String segName = input.readString();
- final byte segmentID[];
- byte hasID = input.readByte();
- if (hasID == 1) {
- segmentID = new byte[StringHelper.ID_LENGTH];
- input.readBytes(segmentID, 0, segmentID.length);
- } else if (hasID == 0) {
- throw new IndexFormatTooOldException(input, "Segment is from Lucene 4.x");
- } else {
- throw new CorruptIndexException("invalid hasID byte, got: " + hasID, input);
- }
- Codec codec = readCodec(input, format < VERSION_53);
- SegmentInfo info = codec.segmentInfoFormat().read(directory, segName, segmentID, IOContext.READ);
- info.setCodec(codec);
- totalDocs += info.maxDoc();
- long delGen = input.readLong();
- int delCount = input.readInt();
- if (delCount < 0 || delCount > info.maxDoc()) {
- throw new CorruptIndexException("invalid deletion count: " + delCount + " vs maxDoc=" + info.maxDoc(), input);
- }
- long fieldInfosGen = input.readLong();
- long dvGen = input.readLong();
- SegmentCommitInfo siPerCommit = new SegmentCommitInfo(info, delCount, delGen, fieldInfosGen, dvGen);
- if (format >= VERSION_51) {
- siPerCommit.setFieldInfosFiles(input.readSetOfStrings());
- } else {
- siPerCommit.setFieldInfosFiles(Collections.unmodifiableSet(input.readStringSet()));
- }
- final Map<Integer,Set<String>> dvUpdateFiles;
- final int numDVFields = input.readInt();
- if (numDVFields == 0) {
- dvUpdateFiles = Collections.emptyMap();
- } else {
- Map<Integer,Set<String>> map = new HashMap<>(numDVFields);
- for (int i = 0; i < numDVFields; i++) {
- if (format >= VERSION_51) {
- map.put(input.readInt(), input.readSetOfStrings());
- } else {
- map.put(input.readInt(), Collections.unmodifiableSet(input.readStringSet()));
- }
+ long totalDocs = 0;
+ for (int seg = 0; seg < numSegments; seg++) {
+ String segName = input.readString();
+ final byte segmentID[];
+ byte hasID = input.readByte();
+ if (hasID == 1) {
+ segmentID = new byte[StringHelper.ID_LENGTH];
+ input.readBytes(segmentID, 0, segmentID.length);
+ } else if (hasID == 0) {
+ throw new IndexFormatTooOldException(input, "Segment is from Lucene 4.x");
+ } else {
+ throw new CorruptIndexException("invalid hasID byte, got: " + hasID, input);
+ }
+ Codec codec = readCodec(input, format < VERSION_53);
+ SegmentInfo info = codec.segmentInfoFormat().read(directory, segName, segmentID, IOContext.READ);
+ info.setCodec(codec);
+ totalDocs += info.maxDoc();
+ long delGen = input.readLong();
+ int delCount = input.readInt();
+ if (delCount < 0 || delCount > info.maxDoc()) {
+ throw new CorruptIndexException("invalid deletion count: " + delCount + " vs maxDoc=" + info.maxDoc(), input);
+ }
+ long fieldInfosGen = input.readLong();
+ long dvGen = input.readLong();
+ SegmentCommitInfo siPerCommit = new SegmentCommitInfo(info, delCount, delGen, fieldInfosGen, dvGen);
+ if (format >= VERSION_51) {
+ siPerCommit.setFieldInfosFiles(input.readSetOfStrings());
+ } else {
+ siPerCommit.setFieldInfosFiles(Collections.unmodifiableSet(input.readStringSet()));
+ }
+ final Map<Integer,Set<String>> dvUpdateFiles;
+ final int numDVFields = input.readInt();
+ if (numDVFields == 0) {
+ dvUpdateFiles = Collections.emptyMap();
+ } else {
+ Map<Integer,Set<String>> map = new HashMap<>(numDVFields);
+ for (int i = 0; i < numDVFields; i++) {
+ if (format >= VERSION_51) {
+ map.put(input.readInt(), input.readSetOfStrings());
+ } else {
+ map.put(input.readInt(), Collections.unmodifiableSet(input.readStringSet()));
}
- dvUpdateFiles = Collections.unmodifiableMap(map);
}
- siPerCommit.setDocValuesUpdatesFiles(dvUpdateFiles);
- infos.add(siPerCommit);
+ dvUpdateFiles = Collections.unmodifiableMap(map);
+ }
+ siPerCommit.setDocValuesUpdatesFiles(dvUpdateFiles);
+ infos.add(siPerCommit);
- Version segmentVersion = info.getVersion();
- if (format < VERSION_53) {
- if (infos.minSegmentLuceneVersion == null || segmentVersion.onOrAfter(infos.minSegmentLuceneVersion) == false) {
- infos.minSegmentLuceneVersion = segmentVersion;
- }
- } else if (segmentVersion.onOrAfter(infos.minSegmentLuceneVersion) == false) {
- throw new CorruptIndexException("segments file recorded minSegmentLuceneVersion=" + infos.minSegmentLuceneVersion + " but segment=" + info + " has older version=" + segmentVersion, input);
+ Version segmentVersion = info.getVersion();
+ if (format < VERSION_53) {
+ if (infos.minSegmentLuceneVersion == null || segmentVersion.onOrAfter(infos.minSegmentLuceneVersion) == false) {
+ infos.minSegmentLuceneVersion = segmentVersion;
}
+ } else if (segmentVersion.onOrAfter(infos.minSegmentLuceneVersion) == false) {
+ throw new CorruptIndexException("segments file recorded minSegmentLuceneVersion=" + infos.minSegmentLuceneVersion + " but segment=" + info + " has older version=" + segmentVersion, input);
}
+ }
- if (format >= VERSION_51) {
- infos.userData = input.readMapOfStrings();
- } else {
- infos.userData = Collections.unmodifiableMap(input.readStringStringMap());
- }
+ if (format >= VERSION_51) {
+ infos.userData = input.readMapOfStrings();
+ } else {
+ infos.userData = Collections.unmodifiableMap(input.readStringStringMap());
+ }
- CodecUtil.checkFooter(input);
+ CodecUtil.checkFooter(input);
- // LUCENE-6299: check we are in bounds
- if (totalDocs > IndexWriter.getActualMaxDocs()) {
- throw new CorruptIndexException("Too many documents: an index cannot exceed " + IndexWriter.getActualMaxDocs() + " but readers have total maxDoc=" + totalDocs, input);
- }
-
- return infos;
+ // LUCENE-6299: check we are in bounds
+ if (totalDocs > IndexWriter.getActualMaxDocs()) {
+ throw new CorruptIndexException("Too many documents: an index cannot exceed " + IndexWriter.getActualMaxDocs() + " but readers have total maxDoc=" + totalDocs, input);
}
+
+ return infos;
}
private static final List<String> unsupportedCodecs = Arrays.asList(
@@ -454,68 +461,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
try {
segnOutput = directory.createOutput(segmentFileName, IOContext.DEFAULT);
- CodecUtil.writeIndexHeader(segnOutput, "segments", VERSION_CURRENT,
- StringHelper.randomId(), Long.toString(nextGeneration, Character.MAX_RADIX));
- segnOutput.writeVInt(Version.LATEST.major);
- segnOutput.writeVInt(Version.LATEST.minor);
- segnOutput.writeVInt(Version.LATEST.bugfix);
-
- segnOutput.writeLong(version);
- segnOutput.writeInt(counter); // write counter
- segnOutput.writeInt(size());
-
- if (size() > 0) {
-
- Version minSegmentVersion = null;
-
- // We do a separate loop up front so we can write the minSegmentVersion before
- // any SegmentInfo; this makes it cleaner to throw IndexFormatTooOldExc at read time:
- for (SegmentCommitInfo siPerCommit : this) {
- Version segmentVersion = siPerCommit.info.getVersion();
- if (minSegmentVersion == null || segmentVersion.onOrAfter(minSegmentVersion) == false) {
- minSegmentVersion = segmentVersion;
- }
- }
-
- segnOutput.writeVInt(minSegmentVersion.major);
- segnOutput.writeVInt(minSegmentVersion.minor);
- segnOutput.writeVInt(minSegmentVersion.bugfix);
- }
-
- // write infos
- for (SegmentCommitInfo siPerCommit : this) {
- SegmentInfo si = siPerCommit.info;
- segnOutput.writeString(si.name);
- byte segmentID[] = si.getId();
- // TODO: remove this in lucene 6, we don't need to include 4.x segments in commits anymore
- if (segmentID == null) {
- segnOutput.writeByte((byte)0);
- } else {
- if (segmentID.length != StringHelper.ID_LENGTH) {
- throw new IllegalStateException("cannot write segment: invalid id segment=" + si.name + "id=" + StringHelper.idToString(segmentID));
- }
- segnOutput.writeByte((byte)1);
- segnOutput.writeBytes(segmentID, segmentID.length);
- }
- segnOutput.writeString(si.getCodec().getName());
- segnOutput.writeLong(siPerCommit.getDelGen());
- int delCount = siPerCommit.getDelCount();
- if (delCount < 0 || delCount > si.maxDoc()) {
- throw new IllegalStateException("cannot write segment: invalid maxDoc segment=" + si.name + " maxDoc=" + si.maxDoc() + " delCount=" + delCount);
- }
- segnOutput.writeInt(delCount);
- segnOutput.writeLong(siPerCommit.getFieldInfosGen());
- segnOutput.writeLong(siPerCommit.getDocValuesGen());
- segnOutput.writeSetOfStrings(siPerCommit.getFieldInfosFiles());
- final Map<Integer,Set<String>> dvUpdatesFiles = siPerCommit.getDocValuesUpdatesFiles();
- segnOutput.writeInt(dvUpdatesFiles.size());
- for (Entry<Integer,Set<String>> e : dvUpdatesFiles.entrySet()) {
- segnOutput.writeInt(e.getKey());
- segnOutput.writeSetOfStrings(e.getValue());
- }
- }
- segnOutput.writeMapOfStrings(userData);
- CodecUtil.writeFooter(segnOutput);
+ write(directory, segnOutput);
segnOutput.close();
directory.sync(Collections.singleton(segmentFileName));
success = true;
@@ -533,6 +479,72 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
}
}
+ public void write(Directory directory, IndexOutput out) throws IOException {
+ CodecUtil.writeIndexHeader(out, "segments", VERSION_CURRENT,
+ StringHelper.randomId(), Long.toString(generation, Character.MAX_RADIX));
+ out.writeVInt(Version.LATEST.major);
+ out.writeVInt(Version.LATEST.minor);
+ out.writeVInt(Version.LATEST.bugfix);
+ System.out.println(Thread.currentThread().getName() + ": now write " + out.getName() + " with version=" + version);
+
+ out.writeLong(version);
+ out.writeInt(counter); // write counter
+ out.writeInt(size());
+
+ if (size() > 0) {
+
+ Version minSegmentVersion = null;
+
+ // We do a separate loop up front so we can write the minSegmentVersion before
+ // any SegmentInfo; this makes it cleaner to throw IndexFormatTooOldExc at read time:
+ for (SegmentCommitInfo siPerCommit : this) {
+ Version segmentVersion = siPerCommit.info.getVersion();
+ if (minSegmentVersion == null || segmentVersion.onOrAfter(minSegmentVersion) == false) {
+ minSegmentVersion = segmentVersion;
+ }
+ }
+
+ out.writeVInt(minSegmentVersion.major);
+ out.writeVInt(minSegmentVersion.minor);
+ out.writeVInt(minSegmentVersion.bugfix);
+ }
+
+ // write infos
+ for (SegmentCommitInfo siPerCommit : this) {
+ SegmentInfo si = siPerCommit.info;
+ out.writeString(si.name);
+ byte segmentID[] = si.getId();
+ // TODO: remove this in lucene 6, we don't need to include 4.x segments in commits anymore
+ if (segmentID == null) {
+ out.writeByte((byte)0);
+ } else {
+ if (segmentID.length != StringHelper.ID_LENGTH) {
+ throw new IllegalStateException("cannot write segment: invalid id segment=" + si.name + "id=" + StringHelper.idToString(segmentID));
+ }
+ out.writeByte((byte)1);
+ out.writeBytes(segmentID, segmentID.length);
+ }
+ out.writeString(si.getCodec().getName());
+ out.writeLong(siPerCommit.getDelGen());
+ int delCount = siPerCommit.getDelCount();
+ if (delCount < 0 || delCount > si.maxDoc()) {
+ throw new IllegalStateException("cannot write segment: invalid maxDoc segment=" + si.name + " maxDoc=" + si.maxDoc() + " delCount=" + delCount);
+ }
+ out.writeInt(delCount);
+ out.writeLong(siPerCommit.getFieldInfosGen());
+ out.writeLong(siPerCommit.getDocValuesGen());
+ out.writeSetOfStrings(siPerCommit.getFieldInfosFiles());
+ final Map<Integer,Set<String>> dvUpdatesFiles = siPerCommit.getDocValuesUpdatesFiles();
+ out.writeInt(dvUpdatesFiles.size());
+ for (Entry<Integer,Set<String>> e : dvUpdatesFiles.entrySet()) {
+ out.writeInt(e.getKey());
+ out.writeSetOfStrings(e.getValue());
+ }
+ }
+ out.writeMapOfStrings(userData);
+ CodecUtil.writeFooter(out);
+ }
+
/**
* Returns a copy of this instance, also copying each
* SegmentInfo.
@@ -698,8 +710,10 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
protected abstract T doBody(String segmentFileName) throws IOException;
}
- // Carry over generation numbers from another SegmentInfos
- void updateGeneration(SegmentInfos other) {
+ /** Carry over generation numbers from another SegmentInfos
+ *
+ * @lucene.internal */
+ public void updateGeneration(SegmentInfos other) {
lastGeneration = other.lastGeneration;
generation = other.generation;
}
@@ -711,7 +725,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
this.counter = other.counter;
}
- void setNextWriteGeneration(long generation) {
+ public void setNextWriteGeneration(long generation) {
assert generation >= this.generation;
this.generation = generation;
}
@@ -777,7 +791,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
boolean success = false;
final String dest;
try {
- final String src = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS, "", generation);
+ final String src = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS, "", generation);
dest = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", generation);
dir.renameFile(src, dest);
success = true;
@@ -800,7 +814,7 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
* method if changes have been made to this {@link SegmentInfos} instance
* </p>
**/
- final void commit(Directory dir) throws IOException {
+ public final void commit(Directory dir) throws IOException {
prepareCommit(dir);
finishCommit(dir);
}
@@ -829,14 +843,15 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
return userData;
}
- void setUserData(Map<String,String> data) {
+ public void setUserData(Map<String,String> data, boolean doIncrementVersion) {
if (data == null) {
userData = Collections.<String,String>emptyMap();
} else {
userData = data;
}
-
- changed();
+ if (doIncrementVersion) {
+ changed();
+ }
}
/** Replaces all segments in this instance, but keeps
@@ -864,8 +879,18 @@ public final class SegmentInfos implements Cloneable, Iterable<SegmentCommitInfo
* segments. */
public void changed() {
version++;
+ System.out.println(Thread.currentThread().getName() + ": SIS.change to version=" + version);
+ //new Throwable().printStackTrace(System.out);
}
+ void setVersion(long newVersion) {
+ if (newVersion < version) {
+ throw new IllegalArgumentException("newVersion (=" + newVersion + ") cannot be less than current version (=" + version + ")");
+ }
+ System.out.println(Thread.currentThread().getName() + ": SIS.setVersion change from " + version + " to " + newVersion);
+ version = newVersion;
+ }
+
/** applies all changes caused by committing a merge to this SegmentInfos */
void applyMergeChanges(MergePolicy.OneMerge merge, boolean dropSegment) {
final Set<SegmentCommitInfo> mergedAway = new HashSet<>(merge.segments);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java b/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
index 8d51532..28dd55f 100644
--- a/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
+++ b/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
@@ -30,19 +30,21 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.IOUtils;
-final class StandardDirectoryReader extends DirectoryReader {
+public final class StandardDirectoryReader extends DirectoryReader {
final IndexWriter writer;
final SegmentInfos segmentInfos;
private final boolean applyAllDeletes;
+ private final boolean writeAllDeletes;
/** called only from static open() methods */
StandardDirectoryReader(Directory directory, LeafReader[] readers, IndexWriter writer,
- SegmentInfos sis, boolean applyAllDeletes) throws IOException {
+ SegmentInfos sis, boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
super(directory, readers);
this.writer = writer;
this.segmentInfos = sis;
this.applyAllDeletes = applyAllDeletes;
+ this.writeAllDeletes = writeAllDeletes;
}
/** called from DirectoryReader.open(...) methods */
@@ -60,7 +62,7 @@ final class StandardDirectoryReader extends DirectoryReader {
// This may throw CorruptIndexException if there are too many docs, so
// it must be inside try clause so we close readers in that case:
- DirectoryReader reader = new StandardDirectoryReader(directory, readers, null, sis, false);
+ DirectoryReader reader = new StandardDirectoryReader(directory, readers, null, sis, false, false);
success = true;
return reader;
@@ -74,7 +76,7 @@ final class StandardDirectoryReader extends DirectoryReader {
}
/** Used by near real-time search */
- static DirectoryReader open(IndexWriter writer, SegmentInfos infos, boolean applyAllDeletes) throws IOException {
+ static DirectoryReader open(IndexWriter writer, SegmentInfos infos, boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
// IndexWriter synchronizes externally before calling
// us, which ensures infos will not change; so there's
// no need to process segments in reverse order
@@ -113,8 +115,8 @@ final class StandardDirectoryReader extends DirectoryReader {
writer.incRefDeleter(segmentInfos);
StandardDirectoryReader result = new StandardDirectoryReader(dir,
- readers.toArray(new SegmentReader[readers.size()]), writer,
- segmentInfos, applyAllDeletes);
+ readers.toArray(new SegmentReader[readers.size()]), writer,
+ segmentInfos, applyAllDeletes, writeAllDeletes);
success = true;
return result;
} finally {
@@ -131,8 +133,10 @@ final class StandardDirectoryReader extends DirectoryReader {
}
}
- /** This constructor is only used for {@link #doOpenIfChanged(SegmentInfos)} */
- private static DirectoryReader open(Directory directory, SegmentInfos infos, List<? extends LeafReader> oldReaders) throws IOException {
+ /** This constructor is only used for {@link #doOpenIfChanged(SegmentInfos)}, as well as NRT replication.
+ *
+ * @lucene.internal */
+ public static DirectoryReader open(Directory directory, SegmentInfos infos, List<? extends LeafReader> oldReaders) throws IOException {
// we put the old SegmentReaders in a map, that allows us
// to lookup a reader using its segment name
@@ -210,7 +214,7 @@ final class StandardDirectoryReader extends DirectoryReader {
}
}
}
- return new StandardDirectoryReader(directory, newReaders, null, infos, false);
+ return new StandardDirectoryReader(directory, newReaders, null, infos, false, false);
}
// TODO: move somewhere shared if it's useful elsewhere
@@ -270,7 +274,7 @@ final class StandardDirectoryReader extends DirectoryReader {
if (writer == this.writer && applyAllDeletes == this.applyAllDeletes) {
return doOpenFromWriter(null);
} else {
- return writer.getReader(applyAllDeletes);
+ return writer.getReader(applyAllDeletes, writeAllDeletes);
}
}
@@ -283,7 +287,7 @@ final class StandardDirectoryReader extends DirectoryReader {
return null;
}
- DirectoryReader reader = writer.getReader(applyAllDeletes);
+ DirectoryReader reader = writer.getReader(applyAllDeletes, writeAllDeletes);
// If in fact no changes took place, return null:
if (reader.getVersion() == segmentInfos.getVersion()) {
@@ -332,6 +336,11 @@ final class StandardDirectoryReader extends DirectoryReader {
return segmentInfos.getVersion();
}
+ /** @lucene.internal */
+ public SegmentInfos getSegmentInfos() {
+ return segmentInfos;
+ }
+
@Override
public boolean isCurrent() throws IOException {
ensureOpen();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/java/org/apache/lucene/search/SearcherManager.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/search/SearcherManager.java b/lucene/core/src/java/org/apache/lucene/search/SearcherManager.java
index 3d3b064..a57f260 100644
--- a/lucene/core/src/java/org/apache/lucene/search/SearcherManager.java
+++ b/lucene/core/src/java/org/apache/lucene/search/SearcherManager.java
@@ -73,7 +73,7 @@ public final class SearcherManager extends ReferenceManager<IndexSearcher> {
* @throws IOException if there is a low-level I/O error
*/
public SearcherManager(IndexWriter writer, SearcherFactory searcherFactory) throws IOException {
- this(writer, true, searcherFactory);
+ this(writer, true, false, searcherFactory);
}
/**
@@ -91,6 +91,8 @@ public final class SearcherManager extends ReferenceManager<IndexSearcher> {
* tolerate deleted documents being returned you might gain some
* performance by passing <code>false</code>. See
* {@link DirectoryReader#openIfChanged(DirectoryReader, IndexWriter, boolean)}.
+ * @param writeAllDeletes
+ * If <code>true</code>, new deletes will be forcefully written to index files.
* @param searcherFactory
* An optional {@link SearcherFactory}. Pass <code>null</code> if you
* don't require the searcher to be warmed before going live or other
@@ -98,12 +100,12 @@ public final class SearcherManager extends ReferenceManager<IndexSearcher> {
*
* @throws IOException if there is a low-level I/O error
*/
- public SearcherManager(IndexWriter writer, boolean applyAllDeletes, SearcherFactory searcherFactory) throws IOException {
+ public SearcherManager(IndexWriter writer, boolean applyAllDeletes, boolean writeAllDeletes, SearcherFactory searcherFactory) throws IOException {
if (searcherFactory == null) {
searcherFactory = new SearcherFactory();
}
this.searcherFactory = searcherFactory;
- current = getSearcher(searcherFactory, DirectoryReader.open(writer, applyAllDeletes), null);
+ current = getSearcher(searcherFactory, DirectoryReader.open(writer, applyAllDeletes, writeAllDeletes), null);
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java
new file mode 100644
index 0000000..6363ed1
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java
@@ -0,0 +1,163 @@
+package org.apache.lucene.store;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.lucene.util.BytesRef;
+
+/**
+ * DataInput backed by a byte array.
+ * <b>WARNING:</b> This class omits all low-level checks.
+ * @lucene.experimental
+ */
+public final class ByteArrayIndexInput extends IndexInput {
+
+ private byte[] bytes;
+
+ private int pos;
+ private int limit;
+
+ public ByteArrayIndexInput(String description, byte[] bytes) {
+ super(description);
+ this.bytes = bytes;
+ this.limit = bytes.length;
+ }
+
+ public long getFilePointer() {
+ return pos;
+ }
+
+ public void seek(long pos) {
+ this.pos = (int) pos;
+ }
+
+ public void reset(byte[] bytes, int offset, int len) {
+ this.bytes = bytes;
+ pos = offset;
+ limit = offset + len;
+ }
+
+ @Override
+ public long length() {
+ return limit;
+ }
+
+ public boolean eof() {
+ return pos == limit;
+ }
+
+ @Override
+ public void skipBytes(long count) {
+ pos += count;
+ }
+
+ @Override
+ public short readShort() {
+ return (short) (((bytes[pos++] & 0xFF) << 8) | (bytes[pos++] & 0xFF));
+ }
+
+ @Override
+ public int readInt() {
+ return ((bytes[pos++] & 0xFF) << 24) | ((bytes[pos++] & 0xFF) << 16)
+ | ((bytes[pos++] & 0xFF) << 8) | (bytes[pos++] & 0xFF);
+ }
+
+ @Override
+ public long readLong() {
+ final int i1 = ((bytes[pos++] & 0xff) << 24) | ((bytes[pos++] & 0xff) << 16) |
+ ((bytes[pos++] & 0xff) << 8) | (bytes[pos++] & 0xff);
+ final int i2 = ((bytes[pos++] & 0xff) << 24) | ((bytes[pos++] & 0xff) << 16) |
+ ((bytes[pos++] & 0xff) << 8) | (bytes[pos++] & 0xff);
+ return (((long)i1) << 32) | (i2 & 0xFFFFFFFFL);
+ }
+
+ @Override
+ public int readVInt() {
+ byte b = bytes[pos++];
+ if (b >= 0) return b;
+ int i = b & 0x7F;
+ b = bytes[pos++];
+ i |= (b & 0x7F) << 7;
+ if (b >= 0) return i;
+ b = bytes[pos++];
+ i |= (b & 0x7F) << 14;
+ if (b >= 0) return i;
+ b = bytes[pos++];
+ i |= (b & 0x7F) << 21;
+ if (b >= 0) return i;
+ b = bytes[pos++];
+ // Warning: the next ands use 0x0F / 0xF0 - beware copy/paste errors:
+ i |= (b & 0x0F) << 28;
+ if ((b & 0xF0) == 0) return i;
+ throw new RuntimeException("Invalid vInt detected (too many bits)");
+ }
+
+ @Override
+ public long readVLong() {
+ byte b = bytes[pos++];
+ if (b >= 0) return b;
+ long i = b & 0x7FL;
+ b = bytes[pos++];
+ i |= (b & 0x7FL) << 7;
+ if (b >= 0) return i;
+ b = bytes[pos++];
+ i |= (b & 0x7FL) << 14;
+ if (b >= 0) return i;
+ b = bytes[pos++];
+ i |= (b & 0x7FL) << 21;
+ if (b >= 0) return i;
+ b = bytes[pos++];
+ i |= (b & 0x7FL) << 28;
+ if (b >= 0) return i;
+ b = bytes[pos++];
+ i |= (b & 0x7FL) << 35;
+ if (b >= 0) return i;
+ b = bytes[pos++];
+ i |= (b & 0x7FL) << 42;
+ if (b >= 0) return i;
+ b = bytes[pos++];
+ i |= (b & 0x7FL) << 49;
+ if (b >= 0) return i;
+ b = bytes[pos++];
+ i |= (b & 0x7FL) << 56;
+ if (b >= 0) return i;
+ throw new RuntimeException("Invalid vLong detected (negative values disallowed)");
+ }
+
+ // NOTE: AIOOBE not EOF if you read too much
+ @Override
+ public byte readByte() {
+ return bytes[pos++];
+ }
+
+ // NOTE: AIOOBE not EOF if you read too much
+ @Override
+ public void readBytes(byte[] b, int offset, int len) {
+ System.arraycopy(bytes, pos, b, offset, len);
+ pos += len;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java b/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
index fd5e3d7..538e7c0 100644
--- a/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
+++ b/lucene/core/src/java/org/apache/lucene/store/NRTCachingDirectory.java
@@ -174,8 +174,12 @@ public class NRTCachingDirectory extends FilterDirectory implements Accountable
@Override
public void renameFile(String source, String dest) throws IOException {
- // NOTE: uncache is unnecessary for lucene's usage, as we always sync() before renaming.
unCache(source);
+ try {
+ cache.deleteFile(dest);
+ } catch (FileNotFoundException fnfe) {
+ // OK -- it may not exist
+ }
in.renameFile(source, dest);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/java/org/apache/lucene/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/IOUtils.java b/lucene/core/src/java/org/apache/lucene/util/IOUtils.java
index 510545f..1c5aabd 100644
--- a/lucene/core/src/java/org/apache/lucene/util/IOUtils.java
+++ b/lucene/core/src/java/org/apache/lucene/util/IOUtils.java
@@ -107,6 +107,9 @@ public final class IOUtils {
* objects to call <tt>close()</tt> on
*/
public static void closeWhileHandlingException(Closeable... objects) {
+ if (objects.length == 0) {
+ throw new IllegalArgumentException("pass at least one Closeable");
+ }
closeWhileHandlingException(Arrays.asList(objects));
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index baa2484..ab75fb8 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -1036,7 +1036,7 @@ public class TestIndexWriter extends LuceneTestCase {
if (random.nextInt(3) == 0) {
IndexReader r = null;
try {
- r = DirectoryReader.open(w, random.nextBoolean());
+ r = DirectoryReader.open(w, random.nextBoolean(), false);
if (random.nextBoolean() && r.maxDoc() > 0) {
int docid = random.nextInt(r.maxDoc());
w.tryDeleteDocument(r, docid);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
index 9213c25..b970519 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
@@ -1249,7 +1249,7 @@ public class TestIndexWriterDelete extends LuceneTestCase {
iwc = new IndexWriterConfig(new MockAnalyzer(random()));
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
w = new IndexWriter(d, iwc);
- IndexReader r = DirectoryReader.open(w, false);
+ IndexReader r = DirectoryReader.open(w, false, false);
assertTrue(w.tryDeleteDocument(r, 1));
assertTrue(w.tryDeleteDocument(r.leaves().get(0).reader(), 0));
r.close();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions2.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions2.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions2.java
index 58e3ac7..2013ab3 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions2.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions2.java
@@ -200,7 +200,7 @@ public class TestIndexWriterExceptions2 extends LuceneTestCase {
if (random().nextBoolean()) {
DirectoryReader ir = null;
try {
- ir = DirectoryReader.open(iw, random().nextBoolean());
+ ir = DirectoryReader.open(iw, random().nextBoolean(), false);
TestUtil.checkReader(ir);
} finally {
IOUtils.closeWhileHandlingException(ir);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnJRECrash.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnJRECrash.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnJRECrash.java
index 777ef46..3fecf71 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnJRECrash.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnJRECrash.java
@@ -196,29 +196,29 @@ public class TestIndexWriterOnJRECrash extends TestNRTThreads {
vendor.startsWith("Sun") ||
vendor.startsWith("Apple");
- try {
- if (supportsUnsafeNpeDereference) {
- try {
- Class<?> clazz = Class.forName("sun.misc.Unsafe");
- Field field = clazz.getDeclaredField("theUnsafe");
- field.setAccessible(true);
- Object o = field.get(null);
- Method m = clazz.getMethod("putAddress", long.class, long.class);
- m.invoke(o, 0L, 0L);
- } catch (Throwable e) {
- System.out.println("Couldn't kill the JVM via Unsafe.");
- e.printStackTrace(System.out);
- }
+ try {
+ if (supportsUnsafeNpeDereference) {
+ try {
+ Class<?> clazz = Class.forName("sun.misc.Unsafe");
+ Field field = clazz.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ Object o = field.get(null);
+ Method m = clazz.getMethod("putAddress", long.class, long.class);
+ m.invoke(o, 0L, 0L);
+ } catch (Throwable e) {
+ System.out.println("Couldn't kill the JVM via Unsafe.");
+ e.printStackTrace(System.out);
}
-
- // Fallback attempt to Runtime.halt();
- Runtime.getRuntime().halt(-1);
- } catch (Exception e) {
- System.out.println("Couldn't kill the JVM.");
- e.printStackTrace(System.out);
}
- // We couldn't get the JVM to crash for some reason.
- fail();
+ // Fallback attempt to Runtime.halt();
+ Runtime.getRuntime().halt(-1);
+ } catch (Exception e) {
+ System.out.println("Couldn't kill the JVM.");
+ e.printStackTrace(System.out);
+ }
+
+ // We couldn't get the JVM to crash for some reason.
+ fail();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnVMError.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnVMError.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnVMError.java
index 90371fe..1c8a43e 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnVMError.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterOnVMError.java
@@ -169,7 +169,7 @@ public class TestIndexWriterOnVMError extends LuceneTestCase {
if (random().nextBoolean()) {
DirectoryReader ir = null;
try {
- ir = DirectoryReader.open(iw, random().nextBoolean());
+ ir = DirectoryReader.open(iw, random().nextBoolean(), false);
TestUtil.checkReader(ir);
} finally {
IOUtils.closeWhileHandlingException(ir);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java b/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java
index f367024..51c0eb5 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java
@@ -124,7 +124,7 @@ public class TestRollingUpdates extends LuceneTestCase {
System.out.println("TEST: reopen applyDeletions=" + applyDeletions);
}
- r = w.getReader(applyDeletions);
+ r = w.getReader(applyDeletions, false);
if (applyDeletions) {
s = newSearcher(r);
} else {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/test/org/apache/lucene/index/TestStressDeletes.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestStressDeletes.java b/lucene/core/src/test/org/apache/lucene/index/TestStressDeletes.java
index 1df019c..cc4b80c 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestStressDeletes.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestStressDeletes.java
@@ -83,7 +83,7 @@ public class TestStressDeletes extends LuceneTestCase {
}
}
if (random().nextInt(500) == 2) {
- DirectoryReader.open(w, random().nextBoolean()).close();
+ DirectoryReader.open(w, random().nextBoolean(), false).close();
}
if (random().nextInt(500) == 2) {
w.commit();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
index 2e1f385..d90eaba 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
@@ -231,7 +231,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
}
};
- nrtNoDeletes = new SearcherManager(writer, false, sf);
+ nrtNoDeletes = new SearcherManager(writer, false, false, sf);
nrtDeletes = new SearcherManager(writer, sf);
nrtDeletesThread = new ControlledRealTimeReopenThread<>(genWriter, nrtDeletes, maxReopenSec, minReopenSec);
@@ -313,7 +313,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal);
final TrackingIndexWriter writer = new TrackingIndexWriter(_writer);
- final SearcherManager manager = new SearcherManager(_writer, false, null);
+ final SearcherManager manager = new SearcherManager(_writer, false, false, null);
Document doc = new Document();
doc.add(newTextField("test", "test", Field.Store.YES));
writer.addDocument(doc);
@@ -423,7 +423,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
};
try {
- new SearcherManager(w.w, false, theEvilOne);
+ new SearcherManager(w.w, false, false, theEvilOne);
fail("didn't hit expected exception");
} catch (IllegalStateException ise) {
// expected
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/test/org/apache/lucene/search/TestLRUQueryCache.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestLRUQueryCache.java b/lucene/core/src/test/org/apache/lucene/search/TestLRUQueryCache.java
index db632de..8eb7b44 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestLRUQueryCache.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestLRUQueryCache.java
@@ -94,7 +94,7 @@ public class TestLRUQueryCache extends LuceneTestCase {
}
};
final boolean applyDeletes = random().nextBoolean();
- final SearcherManager mgr = new SearcherManager(w.w, applyDeletes, searcherFactory);
+ final SearcherManager mgr = new SearcherManager(w.w, applyDeletes, false, searcherFactory);
final AtomicBoolean indexing = new AtomicBoolean(true);
final AtomicReference<Throwable> error = new AtomicReference<>();
final int numDocs = atLeast(10000);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java b/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java
index 76b56a5..3218569 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestSearcherManager.java
@@ -231,7 +231,7 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
};
final SearcherManager searcherManager = random().nextBoolean()
? new SearcherManager(dir, factory)
- : new SearcherManager(writer, random().nextBoolean(), factory);
+ : new SearcherManager(writer, random().nextBoolean(), false, factory);
if (VERBOSE) {
System.out.println("sm created");
}
@@ -311,7 +311,7 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
new MockAnalyzer(random())).setMergeScheduler(new ConcurrentMergeScheduler()));
- SearcherManager sm = new SearcherManager(writer, false, new SearcherFactory());
+ SearcherManager sm = new SearcherManager(writer, false, false, new SearcherFactory());
writer.addDocument(new Document());
writer.commit();
sm.maybeRefreshBlocking();
@@ -368,7 +368,7 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
Directory dir = newDirectory();
IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(null));
final AtomicBoolean afterRefreshCalled = new AtomicBoolean(false);
- SearcherManager sm = new SearcherManager(iw, false, new SearcherFactory());
+ SearcherManager sm = new SearcherManager(iw, false, false, new SearcherFactory());
sm.addListener(new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() {
@@ -411,7 +411,7 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
// expected
}
try {
- new SearcherManager(w.w, random.nextBoolean(), theEvilOne);
+ new SearcherManager(w.w, random.nextBoolean(), false, theEvilOne);
} catch (IllegalStateException ise) {
// expected
}
@@ -522,7 +522,7 @@ public class TestSearcherManager extends ThreadedIndexingAndSearchingTestCase {
}
MySearcherFactory factory = new MySearcherFactory();
- final SearcherManager sm = new SearcherManager(w, random().nextBoolean(), factory);
+ final SearcherManager sm = new SearcherManager(w, random().nextBoolean(), false, factory);
assertEquals(1, factory.called);
assertNull(factory.lastPreviousReader);
assertNotNull(factory.lastReader);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/SearcherTaxonomyManager.java
----------------------------------------------------------------------
diff --git a/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/SearcherTaxonomyManager.java b/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/SearcherTaxonomyManager.java
index cdd0a15..5a3a551 100644
--- a/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/SearcherTaxonomyManager.java
+++ b/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/SearcherTaxonomyManager.java
@@ -76,7 +76,7 @@ public class SearcherTaxonomyManager extends ReferenceManager<SearcherTaxonomyMa
this.searcherFactory = searcherFactory;
this.taxoWriter = taxoWriter;
DirectoryTaxonomyReader taxoReader = new DirectoryTaxonomyReader(taxoWriter);
- current = new SearcherAndTaxonomy(SearcherManager.getSearcher(searcherFactory, DirectoryReader.open(writer, applyAllDeletes), null), taxoReader);
+ current = new SearcherAndTaxonomy(SearcherManager.getSearcher(searcherFactory, DirectoryReader.open(writer, applyAllDeletes, false), null), taxoReader);
this.taxoEpoch = taxoWriter.getTaxonomyEpoch();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java
----------------------------------------------------------------------
diff --git a/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java b/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java
index e241007..6396b16 100644
--- a/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java
+++ b/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java
@@ -271,7 +271,7 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
// verify that the taxo-writer hasn't been closed on us.
ensureOpen();
if (!initializedReaderManager) {
- readerManager = new ReaderManager(indexWriter, false);
+ readerManager = new ReaderManager(indexWriter, false, false);
shouldRefreshReaderManager = false;
initializedReaderManager = true;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/CopyJob.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/CopyJob.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/CopyJob.java
new file mode 100644
index 0000000..5baf1d3
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/CopyJob.java
@@ -0,0 +1,237 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.util.IOUtils;
+
+/** Handles copying one set of files, e.g. all files for a new NRT point, or files for pre-copying a merged segment.
+ * This notifies the caller via OnceDone when the job finishes or failed. */
+
+public abstract class CopyJob implements Comparable<CopyJob> {
+ private final static AtomicLong counter = new AtomicLong();
+ protected final ReplicaNode dest;
+
+ protected final Map<String,FileMetaData> files;
+
+ public final long ord = counter.incrementAndGet();
+
+ /** True for an NRT sync, false for pre-copying a newly merged segment */
+ public final boolean highPriority;
+
+ public final OnceDone onceDone;
+
+ public final long startNS = System.nanoTime();
+
+ public final String reason;
+
+ protected final List<Map.Entry<String,FileMetaData>> toCopy;
+
+ protected long totBytes;
+
+ protected long totBytesCopied;
+
+ // The file we are currently copying:
+ protected CopyOneFile current;
+
+ // Set when we are cancelled
+ protected volatile Throwable exc;
+ protected volatile String cancelReason;
+
+ // toString may concurrently access this:
+ protected final Map<String,String> copiedFiles = new ConcurrentHashMap<>();
+
+ protected CopyJob(String reason, Map<String,FileMetaData> files, ReplicaNode dest, boolean highPriority, OnceDone onceDone) throws IOException {
+ this.reason = reason;
+ this.files = files;
+ this.dest = dest;
+ this.highPriority = highPriority;
+ this.onceDone = onceDone;
+
+ // Exceptions in here are bad:
+ try {
+ this.toCopy = dest.getFilesToCopy(this.files);
+ } catch (Throwable t) {
+ cancel("exc during init", t);
+ throw new CorruptIndexException("exception while checking local files", "n/a", t);
+ }
+ }
+
+ /** Callback invoked by CopyJob once all files have (finally) finished copying */
+ public interface OnceDone {
+ public void run(CopyJob job) throws IOException;
+ }
+
+ /** Transfers whatever tmp files were already copied in this previous job and cancels the previous job */
+ public synchronized void transferAndCancel(CopyJob prevJob) throws IOException {
+ synchronized(prevJob) {
+ dest.message("CopyJob: now transfer prevJob " + prevJob);
+ try {
+ _transferAndCancel(prevJob);
+ } catch (Throwable t) {
+ dest.message("xfer: exc during transferAndCancel");
+ cancel("exc during transferAndCancel", t);
+ IOUtils.reThrow(t);
+ }
+ }
+ }
+
+ private synchronized void _transferAndCancel(CopyJob prevJob) throws IOException {
+
+ // Caller must already be sync'd on prevJob:
+ assert Thread.holdsLock(prevJob);
+
+ if (prevJob.exc != null) {
+ // Already cancelled
+ dest.message("xfer: prevJob was already cancelled; skip transfer");
+ return;
+ }
+
+ // Cancel the previous job
+ prevJob.exc = new Throwable();
+
+ // Carry over already copied files that we also want to copy
+ Iterator<Map.Entry<String,FileMetaData>> it = toCopy.iterator();
+ long bytesAlreadyCopied = 0;
+
+ // Iterate over all files we think we need to copy:
+ while (it.hasNext()) {
+ Map.Entry<String,FileMetaData> ent = it.next();
+ String fileName = ent.getKey();
+ String prevTmpFileName = prevJob.copiedFiles.get(fileName);
+ if (prevTmpFileName != null) {
+ // This fileName is common to both jobs, and the old job already finished copying it (to a temp file), so we keep it:
+ long fileLength = ent.getValue().length;
+ bytesAlreadyCopied += fileLength;
+ dest.message("xfer: carry over already-copied file " + fileName + " (" + prevTmpFileName + ", " + fileLength + " bytes)");
+ copiedFiles.put(fileName, prevTmpFileName);
+
+ // So we don't try to delete it, below:
+ prevJob.copiedFiles.remove(fileName);
+
+ // So it's not in our copy list anymore:
+ it.remove();
+ } else if (prevJob.current != null && prevJob.current.name.equals(fileName)) {
+ // This fileName is common to both jobs, and it's the file that the previous job was in the process of copying. In this case
+ // we continue copying it from the prevoius job. This is important for cases where we are copying over a large file
+ // because otherwise we could keep failing the NRT copy and restarting this file from the beginning and never catch up:
+ dest.message("xfer: carry over in-progress file " + fileName + " (" + prevJob.current.tmpName + ") bytesCopied=" + prevJob.current.getBytesCopied() + " of " + prevJob.current.bytesToCopy);
+ bytesAlreadyCopied += prevJob.current.getBytesCopied();
+
+ assert current == null;
+
+ // must set current first, before writing/read to c.in/out in case that hits an exception, so that we then close the temp
+ // IndexOutput when cancelling ourselves:
+ current = newCopyOneFile(prevJob.current);
+
+ // Tell our new (primary) connection we'd like to copy this file first, but resuming from how many bytes we already copied last time:
+ // We do this even if bytesToCopy == bytesCopied, because we still need to readLong() the checksum from the primary connection:
+ assert prevJob.current.getBytesCopied() <= prevJob.current.bytesToCopy;
+
+ prevJob.current = null;
+
+ totBytes += current.metaData.length;
+
+ // So it's not in our copy list anymore:
+ it.remove();
+ } else {
+ dest.message("xfer: file " + fileName + " will be fully copied");
+ }
+ }
+ dest.message("xfer: " + bytesAlreadyCopied + " bytes already copied of " + totBytes);
+
+ // Delete all temp files the old job wrote but we don't need:
+ dest.message("xfer: now delete old temp files: " + prevJob.copiedFiles.values());
+ IOUtils.deleteFilesIgnoringExceptions(dest.dir, prevJob.copiedFiles.values());
+
+ if (prevJob.current != null) {
+ IOUtils.closeWhileHandlingException(prevJob.current);
+ if (Node.VERBOSE_FILES) {
+ dest.message("remove partial file " + prevJob.current.tmpName);
+ }
+ dest.deleter.deleteNewFile(prevJob.current.tmpName);
+ prevJob.current = null;
+ }
+ }
+
+ protected abstract CopyOneFile newCopyOneFile(CopyOneFile current);
+
+ /** Begin copying files */
+ public abstract void start() throws IOException;
+
+ /** Use current thread (blocking) to do all copying and then return once done, or throw exception on failure */
+ public abstract void runBlocking() throws Exception;
+
+ public void cancel(String reason, Throwable exc) {
+ if (this.exc != null) {
+ // Already cancelled
+ return;
+ }
+
+ dest.message(String.format(Locale.ROOT, "top: cancel after copying %s; exc=%s:\n files=%s\n copiedFiles=%s",
+ Node.bytesToString(totBytesCopied),
+ exc,
+ files == null ? "null" : files.keySet(), copiedFiles.keySet()));
+
+ if (exc == null) {
+ exc = new Throwable();
+ }
+
+ this.exc = exc;
+ this.cancelReason = reason;
+
+ // Delete all temp files we wrote:
+ IOUtils.deleteFilesIgnoringExceptions(dest.dir, copiedFiles.values());
+
+ if (current != null) {
+ IOUtils.closeWhileHandlingException(current);
+ if (Node.VERBOSE_FILES) {
+ dest.message("remove partial file " + current.tmpName);
+ }
+ dest.deleter.deleteNewFile(current.tmpName);
+ current = null;
+ }
+ }
+
+ /** Return true if this job is trying to copy any of the same files as the other job */
+ public abstract boolean conflicts(CopyJob other);
+
+ /** Renames all copied (tmp) files to their true file names */
+ public abstract void finish() throws IOException;
+
+ public abstract boolean getFailed();
+
+ /** Returns only those file names (a subset of {@link #getFileNames}) that need to be copied */
+ public abstract Set<String> getFileNamesToCopy();
+
+ /** Returns all file names referenced in this copy job */
+ public abstract Set<String> getFileNames();
+
+ public abstract CopyState getCopyState();
+
+ public abstract long getTotalBytesCopied();
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/CopyOneFile.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/CopyOneFile.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/CopyOneFile.java
new file mode 100644
index 0000000..e3f0f7d
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/CopyOneFile.java
@@ -0,0 +1,132 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Locale;
+
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.IOUtils;
+
+/** Copies one file from an incoming DataInput to a dest filename in a local Directory */
+
+class CopyOneFile implements Closeable {
+ private final DataInput in;
+ private final IndexOutput out;
+ private final ReplicaNode dest;
+ public final String name;
+ public final String tmpName;
+ public final FileMetaData metaData;
+ public final long bytesToCopy;
+ private final long copyStartNS;
+ private final byte[] buffer;
+
+ private long bytesCopied;
+
+ public CopyOneFile(DataInput in, ReplicaNode dest, String name, FileMetaData metaData, byte[] buffer) throws IOException {
+ this.in = in;
+ this.name = name;
+ this.dest = dest;
+ this.buffer = buffer;
+ // TODO: pass correct IOCtx, e.g. seg total size
+ out = dest.createTempOutput(name, "copy", IOContext.DEFAULT);
+ tmpName = out.getName();
+
+ // last 8 bytes are checksum:
+ bytesToCopy = metaData.length - 8;
+
+ if (Node.VERBOSE_FILES) {
+ dest.message("file " + name + ": start copying to tmp file " + tmpName + " length=" + (8+bytesToCopy));
+ }
+
+ copyStartNS = System.nanoTime();
+ this.metaData = metaData;
+ dest.startCopyFile(name);
+ }
+
+ /** Transfers this file copy to another input, continuing where the first one left off */
+ public CopyOneFile(CopyOneFile other, DataInput in) {
+ this.in = in;
+ this.dest = other.dest;
+ this.name = other.name;
+ this.out = other.out;
+ this.tmpName = other.tmpName;
+ this.metaData = other.metaData;
+ this.bytesCopied = other.bytesCopied;
+ this.bytesToCopy = other.bytesToCopy;
+ this.copyStartNS = other.copyStartNS;
+ this.buffer = other.buffer;
+ }
+
+ public void close() throws IOException {
+ out.close();
+ dest.finishCopyFile(name);
+ }
+
+ /** Copy another chunk of bytes, returning true once the copy is done */
+ public boolean visit() throws IOException {
+ // Copy up to 640 KB per visit:
+ for(int i=0;i<10;i++) {
+ long bytesLeft = bytesToCopy - bytesCopied;
+ if (bytesLeft == 0) {
+ long checksum = out.getChecksum();
+ if (checksum != metaData.checksum) {
+ // Bits flipped during copy!
+ dest.message("file " + tmpName + ": checksum mismatch after copy (bits flipped during network copy?) after-copy checksum=" + checksum + " vs expected=" + metaData.checksum + "; cancel job");
+ throw new IOException("file " + name + ": checksum mismatch after file copy");
+ }
+
+ // Paranoia: make sure the primary node is not smoking crack, by somehow sending us an already corrupted file whose checksum (in its
+ // footer) disagrees with reality:
+ long actualChecksumIn = in.readLong();
+ if (actualChecksumIn != checksum) {
+ dest.message("file " + tmpName + ": checksum claimed by primary disagrees with the file's footer: claimed checksum=" + checksum + " vs actual=" + actualChecksumIn);
+ throw new IOException("file " + name + ": checksum mismatch after file copy");
+ }
+ out.writeLong(checksum);
+ close();
+
+ if (Node.VERBOSE_FILES) {
+ dest.message(String.format(Locale.ROOT, "file %s: done copying [%s, %.3fms]",
+ name,
+ Node.bytesToString(metaData.length),
+ (System.nanoTime() - copyStartNS)/1000000.0));
+ }
+
+ return true;
+ }
+
+ int toCopy = (int) Math.min(bytesLeft, buffer.length);
+ in.readBytes(buffer, 0, toCopy);
+ out.writeBytes(buffer, 0, toCopy);
+
+ // TODO: rsync will fsync a range of the file; maybe we should do that here for large files in case we crash/killed
+ bytesCopied += toCopy;
+ }
+
+ return false;
+ }
+
+ public long getBytesCopied() {
+ return bytesCopied;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/CopyState.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/CopyState.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/CopyState.java
new file mode 100644
index 0000000..c19fabc
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/CopyState.java
@@ -0,0 +1,56 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.Directory;
+
+/** Holds incRef'd file level details for one point-in-time segment infos on the primary node. */
+public class CopyState {
+
+ public final Map<String,FileMetaData> files;
+ public final long version;
+ public final long gen;
+ public final byte[] infosBytes;
+ public final Set<String> completedMergeFiles;
+ public final long primaryGen;
+
+ // only non-null on the primary node
+ public final SegmentInfos infos;
+
+ public CopyState(Map<String,FileMetaData> files, long version, long gen, byte[] infosBytes,
+ Set<String> completedMergeFiles, long primaryGen, SegmentInfos infos) {
+ assert completedMergeFiles != null;
+ this.files = Collections.unmodifiableMap(files);
+ this.version = version;
+ this.gen = gen;
+ this.infosBytes = infosBytes;
+ this.completedMergeFiles = Collections.unmodifiableSet(completedMergeFiles);
+ this.primaryGen = primaryGen;
+ this.infos = infos;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(version=" + version + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/FileMetaData.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/FileMetaData.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/FileMetaData.java
new file mode 100644
index 0000000..aca408c
--- /dev/null
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/FileMetaData.java
@@ -0,0 +1,40 @@
+package org.apache.lucene.replicator.nrt;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Holds metadata details about a single file that we use to confirm two files (one remote, one local) are in fact "identical". */
+
+class FileMetaData {
+
+ // Header and footer of the file must be identical between primary and replica to consider the files equal:
+ public final byte[] header;
+ public final byte[] footer;
+
+ public final long length;
+
+ // Used to ensure no bit flips when copying the file:
+ public final long checksum;
+
+ public FileMetaData(byte[] header, byte[] footer, long length, long checksum) {
+ this.header = header;
+ this.footer = footer;
+ this.length = length;
+ this.checksum = checksum;
+ }
+}
+