You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/11/07 17:07:51 UTC
svn commit: r1198789 - in
/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor:
DistributedUpdateProcessor.java DistributedUpdateProcessorFactory.java
Author: markrmiller
Date: Mon Nov 7 16:07:51 2011
New Revision: 1198789
URL: http://svn.apache.org/viewvc?rev=1198789&view=rev
Log:
commit some refactoring before I merge up to trunk
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1198789&r1=1198788&r2=1198789&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Nov 7 16:07:51 2011
@@ -231,10 +231,12 @@ public class DistributedUpdateProcessor
if (ureq.getParams() == null) {
ureq.setParams(new ModifiableSolrParams());
}
- if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) {
- ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION,
- req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION));
+ String seenLeader = req.getParams().get(DistributedUpdateProcessorFactory.SEEN_LEADER);
+ if (seenLeader != null) {
+ ureq.getParams().add(DistributedUpdateProcessorFactory.SEEN_LEADER, seenLeader);
}
+
+ // nocommit: we add the right update chain - we should add the current one?
ureq.getParams().add("update.chain", "distrib-update-chain");
addCommit(ureq, cmd);
submit(slot, ureq);
@@ -324,10 +326,11 @@ public class DistributedUpdateProcessor
if (ureq.getParams() == null) {
ureq.setParams(new ModifiableSolrParams());
}
- if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) {
- ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION,
- req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION));
+ if (req.getParams().get(DistributedUpdateProcessorFactory.SEEN_LEADER) != null) {
+ ureq.getParams().add(DistributedUpdateProcessorFactory.SEEN_LEADER,
+ req.getParams().get(DistributedUpdateProcessorFactory.SEEN_LEADER));
}
+ // nocommit: we add the right update chain - we should add the current one?
ureq.getParams().add("update.chain", "distrib-update-chain");
addCommit(ureq, ccmd);
@@ -350,10 +353,13 @@ public class DistributedUpdateProcessor
if (ureq.getParams() == null) {
ureq.setParams(new ModifiableSolrParams());
}
- if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) {
- ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION,
- req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION));
+
+ String seenLeader = req.getParams().get(DistributedUpdateProcessorFactory.SEEN_LEADER);
+ if (seenLeader != null) {
+ ureq.getParams().add(DistributedUpdateProcessorFactory.SEEN_LEADER, seenLeader);
}
+
+ // nocommit: we add the right update chain - we should add the current one?
ureq.getParams().add("update.chain", "distrib-update-chain");
addCommit(ureq, ccmd);
for (DeleteUpdateCommand cmd : dlist) {
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java?rev=1198789&r1=1198788&r2=1198789&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java Mon Nov 7 16:07:51 2011
@@ -39,7 +39,7 @@ import org.apache.zookeeper.KeeperExcept
public class DistributedUpdateProcessorFactory extends
UpdateRequestProcessorFactory {
- public static final String DOCVERSION = "docversion";
+ public static final String SEEN_LEADER = "leader";
NamedList args;
List<String> shards;
String selfStr;
@@ -88,55 +88,42 @@ public class DistributedUpdateProcessorF
List<String> leaderChildren;
String collection = coreDesc.getCloudDescriptor().getCollectionName();
String shardId = coreDesc.getCloudDescriptor().getShardId();
+
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+
String leaderNode = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
+ ZkStateReader.LEADER_ELECT_ZKNODE + "/" + shardId + "/leader";
SolrZkClient zkClient = coreDesc.getCoreContainer().getZkController()
.getZkClient();
+
try {
leaderChildren = zkClient.getChildren(leaderNode, null);
if (leaderChildren.size() > 0) {
String leader = leaderChildren.get(0);
+
ZkNodeProps zkNodeProps = new ZkNodeProps();
byte[] bytes = zkClient
.getData(leaderNode + "/" + leader, null, null);
zkNodeProps.load(bytes);
+
String leaderUrl = zkNodeProps.get("url");
String nodeName = req.getCore().getCoreDescriptor()
.getCoreContainer().getZkController().getNodeName();
String shardZkNodeName = nodeName + "_" + req.getCore().getName();
- if (params.get(DOCVERSION) != null
- && params.get(DOCVERSION).equals("yes")) {
+ if (params.getBool(SEEN_LEADER, false)) {
// we got a version, just go local
} else if (shardZkNodeName.equals(leader)) {
// that means I want to forward onto my replicas...
// so get the replicas...
- CloudState cloudState = req.getCore().getCoreDescriptor()
- .getCoreContainer().getZkController().getCloudState();
- Slice replicas = cloudState.getSlices(collection).get(shardId);
- Map<String,ZkNodeProps> shardMap = replicas.getShards();
- String self = null;
- StringBuilder replicasUrl = new StringBuilder();
- for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
- if (replicasUrl.length() > 0) {
- replicasUrl.append("|");
- }
- String replicaUrl = entry.getValue().get("url");
- if (shardZkNodeName.equals(entry.getKey())) {
- self = replicaUrl;
- }
- replicasUrl.append(replicaUrl);
- }
+ addReplicasAndSelf(req, collection, shardId, params,
+ shardZkNodeName);
+
versionDoc(params);
- params.add("self", self);
- params.add("shards", replicasUrl.toString());
} else {
// I need to forward onto the leader...
- // TODO: don't use leader - we need to get the real URL from the zk
- // node
params.add("shards", leaderUrl);
}
req.setParams(params);
@@ -158,8 +145,31 @@ public class DistributedUpdateProcessorF
if (shards == null && shardStr == null) return null;
return new DistributedUpdateProcessor(shardStr, req, rsp, this, next);
}
+
+ private void addReplicasAndSelf(SolrQueryRequest req, String collection,
+ String shardId, ModifiableSolrParams params, String shardZkNodeName) {
+ CloudState cloudState = req.getCore().getCoreDescriptor()
+ .getCoreContainer().getZkController().getCloudState();
+ Slice replicas = cloudState.getSlices(collection).get(shardId);
+ Map<String,ZkNodeProps> shardMap = replicas.getShards();
+ String self = null;
+ StringBuilder replicasUrl = new StringBuilder();
+ for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
+ if (replicasUrl.length() > 0) {
+ replicasUrl.append("|");
+ }
+ String replicaUrl = entry.getValue().get("url");
+ if (shardZkNodeName.equals(entry.getKey())) {
+ self = replicaUrl;
+ }
+ replicasUrl.append(replicaUrl);
+ }
+
+ params.add("self", self);
+ params.add("shards", replicasUrl.toString());
+ }
private void versionDoc(ModifiableSolrParams params) {
- params.set(DOCVERSION, "yes");
+ params.set(SEEN_LEADER, true);
}
}