You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/07/25 23:17:42 UTC

lucene-solr:branch_6x: reduce IW.infoStream noise when stalling happens due to too many total bytes flushing; only notifyAll in stall/unstall when it changes

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x 501d73b4b -> 30db9e72b


reduce IW.infoStream noise when stalling happens due to too many total bytes flushing; only notifyAll in stall/unstall when it changes


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/30db9e72
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/30db9e72
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/30db9e72

Branch: refs/heads/branch_6x
Commit: 30db9e72ba3a886feb997397830882b85e930d24
Parents: 501d73b
Author: Mike McCandless <mi...@apache.org>
Authored: Mon Jul 25 19:17:28 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Mon Jul 25 19:17:28 2016 -0400

----------------------------------------------------------------------
 .../apache/lucene/index/DocumentsWriter.java    | 11 --------
 .../index/DocumentsWriterFlushControl.java      | 27 ++++++++++++++------
 .../index/DocumentsWriterStallControl.java      | 26 +++++--------------
 .../index/TestDocumentsWriterStallControl.java  |  6 ++---
 4 files changed, 28 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/30db9e72/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 2429c33..2807517 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -376,9 +376,6 @@ final class DocumentsWriter implements Closeable, Accountable {
     boolean hasEvents = false;
     if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
       // Help out flushing any queued DWPTs so we can un-stall:
-      if (infoStream.isEnabled("DW")) {
-        infoStream.message("DW", "DocumentsWriter has queued dwpt; will hijack this thread to flush pending segment(s)");
-      }
       do {
         // Try pick up pending threads here if possible
         DocumentsWriterPerThread flushingDWPT;
@@ -386,17 +383,9 @@ final class DocumentsWriter implements Closeable, Accountable {
           // Don't push the delete here since the update could fail!
           hasEvents |= doFlush(flushingDWPT);
         }
-  
-        if (infoStream.isEnabled("DW") && flushControl.anyStalledThreads()) {
-          infoStream.message("DW", "WARNING DocumentsWriter has stalled threads; waiting");
-        }
         
         flushControl.waitIfStalled(); // block if stalled
       } while (flushControl.numQueuedFlushes() != 0); // still queued DWPTs try help flushing
-
-      if (infoStream.isEnabled("DW")) {
-        infoStream.message("DW", "continue indexing after helping out flushing DocumentsWriter is healthy");
-      }
     }
     return hasEvents;
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/30db9e72/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index a8c1dc3..a5b4b7c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -22,6 +22,7 @@ import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Locale;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -74,7 +75,7 @@ final class DocumentsWriterFlushControl implements Accountable {
 
   DocumentsWriterFlushControl(DocumentsWriter documentsWriter, LiveIndexWriterConfig config, BufferedUpdatesStream bufferedUpdatesStream) {
     this.infoStream = config.getInfoStream();
-    this.stallControl = new DocumentsWriterStallControl(config);
+    this.stallControl = new DocumentsWriterStallControl();
     this.perThreadPool = documentsWriter.perThreadPool;
     this.flushPolicy = documentsWriter.flushPolicy;
     this.config = config;
@@ -230,7 +231,9 @@ final class DocumentsWriterFlushControl implements Accountable {
       }
     }
   }
