You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/02/08 20:06:24 UTC

lucene-solr git commit: fix some nocommits; fix one stress test failure

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/lucene-5438-nrt-replication f0f42780a -> 9afa56005


fix some nocommits; fix one stress test failure


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9afa5600
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9afa5600
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9afa5600

Branch: refs/heads/jira/lucene-5438-nrt-replication
Commit: 9afa560054aad72b8cc623b130745bc8778d1d5c
Parents: f0f4278
Author: Mike McCandless <mi...@apache.org>
Authored: Mon Feb 8 14:07:04 2016 -0500
Committer: Mike McCandless <mi...@apache.org>
Committed: Mon Feb 8 14:07:04 2016 -0500

----------------------------------------------------------------------
 .../lucene/replicator/nrt/ReplicaNode.java      | 21 ++++++--
 .../org/apache/lucene/replicator/nrt/Jobs.java  | 18 +++++--
 .../lucene/replicator/nrt/NodeProcess.java      | 24 +++------
 .../replicator/nrt/SimplePrimaryNode.java       |  1 +
 .../replicator/nrt/SimpleReplicaNode.java       |  5 +-
 .../replicator/nrt/TestNRTReplication.java      |  6 +--
 .../nrt/TestStressNRTReplication.java           | 53 +++++++++++++-------
 .../lucene/replicator/nrt/ThreadPumper.java     |  4 +-
 lucene/replicator/test.cmd                      |  4 +-
 .../lucene/store/MockDirectoryWrapper.java      |  1 +
 10 files changed, 84 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9afa5600/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
