You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2017/12/29 03:49:08 UTC

[1/5] lucene-solr:jira/solr-11702: SOLR-11702: Refactoring ZkShardTerms

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-11702 520394fa5 -> 80044bce4


SOLR-11702: Refactoring ZkShardTerms


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/54b376dd
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/54b376dd
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/54b376dd

Branch: refs/heads/jira/solr-11702
Commit: 54b376dd776471f4b43e8fd6c0c7329360d6e700
Parents: 520394f
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Dec 29 09:22:58 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Fri Dec 29 09:22:58 2017 +0700

----------------------------------------------------------------------
 .../org/apache/solr/cloud/ZkShardTerms.java     | 38 ++++++++++----------
 1 file changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/54b376dd/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 5dc3ff1..4b970a2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -90,7 +90,7 @@ public class ZkShardTerms implements AutoCloseable{
   // package private for testing, only used by tests
   Map<String, Long> getTerms() {
     synchronized (writingLock) {
-      return new HashMap<>(terms.terms);
+      return new HashMap<>(terms.values);
     }
   }
 
@@ -152,10 +152,10 @@ public class ZkShardTerms implements AutoCloseable{
   }
 
   private boolean saveTerms(Terms newTerms) throws KeeperException.NoNodeException {
-    byte[] znodeData = Utils.toJSON(newTerms.terms);
+    byte[] znodeData = Utils.toJSON(newTerms.values);
     try {
       Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
-      updateTerms(new Terms(newTerms.terms, stat.getVersion()));
+      updateTerms(new Terms(newTerms.values, stat.getVersion()));
       return true;
     } catch (KeeperException.BadVersionException e) {
       log.info("Failed to save terms, version is not match, retrying");
@@ -236,37 +236,37 @@ public class ZkShardTerms implements AutoCloseable{
   }
 
   static class Terms {
-    private final Map<String, Long> terms;
+    private final Map<String, Long> values;
     private final int version;
 
     public Terms () {
       this(new HashMap<>(), 0);
     }
 
-    public Terms(Map<String, Long> terms, int version) {
-      this.terms = terms;
+    public Terms(Map<String, Long> values, int version) {
+      this.values = values;
       this.version = version;
     }
 
     boolean canBecomeLeader(String coreNodeName) {
-      if (terms.isEmpty()) return true;
-      long maxTerm = Collections.max(terms.values());
-      return terms.getOrDefault(coreNodeName, 0L) == maxTerm;
+      if (values.isEmpty()) return true;
+      long maxTerm = Collections.max(values.values());
+      return values.getOrDefault(coreNodeName, 0L) == maxTerm;
     }
 
     Long getTerm(String coreNodeName) {
-      return terms.get(coreNodeName);
+      return values.get(coreNodeName);
     }
 
     Terms increaseTerms(String leader, Set<String> replicasInLowerTerms) {
-      if (!terms.containsKey(leader)) {
+      if (!values.containsKey(leader)) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
       }
 
       boolean changed = false;
       boolean foundReplicasInLowerTerms = false;
 
-      HashMap<String, Long> newValues = new HashMap<>(terms);
+      HashMap<String, Long> newValues = new HashMap<>(values);
       long leaderTerm = newValues.get(leader);
       for (String replica : newValues.keySet()) {
         if (replicasInLowerTerms.contains(replica)) foundReplicasInLowerTerms = true;
@@ -286,17 +286,17 @@ public class ZkShardTerms implements AutoCloseable{
     }
 
     Terms removeTerm(String coreNodeName) {
-      if (!terms.containsKey(coreNodeName)) return null;
+      if (!values.containsKey(coreNodeName)) return null;
 
-      HashMap<String, Long> newValues = new HashMap<>(terms);
+      HashMap<String, Long> newValues = new HashMap<>(values);
       newValues.remove(coreNodeName);
       return new Terms(newValues, version);
     }
 
     Terms registerTerm(String coreNodeName) {
-      if (terms.containsKey(coreNodeName)) return null;
+      if (values.containsKey(coreNodeName)) return null;
 
-      HashMap<String, Long> newValues = new HashMap<>(terms);
+      HashMap<String, Long> newValues = new HashMap<>(values);
       newValues.put(coreNodeName, 0L);
       return new Terms(newValues, version);
     }
@@ -304,13 +304,13 @@ public class ZkShardTerms implements AutoCloseable{
     Terms setEqualsToMax(String coreNodeName) {
       long maxTerm;
       try {
-        maxTerm = Collections.max(terms.values());
+        maxTerm = Collections.max(values.values());
       } catch (NoSuchElementException e){
         maxTerm = 0;
       }
-      if (terms.get(coreNodeName) == maxTerm) return null;
+      if (values.get(coreNodeName) == maxTerm) return null;
 
-      HashMap<String, Long> newValues = new HashMap<>(terms);
+      HashMap<String, Long> newValues = new HashMap<>(values);
       newValues.put(coreNodeName, maxTerm);
       return new Terms(newValues, version);
     }


[5/5] lucene-solr:jira/solr-11702: SOLR-11702: RecoveryStrategy should ping leader first

Posted by da...@apache.org.
SOLR-11702: RecoveryStrategy should ping leader first


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/80044bce
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/80044bce
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/80044bce

Branch: refs/heads/jira/solr-11702
Commit: 80044bce46a5980e14eb99a1e9b0dbe05cc9f2d9
Parents: 866a073
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Dec 29 10:48:57 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Fri Dec 29 10:48:57 2017 +0700

----------------------------------------------------------------------
 .../org/apache/solr/cloud/RecoveryStrategy.java | 51 ++++++++++++++------
 1 file changed, 36 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/80044bce/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 944ae4f..2482123 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -19,6 +19,10 @@ package org.apache.solr.cloud;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -516,21 +520,18 @@ public class RecoveryStrategy implements Runnable, Closeable {
       zkController.stopReplicationFromLeader(coreName);
     }
 
+    final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
     Future<RecoveryInfo> replayFuture = null;
     while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
       try {
         CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
-        ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
-            cloudDesc.getCollectionName(), cloudDesc.getShardId());
-      
-        final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
-        final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
-
-        String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
-
-        String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+        final Replica leader = checkConnectionToLeader(ourUrl, cloudDesc);
+        if (isClosed()) {
+          LOG.info("RecoveryStrategy has been closed");
+          break;
+        }
 
-        boolean isLeader = leaderUrl.equals(ourUrl);
+        boolean isLeader = leader.getCoreUrl().equals(ourUrl);
         if (isLeader && !cloudDesc.isLeader()) {
           throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
         }
@@ -548,7 +549,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
         ulog.bufferUpdates();
         replayed = false;
         
-        LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
+        LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
             ourUrl);
         zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
         
@@ -567,7 +568,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           break;
         }
 
-        sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
+        sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
         
         if (isClosed()) {
           LOG.info("RecoveryStrategy has been closed");
@@ -587,11 +588,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
         // first thing we just try to sync
         if (firstTime) {
           firstTime = false; // only try sync the first time through the loop
-          LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
+          LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
           // System.out.println("Attempting to PeerSync from " + leaderUrl
           // + " i am:" + zkController.getNodeName());
           PeerSync peerSync = new PeerSync(core,
-              Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
+              Collections.singletonList(leader.getCoreUrl()), ulog.getNumRecordsToKeep(), false, false);
           peerSync.setStartingVersions(recentVersions);
           boolean syncSuccess = peerSync.sync().isSuccess();
           if (syncSuccess) {
@@ -625,7 +626,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
         try {
 
-          replicate(zkController.getNodeName(), core, leaderprops);
+          replicate(zkController.getNodeName(), core, leader);
 
           if (isClosed()) {
             LOG.info("RecoveryStrategy has been closed");
@@ -747,6 +748,26 @@ public class RecoveryStrategy implements Runnable, Closeable {
     LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
   }
 
+  private Replica checkConnectionToLeader(String ourUrl, CloudDescriptor cloudDesc) throws InterruptedException {
+    while (true) {
+      final Replica leaderReplica = zkStateReader.getLeaderRetry(
+          cloudDesc.getCollectionName(), cloudDesc.getShardId());
+      if (isClosed()) {
+        return leaderReplica;
+      }
+      if (leaderReplica.getCoreUrl().equals(ourUrl)) {
+        return leaderReplica;
+      }
+      try (Socket socket = new Socket()) {
+        URL url = new URL(leaderReplica.getBaseUrl());
+        socket.connect(new InetSocketAddress(url.getHost(), url.getPort()), 2000);
+        return leaderReplica;
+      } catch (IOException e) {
+        LOG.info("Failed to connect leader {}, try again", leaderReplica.getBaseUrl());
+      }
+    }
+  }
+
   public static Runnable testing_beforeReplayBufferingUpdates;
 
   final private Future<RecoveryInfo> replay(SolrCore core)


[4/5] lucene-solr:jira/solr-11702: SOLR-11702: Log message when core is closed

Posted by da...@apache.org.
SOLR-11702: Log message when core is closed


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/866a073d
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/866a073d
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/866a073d

Branch: refs/heads/jira/solr-11702
Commit: 866a073d9c27048e48edf8193c67968fd2353d63
Parents: d8ab1d6
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Dec 29 10:23:04 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Fri Dec 29 10:23:04 2017 +0700

----------------------------------------------------------------------
 .../src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/866a073d/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
index bbff584..59f4cc5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
@@ -44,7 +44,7 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
         solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
       }
     } catch (NullPointerException e) {
-      // Expected
+      log.info("NPE when getting coreNodeName, hence do not start recovery process");
     }
     return true;
   }


[3/5] lucene-solr:jira/solr-11702: SOLR-11702: CreateCollectionCmd should handle InterruptedException properly

Posted by da...@apache.org.
SOLR-11702: CreateCollectionCmd should handle InterruptedException properly


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/d8ab1d65
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/d8ab1d65
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/d8ab1d65

Branch: refs/heads/jira/solr-11702
Commit: d8ab1d65a57be0c827152d3c244bbb46d4849200
Parents: 7291501
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Dec 29 10:20:28 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Fri Dec 29 10:20:28 2017 +0700

----------------------------------------------------------------------
 .../src/java/org/apache/solr/cloud/CreateCollectionCmd.java     | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d8ab1d65/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index b63e91d..63b1b5a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -407,7 +407,10 @@ public class CreateCollectionCmd implements Cmd {
         }
         stateManager.removeData(termsPath, -1);
       }
-    } catch (KeeperException | InterruptedException | IOException | BadVersionException e) {
+    } catch (InterruptedException e) {
+      Thread.interrupted();
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
+    } catch (KeeperException | IOException | BadVersionException e) {
       throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
     }
     try {


[2/5] lucene-solr:jira/solr-11702: SOLR-11702: Clarify ZkShardTerms.removeTerm

Posted by da...@apache.org.
SOLR-11702: Clarify ZkShardTerms.removeTerm


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/72915011
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/72915011
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/72915011

Branch: refs/heads/jira/solr-11702
Commit: 72915011714342cff2785b4d8084922606a4b2e7
Parents: 54b376d
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Dec 29 09:27:55 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Fri Dec 29 09:27:55 2017 +0700

----------------------------------------------------------------------
 solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/72915011/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 4b970a2..deadf2a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -101,8 +101,8 @@ public class ZkShardTerms implements AutoCloseable{
   }
 
   /**
-   * Remove the coreNodeName from terms map and also remove any listeners relate to the core
-   * @return Return true if this object should be closed
+   * Remove the coreNodeName from terms map and also remove any expired listeners
+   * @return Return true if this object should not be reused
    */
   boolean removeTerm(CoreDescriptor cd) {
     int numListeners;