You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2018/05/04 10:16:54 UTC
[2/2] lucene-solr:branch_7x: LUCENE-8293: Ensure only hard deletes
are carried over in a merge
LUCENE-8293: Ensure only hard deletes are carried over in a merge
Today we carry over hard deletes based on the SegmentReaders liveDocs.
This is not correct if soft-deletes are used especially with rentention
policies. If a soft delete is added while a segment is merged the document
might end up hard deleted in the target segment. This isn't necessarily a
correctness issue but causes unnecessary writes of hard-deletes. The biggest
issue here is that we assert that previously deleted documents are still deleted
in the live-docs we apply and that might be violated by the retention policy.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/dad48603
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/dad48603
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/dad48603
Branch: refs/heads/branch_7x
Commit: dad48603aec715063fdcb71e11fe73599d63c3a2
Parents: 0e45b39
Author: Simon Willnauer <si...@apache.org>
Authored: Thu May 3 09:29:12 2018 +0200
Committer: Simon Willnauer <si...@apache.org>
Committed: Fri May 4 12:16:39 2018 +0200
----------------------------------------------------------------------
.../org/apache/lucene/index/IndexWriter.java | 118 +++++++++++--------
.../org/apache/lucene/index/MergePolicy.java | 2 +
.../org/apache/lucene/index/PendingDeletes.java | 5 +-
.../apache/lucene/index/PendingSoftDeletes.java | 11 ++
.../apache/lucene/index/ReadersAndUpdates.java | 21 +++-
.../lucene/index/TestPendingSoftDeletes.java | 5 +
.../TestSoftDeletesRetentionMergePolicy.java | 93 +++++++++++++++
7 files changed, 204 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/dad48603/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 3846124..ead53b3 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import org.apache.lucene.analysis.Analyzer;
@@ -3623,64 +3624,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
SegmentCommitInfo info = sourceSegments.get(i);
minGen = Math.min(info.getBufferedDeletesGen(), minGen);
final int maxDoc = info.info.maxDoc();
- final Bits prevLiveDocs = merge.readers.get(i).getLiveDocs();
final ReadersAndUpdates rld = getPooledInstance(info, false);
// We hold a ref, from when we opened the readers during mergeInit, so it better still be in the pool:
assert rld != null: "seg=" + info.info.name;
- final Bits currentLiveDocs = rld.getLiveDocs();
MergeState.DocMap segDocMap = mergeState.docMaps[i];
MergeState.DocMap segLeafDocMap = mergeState.leafDocMaps[i];
- if (prevLiveDocs != null) {
-
- // If we had deletions on starting the merge we must
- // still have deletions now:
- assert currentLiveDocs != null;
- assert prevLiveDocs.length() == maxDoc;
- assert currentLiveDocs.length() == maxDoc;
-
- // There were deletes on this segment when the merge
- // started. The merge has collapsed away those
- // deletes, but, if new deletes were flushed since
- // the merge started, we must now carefully keep any
- // newly flushed deletes but mapping them to the new
- // docIDs.
-
- // Since we copy-on-write, if any new deletes were
- // applied after merging has started, we can just
- // check if the before/after liveDocs have changed.
- // If so, we must carefully merge the liveDocs one
- // doc at a time:
- if (currentLiveDocs != prevLiveDocs) {
- // This means this segment received new deletes
- // since we started the merge, so we
- // must merge them:
- for (int j = 0; j < maxDoc; j++) {
- if (prevLiveDocs.get(j) == false) {
- // if the document was deleted before, it better still be deleted!
- assert currentLiveDocs.get(j) == false;
- } else if (currentLiveDocs.get(j) == false) {
- // the document was deleted while we were merging:
- mergedDeletesAndUpdates.delete(segDocMap.get(segLeafDocMap.get(j)));
- }
- }
- }
- } else if (currentLiveDocs != null) {
- assert currentLiveDocs.length() == maxDoc;
- // This segment had no deletes before but now it
- // does:
- for (int j = 0; j < maxDoc; j++) {
- if (currentLiveDocs.get(j) == false) {
- mergedDeletesAndUpdates.delete(segDocMap.get(segLeafDocMap.get(j)));
- }
- }
- }
+ carryOverHardDeletes(mergedDeletesAndUpdates, maxDoc, mergeState.liveDocs[i], merge.hardLiveDocs.get(i), rld.getHardLiveDocs(),
+ segDocMap, segLeafDocMap);
// Now carry over all doc values updates that were resolved while we were merging, remapping the docIDs to the newly merged docIDs.
// We only carry over packets that finished resolving; if any are still running (concurrently) they will detect that our merge completed
// and re-resolve against the newly merged segment:
-
Map<String,List<DocValuesFieldUpdates>> mergingDVUpdates = rld.getMergingDVUpdates();
for (Map.Entry<String,List<DocValuesFieldUpdates>> ent : mergingDVUpdates.entrySet()) {
@@ -3759,6 +3715,69 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return mergedDeletesAndUpdates;
}
+ /**
+ * This method carries over hard-deleted documents that are applied to the source segment during a merge.
+ */
+ private static void carryOverHardDeletes(ReadersAndUpdates mergedReadersAndUpdates, int maxDoc,
+ Bits mergeLiveDocs, // the liveDocs used to build the segDocMaps
+ Bits prevHardLiveDocs, // the hard deletes when the merge reader was pulled
+ Bits currentHardLiveDocs, // the current hard deletes
+ MergeState.DocMap segDocMap, MergeState.DocMap segLeafDocMap) throws IOException {
+
+ assert mergeLiveDocs == null || mergeLiveDocs.length() == maxDoc;
+ // if we mix soft and hard deletes we need to make sure that we only carry over deletes
+ // that were not deleted before. Otherwise the segDocMap doesn't contain a mapping.
+ // yet this is also required if any MergePolicy modifies the liveDocs since this is
+ // what the segDocMap is build on.
+ final IntPredicate carryOverDelete = mergeLiveDocs == null || mergeLiveDocs == prevHardLiveDocs
+ ? docId -> currentHardLiveDocs.get(docId) == false
+ : docId -> mergeLiveDocs.get(docId) && currentHardLiveDocs.get(docId) == false;
+ if (prevHardLiveDocs != null) {
+ // If we had deletions on starting the merge we must
+ // still have deletions now:
+ assert currentHardLiveDocs != null;
+ assert mergeLiveDocs != null;
+ assert prevHardLiveDocs.length() == maxDoc;
+ assert currentHardLiveDocs.length() == maxDoc;
+
+ // There were deletes on this segment when the merge
+ // started. The merge has collapsed away those
+ // deletes, but, if new deletes were flushed since
+ // the merge started, we must now carefully keep any
+ // newly flushed deletes but mapping them to the new
+ // docIDs.
+
+ // Since we copy-on-write, if any new deletes were
+ // applied after merging has started, we can just
+ // check if the before/after liveDocs have changed.
+ // If so, we must carefully merge the liveDocs one
+ // doc at a time:
+ if (currentHardLiveDocs != prevHardLiveDocs) {
+ // This means this segment received new deletes
+ // since we started the merge, so we
+ // must merge them:
+ for (int j = 0; j < maxDoc; j++) {
+ if (prevHardLiveDocs.get(j) == false) {
+ // if the document was deleted before, it better still be deleted!
+ assert currentHardLiveDocs.get(j) == false;
+ } else if (carryOverDelete.test(j)) {
+ // the document was deleted while we were merging:
+ mergedReadersAndUpdates.delete(segDocMap.get(segLeafDocMap.get(j)));
+ }
+ }
+ }
+ } else if (currentHardLiveDocs != null) {
+ assert currentHardLiveDocs.length() == maxDoc;
+ // This segment had no deletes before but now it
+ // does:
+ for (int j = 0; j < maxDoc; j++) {
+ if (carryOverDelete.test(j)) {
+ mergedReadersAndUpdates.delete(segDocMap.get(segLeafDocMap.get(j)));
+ }
+ }
+ }
+ }
+
@SuppressWarnings("try")
synchronized private boolean commitMerge(MergePolicy.OneMerge merge, MergeState mergeState) throws IOException {
@@ -4238,6 +4257,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
merge.readers = new ArrayList<>(sourceSegments.size());
+ merge.hardLiveDocs = new ArrayList<>(sourceSegments.size());
// This is try/finally to make sure merger's readers are
// closed:
@@ -4253,13 +4273,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final ReadersAndUpdates rld = getPooledInstance(info, true);
rld.setIsMerging();
- SegmentReader reader = rld.getReaderForMerge(context);
+ ReadersAndUpdates.MergeReader mr = rld.getReaderForMerge(context);
+ SegmentReader reader = mr.reader;
int delCount = reader.numDeletedDocs();
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "seg=" + segString(info) + " reader=" + reader);
}
+ merge.hardLiveDocs.add(mr.hardLiveDocs);
merge.readers.add(reader);
assert delCount <= info.info.maxDoc(): "delCount=" + delCount + " info.maxDoc=" + info.info.maxDoc() + " rld.pendingDeleteCount=" + rld.getPendingDeleteCount() + " info.getDelCount()=" + info.getDelCount();
segUpto++;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/dad48603/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index 029cca9..36ea33b 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
import org.apache.lucene.document.Field;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MergeInfo;
+import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOSupplier;
/**
@@ -205,6 +206,7 @@ public abstract class MergePolicy {
volatile long totalMergeBytes;
List<SegmentReader> readers; // used by IndexWriter
+ List<Bits> hardLiveDocs; // used by IndexWriter
/** Segments to be merged. */
public final List<SegmentCommitInfo> segments;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/dad48603/lucene/core/src/java/org/apache/lucene/index/PendingDeletes.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/PendingDeletes.java b/lucene/core/src/java/org/apache/lucene/index/PendingDeletes.java
index 1878665..2dc0858 100644
--- a/lucene/core/src/java/org/apache/lucene/index/PendingDeletes.java
+++ b/lucene/core/src/java/org/apache/lucene/index/PendingDeletes.java
@@ -145,7 +145,6 @@ class PendingDeletes {
liveDocs = reader.getLiveDocs();
assert liveDocs == null || assertCheckLiveDocs(liveDocs, info.info.maxDoc(), info.getDelCount());
liveDocsShared = true;
-
}
liveDocsInitialized = true;
}
@@ -245,4 +244,8 @@ class PendingDeletes {
int numDeletesToMerge(MergePolicy policy, IOSupplier<CodecReader> readerIOSupplier) throws IOException {
return policy.numDeletesToMerge(info, numPendingDeletes(), readerIOSupplier);
}
+
+ Bits getHardLiveDocs() {
+ return liveDocs;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/dad48603/lucene/core/src/java/org/apache/lucene/index/PendingSoftDeletes.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/PendingSoftDeletes.java b/lucene/core/src/java/org/apache/lucene/index/PendingSoftDeletes.java
index eae25e0..41eebd2 100644
--- a/lucene/core/src/java/org/apache/lucene/index/PendingSoftDeletes.java
+++ b/lucene/core/src/java/org/apache/lucene/index/PendingSoftDeletes.java
@@ -25,6 +25,7 @@ import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
+import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOSupplier;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.MutableBits;
@@ -193,4 +194,14 @@ final class PendingSoftDeletes extends PendingDeletes {
return fisFormat.read(dir, segInfo, segmentSuffix, IOContext.READONCE);
}
}
+
+ Bits getHardLiveDocs() {
+ return hardDeletes.getHardLiveDocs();
+ }
+
+ @Override
+ void liveDocsShared() {
+ super.liveDocsShared();
+ hardDeletes.liveDocsShared();
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/dad48603/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java
index 6e074ee..b33ac4b 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java
@@ -274,6 +274,13 @@ final class ReadersAndUpdates {
return pendingDeletes.getLiveDocs();
}
+ /**
+ * Returns the live-docs bits excluding documents that are not live due to soft-deletes
+ */
+ public synchronized Bits getHardLiveDocs() {
+ return pendingDeletes.getHardLiveDocs();
+ }
+
public synchronized void dropChanges() {
// Discard (don't save) changes when we are dropping
// the reader; this is used only on the sub-readers
@@ -687,8 +694,18 @@ final class ReadersAndUpdates {
return isMerging;
}
+ final static class MergeReader {
+ final SegmentReader reader;
+ final Bits hardLiveDocs;
+
+ MergeReader(SegmentReader reader, Bits hardLiveDocs) {
+ this.reader = reader;
+ this.hardLiveDocs = hardLiveDocs;
+ }
+ }
+
/** Returns a reader for merge, with the latest doc values updates and deletions. */
- synchronized SegmentReader getReaderForMerge(IOContext context) throws IOException {
+ synchronized MergeReader getReaderForMerge(IOContext context) throws IOException {
// We must carry over any still-pending DV updates because they were not
// successfully written, e.g. because there was a hole in the delGens,
@@ -715,7 +732,7 @@ final class ReadersAndUpdates {
markAsShared();
assert verifyDocCounts();
- return reader;
+ return new MergeReader(reader, pendingDeletes.getHardLiveDocs());
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/dad48603/lucene/core/src/test/org/apache/lucene/index/TestPendingSoftDeletes.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestPendingSoftDeletes.java b/lucene/core/src/test/org/apache/lucene/index/TestPendingSoftDeletes.java
index 7ddf0be..aeb5819 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestPendingSoftDeletes.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestPendingSoftDeletes.java
@@ -70,6 +70,7 @@ public class TestPendingSoftDeletes extends TestPendingDeletes {
assertTrue(pendingSoftDeletes.getLiveDocs().get(0));
assertFalse(pendingSoftDeletes.getLiveDocs().get(1));
assertTrue(pendingSoftDeletes.getLiveDocs().get(2));
+ assertNull(pendingSoftDeletes.getHardLiveDocs());
// pass reader again
Bits liveDocs = pendingSoftDeletes.getLiveDocs();
pendingSoftDeletes.liveDocsShared();
@@ -91,6 +92,10 @@ public class TestPendingSoftDeletes extends TestPendingDeletes {
assertFalse(pendingSoftDeletes.getLiveDocs().get(0));
assertFalse(pendingSoftDeletes.getLiveDocs().get(1));
assertTrue(pendingSoftDeletes.getLiveDocs().get(2));
+ assertNotNull(pendingSoftDeletes.getHardLiveDocs());
+ assertFalse(pendingSoftDeletes.getHardLiveDocs().get(0));
+ assertTrue(pendingSoftDeletes.getHardLiveDocs().get(1));
+ assertTrue(pendingSoftDeletes.getHardLiveDocs().get(2));
IOUtils.close(reader, writer, dir);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/dad48603/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java
index 5f1ba6c..c9b22a5 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestSoftDeletesRetentionMergePolicy.java
@@ -445,4 +445,97 @@ public class TestSoftDeletesRetentionMergePolicy extends LuceneTestCase {
writer.close();
dir.close();
}
+
+ public void testSoftDeleteWhileMergeSurvives() throws IOException {
+ Directory dir = newDirectory();
+ String softDelete = "soft_delete";
+ IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(softDelete);
+ AtomicBoolean update = new AtomicBoolean(true);
+ config.setReaderPooling(true);
+ config.setMergePolicy(new SoftDeletesRetentionMergePolicy("soft_delete", () -> new DocValuesFieldExistsQuery("keep"),
+ new LogDocMergePolicy()));
+ IndexWriter writer = new IndexWriter(dir, config);
+ writer.getConfig().setMergedSegmentWarmer(sr -> {
+ if (update.compareAndSet(true, false)) {
+ try {
+ writer.softUpdateDocument(new Term("id", "0"), new Document(),
+ new NumericDocValuesField(softDelete, 1), new NumericDocValuesField("keep", 1));
+ writer.commit();
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ }
+ });
+
+ boolean preExistingDeletes = random().nextBoolean();
+ for (int i = 0; i < 2; i++) {
+ Document d = new Document();
+ d.add(new StringField("id", Integer.toString(i), Field.Store.YES));
+ if (preExistingDeletes && random().nextBoolean()) {
+ writer.addDocument(d); // randomly add a preexisting hard-delete that we don't carry over
+ writer.deleteDocuments(new Term("id", Integer.toString(i)));
+ d.add(new NumericDocValuesField("keep", 1));
+ writer.addDocument(d);
+ } else {
+ d.add(new NumericDocValuesField("keep", 1));
+ writer.addDocument(d);
+ }
+ writer.flush();
+ }
+ writer.forceMerge(1);
+ writer.commit();
+ assertFalse(update.get());
+ DirectoryReader open = DirectoryReader.open(dir);
+ assertEquals(0, open.numDeletedDocs());
+ assertEquals(3, open.maxDoc());
+ IOUtils.close(open, writer, dir);
+ }
+
+ /*
+ * This test is trying to hard-delete a particular document while the segment is merged which is already soft-deleted
+ * This requires special logic inside IndexWriter#carryOverHardDeletes since docMaps are not created for this document.
+ */
+ public void testDeleteDocWhileMergeThatIsSoftDeleted() throws IOException {
+ Directory dir = newDirectory();
+ String softDelete = "soft_delete";
+ IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(softDelete);
+ AtomicBoolean delete = new AtomicBoolean(true);
+ config.setReaderPooling(true);
+ config.setMergePolicy(new LogDocMergePolicy());
+ IndexWriter writer = new IndexWriter(dir, config);
+ Document d = new Document();
+ d.add(new StringField("id", "0", Field.Store.YES));
+ writer.addDocument(d);
+ d = new Document();
+ d.add(new StringField("id", "1", Field.Store.YES));
+ writer.addDocument(d);
+ if (random().nextBoolean()) {
+ // randomly run with a preexisting hard delete
+ d = new Document();
+ d.add(new StringField("id", "2", Field.Store.YES));
+ writer.addDocument(d);
+ writer.deleteDocuments(new Term("id", "2"));
+ }
+
+ writer.flush();
+ DirectoryReader reader = writer.getReader();
+ writer.softUpdateDocument(new Term("id", "0"), new Document(),
+ new NumericDocValuesField(softDelete, 1));
+ writer.flush();
+ writer.getConfig().setMergedSegmentWarmer(sr -> {
+ if (delete.compareAndSet(true, false)) {
+ try {
+ long seqNo = writer.tryDeleteDocument(reader, 0);
+ assertTrue("seqId was -1", seqNo != -1);
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ }
+ });
+ writer.forceMerge(1);
+ assertEquals(2, writer.numDocs());
+ assertEquals(2, writer.maxDoc());
+ assertFalse(delete.get());
+ IOUtils.close(reader, writer, dir);
+ }
}