You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2018/03/15 12:55:24 UTC
lucene-solr:branch_7x: LUCENE-8205: Simplify AbortingException
handling and tragic event logic
Repository: lucene-solr
Updated Branches:
refs/heads/branch_7x 4739d078a -> cf8c9cabd
LUCENE-8205: Simplify AbortingException handling and tragic event logic
Today we try to signal via exception handling if an exception is aborting
and/or a tragic event. This causes today that we ignore certain exception if we
are in the process of handling a such which is generally bad practice. This
change simplify the signaling of aborting exceptions and separates acting on
tragic events and closing the writer because of a tragic event. This in-turn
simplifies lock ordering since we never acquire a lock anymore inside the
tragic event code.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/cf8c9cab
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/cf8c9cab
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/cf8c9cab
Branch: refs/heads/branch_7x
Commit: cf8c9cabd97be24874887d01781dc987c43de88d
Parents: 4739d07
Author: Simon Willnauer <si...@apache.org>
Authored: Wed Mar 14 16:51:22 2018 +0100
Committer: Simon Willnauer <si...@apache.org>
Committed: Thu Mar 15 13:55:12 2018 +0100
----------------------------------------------------------------------
.../apache/lucene/index/AbortingException.java | 36 ---
.../lucene/index/DefaultIndexingChain.java | 42 ++-
.../org/apache/lucene/index/DocConsumer.java | 4 +-
.../apache/lucene/index/DocumentsWriter.java | 48 ++-
.../lucene/index/DocumentsWriterPerThread.java | 273 +++++++++--------
.../apache/lucene/index/IndexFileDeleter.java | 4 +-
.../org/apache/lucene/index/IndexWriter.java | 297 +++++++++----------
.../lucene/index/TermVectorsConsumer.java | 7 +-
8 files changed, 336 insertions(+), 375 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cf8c9cab/lucene/core/src/java/org/apache/lucene/index/AbortingException.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/AbortingException.java b/lucene/core/src/java/org/apache/lucene/index/AbortingException.java
deleted file mode 100644
index c862e82..0000000
--- a/lucene/core/src/java/org/apache/lucene/index/AbortingException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.lucene.index;
-
-
-/** Thrown and caught internally in {@link IndexWriter} methods when an {@code IOException} would cause it to
- * lose previously indexed documents. When this happens, the {@link IndexWriter} is forcefully
- * closed, using {@link IndexWriter#rollback}). */
-class AbortingException extends Exception {
- private AbortingException(Throwable cause) {
- super(cause);
- assert cause instanceof AbortingException == false;
- }
-
- public static AbortingException wrap(Throwable t) {
- if (t instanceof AbortingException) {
- return (AbortingException) t;
- } else {
- return new AbortingException(t);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cf8c9cab/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java b/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java
index fd24105..010d1a6 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DefaultIndexingChain.java
@@ -115,7 +115,7 @@ final class DefaultIndexingChain extends DocConsumer {
}
@Override
- public Sorter.DocMap flush(SegmentWriteState state) throws IOException, AbortingException {
+ public Sorter.DocMap flush(SegmentWriteState state) throws IOException {
// NOTE: caller (DocumentsWriterPerThread) handles
// aborting on any exception from this method
@@ -313,15 +313,13 @@ final class DefaultIndexingChain extends DocConsumer {
@Override
public void abort() {
storedFieldsConsumer.abort();
-
try {
// E.g. close any open files in the term vectors writer:
termsHash.abort();
- } catch (Throwable t) {
+ } finally {
+ Arrays.fill(fieldHash, null);
}
-
- Arrays.fill(fieldHash, null);
- }
+ }
private void rehash() {
int newHashSize = (fieldHash.length*2);
@@ -348,26 +346,28 @@ final class DefaultIndexingChain extends DocConsumer {
/** Calls StoredFieldsWriter.startDocument, aborting the
* segment if it hits any exception. */
- private void startStoredFields(int docID) throws IOException, AbortingException {
+ private void startStoredFields(int docID) throws IOException {
try {
storedFieldsConsumer.startDocument(docID);
} catch (Throwable th) {
- throw AbortingException.wrap(th);
+ docWriter.onAbortingException(th);
+ throw th;
}
}
/** Calls StoredFieldsWriter.finishDocument, aborting the
* segment if it hits any exception. */
- private void finishStoredFields() throws IOException, AbortingException {
+ private void finishStoredFields() throws IOException {
try {
storedFieldsConsumer.finishDocument();
} catch (Throwable th) {
- throw AbortingException.wrap(th);
+ docWriter.onAbortingException(th);
+ throw th;
}
}
@Override
- public void processDocument() throws IOException, AbortingException {
+ public void processDocument() throws IOException {
// How many indexed field names we've seen (collapses
// multiple field instances by the same name):
@@ -385,17 +385,12 @@ final class DefaultIndexingChain extends DocConsumer {
termsHash.startDocument();
startStoredFields(docState.docID);
-
- boolean aborting = false;
try {
for (IndexableField field : docState.doc) {
fieldCount = processField(field, fieldGen, fieldCount);
}
- } catch (AbortingException ae) {
- aborting = true;
- throw ae;
} finally {
- if (aborting == false) {
+ if (docWriter.hasHitAbortingException() == false) {
// Finish each indexed field name seen in the document:
for (int i=0;i<fieldCount;i++) {
fields[i].finish();
@@ -409,11 +404,12 @@ final class DefaultIndexingChain extends DocConsumer {
} catch (Throwable th) {
// Must abort, on the possibility that on-disk term
// vectors are now corrupt:
- throw AbortingException.wrap(th);
+ docWriter.onAbortingException(th);
+ throw th;
}
}
- private int processField(IndexableField field, long fieldGen, int fieldCount) throws IOException, AbortingException {
+ private int processField(IndexableField field, long fieldGen, int fieldCount) throws IOException {
String fieldName = field.name();
IndexableFieldType fieldType = field.fieldType();
@@ -450,7 +446,8 @@ final class DefaultIndexingChain extends DocConsumer {
try {
storedFieldsConsumer.writeField(fp.fieldInfo, field);
} catch (Throwable th) {
- throw AbortingException.wrap(th);
+ docWriter.onAbortingException(th);
+ throw th;
}
}
}
@@ -703,7 +700,7 @@ final class DefaultIndexingChain extends DocConsumer {
/** Inverts one field for one document; first is true
* if this is the first time we are seeing this field
* name in this document. */
- public void invert(IndexableField field, boolean first) throws IOException, AbortingException {
+ public void invert(IndexableField field, boolean first) throws IOException {
if (first) {
// First time we're seeing this field (indexed) in
// this document:
@@ -795,7 +792,8 @@ final class DefaultIndexingChain extends DocConsumer {
// Document will be deleted above:
throw new IllegalArgumentException(msg, e);
} catch (Throwable th) {
- throw AbortingException.wrap(th);
+ docWriter.onAbortingException(th);
+ throw th;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cf8c9cab/lucene/core/src/java/org/apache/lucene/index/DocConsumer.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocConsumer.java b/lucene/core/src/java/org/apache/lucene/index/DocConsumer.java
index 36766f3..8ff5704 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocConsumer.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocConsumer.java
@@ -20,7 +20,7 @@ package org.apache.lucene.index;
import java.io.IOException;
abstract class DocConsumer {
- abstract void processDocument() throws IOException, AbortingException;
- abstract Sorter.DocMap flush(final SegmentWriteState state) throws IOException, AbortingException;
+ abstract void processDocument() throws IOException;
+ abstract Sorter.DocMap flush(final SegmentWriteState state) throws IOException;
abstract void abort();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cf8c9cab/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index d49c1da..05a886a 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -246,7 +246,7 @@ final class DocumentsWriter implements Closeable, Accountable {
}
}
- final boolean flushOneDWPT() throws IOException, AbortingException {
+ final boolean flushOneDWPT() throws IOException {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "startFlushOneDWPT");
}
@@ -389,7 +389,7 @@ final class DocumentsWriter implements Closeable, Accountable {
flushControl.setClosed();
}
- private boolean preUpdate() throws IOException, AbortingException {
+ private boolean preUpdate() throws IOException {
ensureOpen();
boolean hasEvents = false;
@@ -409,7 +409,7 @@ final class DocumentsWriter implements Closeable, Accountable {
return hasEvents;
}
- private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean hasEvents) throws IOException, AbortingException {
+ private boolean postUpdate(DocumentsWriterPerThread flushingDWPT, boolean hasEvents) throws IOException {
hasEvents |= applyAllDeletes(deleteQueue);
if (flushingDWPT != null) {
hasEvents |= doFlush(flushingDWPT);
@@ -433,7 +433,7 @@ final class DocumentsWriter implements Closeable, Accountable {
}
long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer,
- final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException, AbortingException {
+ final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock();
@@ -450,11 +450,10 @@ final class DocumentsWriter implements Closeable, Accountable {
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
seqNo = dwpt.updateDocuments(docs, analyzer, delNode);
- } catch (AbortingException ae) {
- flushControl.doOnAbort(perThread);
- dwpt.abort();
- throw ae;
} finally {
+ if (dwpt.isAborted()) {
+ flushControl.doOnAbort(perThread);
+ }
// We don't know how many documents were actually
// counted as indexed, so we must subtract here to
// accumulate our separate counter:
@@ -477,7 +476,7 @@ final class DocumentsWriter implements Closeable, Accountable {
}
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
- final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException, AbortingException {
+ final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
boolean hasEvents = preUpdate();
@@ -495,11 +494,10 @@ final class DocumentsWriter implements Closeable, Accountable {
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
seqNo = dwpt.updateDocument(doc, analyzer, delNode);
- } catch (AbortingException ae) {
- flushControl.doOnAbort(perThread);
- dwpt.abort();
- throw ae;
} finally {
+ if (dwpt.isAborted()) {
+ flushControl.doOnAbort(perThread);
+ }
// We don't know whether the document actually
// counted as being indexed, so we must subtract here to
// accumulate our separate counter:
@@ -522,7 +520,7 @@ final class DocumentsWriter implements Closeable, Accountable {
return seqNo;
}
- private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException {
+ private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
boolean hasEvents = false;
while (flushingDWPT != null) {
hasEvents = true;
@@ -645,7 +643,7 @@ final class DocumentsWriter implements Closeable, Accountable {
* is called after this method, to release the flush lock in DWFlushControl
*/
long flushAllThreads()
- throws IOException, AbortingException {
+ throws IOException {
final DocumentsWriterDeleteQueue flushingDeleteQueue;
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "startFullFlush");
@@ -713,10 +711,6 @@ final class DocumentsWriter implements Closeable, Accountable {
}
}
- public LiveIndexWriterConfig getIndexWriterConfig() {
- return config;
- }
-
void putEvent(Event event) {
events.add(event);
}
@@ -735,11 +729,15 @@ final class DocumentsWriter implements Closeable, Accountable {
}
@Override
- public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+ public void process(IndexWriter writer) throws IOException {
try {
packet.apply(writer);
} catch (Throwable t) {
- writer.tragicEvent(t, "applyUpdatesPacket");
+ try {
+ writer.onTragicEvent(t, "applyUpdatesPacket");
+ } finally {
+ throw t;
+ }
}
writer.flushDeletesCount.incrementAndGet();
}
@@ -753,7 +751,7 @@ final class DocumentsWriter implements Closeable, Accountable {
}
@Override
- public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+ public void process(IndexWriter writer) throws IOException {
writer.applyDeletesAndPurge(true); // we always purge!
}
}
@@ -766,7 +764,7 @@ final class DocumentsWriter implements Closeable, Accountable {
}
@Override
- public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+ public void process(IndexWriter writer) throws IOException {
writer.purge(true);
}
}
@@ -779,7 +777,7 @@ final class DocumentsWriter implements Closeable, Accountable {
}
@Override
- public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+ public void process(IndexWriter writer) throws IOException {
writer.flushFailed(info);
}
}
@@ -792,7 +790,7 @@ final class DocumentsWriter implements Closeable, Accountable {
}
@Override
- public void process(IndexWriter writer, boolean triggerMerge, boolean forcePurge) throws IOException {
+ public void process(IndexWriter writer) throws IOException {
writer.deleteNewFiles(files);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cf8c9cab/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index 3ee10d0..d5ebc60 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -56,6 +56,21 @@ class DocumentsWriterPerThread {
abstract static class IndexingChain {
abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) throws IOException;
}
+
+ private Throwable abortingException;
+
+ final void onAbortingException(Throwable throwable) {
+ assert abortingException == null: "aborting excpetion has already been set";
+ abortingException = throwable;
+ }
+
+ final boolean hasHitAbortingException() {
+ return abortingException != null;
+ }
+
+ final boolean isAborted() {
+ return aborted;
+ }
static final IndexingChain defaultIndexingChain = new IndexingChain() {
@@ -79,10 +94,6 @@ class DocumentsWriterPerThread {
this.infoStream = infoStream;
}
- public void testPoint(String name) {
- docWriter.testPoint(name);
- }
-
public void clear() {
// don't hold onto doc nor analyzer, in case it is
// largish:
@@ -125,10 +136,9 @@ class DocumentsWriterPerThread {
}
try {
consumer.abort();
- } catch (Throwable t) {
+ } finally {
+ pendingUpdates.clear();
}
-
- pendingUpdates.clear();
} finally {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "done abort");
@@ -147,7 +157,7 @@ class DocumentsWriterPerThread {
// Updates for our still-in-RAM (to be flushed next) segment
final BufferedUpdates pendingUpdates;
final SegmentInfo segmentInfo; // Current segment we are working on
- boolean aborted = false; // True if we aborted
+ private boolean aborted = false; // True if we aborted
private final FieldInfos.Builder fieldInfos;
private final InfoStream infoStream;
@@ -218,113 +228,123 @@ class DocumentsWriterPerThread {
}
}
- public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode) throws IOException, AbortingException {
- testPoint("DocumentsWriterPerThread addDocument start");
- assert deleteQueue != null;
- reserveOneDoc();
- docState.doc = doc;
- docState.analyzer = analyzer;
- docState.docID = numDocsInRAM;
- if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
- infoStream.message("DWPT", Thread.currentThread().getName() + " update delNode=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
- }
- // Even on exception, the document is still added (but marked
- // deleted), so we don't need to un-reserve at that point.
- // Aborting exceptions will actually "lose" more than one
- // document, so the counter will be "wrong" in that case, but
- // it's very hard to fix (we can't easily distinguish aborting
- // vs non-aborting exceptions):
- boolean success = false;
+ public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode) throws IOException {
try {
+ assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
+ testPoint("DocumentsWriterPerThread addDocument start");
+ assert deleteQueue != null;
+ reserveOneDoc();
+ docState.doc = doc;
+ docState.analyzer = analyzer;
+ docState.docID = numDocsInRAM;
+ if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
+ infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
+ }
+ // Even on exception, the document is still added (but marked
+ // deleted), so we don't need to un-reserve at that point.
+ // Aborting exceptions will actually "lose" more than one
+ // document, so the counter will be "wrong" in that case, but
+ // it's very hard to fix (we can't easily distinguish aborting
+ // vs non-aborting exceptions):
+ boolean success = false;
try {
- consumer.processDocument();
+ try {
+ consumer.processDocument();
+ } finally {
+ docState.clear();
+ }
+ success = true;
} finally {
- docState.clear();
+ if (!success) {
+ // mark document as deleted
+ deleteDocID(docState.docID);
+ numDocsInRAM++;
+ }
}
- success = true;
+
+ return finishDocument(deleteNode);
} finally {
- if (!success) {
- // mark document as deleted
- deleteDocID(docState.docID);
- numDocsInRAM++;
- }
+ maybeAbort("updateDocument");
}
-
- return finishDocument(deleteNode);
}
- public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode) throws IOException, AbortingException {
- testPoint("DocumentsWriterPerThread addDocuments start");
- assert deleteQueue != null;
- docState.analyzer = analyzer;
- if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
- infoStream.message("DWPT", Thread.currentThread().getName() + " update delNode=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
- }
- int docCount = 0;
- boolean allDocsIndexed = false;
+ public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode) throws IOException {
try {
-
- for(Iterable<? extends IndexableField> doc : docs) {
- // Even on exception, the document is still added (but marked
- // deleted), so we don't need to un-reserve at that point.
- // Aborting exceptions will actually "lose" more than one
- // document, so the counter will be "wrong" in that case, but
- // it's very hard to fix (we can't easily distinguish aborting
- // vs non-aborting exceptions):
- reserveOneDoc();
- docState.doc = doc;
- docState.docID = numDocsInRAM;
- docCount++;
-
- boolean success = false;
- try {
- consumer.processDocument();
- success = true;
- } finally {
- if (!success) {
- // Incr here because finishDocument will not
- // be called (because an exc is being thrown):
- numDocsInRAM++;
+ testPoint("DocumentsWriterPerThread addDocuments start");
+ assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
+ assert deleteQueue != null;
+ docState.analyzer = analyzer;
+ if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
+ infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
+ }
+ int docCount = 0;
+ boolean allDocsIndexed = false;
+ try {
+
+ for (Iterable<? extends IndexableField> doc : docs) {
+ // Even on exception, the document is still added (but marked
+ // deleted), so we don't need to un-reserve at that point.
+ // Aborting exceptions will actually "lose" more than one
+ // document, so the counter will be "wrong" in that case, but
+ // it's very hard to fix (we can't easily distinguish aborting
+ // vs non-aborting exceptions):
+ reserveOneDoc();
+ docState.doc = doc;
+ docState.docID = numDocsInRAM;
+ docCount++;
+
+ boolean success = false;
+ try {
+ consumer.processDocument();
+ success = true;
+ } finally {
+ if (!success) {
+ // Incr here because finishDocument will not
+ // be called (because an exc is being thrown):
+ numDocsInRAM++;
+ }
}
- }
- numDocsInRAM++;
- }
- allDocsIndexed = true;
-
- // Apply delTerm only after all indexing has
- // succeeded, but apply it only to docs prior to when
- // this batch started:
- long seqNo;
- if (deleteNode != null) {
- seqNo = deleteQueue.add(deleteNode, deleteSlice);
- assert deleteSlice.isTail(deleteNode) : "expected the delete node as the tail";
- deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
- return seqNo;
- } else {
- seqNo = deleteQueue.updateSlice(deleteSlice);
- if (seqNo < 0) {
- seqNo = -seqNo;
- deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
+ numDocsInRAM++;
+ }
+ allDocsIndexed = true;
+
+ // Apply delTerm only after all indexing has
+ // succeeded, but apply it only to docs prior to when
+ // this batch started:
+ long seqNo;
+ if (deleteNode != null) {
+ seqNo = deleteQueue.add(deleteNode, deleteSlice);
+ assert deleteSlice.isTail(deleteNode) : "expected the delete term as the tail item";
+ deleteSlice.apply(pendingUpdates, numDocsInRAM - docCount);
+ return seqNo;
} else {
- deleteSlice.reset();
+ seqNo = deleteQueue.updateSlice(deleteSlice);
+ if (seqNo < 0) {
+ seqNo = -seqNo;
+ deleteSlice.apply(pendingUpdates, numDocsInRAM - docCount);
+ } else {
+ deleteSlice.reset();
+ }
}
- }
- return seqNo;
+ return seqNo;
- } finally {
- if (!allDocsIndexed && !aborted) {
- // the iterator threw an exception that is not aborting
- // go and mark all docs from this block as deleted
- int docID = numDocsInRAM-1;
- final int endDocID = docID - docCount;
- while (docID > endDocID) {
- deleteDocID(docID);
- docID--;
+ } finally {
+ if (!allDocsIndexed && !aborted) {
+ // the iterator threw an exception that is not aborting
+ // go and mark all docs from this block as deleted
+ int docID = numDocsInRAM - 1;
+ final int endDocID = docID - docCount;
+ while (docID > endDocID) {
+ deleteDocID(docID);
+ docID--;
+ }
}
+ docState.clear();
}
- docState.clear();
+ } finally {
+ maybeAbort("updateDocuments");
}
}
@@ -378,14 +398,6 @@ class DocumentsWriterPerThread {
}
/**
- * Returns the number of delete terms in this {@link DocumentsWriterPerThread}
- */
- public int numDeleteTerms() {
- // public for FlushPolicy
- return pendingUpdates.numTermDeletes.get();
- }
-
- /**
* Returns the number of RAM resident documents in this {@link DocumentsWriterPerThread}
*/
public int getNumDocsInRAM() {
@@ -414,7 +426,7 @@ class DocumentsWriterPerThread {
}
/** Flush all pending docs to a new segment */
- FlushedSegment flush() throws IOException, AbortingException {
+ FlushedSegment flush() throws IOException {
assert numDocsInRAM > 0;
assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
segmentInfo.setMaxDoc(numDocsInRAM);
@@ -458,11 +470,11 @@ class DocumentsWriterPerThread {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "new segment has " + (flushState.liveDocs == null ? 0 : flushState.delCountOnFlush) + " deleted docs");
infoStream.message("DWPT", "new segment has " +
- (flushState.fieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
- (flushState.fieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
- (flushState.fieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " +
- (flushState.fieldInfos.hasProx() ? "prox" : "no prox") + "; " +
- (flushState.fieldInfos.hasFreq() ? "freqs" : "no freqs"));
+ (flushState.fieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
+ (flushState.fieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
+ (flushState.fieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " +
+ (flushState.fieldInfos.hasProx() ? "prox" : "no prox") + "; " +
+ (flushState.fieldInfos.hasFreq() ? "freqs" : "no freqs"));
infoStream.message("DWPT", "flushedFiles=" + segmentInfoPerCommit.files());
infoStream.message("DWPT", "flushed codec=" + codec);
}
@@ -476,27 +488,40 @@ class DocumentsWriterPerThread {
}
if (infoStream.isEnabled("DWPT")) {
- final double newSegmentSize = segmentInfoPerCommit.sizeInBytes()/1024./1024.;
- infoStream.message("DWPT", "flushed: segment=" + segmentInfo.name +
- " ramUsed=" + nf.format(startMBUsed) + " MB" +
- " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
- " docs/MB=" + nf.format(flushState.segmentInfo.maxDoc() / newSegmentSize));
+ final double newSegmentSize = segmentInfoPerCommit.sizeInBytes() / 1024. / 1024.;
+ infoStream.message("DWPT", "flushed: segment=" + segmentInfo.name +
+ " ramUsed=" + nf.format(startMBUsed) + " MB" +
+ " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
+ " docs/MB=" + nf.format(flushState.segmentInfo.maxDoc() / newSegmentSize));
}
assert segmentInfo != null;
FlushedSegment fs = new FlushedSegment(infoStream, segmentInfoPerCommit, flushState.fieldInfos,
- segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush,
- sortMap);
+ segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush,
+ sortMap);
sealFlushedSegment(fs, sortMap);
if (infoStream.isEnabled("DWPT")) {
- infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0)/1000000.0) + " msec");
+ infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0) / 1000000.0) + " msec");
}
-
return fs;
- } catch (Throwable th) {
- abort();
- throw AbortingException.wrap(th);
+ } catch (Throwable t) {
+ onAbortingException(t);
+ throw t;
+ } finally {
+ maybeAbort("flush");
+ }
+ }
+
+ private void maybeAbort(String location) {
+ if (hasHitAbortingException() && aborted == false) {
+ // if we are already aborted don't do anything here
+ try {
+ abort();
+ } finally {
+ // whatever we do here we have to fire this tragic event up.
+ indexWriter.onTragicEvent(abortingException, location);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cf8c9cab/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java b/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java
index d8fb249..282db03 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexFileDeleter.java
@@ -341,8 +341,8 @@ final class IndexFileDeleter implements Closeable {
void ensureOpen() throws AlreadyClosedException {
writer.ensureOpen(false);
// since we allow 'closing' state, we must still check this, we could be closing because we hit e.g. OOM
- if (writer.tragedy != null) {
- throw new AlreadyClosedException("refusing to delete any files: this IndexWriter hit an unrecoverable exception", writer.tragedy);
+ if (writer.tragedy.get() != null) {
+ throw new AlreadyClosedException("refusing to delete any files: this IndexWriter hit an unrecoverable exception", writer.tragedy.get());
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cf8c9cab/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 4719ec9..fa18211 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.lucene.analysis.Analyzer;
@@ -270,7 +271,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
public final static int MAX_STORED_STRING_LENGTH = ArrayUtil.MAX_ARRAY_LENGTH / UnicodeUtil.MAX_UTF8_BYTES_PER_CHAR;
// when unrecoverable disaster strikes, we populate this with the reason that we had to close IndexWriter
- volatile Throwable tragedy;
+ final AtomicReference<Throwable> tragedy = new AtomicReference<>(null);
private final Directory directoryOrig; // original user directory
final Directory directory; // wrapped with additional checks
@@ -463,7 +464,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
flushCount.incrementAndGet();
}
- processEvents(false, true);
+ processEvents(false);
if (applyAllDeletes) {
applyAllDeletesAndUpdates();
@@ -497,7 +498,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Done: finish the full flush!
docWriter.finishFullFlush(this, success);
if (success) {
- processEvents(false, true);
+ processEvents(false);
doAfterFlush();
} else {
if (infoStream.isEnabled("IW")) {
@@ -514,13 +515,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "getReader took " + (System.currentTimeMillis() - tStart) + " msec");
}
success2 = true;
- } catch (AbortingException | VirtualMachineError tragedy) {
+ } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "getReader");
- // never reached but javac disagrees:
- return null;
+ throw tragedy;
} finally {
if (!success2) {
- IOUtils.closeWhileHandlingException(r);
+ try {
+ IOUtils.closeWhileHandlingException(r);
+ } finally {
+ maybeCloseOnTragicEvent();
+ }
}
}
return r;
@@ -894,7 +898,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*/
protected final void ensureOpen(boolean failIfClosing) throws AlreadyClosedException {
if (closed || (failIfClosing && closing)) {
- throw new AlreadyClosedException("this IndexWriter is closed", tragedy);
+ throw new AlreadyClosedException("this IndexWriter is closed", tragedy.get());
}
}
@@ -1265,7 +1269,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Ensure that only one thread actually gets to do the
// closing
if (shouldClose(true)) {
- boolean success = false;
try {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now flush at close");
@@ -1275,16 +1278,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
waitForMerges();
commitInternal(config.getMergePolicy());
rollbackInternal(); // ie close, since we just committed
- success = true;
- } finally {
- if (success == false) {
- // Be certain to close the index on any exception
- try {
- rollbackInternal();
- } catch (Throwable t) {
- // Suppress so we keep throwing original exception
- }
+ } catch (Throwable t) {
+ // Be certain to close the index on any exception
+ try {
+ rollbackInternal();
+ } catch (Throwable t1) {
+ t.addSuppressed(t1);
}
+ throw t;
}
}
}
@@ -1530,29 +1531,27 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
private long updateDocuments(final DocumentsWriterDeleteQueue.Node<?> delNode, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
ensureOpen();
+ boolean success = false;
try {
- boolean success = false;
- try {
- long seqNo = docWriter.updateDocuments(docs, analyzer, delNode);
- if (seqNo < 0) {
- seqNo = -seqNo;
- processEvents(true, false);
- }
- success = true;
- return seqNo;
- } finally {
- if (!success) {
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "hit exception updating document");
- }
- }
+ long seqNo = docWriter.updateDocuments(docs, analyzer, delNode);
+ if (seqNo < 0) {
+ seqNo = -seqNo;
+ processEvents(true);
}
- } catch (AbortingException | VirtualMachineError tragedy) {
+ success = true;
+ return seqNo;
+ } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocuments");
-
- // dead code but javac disagrees
- return -1;
+ throw tragedy;
+ } finally {
+ if (success == false) {
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "hit exception updating document");
+ }
+ maybeCloseOnTragicEvent();
+ }
}
+
}
/**
@@ -1702,14 +1701,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
long seqNo = docWriter.deleteTerms(terms);
if (seqNo < 0) {
seqNo = -seqNo;
- processEvents(true, false);
+ processEvents(true);
}
return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteDocuments(Term..)");
-
- // dead code but javac disagrees:
- return -1;
+ throw tragedy;
}
}
@@ -1739,15 +1736,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
long seqNo = docWriter.deleteQueries(queries);
if (seqNo < 0) {
seqNo = -seqNo;
- processEvents(true, false);
+ processEvents(true);
}
return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteDocuments(Query..)");
-
- // dead code but javac disagrees:
- return -1;
+ throw tragedy;
}
}
@@ -1774,28 +1769,25 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
private long updateDocument(final DocumentsWriterDeleteQueue.Node<?> delNode,
Iterable<? extends IndexableField> doc) throws IOException {
ensureOpen();
+ boolean success = false;
try {
- boolean success = false;
- try {
- long seqNo = docWriter.updateDocument(doc, analyzer, delNode);
- if (seqNo < 0) {
- seqNo = - seqNo;
- processEvents(true, false);
- }
- success = true;
- return seqNo;
- } finally {
- if (!success) {
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "hit exception updating document");
- }
- }
+ long seqNo = docWriter.updateDocument(doc, analyzer, delNode);
+ if (seqNo < 0) {
+ seqNo = -seqNo;
+ processEvents(true);
}
- } catch (AbortingException | VirtualMachineError tragedy) {
+ success = true;
+ return seqNo;
+ } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocument");
-
- // dead code but javac disagrees:
- return -1;
+ throw tragedy;
+ } finally {
+ if (success == false) {
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "hit exception updating document");
+ }
+ }
+ maybeCloseOnTragicEvent();
}
}
@@ -1875,14 +1867,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
long seqNo = docWriter.updateDocValues(new NumericDocValuesUpdate(term, field, value));
if (seqNo < 0) {
seqNo = -seqNo;
- processEvents(true, false);
+ processEvents(true);
}
return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateNumericDocValue");
-
- // dead code but javac disagrees:
- return -1;
+ throw tragedy;
}
}
@@ -1922,14 +1912,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
long seqNo = docWriter.updateDocValues(new BinaryDocValuesUpdate(term, field, value));
if (seqNo < 0) {
seqNo = -seqNo;
- processEvents(true, false);
+ processEvents(true);
}
return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateBinaryDocValue");
-
- // dead code but javac disagrees:
- return -1;
+ throw tragedy;
}
}
@@ -1957,14 +1945,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
long seqNo = docWriter.updateDocValues(dvUpdates);
if (seqNo < 0) {
seqNo = -seqNo;
- processEvents(true, false);
+ processEvents(true);
}
return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocValues");
-
- // dead code but javac disagrees:
- return -1;
+ throw tragedy;
}
}
@@ -2183,8 +2169,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized(this) {
while(true) {
- if (tragedy != null) {
- throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete forceMerge", tragedy);
+ if (tragedy.get() != null) {
+ throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete forceMerge", tragedy.get());
}
if (mergeExceptions.size() > 0) {
@@ -2269,8 +2255,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean running = true;
while(running) {
- if (tragedy != null) {
- throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete forceMergeDeletes", tragedy);
+ if (tragedy.get() != null) {
+ throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete forceMergeDeletes", tragedy.get());
}
// Check each merge that MergePolicy asked us to
@@ -2361,7 +2347,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
// Do not start new merges if disaster struck
- if (tragedy != null) {
+ if (tragedy.get() != null) {
return false;
}
boolean newMergesFound = false;
@@ -2514,7 +2500,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Ask deleter to locate unreferenced files & remove
// them ... only when we are not experiencing a tragedy, else
// these methods throw ACE:
- if (tragedy == null) {
+ if (tragedy.get() == null) {
deleter.checkpoint(segmentInfos, false);
deleter.refresh();
deleter.close();
@@ -2533,6 +2519,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
success = true;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "rollbackInternal");
+ throw tragedy;
} finally {
if (success == false) {
// Must not hold IW's lock while closing
@@ -2616,7 +2603,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
try {
synchronized (fullFlushLock) {
docWriter.lockAndAbortAll(this);
- processEvents(false, true);
+ processEvents(false);
synchronized (this) {
try {
// Abort any running merges
@@ -2657,9 +2644,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteAll");
-
- // dead code but javac disagrees
- return -1;
+ throw tragedy;
}
}
@@ -3022,8 +3007,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "addIndexes(Directory...)");
- // dead code but javac disagrees:
- seqNo = -1;
+ throw tragedy;
} finally {
if (successTop) {
IOUtils.close(locks);
@@ -3200,8 +3184,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "addIndexes(CodecReader...)");
- // dead code but javac disagrees:
- seqNo = -1;
+ throw tragedy;
}
maybeMerge();
@@ -3301,13 +3284,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
public final boolean flushNextBuffer() throws IOException {
try {
if (docWriter.flushOneDWPT()) {
- processEvents(true, false);
+ processEvents(true);
return true; // we wrote a segment
}
- } catch (AbortingException | VirtualMachineError tragedy) {
+ return false;
+ } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "flushNextBuffer");
+ throw tragedy;
+ } finally {
+ maybeCloseOnTragicEvent();
}
- return false;
}
private long prepareCommitInternal() throws IOException {
@@ -3319,8 +3305,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", " index before flush " + segString());
}
- if (tragedy != null) {
- throw new IllegalStateException("this writer hit an unrecoverable error; cannot commit", tragedy);
+ if (tragedy.get() != null) {
+ throw new IllegalStateException("this writer hit an unrecoverable error; cannot commit", tragedy.get());
}
if (pendingCommit != null) {
@@ -3355,7 +3341,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
// cannot pass triggerMerges=true here else it can lead to deadlock:
- processEvents(false, false);
+ processEvents(false);
flushSuccess = true;
@@ -3411,11 +3397,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
doAfterFlush();
}
}
- } catch (AbortingException | VirtualMachineError tragedy) {
+ } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "prepareCommit");
-
- // dead code but javac disagrees:
- seqNo = -1;
+ throw tragedy;
+ } finally {
+ maybeCloseOnTragicEvent();
}
boolean success = false;
@@ -3584,8 +3570,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized(this) {
ensureOpen(false);
- if (tragedy != null) {
- throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete commit", tragedy);
+ if (tragedy.get() != null) {
+ throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete commit", tragedy.get());
}
if (pendingCommit != null) {
@@ -3643,9 +3629,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
if (commitCompleted) {
tragicEvent(t, "finishCommit");
- } else {
- throw IOUtils.rethrowAlways(t);
}
+ throw t;
}
if (infoStream.isEnabled("IW")) {
@@ -3694,8 +3679,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
/** Returns true a segment was flushed or deletes were applied. */
private boolean doFlush(boolean applyAllDeletes) throws IOException {
- if (tragedy != null) {
- throw new IllegalStateException("this writer hit an unrecoverable error; cannot flush", tragedy);
+ if (tragedy.get() != null) {
+ throw new IllegalStateException("this writer hit an unrecoverable error; cannot flush", tragedy.get());
}
doBeforeFlush();
@@ -3726,7 +3711,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
flushSuccess = true;
} finally {
docWriter.finishFullFlush(this, flushSuccess);
- processEvents(false, true);
+ processEvents(false);
}
}
@@ -3741,15 +3726,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
success = true;
return anyChanges;
}
- } catch (AbortingException | VirtualMachineError tragedy) {
+ } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "doFlush");
- // never hit
- return false;
+ throw tragedy;
} finally {
if (!success) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during flush");
}
+ maybeCloseOnTragicEvent();
}
}
}
@@ -3974,8 +3959,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
testPoint("startCommitMerge");
- if (tragedy != null) {
- throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete merge", tragedy);
+ if (tragedy.get() != null) {
+ throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete merge", tragedy.get());
}
if (infoStream.isEnabled("IW")) {
@@ -4195,6 +4180,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Important that tragicEvent is called after mergeFinish, else we hang
// waiting for our merge thread to be removed from runningMerges:
tragicEvent(t, "merge");
+ throw t;
}
if (merge.info != null && merge.isAborted() == false) {
@@ -4328,8 +4314,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
assert merge.registerDone;
assert merge.maxNumSegments == UNBOUNDED_MAX_MERGE_SEGMENTS || merge.maxNumSegments > 0;
- if (tragedy != null) {
- throw new IllegalStateException("this writer hit an unrecoverable error; cannot merge", tragedy);
+ if (tragedy.get() != null) {
+ throw new IllegalStateException("this writer hit an unrecoverable error; cannot merge", tragedy.get());
}
if (merge.info != null) {
@@ -4826,8 +4812,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
testPoint("startStartCommit");
assert pendingCommit == null;
- if (tragedy != null) {
- throw new IllegalStateException("this writer hit an unrecoverable error; cannot commit", tragedy);
+ if (tragedy.get() != null) {
+ throw new IllegalStateException("this writer hit an unrecoverable error; cannot commit", tragedy.get());
}
try {
@@ -4930,6 +4916,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "startCommit");
+ throw tragedy;
}
testPoint("finishStartCommit");
}
@@ -4948,7 +4935,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* <p><b>NOTE</b>: {@link #warm(LeafReader)} is called before any
* deletes have been carried over to the merged segment. */
@FunctionalInterface
- public static interface IndexReaderWarmer {
+ public interface IndexReaderWarmer {
/**
* Invoked on the {@link LeafReader} for the newly
* merged segment, before that segment is made visible
@@ -4957,50 +4944,50 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
void warm(LeafReader reader) throws IOException;
}
- void tragicEvent(Throwable tragedy, String location) throws IOException {
-
- // unbox our internal AbortingException
- if (tragedy instanceof AbortingException) {
- tragedy = tragedy.getCause();
- }
-
+ /**
+ * This method should be called on a tragic event ie. if a downstream class of the writer
+ * hits an unrecoverable exception. This method does not rethrow the tragic event exception.
+ * Note: This method will not close the writer but can be called from any location without respecting any lock order
+ */
+ final void onTragicEvent(Throwable tragedy, String location) {
// This is not supposed to be tragic: IW is supposed to catch this and
// ignore, because it means we asked the merge to abort:
assert tragedy instanceof MergePolicy.MergeAbortedException == false;
-
- // We cannot hold IW's lock here else it can lead to deadlock:
- assert Thread.holdsLock(this) == false;
-
// How can it be a tragedy when nothing happened?
assert tragedy != null;
-
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit tragic " + tragedy.getClass().getSimpleName() + " inside " + location);
}
+ this.tragedy.compareAndSet(null, tragedy); // only set it once
+ }
- synchronized (this) {
- // It's possible you could have a really bad day
- if (this.tragedy != null) {
- // Another thread is already dealing / has dealt with the tragedy:
- throw IOUtils.rethrowAlways(tragedy);
- }
-
- this.tragedy = tragedy;
+ /**
+ * This method set the tragic exception unless it's already set and closes the writer
+ * if necessary. Note this method will not rethrow the throwable passed to it.
+ */
+ private void tragicEvent(Throwable tragedy, String location) throws IOException {
+ try {
+ onTragicEvent(tragedy, location);
+ } finally {
+ maybeCloseOnTragicEvent();
}
+ }
+ private void maybeCloseOnTragicEvent() throws IOException {
+ // We cannot hold IW's lock here else it can lead to deadlock:
+ assert Thread.holdsLock(this) == false;
+ assert Thread.holdsLock(fullFlushLock) == false;
// if we are already closed (e.g. called by rollback), this will be a no-op.
- if (shouldClose(false)) {
+ if (this.tragedy.get() != null && shouldClose(false)) {
rollbackInternal();
}
-
- throw IOUtils.rethrowAlways(tragedy);
}
/** If this {@code IndexWriter} was closed as a side-effect of a tragic exception,
* e.g. disk full while flushing a new segment, this returns the root cause exception.
* Otherwise (no tragic exception has occurred) it returns null. */
public Throwable getTragicException() {
- return tragedy;
+ return tragedy.get();
}
/** Returns {@code true} if this {@code IndexWriter} is still open. */
@@ -5180,26 +5167,22 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
- private void processEvents(boolean triggerMerge, boolean forcePurge) throws IOException {
- processEvents(eventQueue, triggerMerge, forcePurge);
- if (triggerMerge) {
- maybeMerge(getConfig().getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
- }
- }
-
- private void processEvents(Queue<Event> queue, boolean triggerMerge, boolean forcePurge) throws IOException {
- if (tragedy == null) {
+ private void processEvents(boolean triggerMerge) throws IOException {
+ if (tragedy.get() == null) {
Event event;
- while ((event = queue.poll()) != null) {
- event.process(this, triggerMerge, forcePurge);
+ while ((event = eventQueue.poll()) != null) {
+ event.process(this);
}
}
+ if (triggerMerge) {
+ maybeMerge(getConfig().getMergePolicy(), MergeTrigger.SEGMENT_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
+ }
}
-
+
/**
* Interface for internal atomic events. See {@link DocumentsWriter} for details. Events are executed concurrently and no order is guaranteed.
* Each event should only rely on the serializeability within its process method. All actions that must happen before or after a certain action must be
- * encoded inside the {@link #process(IndexWriter, boolean, boolean)} method.
+ * encoded inside the {@link #process(IndexWriter)} method.
*
*/
interface Event {
@@ -5210,14 +5193,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* @param writer
* the {@link IndexWriter} that executes the event.
- * @param triggerMerge
- * <code>false</code> iff this event should not trigger any segment merges
- * @param clearBuffers
- * <code>true</code> iff this event should clear all buffers associated with the event.
* @throws IOException
* if an {@link IOException} occurs
*/
- void process(IndexWriter writer, boolean triggerMerge, boolean clearBuffers) throws IOException;
+ void process(IndexWriter writer) throws IOException;
}
/** Anything that will add N docs to the index should reserve first to
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cf8c9cab/lucene/core/src/java/org/apache/lucene/index/TermVectorsConsumer.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/TermVectorsConsumer.java b/lucene/core/src/java/org/apache/lucene/index/TermVectorsConsumer.java
index 46dc63c..b5e525b 100644
--- a/lucene/core/src/java/org/apache/lucene/index/TermVectorsConsumer.java
+++ b/lucene/core/src/java/org/apache/lucene/index/TermVectorsConsumer.java
@@ -124,11 +124,8 @@ class TermVectorsConsumer extends TermsHash {
try {
super.abort();
} finally {
- if (writer != null) {
- IOUtils.closeWhileHandlingException(writer);
- writer = null;
- }
-
+ IOUtils.closeWhileHandlingException(writer);
+ writer = null;
lastDocID = 0;
reset();
}