You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2013/01/04 01:01:16 UTC

svn commit: r1428677 - in /lucene/dev/trunk/solr: CHANGES.txt core/src/java/org/apache/solr/update/PeerSync.java core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java

Author: yonik
Date: Fri Jan  4 00:01:16 2013
New Revision: 1428677

URL: http://svn.apache.org/viewvc?rev=1428677&view=rev
Log:
SOLR-4257: PeerSync updates and Log Replay updates should not wait for ZK connection

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1428677&r1=1428676&r2=1428677&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Fri Jan  4 00:01:16 2013
@@ -435,6 +435,9 @@ Bug Fixes
 * SOLR-4253: Misleading resource loading warning from Carrot2 clustering 
   component fixed (Stanisław Osiński)
 
+* SOLR-4257: PeerSync updates and Log Replay updates should not wait for
+  a ZooKeeper connection in order to proceed. (yonik)
+
 
 Other Changes
 ----------------------

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1428677&r1=1428676&r2=1428677&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java Fri Jan  4 00:01:16 2013
@@ -443,10 +443,11 @@ public class PeerSync  {
 
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(DISTRIB_UPDATE_PARAM, FROMLEADER.toString());
-    // params.set("peersync",true); // debugging
+    params.set("peersync",true); // debugging
     SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
     SolrQueryResponse rsp = new SolrQueryResponse();
 
+    // TODO: use the standard update processor chain now that it has support to skip processors before the DistributedUpdateProcessor?
     RunUpdateProcessorFactory runFac = new RunUpdateProcessorFactory();
     DistributedUpdateProcessorFactory magicFac = new DistributedUpdateProcessorFactory();
     runFac.init(new NamedList());

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1428677&r1=1428676&r2=1428677&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Fri Jan  4 00:01:16 2013
@@ -140,6 +140,8 @@ public class DistributedUpdateProcessor 
 
   private int numNodes;
 
+  private UpdateCommand updateCommand;  // the current command this processor is working on.
+
   
   public DistributedUpdateProcessor(SolrQueryRequest req,
       SolrQueryResponse rsp, UpdateRequestProcessor next) {
@@ -184,6 +186,12 @@ public class DistributedUpdateProcessor 
     // if we are in zk mode...
     if (zkEnabled) {
 
+      if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
+        isLeader = false;     // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
+        forwardToLeader = false;
+        return nodes;
+      }
+
       String coreName = req.getCore().getName();
       String coreNodeName = zkController.getNodeName() + "_" + coreName;
 
@@ -272,10 +280,12 @@ public class DistributedUpdateProcessor 
 
 
   private void doDefensiveChecks(String shardId, DistribPhase phase) {
+    boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
+    if (isReplayOrPeersync) return;
+
     String from = req.getParams().get("distrib.from");
-    boolean logReplay = req.getParams().getBool(LOG_REPLAY, false);
     boolean localIsLeader = req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader();
-    if (!logReplay && DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
+    if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
       log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
       throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
     }
@@ -326,6 +336,8 @@ public class DistributedUpdateProcessor 
 
   @Override
   public void processAdd(AddUpdateCommand cmd) throws IOException {
+    updateCommand = cmd;
+
     if (zkEnabled) {
       zkCheck();
       nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument());
@@ -493,8 +505,8 @@ public class DistributedUpdateProcessor 
       }
     }
 
-    boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
-    boolean leaderLogic = isLeader && !isReplay;
+    boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
+    boolean leaderLogic = isLeader && !isReplayOrPeersync;
 
 
     VersionBucket bucket = vinfo.bucket(bucketHash);
@@ -690,6 +702,8 @@ public class DistributedUpdateProcessor 
   
   @Override
   public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+    updateCommand = cmd;
+
     if (!cmd.isDeleteById()) {
       doDeleteByQuery(cmd);
       return;
@@ -848,8 +862,8 @@ public class DistributedUpdateProcessor 
     }
     versionOnUpdate = Math.abs(versionOnUpdate);  // normalize to positive version
 
-    boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
-    boolean leaderLogic = isLeader && !isReplay;
+    boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
+    boolean leaderLogic = isLeader && !isReplayOrPeersync;
 
     if (!leaderLogic && versionOnUpdate==0) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
@@ -912,6 +926,11 @@ public class DistributedUpdateProcessor 
 
 
   private void zkCheck() {
+    if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
+      // for log reply or peer sync, we don't need to be connected to ZK
+      return;
+    }
+
     if (zkController.isConnected()) {
       return;
     }
@@ -955,8 +974,8 @@ public class DistributedUpdateProcessor 
     long signedVersionOnUpdate = versionOnUpdate;
     versionOnUpdate = Math.abs(versionOnUpdate);  // normalize to positive version
 
-    boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
-    boolean leaderLogic = isLeader && !isReplay;
+    boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
+    boolean leaderLogic = isLeader && !isReplayOrPeersync;
 
     if (!leaderLogic && versionOnUpdate==0) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
@@ -1026,6 +1045,8 @@ public class DistributedUpdateProcessor 
 
   @Override
   public void processCommit(CommitUpdateCommand cmd) throws IOException {
+    updateCommand = cmd;
+
     if (zkEnabled) {
       zkCheck();
     }