You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/11/27 18:55:02 UTC
svn commit: r1206785 - in
/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update:
SolrCmdDistributor.java processor/DistributedUpdateProcessor.java
Author: markrmiller
Date: Sun Nov 27 17:55:01 2011
New Revision: 1206785
URL: http://svn.apache.org/viewvc?rev=1206785&view=rev
Log:
use a List<String> instead of | delim string for shard urls now that we don't pass it or expect it as an http param
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1206785&r1=1206784&r2=1206785&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Sun Nov 27 17:55:01 2011
@@ -85,27 +85,27 @@ public class SolrCmdDistributor {
this.idField = req.getSchema().getUniqueKeyField();
}
- public void finish(String shardStr) {
+ public void finish(List<String> shards) {
// piggyback on any outstanding adds or deletes if possible.
- flushAdds(1, null, shardStr);
- flushDeletes(1, null, shardStr);
+ flushAdds(1, null, shards);
+ flushDeletes(1, null, shards);
checkResponses(true);
}
- public void distribDelete(DeleteUpdateCommand cmd, String shardStr) throws IOException {
+ public void distribDelete(DeleteUpdateCommand cmd, List<String> shards) throws IOException {
checkResponses(false);
if (cmd.id != null) {
- doDelete(cmd, shardStr);
+ doDelete(cmd, shards);
} else if (cmd.query != null) {
// TODO: query must be broadcast to all ??
- doDelete(cmd, shardStr);
+ doDelete(cmd, shards);
}
}
- public void distribAdd(AddUpdateCommand cmd, String shardStr) throws IOException {
+ public void distribAdd(AddUpdateCommand cmd, List<String> shards) throws IOException {
checkResponses(false);
@@ -116,7 +116,7 @@ public class SolrCmdDistributor {
}
// make sure any pending deletes are flushed
- flushDeletes(1, null, shardStr);
+ flushDeletes(1, null, shards);
// TODO: this is brittle
// need to make a clone since these commands may be reused
@@ -136,10 +136,10 @@ public class SolrCmdDistributor {
}
alist.add(clone);
- flushAdds(maxBufferedAddsPerServer, null, shardStr);
+ flushAdds(maxBufferedAddsPerServer, null, shards);
}
- public void distribCommit(CommitUpdateCommand cmd, String shardStr)
+ public void distribCommit(CommitUpdateCommand cmd, List<String> shards)
throws IOException {
// Wait for all outstanding repsonses to make sure that a commit
@@ -150,9 +150,9 @@ public class SolrCmdDistributor {
// piggyback on any outstanding adds or deletes if possible.
// TODO: review this
- flushAdds(1, cmd, shardStr);
+ flushAdds(1, cmd, shards);
- flushDeletes(1, cmd, shardStr);
+ flushDeletes(1, cmd, shards);
UpdateRequestExt ureq = new UpdateRequestExt();
@@ -161,7 +161,7 @@ public class SolrCmdDistributor {
}
passOnParams(ureq);
addCommit(ureq, cmd);
- submit(ureq, shardStr);
+ submit(ureq, shards);
// if (next != null && shardStr == null) next.processCommit(cmd);
@@ -185,16 +185,16 @@ public class SolrCmdDistributor {
}
}
- private void doDelete(DeleteUpdateCommand cmd, String shardStr) throws IOException {
+ private void doDelete(DeleteUpdateCommand cmd, List<String> shards) throws IOException {
- flushAdds(1, null, shardStr);
+ flushAdds(1, null, shards);
if (dlist == null) {
dlist = new ArrayList<DeleteUpdateCommand>(2);
}
dlist.add(clone(cmd));
- flushDeletes(maxBufferedDeletesPerServer, null, shardStr);
+ flushDeletes(maxBufferedDeletesPerServer, null, shards);
}
void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
@@ -204,7 +204,7 @@ public class SolrCmdDistributor {
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
}
- boolean flushAdds(int limit, CommitUpdateCommand ccmd, String shardStr) {
+ boolean flushAdds(int limit, CommitUpdateCommand ccmd, List<String> urls) {
// check for pending deletes
if (alist == null || alist.size() < limit) return false;
@@ -222,11 +222,11 @@ public class SolrCmdDistributor {
}
alist = null;
- submit(ureq, shardStr);
+ submit(ureq, urls);
return true;
}
- boolean flushDeletes(int limit, CommitUpdateCommand ccmd, String shardStr) {
+ boolean flushDeletes(int limit, CommitUpdateCommand ccmd, List<String> shards) {
// check for pending deletes
if (dlist == null || dlist.size() < limit) return false;
@@ -249,7 +249,7 @@ public class SolrCmdDistributor {
}
dlist = null;
- submit(ureq, shardStr);
+ submit(ureq, shards);
return true;
}
@@ -263,16 +263,16 @@ public class SolrCmdDistributor {
static class Request {
// TODO: we may need to look at deep cloning this?
- String shard;
+ List<String> shards;
UpdateRequestExt ureq;
NamedList<Object> ursp;
int rspCode;
Exception exception;
}
- void submit(UpdateRequestExt ureq, String shardStr) {
+ void submit(UpdateRequestExt ureq, List<String> shards) {
Request sreq = new Request();
- sreq.shard = shardStr;
+ sreq.shards = shards;
sreq.ureq = ureq;
submit(sreq);
}
@@ -282,15 +282,8 @@ public class SolrCmdDistributor {
completionService = new ExecutorCompletionService<Request>(commExecutor);
pending = new HashSet<Future<Request>>();
}
- String[] shards;
- // look to see if we should send to multiple servers
- if (sreq.shard.contains("|")) {
- shards = sreq.shard.split("\\|");
- } else {
- shards = new String[1];
- shards[0] = sreq.shard;
- }
- for (final String shard : shards) {
+
+ for (final String shard : sreq.shards) {
// TODO: when we break up shards, we might forward
// to self again - makes things simple here, but we could
// also have realized this before, done the req locally, and
@@ -300,7 +293,7 @@ public class SolrCmdDistributor {
@Override
public Request call() throws Exception {
Request clonedRequest = new Request();
- clonedRequest.shard = sreq.shard;
+ clonedRequest.shards = sreq.shards;
clonedRequest.ureq = sreq.ureq;
try {
@@ -351,7 +344,7 @@ public class SolrCmdDistributor {
// use the first exception encountered
if (rsp.getException() == null) {
Exception e = sreq.exception;
- String newMsg = "shard update error (" + sreq.shard + "):"
+ String newMsg = "shard update error (" + sreq.shards + "):"
+ e.getMessage();
if (e instanceof SolrException) {
SolrException se = (SolrException) e;
@@ -365,7 +358,7 @@ public class SolrCmdDistributor {
}
SolrException.logOnce(SolrCore.log, "shard update error ("
- + sreq.shard + ")", sreq.exception);
+ + sreq.shards + ")", sreq.exception);
}
} catch (ExecutionException e) {
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1206785&r1=1206784&r2=1206785&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Nov 27 17:55:01 2011
@@ -77,13 +77,14 @@ public class DistributedUpdateProcessor
private CharsRef scratch;
private boolean isLeader;
private boolean forwardToLeader;
- private volatile String shardStr;
private final SchemaField idField;
private final SolrCmdDistributor cmdDistrib;
private HashPartitioner hp;
+
+ private List<String> shards;
public DistributedUpdateProcessor(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) {
@@ -107,7 +108,9 @@ public class DistributedUpdateProcessor
cmdDistrib = new SolrCmdDistributor(req, rsp);
}
- private void setupRequest(int hash) {
+ private List<String> setupRequest(int hash) {
+ List<String> shards = null;
+
CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
CloudState cloudState = req.getCore().getCoreDescriptor().getCoreContainer().getZkController().getCloudState();
@@ -156,14 +159,15 @@ public class DistributedUpdateProcessor
isLeader = true;
// that means I want to forward onto my replicas...
// so get the replicas...
- shardStr = addReplicas(req, collection, shardId,
+ shards = getReplicaUrls(req, collection, shardId,
shardZkNodeName);
// mark that this req has been to the leader
params.set(SEEN_LEADER, true);
} else {
// I need to forward onto the leader...
- shardStr = leaderUrl;
+ shards = new ArrayList<String>(1);
+ shards.add(leaderUrl);
forwardToLeader = true;
}
req.setParams(params);
@@ -180,6 +184,8 @@ public class DistributedUpdateProcessor
e);
}
}
+
+ return shards;
}
private String getShard(int hash, String collection, CloudState cloudState) {
@@ -207,7 +213,7 @@ public class DistributedUpdateProcessor
public void processAdd(AddUpdateCommand cmd) throws IOException {
int hash = hash(cmd);
- setupRequest(hash);
+ shards = setupRequest(hash);
boolean dropCmd = false;
if (!forwardToLeader) {
@@ -219,8 +225,8 @@ public class DistributedUpdateProcessor
return;
}
- if (shardStr != null) {
- cmdDistrib.distribAdd(cmd, shardStr);
+ if (shards != null) {
+ cmdDistrib.distribAdd(cmd, shards);
} else {
super.processAdd(cmd);
}
@@ -320,7 +326,7 @@ public class DistributedUpdateProcessor
hash = hash(cmd);
}
- setupRequest(hash);
+ shards = setupRequest(hash);
boolean dropCmd = false;
if (!forwardToLeader) {
@@ -332,8 +338,8 @@ public class DistributedUpdateProcessor
return;
}
- if (shardStr != null) {
- cmdDistrib.distribDelete(cmd, shardStr);
+ if (shards != null) {
+ cmdDistrib.distribDelete(cmd, shards);
} else {
super.processDelete(cmd);
}
@@ -413,20 +419,21 @@ public class DistributedUpdateProcessor
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
// nocommit: make everyone commit?
- if (shardStr != null) {
- cmdDistrib.distribCommit(cmd, shardStr);
- } else {
+// if (shards != null) {
+// cmdDistrib.distribCommit(cmd, shards);
+// } else {
super.processCommit(cmd);
- }
+// }
}
@Override
public void finish() throws IOException {
- cmdDistrib.finish(shardStr);
- if (next != null && shardStr == null) next.finish();
+ cmdDistrib.finish(shards);
+ if (next != null && shards == null) next.finish();
}
- private String addReplicas(SolrQueryRequest req, String collection,
+ // TODO: currently this also includes the leader url
+ private List<String> getReplicaUrls(SolrQueryRequest req, String collection,
String shardId, String shardZkNodeName) {
CloudState cloudState = req.getCore().getCoreDescriptor()
.getCoreContainer().getZkController().getCloudState();
@@ -434,23 +441,19 @@ public class DistributedUpdateProcessor
Slice replicas = cloudState.getSlices(collection).get(shardId);
Map<String,ZkNodeProps> shardMap = replicas.getShards();
+ List<String> urls = new ArrayList<String>();
- StringBuilder replicasUrl = new StringBuilder();
for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
if (cloudState.liveNodesContain(entry.getValue().get(
ZkStateReader.NODE_NAME_PROP))) {
-
- if (replicasUrl.length() > 0) {
- replicasUrl.append("|");
- }
String replicaUrl = entry.getValue().get(ZkStateReader.URL_PROP);
- replicasUrl.append(replicaUrl);
+ urls.add(replicaUrl);
}
}
- if (replicasUrl.length() == 0) {
- throw new ZooKeeperException(ErrorCode.SERVICE_UNAVAILABLE, "No servers hosting shard " + shardId + " found");
+ if (urls.size() == 0) {
+ throw new ZooKeeperException(ErrorCode.SERVICE_UNAVAILABLE, "No available servers hosting shard " + shardId + " found");
}
- return replicasUrl.toString();
+ return urls;
}
// TODO: move this to AddUpdateCommand/DeleteUpdateCommand and cache it? And