You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2013/01/04 01:01:16 UTC
svn commit: r1428677 - in /lucene/dev/trunk/solr: CHANGES.txt
core/src/java/org/apache/solr/update/PeerSync.java
core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
Author: yonik
Date: Fri Jan 4 00:01:16 2013
New Revision: 1428677
URL: http://svn.apache.org/viewvc?rev=1428677&view=rev
Log:
SOLR-4257: PeerSync updates and Log Replay updates should not wait for ZK connection
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1428677&r1=1428676&r2=1428677&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Fri Jan 4 00:01:16 2013
@@ -435,6 +435,9 @@ Bug Fixes
* SOLR-4253: Misleading resource loading warning from Carrot2 clustering
component fixed (StanisÅaw OsiÅski)
+* SOLR-4257: PeerSync updates and Log Replay updates should not wait for
+ a ZooKeeper connection in order to proceed. (yonik)
+
Other Changes
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1428677&r1=1428676&r2=1428677&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/PeerSync.java Fri Jan 4 00:01:16 2013
@@ -443,10 +443,11 @@ public class PeerSync {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(DISTRIB_UPDATE_PARAM, FROMLEADER.toString());
- // params.set("peersync",true); // debugging
+ params.set("peersync",true); // debugging
SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
SolrQueryResponse rsp = new SolrQueryResponse();
+ // TODO: use the standard update processor chain now that it has support to skip processors before the DistributedUpdateProcessor?
RunUpdateProcessorFactory runFac = new RunUpdateProcessorFactory();
DistributedUpdateProcessorFactory magicFac = new DistributedUpdateProcessorFactory();
runFac.init(new NamedList());
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1428677&r1=1428676&r2=1428677&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Fri Jan 4 00:01:16 2013
@@ -140,6 +140,8 @@ public class DistributedUpdateProcessor
private int numNodes;
+ private UpdateCommand updateCommand; // the current command this processor is working on.
+
public DistributedUpdateProcessor(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) {
@@ -184,6 +186,12 @@ public class DistributedUpdateProcessor
// if we are in zk mode...
if (zkEnabled) {
+ if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
+ isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
+ forwardToLeader = false;
+ return nodes;
+ }
+
String coreName = req.getCore().getName();
String coreNodeName = zkController.getNodeName() + "_" + coreName;
@@ -272,10 +280,12 @@ public class DistributedUpdateProcessor
private void doDefensiveChecks(String shardId, DistribPhase phase) {
+ boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
+ if (isReplayOrPeersync) return;
+
String from = req.getParams().get("distrib.from");
- boolean logReplay = req.getParams().getBool(LOG_REPLAY, false);
boolean localIsLeader = req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader();
- if (!logReplay && DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
+ if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
}
@@ -326,6 +336,8 @@ public class DistributedUpdateProcessor
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
+ updateCommand = cmd;
+
if (zkEnabled) {
zkCheck();
nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument());
@@ -493,8 +505,8 @@ public class DistributedUpdateProcessor
}
}
- boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
- boolean leaderLogic = isLeader && !isReplay;
+ boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
+ boolean leaderLogic = isLeader && !isReplayOrPeersync;
VersionBucket bucket = vinfo.bucket(bucketHash);
@@ -690,6 +702,8 @@ public class DistributedUpdateProcessor
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+ updateCommand = cmd;
+
if (!cmd.isDeleteById()) {
doDeleteByQuery(cmd);
return;
@@ -848,8 +862,8 @@ public class DistributedUpdateProcessor
}
versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
- boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
- boolean leaderLogic = isLeader && !isReplay;
+ boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
+ boolean leaderLogic = isLeader && !isReplayOrPeersync;
if (!leaderLogic && versionOnUpdate==0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
@@ -912,6 +926,11 @@ public class DistributedUpdateProcessor
private void zkCheck() {
+ if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
+ // for log reply or peer sync, we don't need to be connected to ZK
+ return;
+ }
+
if (zkController.isConnected()) {
return;
}
@@ -955,8 +974,8 @@ public class DistributedUpdateProcessor
long signedVersionOnUpdate = versionOnUpdate;
versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
- boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
- boolean leaderLogic = isLeader && !isReplay;
+ boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
+ boolean leaderLogic = isLeader && !isReplayOrPeersync;
if (!leaderLogic && versionOnUpdate==0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
@@ -1026,6 +1045,8 @@ public class DistributedUpdateProcessor
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
+ updateCommand = cmd;
+
if (zkEnabled) {
zkCheck();
}