You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/16 04:55:03 UTC

svn commit: r1231823 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/client/solrj/embedded/ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/admin/ core/src/java/org/apache/solr/update/ core/src/java/or...

Author: markrmiller
Date: Mon Jan 16 03:55:02 2012
New Revision: 1231823

URL: http://svn.apache.org/viewvc?rev=1231823&view=rev
Log:
initial base for higher level peer recovery and per shard distrib url cmd

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/PeerSync.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java Mon Jan 16 03:55:02 2012
@@ -95,19 +95,24 @@ public class JettySolrRunner {
       SocketConnector connector = new SocketConnector();
       connector.setPort(port);
       connector.setReuseAddress(true);
-      QueuedThreadPool threadPool = (QueuedThreadPool) connector.getThreadPool();
-      if (threadPool != null) {
-        threadPool.setMaxStopTimeMs(100);
+      if (!stopAtShutdown) {
+        QueuedThreadPool threadPool = (QueuedThreadPool) connector
+            .getThreadPool();
+        if (threadPool != null) {
+          threadPool.setMaxStopTimeMs(0);
+        }
       }
-      server.setConnectors(new Connector[] { connector });
+      server.setConnectors(new Connector[] {connector});
       server.setSessionIdManager(new HashSessionIdManager(new Random()));
     } else {
-      for (Connector connector : server.getConnectors()) {
-        if (connector instanceof SocketConnector) {
-          QueuedThreadPool threadPool = (QueuedThreadPool) ((SocketConnector) connector)
-              .getThreadPool();
-          if (threadPool != null) {
-            threadPool.setMaxStopTimeMs(100);
+      if (!stopAtShutdown) {
+        for (Connector connector : server.getConnectors()) {
+          if (connector instanceof SocketConnector) {
+            QueuedThreadPool threadPool = (QueuedThreadPool) ((SocketConnector) connector)
+                .getThreadPool();
+            if (threadPool != null) {
+              threadPool.setMaxStopTimeMs(0);
+            }
           }
         }
       }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Mon Jan 16 03:55:02 2012
@@ -1,8 +1,21 @@
 package org.apache.solr.cloud;
 
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -39,23 +52,35 @@ public abstract class ElectionContext {
     this.leaderProps = leaderProps;
   }
   
-  abstract void runLeaderProcess() throws KeeperException, InterruptedException;
+  abstract void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException;
 }
 
 final class ShardLeaderElectionContext extends ElectionContext {
   
   private final SolrZkClient zkClient;
+  private ZkStateReader zkStateReader;
+  private String shardId;
+  private String collection;
 
   public ShardLeaderElectionContext(final String shardId,
-      final String collection, final String shardZkNodeName, ZkNodeProps props, SolrZkClient zkClient) {
+      final String collection, final String shardZkNodeName, ZkNodeProps props, ZkStateReader zkStateReader) {
     super(shardZkNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/leader_elect/"
         + shardId, ZkStateReader.getShardLeadersPath(collection, shardId),
         props);
-    this.zkClient = zkClient;
+    this.zkClient = zkStateReader.getZkClient();
+    this.zkStateReader = zkStateReader;
+    this.shardId = shardId;
+    this.collection = collection;
   }
 
   @Override
-  void runLeaderProcess() throws KeeperException, InterruptedException {
+  void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException {
+    if (weAreReplacement) {
+      if (zkClient.exists(leaderPath, true)) {
+        zkClient.delete(leaderPath, -1, true);
+      }
+      syncReplicas();
+    }
     try {
       zkClient.makePath(leaderPath,
           leaderProps == null ? null : ZkStateReader.toJSON(leaderProps),
@@ -68,6 +93,72 @@ final class ShardLeaderElectionContext e
           CreateMode.EPHEMERAL, true);
     }
   }
+
+  private void syncReplicas() {
+    try {
+      // nocommit
+//      System.out.println("I am the new Leader:" + leaderPath
+//          + " - I need to request all of my replicas to go into sync mode");
+      
+      // first sync ourselves - we are the potential leader after all
+      sync(leaderProps);
+      
+      // sync everyone else
+      // TODO: we should do this in parallel
+      List<ZkCoreNodeProps> nodes = zkStateReader.getReplicaProps(collection, shardId,
+          leaderProps.get(ZkStateReader.NODE_NAME_PROP), leaderProps.get(ZkStateReader.CORE_PROP));
+      if (nodes != null) {
+        for (ZkCoreNodeProps node : nodes) {
+          try {
+            sync(node.getNodeProps());
+          } catch(Exception exception) {
+            exception.printStackTrace();
+            //nocommit
+          }
+        }
+      }
+      
+    } catch (Exception e) {
+      // nocommit
+      e.printStackTrace();
+    }
+  }
+
+  private void sync(ZkNodeProps props) throws MalformedURLException, SolrServerException,
+      IOException {
+    List<ZkCoreNodeProps> nodes = zkStateReader.getReplicaProps(collection, shardId,
+        props.get(ZkStateReader.NODE_NAME_PROP), props.get(ZkStateReader.CORE_PROP));
+    
+    if (nodes == null) {
+      // I have no replicas
+      return;
+    }
+    
+    List<String> syncWith = new ArrayList<String>();
+    for (ZkCoreNodeProps node : nodes) {
+      syncWith.add(node.getCoreUrl());
+    }
+
+    // TODO: do we first everyone register as sync phase? get the overseer to do it?
+    QueryRequest qr = new QueryRequest(params("qt", "/get", "getVersions",
+        Integer.toString(1000), "sync",
+        StrUtils.join(Arrays.asList(syncWith), ',')));
+    CommonsHttpSolrServer server = null;
+    
+    server = new CommonsHttpSolrServer(ZkCoreNodeProps.getCoreUrl(
+        props.get(ZkStateReader.BASE_URL_PROP),
+        props.get(ZkStateReader.CORE_PROP)));
+    
+    NamedList rsp = server.request(qr);
+  }
+  
+  public static ModifiableSolrParams params(String... params) {
+    ModifiableSolrParams msp = new ModifiableSolrParams();
+    for (int i=0; i<params.length; i+=2) {
+      msp.add(params[i], params[i+1]);
+    }
+    return msp;
+  }
 }
 
 final class OverseerElectionContext extends ElectionContext {
@@ -82,7 +173,7 @@ final class OverseerElectionContext exte
   }
 
   @Override
-  void runLeaderProcess() throws KeeperException, InterruptedException {
+  void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException {
     new Overseer(zkClient, stateReader);
   }
   

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Mon Jan 16 03:55:02 2012
@@ -82,7 +82,7 @@ public  class LeaderElector {
    * @throws IOException 
    * @throws UnsupportedEncodingException
    */
-  private void checkIfIamLeader(final int seq, final ElectionContext context) throws KeeperException,
+  private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement) throws KeeperException,
       InterruptedException, IOException {
     // get all other numbers...
     final String holdElectionPath = context.electionPath + ELECTION_NODE;
@@ -91,7 +91,7 @@ public  class LeaderElector {
     sortSeqs(seqs);
     List<Integer> intSeqs = getSeqs(seqs);
     if (seq <= intSeqs.get(0)) {
-      runIamLeaderProcess(context);
+      runIamLeaderProcess(context, replacement);
     } else {
       // I am not the leader - watch the node below me
       int i = 1;
@@ -115,7 +115,7 @@ public  class LeaderElector {
               public void process(WatchedEvent event) {
                 // am I the next leader?
                 try {
-                  checkIfIamLeader(seq, context);
+                  checkIfIamLeader(seq, context, true);
                 } catch (KeeperException e) {
                   log.warn("", e);
                   
@@ -134,14 +134,15 @@ public  class LeaderElector {
       } catch (KeeperException e) {
         // we couldn't set our watch - the node before us may already be down?
         // we need to check if we are the leader again
-        checkIfIamLeader(seq, context);
+        checkIfIamLeader(seq, context, true);
       }
     }
   }
 
-  protected void runIamLeaderProcess(final ElectionContext context) throws KeeperException,
+  protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException,
       InterruptedException {
-    context.runLeaderProcess();
+
+    context.runLeaderProcess(weAreReplacement);
   }
   
   /**
@@ -244,7 +245,7 @@ public  class LeaderElector {
       }
     }
     int seq = getSeq(leaderSeqPath);
-    checkIfIamLeader(seq, context);
+    checkIfIamLeader(seq, context, false);
     
     return seq;
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Mon Jan 16 03:55:02 2012
@@ -48,6 +48,7 @@ import org.apache.solr.core.CoreContaine
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.SolrCmdDistributor.Node;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -504,9 +505,12 @@ public final class ZkController {
 
     // we only put a subset of props into the leader node
     ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
-        props.get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_PROP, props.get(ZkStateReader.CORE_PROP));
+        props.get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_PROP,
+        props.get(ZkStateReader.CORE_PROP), ZkStateReader.NODE_NAME_PROP,
+        props.get(ZkStateReader.NODE_NAME_PROP));
 
-    ElectionContext context = new ShardLeaderElectionContext(shardId, collection, shardZkNodeName, leaderProps, zkClient);
+    ElectionContext context = new ShardLeaderElectionContext(shardId,
+        collection, shardZkNodeName, leaderProps, zkStateReader);
     
     leaderElector.setup(context);
     leaderElector.joinElection(context);

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Mon Jan 16 03:55:02 2012
@@ -17,16 +17,27 @@
 
 package org.apache.solr.handler.admin;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.RecoveryStrat;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
@@ -35,29 +46,27 @@ import org.apache.solr.common.params.Mod
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.core.*;
+import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.RequestHandlerBase;
-import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.util.NumberUtils;
-import org.apache.solr.util.RefCounted;
-import org.apache.solr.util.SolrPluginUtils;
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.MergeIndexesCommand;
 import org.apache.solr.update.processor.DistributedUpdateProcessor;
 import org.apache.solr.update.processor.UpdateRequestProcessor;
 import org.apache.solr.update.processor.UpdateRequestProcessorChain;
+import org.apache.solr.util.NumberUtils;
+import org.apache.solr.util.RefCounted;
+import org.apache.solr.util.SolrPluginUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.Properties;
-
 /**
  *
  * @since solr 1.3
@@ -179,6 +188,11 @@ public class CoreAdminHandler extends Re
           break;
         }
         
+        case DISTRIBURL: {
+          this.handleDistribUrlAction(req, rsp);
+          break;
+        }
+        
         default: {
           doPersist = this.handleCustomAction(req, rsp);
           break;
@@ -679,6 +693,33 @@ public class CoreAdminHandler extends Re
       }
     }
   }
+  
+  protected void handleDistribUrlAction(SolrQueryRequest req,
+      SolrQueryResponse rsp) throws IOException, InterruptedException, SolrServerException {
+    SolrParams params = req.getParams();
+    
+    SolrParams required = params.required();
+    String path = required.get("path");
+    String shard = params.get("shard");
+    String collection = required.get("collection");
+    
+    SolrCore core = req.getCore();
+    ZkController zkController = core.getCoreDescriptor().getCoreContainer()
+        .getZkController();
+    if (shard != null) {
+      List<ZkCoreNodeProps> replicas = zkController.getZkStateReader().getReplicaProps(
+          collection, shard, zkController.getNodeName(), core.getName());
+      
+      for (ZkCoreNodeProps node : replicas) {
+        CommonsHttpSolrServer server = new CommonsHttpSolrServer(node.getCoreUrl() + path);
+        QueryRequest qr = new QueryRequest();
+        server.request(qr);
+      }
+
+    }
+    
+    // nocommit: no shard?
+  }
 
   protected NamedList<Object> getCoreStatus(CoreContainer cores, String cname) throws IOException {
     NamedList<Object> info = new SimpleOrderedMap<Object>();
@@ -719,6 +760,13 @@ public class CoreAdminHandler extends Re
     return path;
   }
 
+  public static ModifiableSolrParams params(String... params) {
+    ModifiableSolrParams msp = new ModifiableSolrParams();
+    for (int i=0; i<params.length; i+=2) {
+      msp.add(params[i], params[i+1]);
+    }
+    return msp;
+  }
 
   //////////////////////// SolrInfoMBeans methods //////////////////////
 

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/PeerSync.java Mon Jan 16 03:55:02 2012
@@ -153,11 +153,12 @@ public class PeerSync  {
     sreq.actualShards = sreq.shards;
     sreq.params = new ModifiableSolrParams();
     sreq.params.set("qt","/get");
+    sreq.params.set("distrib",false);
     sreq.params.set("getVersions",nUpdates);
     shardHandler.submit(sreq, replica, sreq.params);
   }
 
-  private boolean  handleResponse(ShardResponse srsp) {
+  private boolean handleResponse(ShardResponse srsp) {
     if (srsp.getException() != null) {
       return false;
     }
@@ -235,6 +236,7 @@ public class PeerSync  {
     sreq.purpose = 0;
     sreq.params = new ModifiableSolrParams();
     sreq.params.set("qt","/get");
+    sreq.params.set("distrib",false);
     sreq.params.set("getUpdates", StrUtils.join(toRequest, ','));
     sreq.responses.clear();  // needs to be zeroed for correct correlation to occur
 
@@ -361,6 +363,7 @@ public class PeerSync  {
       sreq.shards = new String[]{replica};
       sreq.params = new ModifiableSolrParams();
       sreq.params.set("qt","/get");
+      sreq.params.set("distrib", false);
       sreq.params.set("getVersions",nUpdates);
       shardHandler.submit(sreq, replica, sreq.params);
     }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Mon Jan 16 03:55:02 2012
@@ -400,17 +400,20 @@ public class SolrCmdDistributor {
     public abstract boolean checkRetry();
     public abstract String getCoreName();
     public abstract String getBaseUrl();
+    public abstract ZkCoreNodeProps getNodeProps();
   }
 
   public static class StdNode extends Node {
     protected String url;
     protected String baseUrl;
     protected String coreName;
+    private ZkCoreNodeProps nodeProps;
 
     public StdNode(ZkCoreNodeProps nodeProps) {
       this.url = nodeProps.getCoreUrl();
       this.baseUrl = nodeProps.getBaseUrl();
       this.coreName = nodeProps.getCoreName();
+      this.nodeProps = nodeProps;
     }
     
     @Override
@@ -465,5 +468,9 @@ public class SolrCmdDistributor {
       } else if (!url.equals(other.url)) return false;
       return true;
     }
+
+    public ZkCoreNodeProps getNodeProps() {
+      return nodeProps;
+    }
   }
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Jan 16 03:55:02 2012
@@ -152,11 +152,10 @@ public class DistributedUpdateProcessor 
         ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps(
             collection, shardId));
         
-        String leaderNodeName = leaderProps.getNodeName();
-        
-        String nodeName = zkController.getNodeName();
-        
-        isLeader = nodeName.equals(leaderNodeName);
+        String leaderNodeName = leaderProps.getCoreNodeName();
+        String coreName = req.getCore().getName();
+        String coreNodeName = zkController.getNodeName() + "_" + coreName;
+        isLeader = coreNodeName.equals(leaderNodeName);
         
         if (req.getParams().getBool(SEEN_LEADER, false)) {
           // we are coming from the leader, just go local - add no urls
@@ -165,7 +164,16 @@ public class DistributedUpdateProcessor 
           // that means I want to forward onto my replicas...
           // so get the replicas...
           forwardToLeader = false;
-          nodes = getReplicaNodes(req, collection, shardId, nodeName);
+          List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
+              .getReplicaProps(collection, shardId, zkController.getNodeName(),
+                  coreName);
+          if (replicaProps != null) {
+            nodes = new ArrayList<Node>(replicaProps.size());
+            for (ZkCoreNodeProps props : replicaProps) {
+              nodes.add(new StdNode(props));
+            }
+          }
+          
         } else {
           // I need to forward onto the leader...
           nodes = new ArrayList<Node>(1);
@@ -682,37 +690,7 @@ public class DistributedUpdateProcessor 
     if (next != null && nodes == null) next.finish();
   }
  
-  private List<Node> getReplicaNodes(SolrQueryRequest req, String collection,
-      String shardId, String thisNodeName) {
-    CloudState cloudState = req.getCore().getCoreDescriptor()
-        .getCoreContainer().getZkController().getCloudState();
 
-    Map<String,Slice> slices = cloudState.getSlices(collection);
-    if (slices == null) {
-      throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find collection in zk: " + cloudState);
-    }
-    
-    Slice replicas = slices.get(shardId);
-    if (replicas == null) {
-      throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find shardId in zk: " + shardId);
-    }
-    
-    Map<String,ZkNodeProps> shardMap = replicas.getShards();
-    List<Node> nodes = new ArrayList<Node>(shardMap.size());
-
-    for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
-      ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
-      String nodeName = nodeProps.getNodeName();
-      if (cloudState.liveNodesContain(nodeName) && !nodeName.equals(thisNodeName)) {
-        nodes.add(new StdNode(nodeProps));
-      }
-    }
-    if (nodes.size() == 0) {
-      // no replicas - go local
-      return null;
-    }
-    return nodes;
-  }
   
   private List<Node> getReplicaUrls(SolrQueryRequest req, String collection, String shardZkNodeName) {
     CloudState cloudState = req.getCore().getCoreDescriptor()

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java Mon Jan 16 03:55:02 2012
@@ -19,6 +19,7 @@ package org.apache.solr.cloud;
 
 import java.net.BindException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -63,12 +64,18 @@ public class ChaosMonkey {
   private boolean expireSessions;
   private boolean causeConnectionLoss;
   private boolean killLeaders;
+  private Map<String,SolrServer> shardToLeaderClient;
+  private Map<String,CloudJettyRunner> shardToLeaderJetty;
   
   public ChaosMonkey(ZkTestServer zkServer, ZkStateReader zkStateReader,
       String collection, Map<String,List<CloudJettyRunner>> shardToJetty,
-      Map<String,List<SolrServer>> shardToClient, Random random) {
+      Map<String,List<SolrServer>> shardToClient,
+      Map<String,SolrServer> shardToLeaderClient,
+      Map<String,CloudJettyRunner> shardToLeaderJetty, Random random) {
     this.shardToJetty = shardToJetty;
     this.shardToClient = shardToClient;
+    this.shardToLeaderClient = shardToLeaderClient;
+    this.shardToLeaderJetty = shardToLeaderJetty;
     this.zkServer = zkServer;
     this.zkStateReader = zkStateReader;
     this.collection = collection;
@@ -129,23 +136,17 @@ public class ChaosMonkey {
   }
 
   public void stopJetty(JettySolrRunner jetty) throws Exception {
-    if (jetty.isRunning()) {
-      stops.incrementAndGet();
-    }
-    
     stop(jetty);
+    stops.incrementAndGet();
   }
 
   public void killJetty(JettySolrRunner jetty) throws Exception {
-    if (jetty.isRunning()) {
-      stops.incrementAndGet();
-    }
-    
     kill(jetty);
-
+    stops.incrementAndGet();
   }
   
   public static void stop(JettySolrRunner jetty) throws Exception {
+    
     // get a clean shutdown so that no dirs are left open...
     FilterHolder fh = jetty.getDispatchFilter();
     if (fh != null) {
@@ -154,10 +155,7 @@ public class ChaosMonkey {
         sdf.destroy();
       }
     }
-    
-    if (!jetty.isStopped()) {
-      jetty.stop();
-    }
+    jetty.stop();
     
     if (!jetty.isStopped()) {
       throw new RuntimeException("could not stop jetty");
@@ -165,9 +163,9 @@ public class ChaosMonkey {
   }
   
   public static void kill(JettySolrRunner jetty) throws Exception {
-    if (!jetty.isStopped()) {
-      jetty.stop();
-    }
+
+    jetty.stop();
+    
     
     FilterHolder fh = jetty.getDispatchFilter();
     if (fh != null) {
@@ -291,18 +289,31 @@ public class ChaosMonkey {
       return null;
     }
     
-    // get random shard
-    List<CloudJettyRunner> jetties = shardToJetty.get(slice);
-    int index = random.nextInt(jetties.size() - 1);
-    JettySolrRunner jetty = jetties.get(index).jetty;
-    
-    ZkNodeProps leader = zkStateReader.getLeaderProps(collection, slice);
-    
-    if (!killLeader && leader.get(ZkStateReader.NODE_NAME_PROP).equals(jetties.get(index).nodeName)) {
-      // we don't kill leaders...
-      return null;
+    int chance = random.nextInt(10);
+    JettySolrRunner jetty;
+    if (chance <= 8 && killLeader) {
+      // if killLeader, really aggressively go after leaders
+      Collection<CloudJettyRunner> leaders = shardToLeaderJetty.values();
+      List<CloudJettyRunner> leadersList = new ArrayList<CloudJettyRunner>(leaders.size());
+     
+      leadersList.addAll(leaders);
+
+      int index = random.nextInt(leadersList.size());
+      jetty = leadersList.get(index).jetty;
+    } else {
+      // get random shard
+      List<CloudJettyRunner> jetties = shardToJetty.get(slice);
+      int index = random.nextInt(jetties.size());
+      jetty = jetties.get(index).jetty;
+      
+      ZkNodeProps leader = zkStateReader.getLeaderProps(collection, slice);
+      boolean isLeader = leader.get(ZkStateReader.NODE_NAME_PROP).equals(jetties.get(index).nodeName);
+      if (!killLeader && isLeader) {
+        // we don't kill leaders...
+        return null;
+      }
     }
-    
+ 
     return jetty;
   }
   
@@ -333,8 +344,8 @@ public class ChaosMonkey {
       public void run() {
         while (!stop) {
           try {
-            Thread.sleep(500);
-            
+            Thread.sleep(300);
+ 
             if (random.nextBoolean()) {
              if (!deadPool.isEmpty()) {
                int index = random.nextInt(deadPool.size());

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java Mon Jan 16 03:55:02 2012
@@ -18,6 +18,7 @@ package org.apache.solr.cloud;
  */
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -25,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 import org.apache.solr.client.solrj.impl.StreamingUpdateSolrServer;
 import org.apache.solr.common.SolrInputDocument;
@@ -35,7 +37,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 
-@Ignore
 public class ChaosMonkeyNothingIsSafeTest extends FullSolrCloudTest {
   
   @BeforeClass
@@ -59,6 +60,7 @@ public class ChaosMonkeyNothingIsSafeTes
     ignoreException("org\\.mortbay\\.jetty\\.EofException");
     ignoreException("java\\.lang\\.InterruptedException");
     ignoreException("java\\.nio\\.channels\\.ClosedByInterruptException");
+    ignoreException("Failure to open existing log file \\(non fatal\\)");
     
     
     // sometimes we cannot get the same port
@@ -70,14 +72,15 @@ public class ChaosMonkeyNothingIsSafeTes
   @Override
   @After
   public void tearDown() throws Exception {
+    printLayout();
     super.tearDown();
     resetExceptionIgnores();
   }
   
   public ChaosMonkeyNothingIsSafeTest() {
     super();
-    shardCount = atLeast(9);
-    sliceCount = atLeast(3);
+    shardCount = atLeast(2);
+    sliceCount = 2;
   }
   
   @Override
@@ -101,7 +104,7 @@ public class ChaosMonkeyNothingIsSafeTes
     }
     
     FullThrottleStopableIndexingThread ftIndexThread = new FullThrottleStopableIndexingThread(
-        ((CommonsHttpSolrServer) clients.get(0)).getBaseURL(), i * 50000, true);
+        clients, i * 50000, true);
     threads.add(ftIndexThread);
     ftIndexThread.start();
     
@@ -130,7 +133,14 @@ public class ChaosMonkeyNothingIsSafeTes
     
     // wait until there are no recoveries...
     waitForThingsToLevelOut();
+    
+    // make sure we again have leaders for each shard
+    for (int j = 1; j < sliceCount; j++) {
+      zkStateReader.getLeaderProps(DEFAULT_COLLECTION, "shard" + j, 10000);
+    }
 
+    commit();
+    
     checkShardConsistency(false, true);
     
     // ensure we have added more than 0 docs
@@ -181,13 +191,15 @@ public class ChaosMonkeyNothingIsSafeTes
 
   class FullThrottleStopableIndexingThread extends StopableIndexingThread {
     private volatile boolean stop = false;
-
-    private StreamingUpdateSolrServer suss;  
+    int clientIndex = 0;
+    private StreamingUpdateSolrServer suss;
+    private List<SolrServer> clients;  
     
-    public FullThrottleStopableIndexingThread(String serverUrl, int startI, boolean doDeletes) throws MalformedURLException {
+    public FullThrottleStopableIndexingThread(List<SolrServer> clients, int startI, boolean doDeletes) throws MalformedURLException {
       super(startI, doDeletes);
       setDaemon(true);
-      suss = new StreamingUpdateSolrServer(serverUrl, 10, 3);
+      this.clients = clients;
+      suss = new StreamingUpdateSolrServer(((CommonsHttpSolrServer) clients.get(0)).getBaseURL(), 10, 3);
     }
     
     @Override
@@ -205,6 +217,7 @@ public class ChaosMonkeyNothingIsSafeTes
             numDeletes++;
             suss.deleteById(Integer.toString(delete));
           } catch (Exception e) {
+            changeUrlOnError(e);
             System.err.println("REQUEST FAILED:");
             e.printStackTrace();
             fails.incrementAndGet();
@@ -213,10 +226,18 @@ public class ChaosMonkeyNothingIsSafeTes
         
         try {
           numAdds++;
-          SolrInputDocument doc = getDoc(id, i, i1, 50, tlong, 50, t1,
-              "to come to the aid of their country.");
+          SolrInputDocument doc = getDoc(
+              id,
+              i,
+              i1,
+              50,
+              tlong,
+              50,
+              t1,
+              "Saxon heptarchies that used to rip around so in old times and raise Cain.  My, you ought to seen old Henry the Eight when he was in bloom.  He WAS a blossom.  He used to marry a new wife every day, and chop off her head next morning.  And he would do it just as indifferent as if ");
           suss.add(doc);
         } catch (Exception e) {
+          changeUrlOnError(e);
           System.err.println("REQUEST FAILED:");
           e.printStackTrace();
           fails.incrementAndGet();
@@ -228,7 +249,21 @@ public class ChaosMonkeyNothingIsSafeTes
         
       }
       
-      System.err.println("added docs:" + numAdds + " with " + fails + " fails" + " deletes:" + numDeletes);
+      System.err.println("FT added docs:" + numAdds + " with " + fails + " fails" + " deletes:" + numDeletes);
+    }
+
+    private void changeUrlOnError(Exception e) {
+      if (e instanceof ConnectException) {
+        clientIndex++;
+        if (clientIndex > clients.size() - 1) {
+          clientIndex = 0;
+        }
+        try {
+          suss = new StreamingUpdateSolrServer(((CommonsHttpSolrServer) clients.get(clientIndex)).getBaseURL(), 30, 3);
+        } catch (MalformedURLException e1) {
+          e1.printStackTrace();
+        }
+      }
     }
     
     public void safeStop() {

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Mon Jan 16 03:55:02 2012
@@ -94,8 +94,9 @@ public class FullSolrCloudTest extends A
   protected ChaosMonkey chaosMonkey;
   protected volatile ZkStateReader zkStateReader;
 
+  private Map<String,SolrServer> shardToLeaderClient = new HashMap<String,SolrServer>();
+  private Map<String,CloudJettyRunner> shardToLeaderJetty = new HashMap<String,CloudJettyRunner>();
 
-  
   class CloudJettyRunner {
     JettySolrRunner jetty;
     String nodeName;
@@ -179,9 +180,11 @@ public class FullSolrCloudTest extends A
         zkStateReader.createClusterStateWatchersAndUpdate();
       }
       
-      chaosMonkey = new ChaosMonkey(zkServer, zkStateReader, DEFAULT_COLLECTION, shardToJetty, shardToClient, random);
+      chaosMonkey = new ChaosMonkey(zkServer, zkStateReader,
+          DEFAULT_COLLECTION, shardToJetty, shardToClient, shardToLeaderClient, shardToLeaderJetty,
+          random);
     }
-    
+
     // wait until shards have started registering...
     while(!zkStateReader.getCloudState().getCollections().contains(DEFAULT_COLLECTION)) {
       Thread.sleep(500);
@@ -294,6 +297,7 @@ public class FullSolrCloudTest extends A
             CloudSolrServerClient csc = new CloudSolrServerClient();
             csc.client = client;
             csc.shardName = shard.getValue().get(ZkStateReader.NODE_NAME_PROP);
+            boolean isLeader = shard.getValue().containsKey(ZkStateReader.LEADER_PROP);
             clientToInfo.put(csc, shard.getValue());
             List<SolrServer> list = shardToClient.get(slice.getKey());
             if (list == null) {
@@ -301,6 +305,10 @@ public class FullSolrCloudTest extends A
               shardToClient.put(slice.getKey(), list);
             }
             list.add(client);
+            
+            if (isLeader) {
+              shardToLeaderClient.put(slice.getKey(), client);
+            }
           }
         }
       }
@@ -324,11 +332,15 @@ public class FullSolrCloudTest extends A
               list = new ArrayList<CloudJettyRunner>();
               shardToJetty.put(slice.getKey(), list);
             }
+            boolean isLeader = shard.getValue().containsKey(ZkStateReader.LEADER_PROP);
             CloudJettyRunner cjr = new CloudJettyRunner();
             cjr.jetty = jetty;
             cjr.nodeName = shard.getValue().get(ZkStateReader.NODE_NAME_PROP);
             cjr.shardName = shard.getKey();
             list.add(cjr);
+            if (isLeader) {
+              shardToLeaderJetty.put(shard.getKey(), cjr);
+            }
           }
         }
       }

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java Mon Jan 16 03:55:02 2012
@@ -44,7 +44,7 @@ public class LeaderElectionTest extends 
   static final int TIMEOUT = 30000;
   private ZkTestServer server;
   private SolrZkClient zkClient;
-  
+  ZkStateReader zkStateReader;
   private Map<Integer,Thread> seqToThread;
   
   private volatile boolean stopStress = false;
@@ -72,6 +72,7 @@ public class LeaderElectionTest extends 
     AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
     AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
     zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+    zkStateReader = new ZkStateReader(zkClient);
     seqToThread = Collections.synchronizedMap(new HashMap<Integer,Thread>());
   }
   
@@ -82,10 +83,12 @@ public class LeaderElectionTest extends 
     private volatile boolean stop;
     private volatile boolean electionDone = false;
     private final ZkNodeProps props;
+
     
     public ClientThread(int nodeNumber) throws Exception {
       super("Thread-" + nodeNumber);
       zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+    
       this.nodeNumber = nodeNumber;
       props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, Integer.toString(nodeNumber), ZkStateReader.CORE_PROP, "");
     }
@@ -96,7 +99,7 @@ public class LeaderElectionTest extends 
         LeaderElector elector = new LeaderElector(zkClient);
         
         ElectionContext context = new ShardLeaderElectionContext("shard1",
-            "collection1", Integer.toString(nodeNumber), props, zkClient);
+            "collection1", Integer.toString(nodeNumber), props, zkStateReader);
         
         try {
           elector.setup(context);
@@ -139,7 +142,7 @@ public class LeaderElectionTest extends 
   public void testBasic() throws Exception {
     LeaderElector elector = new LeaderElector(zkClient);
     ZkNodeProps props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr/", ZkStateReader.CORE_PROP, "");
-    ElectionContext context = new ShardLeaderElectionContext("shard2", "collection1", "dummynode1", props, zkClient);
+    ElectionContext context = new ShardLeaderElectionContext("shard2", "collection1", "dummynode1", props, zkStateReader);
     elector.setup(context);
     elector.joinElection(context);
     assertEquals("http://127.0.0.1/solr/", getLeaderUrl("collection1", "shard2"));
@@ -198,9 +201,12 @@ public class LeaderElectionTest extends 
     ((ClientThread) seqToThread.get(1)).close();
     ((ClientThread) seqToThread.get(3)).close();
     
-    leaderThread = getLeaderThread();
-    
     // whoever the leader is, should be the n_2 seq
+    
+    // nocommit
+    Thread.sleep(1000);
+    
+    leaderThread = getLeaderThread();
     assertEquals(2, threads.get(leaderThread).seq);
     
     // kill n_5, 2, 6, 7, and 8
@@ -210,6 +216,8 @@ public class LeaderElectionTest extends 
     ((ClientThread) seqToThread.get(7)).close();
     ((ClientThread) seqToThread.get(8)).close();
     
+    // nocommit
+    Thread.sleep(1000);
     leaderThread = getLeaderThread();
     
     // whoever the leader is, should be the n_9 seq
@@ -343,11 +351,8 @@ public class LeaderElectionTest extends 
 
     printLayout(server.getZkAddress());
     
-    
-    System.out.println("leader thread:" + getLeaderThread());
+
     int seq = threads.get(getLeaderThread()).getSeq();
-    System.out.println("Seq:" + seq);
-    System.out.println("Node:" + threads.get(getLeaderThread()).getNodeNumber());
     
     assertFalse("seq is -1 and we may have a zombie leader", seq == -1);
     

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java Mon Jan 16 03:55:02 2012
@@ -62,5 +62,13 @@ public class ZkCoreNodeProps {
     return nodeProps.toString();
   }
 
+  public String getCoreNodeName() {
+    return getNodeName() + "_" + getCoreName();
+  }
+
+  public ZkNodeProps getNodeProps() {
+    return nodeProps;
+  }
+
 
 }

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Mon Jan 16 03:55:02 2012
@@ -18,9 +18,12 @@ package org.apache.solr.common.cloud;
  */
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -36,6 +39,7 @@ import org.apache.noggit.JSONParser;
 import org.apache.noggit.JSONWriter;
 import org.apache.noggit.ObjectBuilder;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -408,4 +412,44 @@ public class ZkStateReader {
         : "");
   }
   
+  public List<ZkCoreNodeProps> getReplicaProps(String collection,
+      String shardId, String thisNodeName, String coreName) {
+    CloudState cloudState = this.cloudState;
+    if (cloudState == null) {
+      return null;
+    }
+    Map<String,Slice> slices = cloudState.getSlices(collection);
+    if (slices == null) {
+      throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
+          "Could not find collection in zk: " + collection + " "
+              + cloudState.getCollections());
+    }
+    
+    Slice replicas = slices.get(shardId);
+    if (replicas == null) {
+      throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find shardId in zk: " + shardId);
+    }
+    
+    Map<String,ZkNodeProps> shardMap = replicas.getShards();
+    List<ZkCoreNodeProps> nodes = new ArrayList<ZkCoreNodeProps>(shardMap.size());
+
+    for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
+      ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
+      String coreNodeName = nodeProps.getNodeName() + "_" + coreName;
+      if (cloudState.liveNodesContain(thisNodeName) && !coreNodeName.equals(thisNodeName + "_" + coreName)) {
+        nodes.add(nodeProps);
+      }
+    }
+    if (nodes.size() == 0) {
+      // no replicas - go local
+      return null;
+    }
+
+    return nodes;
+  }
+
+  public SolrZkClient getZkClient() {
+    return zkClient;
+  }
+  
 }

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java?rev=1231823&r1=1231822&r2=1231823&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java Mon Jan 16 03:55:02 2012
@@ -88,7 +88,8 @@ public interface CoreAdminParams 
     ALIAS,
     MERGEINDEXES,
     PREPRECOVERY, 
-    REQUESTRECOVERY;
+    REQUESTRECOVERY, 
+    DISTRIBURL;
     
     public static CoreAdminAction get( String p )
     {