-  
+
+  private long stallStartNS;
+
   private boolean updateStallState() {
     
     assert Thread.holdsLock(this);
@@ -245,6 +248,20 @@ final class DocumentsWriterFlushControl implements Accountable {
     final boolean stall = (activeBytes + flushBytes) > limit &&
       activeBytes < limit &&
       !closed;
+
+    if (infoStream.isEnabled("DWFC")) {
+      if (stall != stallControl.anyStalledThreads()) {
+        if (stall) {
+          infoStream.message("DW", String.format(Locale.ROOT, "now stalling flushes: netBytes: %.1f MB flushBytes: %.1f MB fullFlush: %b",
+                                                 netBytes()/1024./1024., flushBytes()/1024./1024., fullFlush));
+          stallStartNS = System.nanoTime();
+        } else {
+          infoStream.message("DW", String.format(Locale.ROOT, "done stalling flushes for %.1f msec: netBytes: %.1f MB flushBytes: %.1f MB fullFlush: %b",
+                                                 (System.nanoTime()-stallStartNS)/1000000., netBytes()/1024./1024., flushBytes()/1024./1024., fullFlush));
+        }
+      }
+    }
+
     stallControl.updateStalled(stall);
     return stall;
   }
@@ -687,12 +704,6 @@ final class DocumentsWriterFlushControl implements Accountable {
    * checked out DWPT are available
    */
   void waitIfStalled() {
-    if (infoStream.isEnabled("DWFC")) {
-      infoStream.message("DWFC",
-          "waitIfStalled: numFlushesPending: " + flushQueue.size()
-              + " netBytes: " + netBytes() + " flushBytes: " + flushBytes()
-              + " fullFlush: " + fullFlush);
-    }
     stallControl.waitIfStalled();
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/30db9e72/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
index 84fa9af..c46e3d2 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
@@ -20,7 +20,6 @@ import java.util.IdentityHashMap;
 import java.util.Map;
 
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
-import org.apache.lucene.util.InfoStream;
 import org.apache.lucene.util.ThreadInterruptedException;
 
 /**
@@ -44,12 +43,7 @@ final class DocumentsWriterStallControl {
   private int numWaiting; // only with assert
   private boolean wasStalled; // only with assert
   private final Map<Thread, Boolean> waiting = new IdentityHashMap<>(); // only with assert
-  private final InfoStream infoStream;
 
-  DocumentsWriterStallControl(LiveIndexWriterConfig iwc) {
-    infoStream = iwc.getInfoStream();
-  }
-  
   /**
    * Update the stalled flag status. This method will set the stalled flag to
    * <code>true</code> iff the number of flushing
@@ -59,11 +53,13 @@ final class DocumentsWriterStallControl {
    * waiting on {@link #waitIfStalled()}
    */
   synchronized void updateStalled(boolean stalled) {
-    this.stalled = stalled;
-    if (stalled) {
-      wasStalled = true;
+    if (this.stalled != stalled) {
+      this.stalled = stalled;
+      if (stalled) {
+        wasStalled = true;
+      }
+      notifyAll();
     }
-    notifyAll();
   }
   
   /**
@@ -93,13 +89,7 @@ final class DocumentsWriterStallControl {
     return stalled;
   }
   
-  long stallStartNS;
-
   private void incWaiters() {
-    stallStartNS = System.nanoTime();
-    if (infoStream.isEnabled("DW") && numWaiting == 0) {
-      infoStream.message("DW", "now stalling flushes");
-    }
     numWaiting++;
     assert waiting.put(Thread.currentThread(), Boolean.TRUE) == null;
     assert numWaiting > 0;
@@ -109,10 +99,6 @@ final class DocumentsWriterStallControl {
     numWaiting--;
     assert waiting.remove(Thread.currentThread()) != null;
     assert numWaiting >= 0;
-    if (infoStream.isEnabled("DW") && numWaiting == 0) {
-      long stallEndNS = System.nanoTime();
-      infoStream.message("DW", "done stalling flushes for " + ((stallEndNS - stallStartNS)/1000000.0) + " ms");
-    }
   }
   
   synchronized boolean hasBlocked() { // for tests

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/30db9e72/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java
index 2575e61..d5e643a 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java
@@ -32,7 +32,7 @@ import org.apache.lucene.util.ThreadInterruptedException;
 public class TestDocumentsWriterStallControl extends LuceneTestCase {
   
   public void testSimpleStall() throws InterruptedException {
-    DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl(newIndexWriterConfig());
+    DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
    
     ctrl.updateStalled(false);
     Thread[] waitThreads = waitThreads(atLeast(1), ctrl);
@@ -54,7 +54,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
   }
   
   public void testRandom() throws InterruptedException {
-    final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl(newIndexWriterConfig());
+    final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
     ctrl.updateStalled(false);
     
     Thread[] stallThreads = new Thread[atLeast(3)];
@@ -95,7 +95,7 @@ public class TestDocumentsWriterStallControl extends LuceneTestCase {
   }
   
   public void testAccquireReleaseRace() throws InterruptedException {
-    final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl(newIndexWriterConfig());
+    final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
     ctrl.updateStalled(false);
     final AtomicBoolean stop = new AtomicBoolean(false);
     final AtomicBoolean checkPoint = new AtomicBoolean(true);