index e191caf..62827e8 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaNode.java
@@ -462,12 +462,19 @@ abstract class ReplicaNode extends Node {
 
   /** Call this to notify this replica node that a new NRT infos is available on the primary.
    *  We kick off a job (runs in the background) to copy files across, and open a new reader once that's done. */
-  public synchronized CopyJob newNRTPoint(long version) throws IOException {
+  public synchronized CopyJob newNRTPoint(long newPrimaryGen, long version) throws IOException {
 
     if (isClosed()) {
       throw new AlreadyClosedException("this replica is closed: state=" + state);
     }
 
+    // Cutover (possibly) to new primary first, so we discard any pre-copied merged segments up front, before checking for which files need
+    // copying.  While it's possible the pre-copied merged segments could still be useful to us, in the case that the new primary is either
+    // the same primary (just e.g. rebooted), or a promoted replica that had a newer NRT point than we did that included the pre-copied
+    // merged segments, it's still a bit risky to rely solely on checksum/file length to catch the difference, so we defensively discard
+    // here and re-copy in that case:
+    maybeNewPrimary(newPrimaryGen);
+
     // Caller should not "publish" us until we have finished .start():
     assert mgr != null;
 
@@ -520,9 +527,9 @@ abstract class ReplicaNode extends Node {
       return null;
     }
 
+    assert newPrimaryGen == job.getCopyState().primaryGen;
+
     Collection<String> newNRTFiles = job.getFileNames();
-    long newPrimaryGen = job.getCopyState().primaryGen;
-    maybeNewPrimary(newPrimaryGen);
 
     message("top: newNRTPoint: job files=" + newNRTFiles);
 
@@ -608,9 +615,15 @@ abstract class ReplicaNode extends Node {
   }
 
   /** Called when the primary changed */
-  protected synchronized void maybeNewPrimary(long newPrimaryGen) {
+  protected synchronized void maybeNewPrimary(long newPrimaryGen) throws IOException {
     if (newPrimaryGen != lastPrimaryGen) {
       message("top: now change lastPrimaryGen from " + lastPrimaryGen + " to " + newPrimaryGen + " pendingMergeFiles=" + pendingMergeFiles);
+
+      message("top: delete if no ref pendingMergeFiles=" + pendingMergeFiles);
+      for(String fileName : pendingMergeFiles) {
+        deleter.deleteIfNoRef(fileName);
+      }
+
       assert newPrimaryGen > lastPrimaryGen: "newPrimaryGen=" + newPrimaryGen + " vs lastPrimaryGen=" + lastPrimaryGen;
       lastPrimaryGen = newPrimaryGen;
       pendingMergeFiles.clear();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9afa5600/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java
index a0b9535..f75a027 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/Jobs.java
@@ -80,7 +80,12 @@ class Jobs extends Thread implements Closeable {
         } else {
           node.message("AlreadyClosedException during job.visit job=" + topJob + "; now cancel");
         }
-        topJob.cancel("unexpected exception in visit", t);
+        try {
+          topJob.cancel("unexpected exception in visit", t);
+        } catch (Throwable t2) {
+          node.message("ignore exception calling cancel: " + t2);
+          t2.printStackTrace(System.out);
+        }
         try {
           topJob.onceDone.run(topJob);
         } catch (Throwable t2) {
@@ -101,6 +106,7 @@ class Jobs extends Thread implements Closeable {
           topJob.onceDone.run(topJob);
         } catch (Throwable t) {
           node.message("ignore exception calling OnceDone: " + t);
+          t.printStackTrace(System.out);
         }
       }
     }
@@ -112,11 +118,17 @@ class Jobs extends Thread implements Closeable {
       while (queue.isEmpty() == false) {
         SimpleCopyJob job = (SimpleCopyJob) queue.poll();
         node.message("top: Jobs: now cancel job=" + job);
-        job.cancel("jobs closing", null);
+        try {
+          job.cancel("jobs closing", null);
+        } catch (Throwable t) {
+          node.message("ignore exception calling cancel");
+          t.printStackTrace(System.out);
+        }
         try {
           job.onceDone.run(job);
         } catch (Throwable t) {
-          node.message("ignore exception calling OnceDone: " + t);
+          node.message("ignore exception calling OnceDone");
+          t.printStackTrace(System.out);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9afa5600/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
index 9d8b764..a0bfb78 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
@@ -86,7 +86,7 @@ class NodeProcess implements Closeable {
     }
   }
 
-  public boolean commit() {
+  public boolean commit() throws IOException {
     try (Connection c = new Connection(tcpPort)) {
       c.out.writeByte(SimplePrimaryNode.CMD_COMMIT);
       c.flush();
@@ -95,36 +95,22 @@ class NodeProcess implements Closeable {
         throw new RuntimeException("commit failed");
       }
       return true;
-    } catch (Throwable t) {
-      // nocommit throw this
-      // Something wrong with this replica; skip it:
-      System.out.println("PARENT: top: hit SocketException during commit with R" + id + ": " + t + "; skipping");
-      return false;
     }
   }
 
-  public void commitAsync() {
+  public void commitAsync() throws IOException {
     try (Connection c = new Connection(tcpPort)) {
       c.out.writeByte(SimplePrimaryNode.CMD_COMMIT);
       c.flush();
-    } catch (Throwable t) {
-      // nocommit throw this
-      // Something wrong with this replica; skip it:
-      System.out.println("PARENT: top: hit SocketException during commit with R" + id + ": " + t + "; skipping");
     }
   }
 
-  public long getSearchingVersion() {
+  public long getSearchingVersion() throws IOException {
     try (Connection c = new Connection(tcpPort)) {
       c.out.writeByte(SimplePrimaryNode.CMD_GET_SEARCHING_VERSION);
       c.flush();
       c.s.shutdownOutput();
       return c.in.readVLong();
-    } catch (Throwable t) {
-      // nocommit throw this
-      // Something wrong with this replica; skip it:
-      System.out.println("PARENT: top: hit SocketException during getSearchingVersion with R" + id + "; skipping");
-      return -1L;
     }
   }
 
@@ -162,6 +148,7 @@ class NodeProcess implements Closeable {
           }
         } catch (Throwable t) {
           System.out.println("top: shutdown failed; ignoring");
+          t.printStackTrace(System.out);
         }
         try {
           p.waitFor();
@@ -178,10 +165,11 @@ class NodeProcess implements Closeable {
     }
   }
 
-  public void newNRTPoint(long version, int primaryTCPPort) throws IOException {
+  public void newNRTPoint(long version, long primaryGen, int primaryTCPPort) throws IOException {
     try (Connection c = new Connection(tcpPort)) {
       c.out.writeByte(SimpleReplicaNode.CMD_NEW_NRT_POINT);
       c.out.writeVLong(version);
+      c.out.writeVLong(primaryGen);
       c.out.writeInt(primaryTCPPort);
       c.flush();
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9afa5600/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
index 7f5634c..fe14234 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
@@ -304,6 +304,7 @@ class SimplePrimaryNode extends PrimaryNode {
           message("send NEW_NRT_POINT to R" + replicaID + " at tcpPort=" + replicaTCPPorts[i]);
           c.out.writeByte(SimpleReplicaNode.CMD_NEW_NRT_POINT);
           c.out.writeVLong(version);
+          c.out.writeVLong(primaryGen);
           c.out.writeInt(tcpPort);
           c.flush();
           // TODO: we should use multicast to broadcast files out to replicas

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9afa5600/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
index 4868338..9658ad1 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimpleReplicaNode.java
@@ -176,10 +176,11 @@ class SimpleReplicaNode extends ReplicaNode {
       case CMD_NEW_NRT_POINT:
         {
           long version = in.readVLong();
+          long newPrimaryGen = in.readVLong();
           Thread.currentThread().setName("recv-" + version);
           curPrimaryTCPPort = in.readInt();
-          message("newNRTPoint primaryTCPPort=" + curPrimaryTCPPort);
-          newNRTPoint(version);
+          message("newNRTPoint primaryTCPPort=" + curPrimaryTCPPort + " version=" + version + " newPrimaryGen=" + newPrimaryGen);
+          newNRTPoint(newPrimaryGen, version);
         }
         break;
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9afa5600/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
index 2c66994..262e68e 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
@@ -356,7 +356,7 @@ public class TestNRTReplication extends LuceneTestCase {
     assertVersionAndHits(replica, 0, 0);
 
     // Ask replica to sync:
-    replica.newNRTPoint(primaryVersion1, primary.tcpPort);
+    replica.newNRTPoint(primaryVersion1, 0, primary.tcpPort);
     waitForVersionAndHits(replica, primaryVersion1, 10);
 
     replica.close();
@@ -461,7 +461,7 @@ public class TestNRTReplication extends LuceneTestCase {
     assertVersionAndHits(replica, primaryVersion1, 10);
 
     // Now ask replica to sync:
-    replica.newNRTPoint(primaryVersion2, primary.tcpPort);
+    replica.newNRTPoint(primaryVersion2, 0, primary.tcpPort);
 
     waitForVersionAndHits(replica, primaryVersion2, 20);
 
@@ -736,7 +736,7 @@ public class TestNRTReplication extends LuceneTestCase {
     sendReplicasToPrimary(primary, replica);
 
     // Now ask replica to sync:
-    replica.newNRTPoint(primaryVersion2, primary.tcpPort);
+    replica.newNRTPoint(primaryVersion2, 0, primary.tcpPort);
 
     // Make sure it sees all docs that were indexed while it was down:
     assertVersionAndHits(primary, primaryVersion2, 110);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9afa5600/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
index a765f11..28b15f8 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestStressNRTReplication.java
@@ -60,14 +60,8 @@ import com.carrotsearch.randomizedtesting.SeedUtils;
 
 // nocommit why so many "hit SocketException during commit with R0"?
 
-// nocommit why so much time when so many nodes are down
-
-// nocommit indexing is too fast?  (xlog replay fails to finish before primary crashes itself)
-
 // nocommit why all these NodeCommunicationExcs?
 
-// nocommit the sockets are a pita on jvm crashing ...
-
 /*
   TODO
     - fangs
@@ -161,8 +155,7 @@ public class TestStressNRTReplication extends LuceneTestCase {
   static final boolean DO_BIT_FLIPS_DURING_COPY = true;
 
   /** Set to a non-null value to force exactly that many nodes; else, it's random. */
-  // nocommit
-  static final Integer NUM_NODES = 2;
+  static final Integer NUM_NODES = null;
 
   final AtomicBoolean failed = new AtomicBoolean();
 
@@ -214,9 +207,6 @@ public class TestStressNRTReplication extends LuceneTestCase {
     // Silly bootstrapping:
     versionToTransLogLocation.put(0L, 0L);
 
-    // nocommit why also 1?
-    //versionToTransLogLocation.put(1L, 0L);
-
     versionToMarker.put(0L, 0);
 
     int numNodes;
@@ -334,10 +324,15 @@ public class TestStressNRTReplication extends LuceneTestCase {
       {
         NodeProcess node = nodes[random().nextInt(nodes.length)];
         if (node != null && node.nodeIsClosing.get() == false) {
-          // TODO: if this node is primary, it means we committed a "partial" version (not exposed as an NRT point)... not sure it matters.
+          // TODO: if this node is primary, it means we committed an unpublished version (not exposed as an NRT point)... not sure it matters.
           // maybe we somehow allow IW to commit a specific sis (the one we just flushed)?
           message("top: now commit node=" + node);
-          node.commitAsync();
+          try {
+            node.commitAsync();
+          } catch (Throwable t) {
+            message("top: hit exception during commit with R" + node.id + "; skipping");
+            t.printStackTrace(System.out);
+          }
         }
       }
     }
@@ -400,7 +395,14 @@ public class TestStressNRTReplication extends LuceneTestCase {
     for (NodeProcess node : nodes) {
       if (node != null) {
         message("ask " + node + " for its current searching version");
-        long searchingVersion = node.getSearchingVersion();
+        long searchingVersion;
+        try {
+          searchingVersion = node.getSearchingVersion();
+        } catch (Throwable t) {
+          message("top: hit SocketException during getSearchingVersion with R" + node.id + "; skipping");
+          t.printStackTrace(System.out);
+          continue;
+        }
         message(node + " has searchingVersion=" + searchingVersion);
         if (searchingVersion > maxSearchingVersion) {
           maxSearchingVersion = searchingVersion;
@@ -415,8 +417,12 @@ public class TestStressNRTReplication extends LuceneTestCase {
     }
 
     message("top: promote " + replicaToPromote + " version=" + maxSearchingVersion + "; now commit");
-    if (replicaToPromote.commit() == false) {
-      message("top: commit failed; skipping primary promotion");
+    try {
+      replicaToPromote.commit();
+    } catch (Throwable t) {
+      // Something wrong with this replica; skip it:
+      message("top: hit exception during commit with R" + replicaToPromote.id + "; skipping");
+      t.printStackTrace(System.out);
       return;
     }
 
@@ -478,8 +484,9 @@ public class TestStressNRTReplication extends LuceneTestCase {
     try {
       transLog.replay(newPrimary, startTransLogLoc, nextTransLogLoc);
     } catch (IOException ioe) {
-      // nocommit what if primary node is still running here, and we failed for some other reason?
-      message("top: replay xlog failed; abort");
+      message("top: replay xlog failed; shutdown new primary");
+      ioe.printStackTrace(System.out);
+      newPrimary.shutdown();
       return;
     }
 
@@ -1182,4 +1189,14 @@ public class TestStressNRTReplication extends LuceneTestCase {
                                      Thread.currentThread().getName(),
                                      message));
   }
+
+  static void message(String message, long localStartNS) {
+    long now = System.nanoTime();
+    System.out.println(String.format(Locale.ROOT,
+                                     "%5.3fs %5.1fs:     parent [%11s] %s",
+                                     (now-Node.globalStartNS)/1000000000.,
+                                     (now-localStartNS)/1000000000.,
+                                     Thread.currentThread().getName(),
+                                     message));
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9afa5600/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java
----------------------------------------------------------------------
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java
index d74e170..73f3908 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java
@@ -31,6 +31,7 @@ class ThreadPumper {
         @Override
         public void run() {
           try {
+            long startTimeNS = System.nanoTime();
             Pattern logTimeStart = Pattern.compile("^[0-9\\.]+s .*");
             String line;
             while ((line = from.readLine()) != null) {
@@ -42,7 +43,7 @@ class ThreadPumper {
                 // Already a well-formed log output:
                 System.out.println(line);
               } else {
-                TestNRTReplication.message(line);
+                TestStressNRTReplication.message(line, startTimeNS);
               }
               if (line.contains("now force close server socket after")) {
                 nodeClosing.set(true);
@@ -60,4 +61,3 @@ class ThreadPumper {
     return t;
   }
 }
-

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9afa5600/lucene/replicator/test.cmd
----------------------------------------------------------------------
diff --git a/lucene/replicator/test.cmd b/lucene/replicator/test.cmd
index f636a61..18045ce 100644
--- a/lucene/replicator/test.cmd
+++ b/lucene/replicator/test.cmd
@@ -1,3 +1 @@
-python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir /l/logs TestStressNRTReplication -jvms 3
-
-#  -mult 4 -nightly
+python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir /l/logs TestStressNRTReplication -jvms 3 -verbose -mult 4 -nightly

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9afa5600/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
----------------------------------------------------------------------
diff --git a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
index a36d6d4..aa89209 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
@@ -852,6 +852,7 @@ public class MockDirectoryWrapper extends BaseDirectoryWrapper {
           
         // TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles
         if (assertNoUnreferencedFilesOnClose) {
+          System.out.println("MDW: now assert no unref'd files at close");
 
           // now look for unreferenced files: discount ones that we tried to delete but could not
           Set<String> allFiles = new HashSet<>(Arrays.asList(listAll()));