You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/11/27 18:55:02 UTC

svn commit: r1206785 - in /lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update: SolrCmdDistributor.java processor/DistributedUpdateProcessor.java

Author: markrmiller
Date: Sun Nov 27 17:55:01 2011
New Revision: 1206785

URL: http://svn.apache.org/viewvc?rev=1206785&view=rev
Log:
use a List<String> instead of | delim string for shard urls now that we don't pass it or expect it as an http param

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1206785&r1=1206784&r2=1206785&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Sun Nov 27 17:55:01 2011
@@ -85,27 +85,27 @@ public class SolrCmdDistributor {
     this.idField = req.getSchema().getUniqueKeyField();
   }
   
-  public void finish(String shardStr) {
+  public void finish(List<String> shards) {
 
     // piggyback on any outstanding adds or deletes if possible.
-    flushAdds(1, null, shardStr);
-    flushDeletes(1, null, shardStr);
+    flushAdds(1, null, shards);
+    flushDeletes(1, null, shards);
 
     checkResponses(true);
   }
   
-  public void distribDelete(DeleteUpdateCommand cmd, String shardStr) throws IOException {
+  public void distribDelete(DeleteUpdateCommand cmd, List<String> shards) throws IOException {
     checkResponses(false);
     
     if (cmd.id != null) {
-      doDelete(cmd, shardStr);
+      doDelete(cmd, shards);
     } else if (cmd.query != null) {
       // TODO: query must be broadcast to all ??
-      doDelete(cmd, shardStr);
+      doDelete(cmd, shards);
     }
   }
   
-  public void distribAdd(AddUpdateCommand cmd, String shardStr) throws IOException {
+  public void distribAdd(AddUpdateCommand cmd, List<String> shards) throws IOException {
     
     checkResponses(false);
     
@@ -116,7 +116,7 @@ public class SolrCmdDistributor {
     }
     
     // make sure any pending deletes are flushed
-    flushDeletes(1, null, shardStr);
+    flushDeletes(1, null, shards);
     
     // TODO: this is brittle
     // need to make a clone since these commands may be reused
@@ -136,10 +136,10 @@ public class SolrCmdDistributor {
     }
     alist.add(clone);
     
-    flushAdds(maxBufferedAddsPerServer, null, shardStr);
+    flushAdds(maxBufferedAddsPerServer, null, shards);
   }
   
-  public void distribCommit(CommitUpdateCommand cmd, String shardStr)
+  public void distribCommit(CommitUpdateCommand cmd, List<String> shards)
       throws IOException {
     
     // Wait for all outstanding repsonses to make sure that a commit
@@ -150,9 +150,9 @@ public class SolrCmdDistributor {
     
     // piggyback on any outstanding adds or deletes if possible.
     // TODO: review this
-    flushAdds(1, cmd, shardStr);
+    flushAdds(1, cmd, shards);
     
-    flushDeletes(1, cmd, shardStr);
+    flushDeletes(1, cmd, shards);
     
     UpdateRequestExt ureq = new UpdateRequestExt();
 
@@ -161,7 +161,7 @@ public class SolrCmdDistributor {
     }
     passOnParams(ureq);
     addCommit(ureq, cmd);
-    submit(ureq, shardStr);
+    submit(ureq, shards);
     
     // if (next != null && shardStr == null) next.processCommit(cmd);
     
@@ -185,16 +185,16 @@ public class SolrCmdDistributor {
     }
   }
   
-  private void doDelete(DeleteUpdateCommand cmd, String shardStr) throws IOException {
+  private void doDelete(DeleteUpdateCommand cmd, List<String> shards) throws IOException {
     
-    flushAdds(1, null, shardStr);
+    flushAdds(1, null, shards);
     
     if (dlist == null) {
       dlist = new ArrayList<DeleteUpdateCommand>(2);
     }
     dlist.add(clone(cmd));
     
-    flushDeletes(maxBufferedDeletesPerServer, null, shardStr);
+    flushDeletes(maxBufferedDeletesPerServer, null, shards);
   }
   
   void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
@@ -204,7 +204,7 @@ public class SolrCmdDistributor {
         : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
   }
   
-  boolean flushAdds(int limit, CommitUpdateCommand ccmd, String shardStr) {
+  boolean flushAdds(int limit, CommitUpdateCommand ccmd, List<String> urls) {
     // check for pending deletes
     if (alist == null || alist.size() < limit) return false;
     
@@ -222,11 +222,11 @@ public class SolrCmdDistributor {
     }
     
     alist = null;
-    submit(ureq, shardStr);
+    submit(ureq, urls);
     return true;
   }
   
-  boolean flushDeletes(int limit, CommitUpdateCommand ccmd, String shardStr) {
+  boolean flushDeletes(int limit, CommitUpdateCommand ccmd, List<String> shards) {
     // check for pending deletes
     if (dlist == null || dlist.size() < limit) return false;
     
@@ -249,7 +249,7 @@ public class SolrCmdDistributor {
     }
     
     dlist = null;
-    submit(ureq, shardStr);
+    submit(ureq, shards);
     return true;
   }
   
@@ -263,16 +263,16 @@ public class SolrCmdDistributor {
   
   static class Request {
     // TODO: we may need to look at deep cloning this?
-    String shard;
+    List<String> shards;
     UpdateRequestExt ureq;
     NamedList<Object> ursp;
     int rspCode;
     Exception exception;
   }
   
-  void submit(UpdateRequestExt ureq, String shardStr) {
+  void submit(UpdateRequestExt ureq, List<String> shards) {
     Request sreq = new Request();
-    sreq.shard = shardStr;
+    sreq.shards = shards;
     sreq.ureq = ureq;
     submit(sreq);
   }
@@ -282,15 +282,8 @@ public class SolrCmdDistributor {
       completionService = new ExecutorCompletionService<Request>(commExecutor);
       pending = new HashSet<Future<Request>>();
     }
-    String[] shards;
-    // look to see if we should send to multiple servers
-    if (sreq.shard.contains("|")) {
-      shards = sreq.shard.split("\\|");
-    } else {
-      shards = new String[1];
-      shards[0] = sreq.shard;
-    }
-    for (final String shard : shards) {
+
+    for (final String shard : sreq.shards) {
       // TODO: when we break up shards, we might forward
       // to self again - makes things simple here, but we could
       // also have realized this before, done the req locally, and
@@ -300,7 +293,7 @@ public class SolrCmdDistributor {
         @Override
         public Request call() throws Exception {
           Request clonedRequest = new Request();
-          clonedRequest.shard = sreq.shard;
+          clonedRequest.shards = sreq.shards;
           clonedRequest.ureq = sreq.ureq;
           
           try {
@@ -351,7 +344,7 @@ public class SolrCmdDistributor {
             // use the first exception encountered
             if (rsp.getException() == null) {
               Exception e = sreq.exception;
-              String newMsg = "shard update error (" + sreq.shard + "):"
+              String newMsg = "shard update error (" + sreq.shards + "):"
                   + e.getMessage();
               if (e instanceof SolrException) {
                 SolrException se = (SolrException) e;
@@ -365,7 +358,7 @@ public class SolrCmdDistributor {
             }
             
             SolrException.logOnce(SolrCore.log, "shard update error ("
-                + sreq.shard + ")", sreq.exception);
+                + sreq.shards + ")", sreq.exception);
           }
           
         } catch (ExecutionException e) {

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1206785&r1=1206784&r2=1206785&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Nov 27 17:55:01 2011
@@ -77,13 +77,14 @@ public class DistributedUpdateProcessor 
   private CharsRef scratch;
   private boolean isLeader;
   private boolean forwardToLeader;
-  private volatile String shardStr;
 
   private final SchemaField idField;
   
   private final SolrCmdDistributor cmdDistrib;
 
   private HashPartitioner hp;
+
+  private List<String> shards;
   
   public DistributedUpdateProcessor(SolrQueryRequest req,
       SolrQueryResponse rsp, UpdateRequestProcessor next) {
@@ -107,7 +108,9 @@ public class DistributedUpdateProcessor 
     cmdDistrib = new SolrCmdDistributor(req, rsp);
   }
 
-  private void setupRequest(int hash) {
+  private List<String> setupRequest(int hash) {
+    List<String> shards = null;
+    
     CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
     
     CloudState cloudState = req.getCore().getCoreDescriptor().getCoreContainer().getZkController().getCloudState();
@@ -156,14 +159,15 @@ public class DistributedUpdateProcessor 
             isLeader = true;
             // that means I want to forward onto my replicas...
             // so get the replicas...
-            shardStr = addReplicas(req, collection, shardId,
+            shards = getReplicaUrls(req, collection, shardId,
                 shardZkNodeName);
             
             // mark that this req has been to the leader
             params.set(SEEN_LEADER, true);
           } else {
             // I need to forward onto the leader...
-            shardStr = leaderUrl;
+            shards = new ArrayList<String>(1);
+            shards.add(leaderUrl);
             forwardToLeader  = true;
           }
           req.setParams(params);
@@ -180,6 +184,8 @@ public class DistributedUpdateProcessor 
             e);
       }
     }
+    
+    return shards;
   }
   
   private String getShard(int hash, String collection, CloudState cloudState) {
@@ -207,7 +213,7 @@ public class DistributedUpdateProcessor 
   public void processAdd(AddUpdateCommand cmd) throws IOException {
     int hash = hash(cmd);
     
-    setupRequest(hash);
+    shards = setupRequest(hash);
     
     boolean dropCmd = false;
     if (!forwardToLeader) {
@@ -219,8 +225,8 @@ public class DistributedUpdateProcessor 
       return;
     }
     
-    if (shardStr != null) {
-      cmdDistrib.distribAdd(cmd, shardStr);
+    if (shards != null) {
+      cmdDistrib.distribAdd(cmd, shards);
     } else {
       super.processAdd(cmd);
     }
@@ -320,7 +326,7 @@ public class DistributedUpdateProcessor 
       hash = hash(cmd);
     }
     
-    setupRequest(hash);
+    shards = setupRequest(hash);
     
     boolean dropCmd = false;
     if (!forwardToLeader) {
@@ -332,8 +338,8 @@ public class DistributedUpdateProcessor 
       return;
     }
 
-    if (shardStr != null) {
-      cmdDistrib.distribDelete(cmd, shardStr);
+    if (shards != null) {
+      cmdDistrib.distribDelete(cmd, shards);
     } else {
       super.processDelete(cmd);
     }
@@ -413,20 +419,21 @@ public class DistributedUpdateProcessor 
   @Override
   public void processCommit(CommitUpdateCommand cmd) throws IOException {
     // nocommit: make everyone commit?
-    if (shardStr != null) {
-      cmdDistrib.distribCommit(cmd, shardStr);
-    } else {
+//    if (shards != null) {
+//      cmdDistrib.distribCommit(cmd, shards);
+//    } else {
       super.processCommit(cmd);
-    }
+//    }
   }
   
   @Override
   public void finish() throws IOException {
-    cmdDistrib.finish(shardStr);
-    if (next != null && shardStr == null) next.finish();
+    cmdDistrib.finish(shards);
+    if (next != null && shards == null) next.finish();
   }
   
-  private String addReplicas(SolrQueryRequest req, String collection,
+  // TODO: currently this also includes the leader url
+  private List<String> getReplicaUrls(SolrQueryRequest req, String collection,
       String shardId, String shardZkNodeName) {
     CloudState cloudState = req.getCore().getCoreDescriptor()
         .getCoreContainer().getZkController().getCloudState();
@@ -434,23 +441,19 @@ public class DistributedUpdateProcessor 
     Slice replicas = cloudState.getSlices(collection).get(shardId);
     
     Map<String,ZkNodeProps> shardMap = replicas.getShards();
+    List<String> urls = new ArrayList<String>();
 
-    StringBuilder replicasUrl = new StringBuilder();
     for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
       if (cloudState.liveNodesContain(entry.getValue().get(
           ZkStateReader.NODE_NAME_PROP))) {
-        
-        if (replicasUrl.length() > 0) {
-          replicasUrl.append("|");
-        }
         String replicaUrl = entry.getValue().get(ZkStateReader.URL_PROP);
-        replicasUrl.append(replicaUrl);
+        urls.add(replicaUrl);
       }
     }
-    if (replicasUrl.length() == 0) {
-      throw new ZooKeeperException(ErrorCode.SERVICE_UNAVAILABLE, "No servers hosting shard " + shardId + " found");
+    if (urls.size() == 0) {
+      throw new ZooKeeperException(ErrorCode.SERVICE_UNAVAILABLE, "No available servers hosting shard " + shardId + " found");
     }
-    return replicasUrl.toString();
+    return urls;
   }
   
   // TODO: move this to AddUpdateCommand/DeleteUpdateCommand and cache it? And