You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/11/07 17:07:51 UTC

svn commit: r1198789 - in /lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor: DistributedUpdateProcessor.java DistributedUpdateProcessorFactory.java

Author: markrmiller
Date: Mon Nov  7 16:07:51 2011
New Revision: 1198789

URL: http://svn.apache.org/viewvc?rev=1198789&view=rev
Log:
commit some refactoring before I merge up to trunk

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1198789&r1=1198788&r2=1198789&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Nov  7 16:07:51 2011
@@ -231,10 +231,12 @@ public class DistributedUpdateProcessor 
       if (ureq.getParams() == null) {
         ureq.setParams(new ModifiableSolrParams());
       }
-      if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) {
-        ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION,
-            req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION));
+      String seenLeader = req.getParams().get(DistributedUpdateProcessorFactory.SEEN_LEADER);
+      if (seenLeader != null) {
+        ureq.getParams().add(DistributedUpdateProcessorFactory.SEEN_LEADER, seenLeader);
       }
+      
+      // nocommit: we add the right update chain - we should add the current one?
       ureq.getParams().add("update.chain", "distrib-update-chain");
       addCommit(ureq, cmd);
       submit(slot, ureq);
@@ -324,10 +326,11 @@ public class DistributedUpdateProcessor 
     if (ureq.getParams() == null) {
       ureq.setParams(new ModifiableSolrParams());
     }
-    if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) {
-      ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION,
-          req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION));
+    if (req.getParams().get(DistributedUpdateProcessorFactory.SEEN_LEADER) != null) {
+      ureq.getParams().add(DistributedUpdateProcessorFactory.SEEN_LEADER,
+          req.getParams().get(DistributedUpdateProcessorFactory.SEEN_LEADER));
     }
+    // nocommit: we add the right update chain - we should add the current one?
     ureq.getParams().add("update.chain", "distrib-update-chain");
     addCommit(ureq, ccmd);
     
@@ -350,10 +353,13 @@ public class DistributedUpdateProcessor 
     if (ureq.getParams() == null) {
       ureq.setParams(new ModifiableSolrParams());
     }
-    if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) {
-      ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION,
-          req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION));
+
+    String seenLeader = req.getParams().get(DistributedUpdateProcessorFactory.SEEN_LEADER);
+    if (seenLeader != null) {
+      ureq.getParams().add(DistributedUpdateProcessorFactory.SEEN_LEADER, seenLeader);
     }
+    
+    // nocommit: we add the right update chain - we should add the current one?
     ureq.getParams().add("update.chain", "distrib-update-chain");
     addCommit(ureq, ccmd);
     for (DeleteUpdateCommand cmd : dlist) {

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java?rev=1198789&r1=1198788&r2=1198789&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java Mon Nov  7 16:07:51 2011
@@ -39,7 +39,7 @@ import org.apache.zookeeper.KeeperExcept
 
 public class DistributedUpdateProcessorFactory extends
     UpdateRequestProcessorFactory {
-  public static final String DOCVERSION = "docversion";
+  public static final String SEEN_LEADER = "leader";
   NamedList args;
   List<String> shards;
   String selfStr;
@@ -88,55 +88,42 @@ public class DistributedUpdateProcessorF
       List<String> leaderChildren;
       String collection = coreDesc.getCloudDescriptor().getCollectionName();
       String shardId = coreDesc.getCloudDescriptor().getShardId();
+      
       ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+      
       String leaderNode = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
           + ZkStateReader.LEADER_ELECT_ZKNODE + "/" + shardId + "/leader";
       SolrZkClient zkClient = coreDesc.getCoreContainer().getZkController()
           .getZkClient();
+      
       try {
         leaderChildren = zkClient.getChildren(leaderNode, null);
         if (leaderChildren.size() > 0) {
           String leader = leaderChildren.get(0);
+          
           ZkNodeProps zkNodeProps = new ZkNodeProps();
           byte[] bytes = zkClient
               .getData(leaderNode + "/" + leader, null, null);
           zkNodeProps.load(bytes);
+          
           String leaderUrl = zkNodeProps.get("url");
           
           String nodeName = req.getCore().getCoreDescriptor()
               .getCoreContainer().getZkController().getNodeName();
           String shardZkNodeName = nodeName + "_" + req.getCore().getName();
 
-          if (params.get(DOCVERSION) != null
-              && params.get(DOCVERSION).equals("yes")) {
+          if (params.getBool(SEEN_LEADER, false)) {
             // we got a version, just go local
           } else if (shardZkNodeName.equals(leader)) {
             // that means I want to forward onto my replicas...
             
             // so get the replicas...
-            CloudState cloudState = req.getCore().getCoreDescriptor()
-                .getCoreContainer().getZkController().getCloudState();
-            Slice replicas = cloudState.getSlices(collection).get(shardId);
-            Map<String,ZkNodeProps> shardMap = replicas.getShards();
-            String self = null;
-            StringBuilder replicasUrl = new StringBuilder();
-            for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
-              if (replicasUrl.length() > 0) {
-                replicasUrl.append("|");
-              }
-              String replicaUrl = entry.getValue().get("url");
-              if (shardZkNodeName.equals(entry.getKey())) {
-                self = replicaUrl;
-              }
-              replicasUrl.append(replicaUrl);
-            }
+            addReplicasAndSelf(req, collection, shardId, params,
+                shardZkNodeName);
+            
             versionDoc(params);
-            params.add("self", self);
-            params.add("shards", replicasUrl.toString());
           } else {
             // I need to forward onto the leader...
-            // TODO: don't use leader - we need to get the real URL from the zk
-            // node
             params.add("shards", leaderUrl);
           }
           req.setParams(params);
@@ -158,8 +145,31 @@ public class DistributedUpdateProcessorF
     if (shards == null && shardStr == null) return null;
     return new DistributedUpdateProcessor(shardStr, req, rsp, this, next);
   }
+
+  private void addReplicasAndSelf(SolrQueryRequest req, String collection,
+      String shardId, ModifiableSolrParams params, String shardZkNodeName) {
+    CloudState cloudState = req.getCore().getCoreDescriptor()
+        .getCoreContainer().getZkController().getCloudState();
+    Slice replicas = cloudState.getSlices(collection).get(shardId);
+    Map<String,ZkNodeProps> shardMap = replicas.getShards();
+    String self = null;
+    StringBuilder replicasUrl = new StringBuilder();
+    for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
+      if (replicasUrl.length() > 0) {
+        replicasUrl.append("|");
+      }
+      String replicaUrl = entry.getValue().get("url");
+      if (shardZkNodeName.equals(entry.getKey())) {
+        self = replicaUrl;
+      }
+      replicasUrl.append(replicaUrl);
+    }
+
+    params.add("self", self);
+    params.add("shards", replicasUrl.toString());
+  }
   
   private void versionDoc(ModifiableSolrParams params) {
-    params.set(DOCVERSION, "yes");
+    params.set(SEEN_LEADER, true);
   }
 }