You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@lucene.apache.org by GitBox <gi...@apache.org> on 2021/05/31 15:17:50 UTC

[GitHub] [lucene] mikemccand commented on a change in pull request #128: LUCENE-9662: CheckIndex should be concurrent - parallelizing index check across segments

mikemccand commented on a change in pull request #128:
URL: https://github.com/apache/lucene/pull/128#discussion_r642550067



##########
File path: lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
##########
@@ -605,209 +680,103 @@ public Status checkIndex(List<String> onlySegments) throws IOException {
     result.newSegments.clear();
     result.maxSegmentName = -1;
 
-    for (int i = 0; i < numSegments; i++) {
-      final SegmentCommitInfo info = sis.info(i);
-      long segmentName = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
-      if (segmentName > result.maxSegmentName) {
-        result.maxSegmentName = segmentName;
-      }
-      if (onlySegments != null && !onlySegments.contains(info.info.name)) {
-        continue;
-      }
-      Status.SegmentInfoStatus segInfoStat = new Status.SegmentInfoStatus();
-      result.segmentInfos.add(segInfoStat);
-      msg(
-          infoStream,
-          "  "
-              + (1 + i)
-              + " of "
-              + numSegments
-              + ": name="
-              + info.info.name
-              + " maxDoc="
-              + info.info.maxDoc());
-      segInfoStat.name = info.info.name;
-      segInfoStat.maxDoc = info.info.maxDoc();
-
-      final Version version = info.info.getVersion();
-      if (info.info.maxDoc() <= 0) {
-        throw new RuntimeException("illegal number of documents: maxDoc=" + info.info.maxDoc());
-      }
-
-      int toLoseDocCount = info.info.maxDoc();
-
-      SegmentReader reader = null;
-
-      try {
-        msg(infoStream, "    version=" + (version == null ? "3.0" : version));
-        msg(infoStream, "    id=" + StringHelper.idToString(info.info.getId()));
-        final Codec codec = info.info.getCodec();
-        msg(infoStream, "    codec=" + codec);
-        segInfoStat.codec = codec;
-        msg(infoStream, "    compound=" + info.info.getUseCompoundFile());
-        segInfoStat.compound = info.info.getUseCompoundFile();
-        msg(infoStream, "    numFiles=" + info.files().size());
-        Sort indexSort = info.info.getIndexSort();
-        if (indexSort != null) {
-          msg(infoStream, "    sort=" + indexSort);
-        }
-        segInfoStat.numFiles = info.files().size();
-        segInfoStat.sizeMB = info.sizeInBytes() / (1024. * 1024.);
-        msg(infoStream, "    size (MB)=" + nf.format(segInfoStat.sizeMB));
-        Map<String, String> diagnostics = info.info.getDiagnostics();
-        segInfoStat.diagnostics = diagnostics;
-        if (diagnostics.size() > 0) {
-          msg(infoStream, "    diagnostics = " + diagnostics);
+    // checks segments sequentially
+    if (executorService == null) {

Review comment:
       Maybe we should make a single threaded executor so we don't have to bifurcate the code?

##########
File path: lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
##########
@@ -843,6 +812,258 @@ public Status checkIndex(List<String> onlySegments) throws IOException {
     return result;
   }
 
+  private void updateMaxSegmentName(Status result, SegmentCommitInfo info) {
+    long segmentName = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
+    if (segmentName > result.maxSegmentName) {
+      result.maxSegmentName = segmentName;
+    }
+  }
+
+  private void processSegmentInfoStatusResult(
+      Status result, SegmentCommitInfo info, Status.SegmentInfoStatus segmentInfoStatus) {
+    result.segmentInfos.add(segmentInfoStatus);
+    if (segmentInfoStatus.error != null) {
+      result.totLoseDocCount += segmentInfoStatus.toLoseDocCount;
+      result.numBadSegments++;
+    } else {
+      // Keeper
+      result.newSegments.add(info.clone());
+    }
+  }
+
+  private <R> CompletableFuture<R> runAsyncSegmentCheck(
+      Callable<R> asyncCallable, ExecutorService executorService) {
+    return CompletableFuture.supplyAsync(callableToSupplier(asyncCallable), executorService);
+  }
+
+  private <T> Supplier<T> callableToSupplier(Callable<T> callable) {
+    return () -> {
+      try {
+        return callable.call();
+      } catch (RuntimeException | Error e) {
+        throw e;
+      } catch (Throwable e) {
+        throw new CompletionException(e);
+      }
+    };
+  }
+
+  private Status.SegmentInfoStatus testSegment(
+      SegmentInfos sis, SegmentCommitInfo info, PrintStream infoStream) throws IOException {
+    Status.SegmentInfoStatus segInfoStat = new Status.SegmentInfoStatus();
+    segInfoStat.name = info.info.name;
+    segInfoStat.maxDoc = info.info.maxDoc();
+
+    final Version version = info.info.getVersion();
+    if (info.info.maxDoc() <= 0) {
+      throw new CheckIndexException(" illegal number of documents: maxDoc=" + info.info.maxDoc());
+    }
+
+    int toLoseDocCount = info.info.maxDoc();

Review comment:
       Hmm, sometimes this seems to be `maxDoc` (here) and other times `numDocs` (accounting for deleted documents properly) -- let's try to be consistent with what it was before (I think `numDocs`)?

##########
File path: lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
##########
@@ -450,6 +479,14 @@ public void setChecksumsOnly(boolean v) {
 
   private boolean checksumsOnly;
 
+  /** Set threadCount used for parallelizing index integrity checking. */
+  public void setThreadCount(int tc) {
+    threadCount = tc;
+  }
+
+  // capped threadCount at 4
+  private int threadCount = Math.min(Runtime.getRuntime().availableProcessors(), 4);

Review comment:
       Whoa, why `4` :)  Could we maybe use java's `Runtime.getRuntime().availableProcessorts()`?

##########
File path: lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
##########
@@ -843,6 +812,258 @@ public Status checkIndex(List<String> onlySegments) throws IOException {
     return result;
   }
 
+  private void updateMaxSegmentName(Status result, SegmentCommitInfo info) {
+    long segmentName = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
+    if (segmentName > result.maxSegmentName) {
+      result.maxSegmentName = segmentName;
+    }
+  }
+
+  private void processSegmentInfoStatusResult(
+      Status result, SegmentCommitInfo info, Status.SegmentInfoStatus segmentInfoStatus) {
+    result.segmentInfos.add(segmentInfoStatus);
+    if (segmentInfoStatus.error != null) {
+      result.totLoseDocCount += segmentInfoStatus.toLoseDocCount;
+      result.numBadSegments++;
+    } else {
+      // Keeper
+      result.newSegments.add(info.clone());
+    }
+  }
+
+  private <R> CompletableFuture<R> runAsyncSegmentCheck(
+      Callable<R> asyncCallable, ExecutorService executorService) {
+    return CompletableFuture.supplyAsync(callableToSupplier(asyncCallable), executorService);
+  }
+
+  private <T> Supplier<T> callableToSupplier(Callable<T> callable) {
+    return () -> {
+      try {
+        return callable.call();
+      } catch (RuntimeException | Error e) {
+        throw e;
+      } catch (Throwable e) {
+        throw new CompletionException(e);
+      }
+    };
+  }
+
+  private Status.SegmentInfoStatus testSegment(
+      SegmentInfos sis, SegmentCommitInfo info, PrintStream infoStream) throws IOException {
+    Status.SegmentInfoStatus segInfoStat = new Status.SegmentInfoStatus();
+    segInfoStat.name = info.info.name;
+    segInfoStat.maxDoc = info.info.maxDoc();
+
+    final Version version = info.info.getVersion();
+    if (info.info.maxDoc() <= 0) {
+      throw new CheckIndexException(" illegal number of documents: maxDoc=" + info.info.maxDoc());
+    }
+
+    int toLoseDocCount = info.info.maxDoc();
+
+    SegmentReader reader = null;
+
+    try {
+      msg(infoStream, "    version=" + (version == null ? "3.0" : version));
+      msg(infoStream, "    id=" + StringHelper.idToString(info.info.getId()));
+      final Codec codec = info.info.getCodec();
+      msg(infoStream, "    codec=" + codec);
+      segInfoStat.codec = codec;
+      msg(infoStream, "    compound=" + info.info.getUseCompoundFile());
+      segInfoStat.compound = info.info.getUseCompoundFile();
+      msg(infoStream, "    numFiles=" + info.files().size());
+      Sort indexSort = info.info.getIndexSort();
+      if (indexSort != null) {
+        msg(infoStream, "    sort=" + indexSort);
+      }
+      segInfoStat.numFiles = info.files().size();
+      segInfoStat.sizeMB = info.sizeInBytes() / (1024. * 1024.);
+      // nf#format is not thread-safe, and would generate random non valid results in concurrent
+      // setting
+      synchronized (nf) {
+        msg(infoStream, "    size (MB)=" + nf.format(segInfoStat.sizeMB));
+      }
+      Map<String, String> diagnostics = info.info.getDiagnostics();
+      segInfoStat.diagnostics = diagnostics;
+      if (diagnostics.size() > 0) {
+        msg(infoStream, "    diagnostics = " + diagnostics);
+      }
+
+      if (!info.hasDeletions()) {
+        msg(infoStream, "    no deletions");
+        segInfoStat.hasDeletions = false;
+      } else {
+        msg(infoStream, "    has deletions [delGen=" + info.getDelGen() + "]");
+        segInfoStat.hasDeletions = true;
+        segInfoStat.deletionsGen = info.getDelGen();
+      }
+
+      long startOpenReaderNS = System.nanoTime();
+      if (infoStream != null) infoStream.print("    test: open reader.........");
+      reader = new SegmentReader(info, sis.getIndexCreatedVersionMajor(), IOContext.DEFAULT);
+      msg(
+          infoStream,
+          String.format(
+              Locale.ROOT, "OK [took %.3f sec]", nsToSec(System.nanoTime() - startOpenReaderNS)));
+
+      segInfoStat.openReaderPassed = true;
+
+      long startIntegrityNS = System.nanoTime();
+      if (infoStream != null) infoStream.print("    test: check integrity.....");
+      reader.checkIntegrity();
+      msg(
+          infoStream,
+          String.format(
+              Locale.ROOT, "OK [took %.3f sec]", nsToSec(System.nanoTime() - startIntegrityNS)));
+
+      if (reader.maxDoc() != info.info.maxDoc()) {
+        throw new CheckIndexException(
+            "SegmentReader.maxDoc() "
+                + reader.maxDoc()
+                + " != SegmentInfo.maxDoc "
+                + info.info.maxDoc());
+      }
+
+      final int numDocs = reader.numDocs();
+      toLoseDocCount = numDocs;

Review comment:
       Here is it `numDocs` (taking deletions into account).

##########
File path: lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
##########
@@ -181,6 +193,9 @@
       /** True if we were able to open a CodecReader on this segment. */
       public boolean openReaderPassed;
 
+      /** doc count in this segment */

Review comment:
       Once we decide whether this is `maxDoc` or `docCount` (taking deletions into account) let's update this javadoc?

##########
File path: lucene/core/src/java/org/apache/lucene/index/CheckIndex.java
##########
@@ -605,209 +680,103 @@ public Status checkIndex(List<String> onlySegments) throws IOException {
     result.newSegments.clear();
     result.maxSegmentName = -1;
 
-    for (int i = 0; i < numSegments; i++) {
-      final SegmentCommitInfo info = sis.info(i);
-      long segmentName = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
-      if (segmentName > result.maxSegmentName) {
-        result.maxSegmentName = segmentName;
-      }
-      if (onlySegments != null && !onlySegments.contains(info.info.name)) {
-        continue;
-      }
-      Status.SegmentInfoStatus segInfoStat = new Status.SegmentInfoStatus();
-      result.segmentInfos.add(segInfoStat);
-      msg(
-          infoStream,
-          "  "
-              + (1 + i)
-              + " of "
-              + numSegments
-              + ": name="
-              + info.info.name
-              + " maxDoc="
-              + info.info.maxDoc());
-      segInfoStat.name = info.info.name;
-      segInfoStat.maxDoc = info.info.maxDoc();
-
-      final Version version = info.info.getVersion();
-      if (info.info.maxDoc() <= 0) {
-        throw new RuntimeException("illegal number of documents: maxDoc=" + info.info.maxDoc());
-      }
-
-      int toLoseDocCount = info.info.maxDoc();
-
-      SegmentReader reader = null;
-
-      try {
-        msg(infoStream, "    version=" + (version == null ? "3.0" : version));
-        msg(infoStream, "    id=" + StringHelper.idToString(info.info.getId()));
-        final Codec codec = info.info.getCodec();
-        msg(infoStream, "    codec=" + codec);
-        segInfoStat.codec = codec;
-        msg(infoStream, "    compound=" + info.info.getUseCompoundFile());
-        segInfoStat.compound = info.info.getUseCompoundFile();
-        msg(infoStream, "    numFiles=" + info.files().size());
-        Sort indexSort = info.info.getIndexSort();
-        if (indexSort != null) {
-          msg(infoStream, "    sort=" + indexSort);
-        }
-        segInfoStat.numFiles = info.files().size();
-        segInfoStat.sizeMB = info.sizeInBytes() / (1024. * 1024.);
-        msg(infoStream, "    size (MB)=" + nf.format(segInfoStat.sizeMB));
-        Map<String, String> diagnostics = info.info.getDiagnostics();
-        segInfoStat.diagnostics = diagnostics;
-        if (diagnostics.size() > 0) {
-          msg(infoStream, "    diagnostics = " + diagnostics);
+    // checks segments sequentially
+    if (executorService == null) {
+      for (int i = 0; i < numSegments; i++) {
+        final SegmentCommitInfo info = sis.info(i);
+        updateMaxSegmentName(result, info);
+        if (onlySegments != null && !onlySegments.contains(info.info.name)) {
+          continue;
         }
 
-        if (!info.hasDeletions()) {
-          msg(infoStream, "    no deletions");
-          segInfoStat.hasDeletions = false;
-        } else {
-          msg(infoStream, "    has deletions [delGen=" + info.getDelGen() + "]");
-          segInfoStat.hasDeletions = true;
-          segInfoStat.deletionsGen = info.getDelGen();
-        }
-
-        long startOpenReaderNS = System.nanoTime();
-        if (infoStream != null) infoStream.print("    test: open reader.........");
-        reader = new SegmentReader(info, sis.getIndexCreatedVersionMajor(), IOContext.DEFAULT);
         msg(
             infoStream,
-            String.format(
-                Locale.ROOT, "OK [took %.3f sec]", nsToSec(System.nanoTime() - startOpenReaderNS)));
+            (1 + i)
+                + " of "
+                + numSegments
+                + ": name="
+                + info.info.name
+                + " maxDoc="
+                + info.info.maxDoc());
+        Status.SegmentInfoStatus segmentInfoStatus = testSegment(sis, info, infoStream);
+
+        processSegmentInfoStatusResult(result, info, segmentInfoStatus);
+      }
+    } else {
+      ByteArrayOutputStream[] outputs = new ByteArrayOutputStream[numSegments];
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      CompletableFuture<Status.SegmentInfoStatus>[] futures = new CompletableFuture[numSegments];
+
+      // checks segments concurrently
+      for (int i = 0; i < numSegments; i++) {
+        final SegmentCommitInfo info = sis.info(i);
+        updateMaxSegmentName(result, info);
+        if (onlySegments != null && !onlySegments.contains(info.info.name)) {
+          continue;
+        }
 
-        segInfoStat.openReaderPassed = true;
+        SegmentInfos finalSis = sis;
 
-        long startIntegrityNS = System.nanoTime();
-        if (infoStream != null) infoStream.print("    test: check integrity.....");
-        reader.checkIntegrity();
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrintStream stream;
+        if (i > 0) {
+          // buffer the messages for segment starting from the 2nd one so that they can later be
+          // printed in order
+          stream = new PrintStream(output, true, IOUtils.UTF_8);
+        } else {
+          // optimize for first segment to print real-time

Review comment:
       I think it would be better to 1) buffer all segment's output, and 2) print each segment's full output once it is done.  This way the tiny segments which finish quickly would produce their output, and the large segments would be the long poles, finally finishing and printing theirs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org