You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by us...@apache.org on 2012/01/29 13:18:55 UTC

svn commit: r1237259 [3/4] - in /lucene/dev/branches/lucene2858: ./ dev-tools/idea/lucene/contrib/ dev-tools/maven/solr/solrj/ lucene/ lucene/contrib/ lucene/contrib/sandbox/src/test/org/apache/lucene/sandbox/queries/regex/ lucene/src/java/org/apache/l...

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Sun Jan 29 12:18:50 2012
@@ -118,7 +118,7 @@ final class ShardLeaderElectionContext e
         }
         // should I be leader?
         if (weAreReplacement && !shouldIBeLeader(leaderProps)) {
-          System.out.println("there is a better leader candidate it appears");
+          // System.out.println("there is a better leader candidate it appears");
           rejoinLeaderElection(leaderSeqPath, core);
           return;
         }
@@ -209,14 +209,31 @@ final class OverseerElectionContext exte
   private final ZkStateReader stateReader;
 
   public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, ZkStateReader stateReader) {
-    super(zkNodeName, "/overseer_elect", null, null);
+    super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null);
     this.zkClient = zkClient;
     this.stateReader = stateReader;
   }
 
   @Override
   void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement) throws KeeperException, InterruptedException {
-    new Overseer(zkClient, stateReader);
+    
+    final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1);
+    ZkNodeProps myProps = new ZkNodeProps("id", id);
+
+    try {
+      zkClient.makePath(leaderPath,
+          ZkStateReader.toJSON(myProps),
+          CreateMode.EPHEMERAL, true);
+    } catch (NodeExistsException e) {
+      // if a previous leader ephemeral still exists for some reason, try and
+      // remove it
+      zkClient.delete(leaderPath, -1, true);
+      zkClient.makePath(leaderPath,
+          ZkStateReader.toJSON(myProps),
+          CreateMode.EPHEMERAL, true);
+    }
+  
+    new Overseer(zkClient, stateReader, id);
   }
   
 }

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Sun Jan 29 12:18:50 2012
@@ -199,7 +199,6 @@ public  class LeaderElector {
    * watch the next lowest numbered node.
    * 
    * @param context
-   * @param SolrCore - optional - sometimes null
    * @return sequential node number
    * @throws KeeperException
    * @throws InterruptedException
@@ -256,8 +255,7 @@ public  class LeaderElector {
   /**
    * Set up any ZooKeeper nodes needed for leader election.
    * 
-   * @param shardId
-   * @param collection
+   * @param context
    * @throws InterruptedException
    * @throws KeeperException
    */

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/Overseer.java Sun Jan 29 12:18:50 2012
@@ -22,9 +22,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.Set;
 
 import org.apache.solr.cloud.NodeStateWatcher.NodeStateChangeListener;
@@ -51,13 +53,32 @@ import org.slf4j.LoggerFactory;
  * Cluster leader. Responsible node assignments, cluster state file?
  */
 public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
+
+  private static final int STATE_UPDATE_DELAY = 500;  // delay between cloud state updates
+
+  static enum Op {
+    LeaderChange, StateChange; 
+  }
+
+  private final class CloudStateUpdateRequest {
+    
+    final Op operation;
+    final Object[] args;
+    
+     CloudStateUpdateRequest(final Op operation, final Object... args) {
+       this.operation = operation;
+       this.args = args;
+    }
+  }
   
   public static final String ASSIGNMENTS_NODE = "/node_assignments";
   public static final String STATES_NODE = "/node_states";
   private static Logger log = LoggerFactory.getLogger(Overseer.class);
   
   private final SolrZkClient zkClient;
-  private final ZkStateReader reader;
+  
+  // pooled updates
+  private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo = new LinkedBlockingQueue<CloudStateUpdateRequest>();
   
   // node stateWatches
   private HashMap<String,NodeStateWatcher> nodeStateWatches = new HashMap<String,NodeStateWatcher>();
@@ -66,12 +87,222 @@ public class Overseer implements NodeSta
   private HashMap<String, HashMap<String,ShardLeaderWatcher>> shardLeaderWatches = new HashMap<String,HashMap<String,ShardLeaderWatcher>>();
   private ZkCmdExecutor zkCmdExecutor;
 
-  public Overseer(final SolrZkClient zkClient, final ZkStateReader reader) throws KeeperException, InterruptedException {
-    log.info("Constructing new Overseer");
+  private static class CloudStateUpdater implements Runnable {
+    
+    private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo;
+    private final ZkStateReader reader;
+    private final SolrZkClient zkClient;
+    private final String myId;
+    
+    public CloudStateUpdater(final LinkedBlockingQueue<CloudStateUpdateRequest> fifo, final ZkStateReader reader, final SolrZkClient zkClient, final String myId) {
+      this.fifo = fifo;
+      this.myId = myId;
+      this.reader = reader;
+      this.zkClient = zkClient;
+    }
+      @Override
+      public void run() {
+        while (amILeader()) {
+          
+          
+          LinkedList<CloudStateUpdateRequest> requests = new LinkedList<Overseer.CloudStateUpdateRequest>();
+          while (!fifo.isEmpty()) {
+            // collect all queued requests
+            CloudStateUpdateRequest req;
+            req = fifo.poll();
+            if (req == null) {
+              break;
+            }
+            requests.add(req);
+          }
+
+          if (requests.size() > 0) {
+            // process updates
+            synchronized (reader.getUpdateLock()) {
+              try {
+                reader.updateCloudState(true);
+                CloudState cloudState = reader.getCloudState();
+                for (CloudStateUpdateRequest request : requests) {
+
+                  switch (request.operation) {
+                  case LeaderChange:
+                    cloudState = setShardLeader(cloudState,
+                        (String) request.args[0], (String) request.args[1],
+                        (String) request.args[2]);
+
+                    break;
+                  case StateChange:
+                    cloudState = updateState(cloudState,
+                        (String) request.args[0], (CoreState) request.args[1]);
+                    break;
+
+                  }
+                }
+
+                log.info("Announcing new cluster state");
+                zkClient.setData(ZkStateReader.CLUSTER_STATE,
+                    ZkStateReader.toJSON(cloudState), true);
+
+              } catch (KeeperException e) {
+                // XXX stop processing, exit
+                return;
+              } catch (InterruptedException e) {
+                // XXX stop processing, exit
+                return;
+              }
+            }
+          }
+
+          try {
+            Thread.sleep(STATE_UPDATE_DELAY);
+          } catch (InterruptedException e) {
+            //
+          }
+        }
+      }
+      
+      private boolean amILeader() {
+        try {
+          ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, false));
+          if(myId.equals(props.get("id"))) {
+            return true;
+          }
+        } catch (KeeperException e) {
+          // assume we're dead
+        } catch (InterruptedException e) {
+          // assume we're dead
+        }
+        log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
+        return false;
+      }
+      /**
+       * Try to assign core to the cluster
+       * @throws KeeperException 
+       * @throws InterruptedException 
+       */
+      private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
+        String collection = coreState.getCollectionName();
+        String zkCoreNodeName = coreState.getCoreNodeName();
+        
+          String shardId;
+          if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
+            shardId = AssignShard.assignShard(collection, state);
+          } else {
+            shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
+          }
+          
+          Map<String,String> props = new HashMap<String,String>();
+          for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
+            props.put(entry.getKey(), entry.getValue());
+          }
+          ZkNodeProps zkProps = new ZkNodeProps(props);
+          Slice slice = state.getSlice(collection, shardId);
+          Map<String,ZkNodeProps> shardProps;
+          if (slice == null) {
+            shardProps = new HashMap<String,ZkNodeProps>();
+          } else {
+            shardProps = state.getSlice(collection, shardId).getShardsCopy();
+          }
+          shardProps.put(zkCoreNodeName, zkProps);
+
+          slice = new Slice(shardId, shardProps);
+          CloudState newCloudState = updateSlice(state, collection, slice);
+          return newCloudState;
+      }
+      
+      private CloudState updateSlice(CloudState state, String collection, Slice slice) {
+        
+        final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
+        newStates.putAll(state.getCollectionStates());
+        
+        if (!newStates.containsKey(collection)) {
+          newStates.put(collection, new LinkedHashMap<String,Slice>());
+        }
+        
+        final Map<String, Slice> slices = newStates.get(collection);
+        if (!slices.containsKey(slice.getName())) {
+          slices.put(slice.getName(), slice);
+        } else {
+          final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
+          final Slice existingSlice = slices.get(slice.getName());
+          shards.putAll(existingSlice.getShards());
+          //XXX preserve existing leader
+          for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
+            if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
+              HashMap<String, String> newProps = new HashMap<String,String>();
+              newProps.putAll(edit.getValue().getProperties());
+              newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
+              shards.put(edit.getKey(), new ZkNodeProps(newProps));
+            } else {
+              shards.put(edit.getKey(), edit.getValue());
+            }
+          }
+          final Slice updatedSlice = new Slice(slice.getName(), shards);
+          slices.put(slice.getName(), updatedSlice);
+        }
+        return new CloudState(state.getLiveNodes(), newStates);
+      }
+      
+      private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
+        
+        boolean updated = false;
+        final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
+        newStates.putAll(state.getCollectionStates());
+        
+        final Map<String, Slice> slices = newStates.get(collection);
+
+        if(slices==null) {
+          log.error("Could not mark shard leader for non existing collection.");
+          return state;
+        }
+        
+        if (!slices.containsKey(sliceName)) {
+          log.error("Could not mark leader for non existing slice.");
+          return state;
+        } else {
+          final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
+          for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
+            Map<String, String> newShardProps = new LinkedHashMap<String,String>();
+            newShardProps.putAll(shard.getValue().getProperties());
+            
+            String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP);  //clean any previously existed flag
+
+            ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
+            if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
+              newShardProps.put(ZkStateReader.LEADER_PROP,"true");
+              if (wasLeader == null) {
+                updated = true;
+              }
+            } else {
+              if (wasLeader != null) {
+                updated = true;
+              }
+            }
+            newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
+          }
+          Slice slice = new Slice(sliceName, newShards);
+          slices.put(sliceName, slice);
+        }
+        if (updated) {
+          return new CloudState(state.getLiveNodes(), newStates);
+        } else {
+          return state;
+        }
+      }
+
+    }
+  
+  public Overseer(final SolrZkClient zkClient, final ZkStateReader reader, String id) throws KeeperException, InterruptedException {
+    log.info("Constructing new Overseer id=" + id);
     this.zkClient = zkClient;
     this.zkCmdExecutor = new ZkCmdExecutor();
-    this.reader = reader;
     createWatches();
+    
+    //launch cluster state updater thread
+    ThreadGroup tg = new ThreadGroup("Overseer delayed state updater");
+    Thread updaterThread = new Thread(tg, new CloudStateUpdater(fifo, reader, zkClient, id));
+    updaterThread.setDaemon(true);
+    updaterThread.start();
   }
   
   public synchronized void createWatches()
@@ -267,41 +498,6 @@ public class Overseer implements NodeSta
     }
   }
   
-  /**
-   * Try to assign core to the cluster
-   * @throws KeeperException 
-   * @throws InterruptedException 
-   */
-  private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
-    String collection = coreState.getCollectionName();
-    String zkCoreNodeName = coreState.getCoreNodeName();
-    
-      String shardId;
-      if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
-        shardId = AssignShard.assignShard(collection, state);
-      } else {
-        shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
-      }
-      
-      Map<String,String> props = new HashMap<String,String>();
-      for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
-        props.put(entry.getKey(), entry.getValue());
-      }
-      ZkNodeProps zkProps = new ZkNodeProps(props);
-      Slice slice = state.getSlice(collection, shardId);
-      Map<String,ZkNodeProps> shardProps;
-      if (slice == null) {
-        shardProps = new HashMap<String,ZkNodeProps>();
-      } else {
-        shardProps = state.getSlice(collection, shardId).getShardsCopy();
-      }
-      shardProps.put(zkCoreNodeName, zkProps);
-
-      slice = new Slice(shardId, shardProps);
-      CloudState newCloudState = updateSlice(state, collection, slice);
-      return newCloudState;
-  }
-  
   private Set<String> complement(Collection<String> next,
       Collection<String> prev) {
     Set<String> downCollections = new HashSet<String>();
@@ -311,23 +507,11 @@ public class Overseer implements NodeSta
   }
 
   @Override
-  public void coreChanged(final String nodeName, final Set<CoreState> states) throws KeeperException, InterruptedException  {
-    log.debug("Cores changed: " + nodeName + " states:" + states);
-    synchronized(reader.getUpdateLock()) {
-      reader.updateCloudState(true);
-      CloudState cloudState = reader.getCloudState();
-      for (CoreState state : states) {
-        cloudState = updateState(cloudState, nodeName, state);
-      }
-
-      try {
-        zkClient.setData(ZkStateReader.CLUSTER_STATE,
-            ZkStateReader.toJSON(cloudState), true);  
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            "Interrupted while publishing new state", e);
-      }
+  public void coreChanged(final String nodeName, final Set<CoreState> states)
+      throws KeeperException, InterruptedException {
+    log.info("Core change pooled: " + nodeName + " states:" + states);
+    for (CoreState state : states) {
+      fifo.add(new CloudStateUpdateRequest(Op.StateChange, nodeName, state));
     }
   }
   
@@ -340,111 +524,11 @@ public class Overseer implements NodeSta
     ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
     zkCmdExecutor.ensureExists(node, zkClient);
   }
-  
-  private CloudState updateSlice(CloudState state, String collection, Slice slice) {
-    
-    final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
-    newStates.putAll(state.getCollectionStates());
-    
-    if (!newStates.containsKey(collection)) {
-      newStates.put(collection, new LinkedHashMap<String,Slice>());
-    }
-    
-    final Map<String, Slice> slices = newStates.get(collection);
-    if (!slices.containsKey(slice.getName())) {
-      slices.put(slice.getName(), slice);
-    } else {
-      final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
-      final Slice existingSlice = slices.get(slice.getName());
-      shards.putAll(existingSlice.getShards());
-      //XXX preserve existing leader
-      for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
-        if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
-          HashMap<String, String> newProps = new HashMap<String,String>();
-          newProps.putAll(edit.getValue().getProperties());
-          newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
-          shards.put(edit.getKey(), new ZkNodeProps(newProps));
-        } else {
-          shards.put(edit.getKey(), edit.getValue());
-        }
-      }
-      final Slice updatedSlice = new Slice(slice.getName(), shards);
-      slices.put(slice.getName(), updatedSlice);
-    }
-    return new CloudState(state.getLiveNodes(), newStates);
-  }
-
-  private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
-    
-    boolean updated = false;
-    final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
-    newStates.putAll(state.getCollectionStates());
-    
-    final Map<String, Slice> slices = newStates.get(collection);
-
-    if(slices==null) {
-      log.error("Could not mark shard leader for non existing collection.");
-      return state;
-    }
-    
-    if (!slices.containsKey(sliceName)) {
-      log.error("Could not mark leader for non existing slice.");
-      return state;
-    } else {
-      final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
-      for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
-        Map<String, String> newShardProps = new LinkedHashMap<String,String>();
-        newShardProps.putAll(shard.getValue().getProperties());
-        
-        String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP);  //clean any previously existed flag
-
-        ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
-        if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
-          newShardProps.put(ZkStateReader.LEADER_PROP,"true");
-          if (wasLeader == null) {
-            updated = true;
-          }
-        } else {
-          if (wasLeader != null) {
-            updated = true;
-          }
-        }
-        newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
-      }
-      Slice slice = new Slice(sliceName, newShards);
-      slices.put(sliceName, slice);
-    }
-    if (updated) {
-      return new CloudState(state.getLiveNodes(), newStates);
-    } else {
-      return state;
-    }
-  }
 
   @Override
   public void announceLeader(String collection, String shardId, ZkCoreNodeProps props) {
-    synchronized (reader.getUpdateLock()) {
-      try {
-        reader.updateCloudState(true); // get fresh copy of the state
-      final CloudState state = reader.getCloudState();
-      final CloudState newState = setShardLeader(state, collection, shardId,
-          props.getCoreUrl());
-        if (state != newState) { // if same instance was returned no need to
-                                 // update state
-          log.info("Announcing new leader: coll: " + collection + " shard: " + shardId + " props:" + props);
-          zkClient.setData(ZkStateReader.CLUSTER_STATE,
-              ZkStateReader.toJSON(newState), true);
-          
-        } else {
-          log.debug("State was not changed.");
-        }
-      } catch (KeeperException e) {
-        log.warn("Could not announce new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        log.warn("Could not promote new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
-      }
-    }
+    log.info("Leader change pooled.");
+    fifo.add(new CloudStateUpdateRequest(Op.LeaderChange, collection, shardId, props.getCoreUrl()));
   }
   
 }
\ No newline at end of file

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Sun Jan 29 12:18:50 2012
@@ -20,10 +20,8 @@ package org.apache.solr.cloud;
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
@@ -38,10 +36,8 @@ import org.apache.solr.core.RequestHandl
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.ReplicationHandler;
 import org.apache.solr.request.SolrRequestHandler;
-import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateLog.RecoveryInfo;
-import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +63,7 @@ public class RecoveryStrategy extends Th
   public RecoveryStrategy(SolrCore core) {
     this.core = core;
     this.coreName = core.getName();
-    
+    setName("RecoveryThread");
     zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
     zkStateReader = zkController.getZkStateReader();
     baseUrl = zkController.getBaseUrl();

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Sun Jan 29 12:18:50 2012
@@ -46,7 +46,8 @@ public class SyncStrategy {
   
   public boolean sync(ZkController zkController, SolrCore core,
       ZkNodeProps leaderProps) {
-    zkController.publish(core, ZkStateReader.SYNC);
+    // TODO: look at our state usage of sync
+    // zkController.publish(core, ZkStateReader.SYNC);
     
     // solrcloud_debug
     // System.out.println("SYNC UP");

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sun Jan 29 12:18:50 2012
@@ -130,16 +130,15 @@ public final class ZkController {
     zkServer.stop();
   }
 
-
   /**
-   * @param coreContainer if null, recovery will not be enabled
+   * @param cc if null, recovery will not be enabled
    * @param zkServerAddress
    * @param zkClientTimeout
    * @param zkClientConnectTimeout
    * @param localHost
    * @param locaHostPort
    * @param localHostContext
-   * @param numShards 
+   * @param registerOnReconnect
    * @throws InterruptedException
    * @throws TimeoutException
    * @throws IOException
@@ -437,13 +436,14 @@ public final class ZkController {
   }
 
 
+
   /**
    * Register shard with ZooKeeper.
    * 
    * @param coreName
-   * @param cloudDesc
-   * @return
-   * @throws Exception 
+   * @param desc
+   * @return the shardId for the SolrCore
+   * @throws Exception
    */
   public String register(String coreName, final CoreDescriptor desc) throws Exception {  
     return register(coreName, desc, false);
@@ -456,7 +456,7 @@ public final class ZkController {
    * @param coreName
    * @param desc
    * @param recoverReloadedCores
-   * @return
+   * @return the shardId for the SolrCore
    * @throws Exception
    */
   public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores) throws Exception {  
@@ -508,9 +508,33 @@ public final class ZkController {
       try {
         core = cc.getCore(desc.getName());
 
-        boolean startRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
+        if (isLeader) {
+          // recover from local transaction log and wait for it to complete before
+          // going active
+          // TODO: should this be moved to another thread? To recoveryStrat?
+          // TODO: should this actually be done earlier, before (or as part of)
+          // leader election perhaps?
+          // TODO: ensure that a replica that is trying to recover waits until I'm
+          // active (or don't make me the
+          // leader until my local replay is done. But this replay is only needed
+          // on the leader - replicas
+          // will do recovery anyway
+          
+          UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+          if (!core.isReloaded() && ulog != null) {
+            Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
+                .getUpdateLog().recoverFromLog();
+            if (recoveryFuture != null) {
+              recoveryFuture.get(); // NOTE: this could potentially block for
+                                    // minutes or more!
+              // TODO: public as recovering in the mean time?
+            }
+          }
+        }
+        
+        boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
             collection, coreZkNodeName, shardId, leaderProps, core, cc);
-        if (!startRecovery) {
+        if (!didRecovery) {
           publishAsActive(baseUrl, desc, coreZkNodeName, coreName);
         }
       } finally {
@@ -539,6 +563,24 @@ public final class ZkController {
   }
 
 
+  /**
+   * @param coreName
+   * @param desc
+   * @param recoverReloadedCores
+   * @param isLeader
+   * @param cloudDesc
+   * @param collection
+   * @param shardZkNodeName
+   * @param shardId
+   * @param leaderProps
+   * @param core
+   * @param cc
+   * @return whether or not a recovery was started
+   * @throws InterruptedException
+   * @throws KeeperException
+   * @throws IOException
+   * @throws ExecutionException
+   */
   private boolean checkRecovery(String coreName, final CoreDescriptor desc,
       boolean recoverReloadedCores, final boolean isLeader,
       final CloudDescriptor cloudDesc, final String collection,
@@ -546,46 +588,18 @@ public final class ZkController {
       SolrCore core, CoreContainer cc) throws InterruptedException,
       KeeperException, IOException, ExecutionException {
 
-    
     boolean doRecovery = true;
-
-
-    if (isLeader) {
-      doRecovery = false;
-      
-      // recover from local transaction log and wait for it to complete before
-      // going active
-      // TODO: should this be moved to another thread? To recoveryStrat?
-      // TODO: should this actually be done earlier, before (or as part of)
-      // leader election perhaps?
-      // TODO: ensure that a replica that is trying to recover waits until I'm
-      // active (or don't make me the
-      // leader until my local replay is done. But this replay is only needed
-      // on the leader - replicas
-      // will do recovery anyway
-      
-      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-      if (!core.isReloaded() && ulog != null) {
-        Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
-            .getUpdateLog().recoverFromLog();
-        if (recoveryFuture != null) {
-          recoveryFuture.get(); // NOTE: this could potentially block for
-                                // minutes or more!
-          // TODO: public as recovering in the mean time?
-        }
-      }
-      return false;
-    } else {
+    if (!isLeader) {
       
       if (core.isReloaded() && !recoverReloadedCores) {
         doRecovery = false;
       }
-    }
-    
-    if (doRecovery && !SKIP_AUTO_RECOVERY) {
-      log.info("Core needs to recover:" + core.getName());
-      core.getUpdateHandler().getSolrCoreState().doRecovery(core);
-      return true;
+      
+      if (doRecovery && !SKIP_AUTO_RECOVERY) {
+        log.info("Core needs to recover:" + core.getName());
+        core.getUpdateHandler().getSolrCoreState().doRecovery(core);
+        return true;
+      }
     }
     
     return false;

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/CoreContainer.java Sun Jan 29 12:18:50 2012
@@ -433,9 +433,9 @@ public class CoreContainer 
       try {
         for (SolrCore core : cores.values()) {
           try {
-            if (!core.isClosed()) {
-              core.close();
-            }
+             core.close();
+             // make sure we wait for any recoveries to stop
+             core.getUpdateHandler().getSolrCoreState().cancelRecovery();
           } catch (Throwable t) {
             SolrException.log(log, "Error shutting down core", t);
           }
@@ -491,6 +491,9 @@ public class CoreContainer 
     
     SolrCore old = null;
     synchronized (cores) {
+      if (isShutDown) {
+        throw new IllegalStateException("This CoreContainer has been shutdown");
+      }
       old = cores.put(name, core);
       /*
       * set both the name of the descriptor and the name of the

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/core/SolrCore.java Sun Jan 29 12:18:50 2012
@@ -732,8 +732,17 @@ public final class SolrCore implements S
       if (!searcherExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
         log.error("Timeout waiting for searchExecutor to terminate");
       }
+    } catch (InterruptedException e) {
+      searcherExecutor.shutdownNow();
+      try {
+        if (!searcherExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+          log.error("Timeout waiting for searchExecutor to terminate");
+        }
+      } catch (InterruptedException e2) {
+        SolrException.log(log, e2);
+      }
     } catch (Exception e) {
-      SolrException.log(log,e);
+      SolrException.log(log, e);
     }
     try {
       // Since we waited for the searcherExecutor to shut down,
@@ -744,7 +753,7 @@ public final class SolrCore implements S
       // then the searchExecutor will throw an exception when getSearcher()
       // tries to use it, and the exception handling code should close it.
       closeSearcher();
-    } catch (Exception e) {
+    } catch (Throwable e) {
       SolrException.log(log,e);
     }
 
@@ -1053,14 +1062,13 @@ public final class SolrCore implements S
 
     openSearcherLock.lock();
     try {
-      String newIndexDir = null;
+      String newIndexDir = getNewIndexDir();
       File indexDirFile = null;
       File newIndexDirFile = null;
 
       // if it's not a normal near-realtime update, check that paths haven't changed.
       if (!nrt) {
         indexDirFile = new File(getIndexDir()).getCanonicalFile();
-        newIndexDir = getNewIndexDir();
         newIndexDirFile = new File(newIndexDir).getCanonicalFile();
       }
 

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Sun Jan 29 12:18:50 2012
@@ -288,7 +288,7 @@ public class SnapPuller {
         return true;
       }
       
-      if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
+      if (!force && commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
         //master and slave are already in sync just return
         LOG.info("Slave in sync with master.");
         successfulInstall = true;

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/request/UnInvertedField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/request/UnInvertedField.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/request/UnInvertedField.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/request/UnInvertedField.java Sun Jan 29 12:18:50 2012
@@ -175,7 +175,7 @@ public class UnInvertedField extends Doc
     final String prefix = TrieField.getMainValuePrefix(searcher.getSchema().getFieldType(field));
     this.searcher = searcher;
     try {
-      uninvert(searcher.getIndexReader(), prefix == null ? null : new BytesRef(prefix));
+      uninvert(new SlowMultiReaderWrapper(searcher.getIndexReader()), prefix == null ? null : new BytesRef(prefix));
     } catch (IllegalStateException ise) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, ise.getMessage());
     }
@@ -227,7 +227,7 @@ public class UnInvertedField extends Doc
       int startTerm = 0;
       int endTerm = numTermsInField;  // one past the end
 
-      TermsEnum te = getOrdTermsEnum(searcher.getIndexReader());
+      TermsEnum te = getOrdTermsEnum(new SlowMultiReaderWrapper(searcher.getIndexReader()));
       if (prefix != null && prefix.length() > 0) {
         final BytesRef prefixBr = new BytesRef(prefix);
         if (te.seekCeil(prefixBr, true) == TermsEnum.SeekStatus.END) {
@@ -497,7 +497,7 @@ public class UnInvertedField extends Doc
     final int[] index = this.index;
     final int[] counts = new int[numTermsInField];//keep track of the number of times we see each word in the field for all the documents in the docset
 
-    TermsEnum te = getOrdTermsEnum(searcher.getIndexReader());
+    TermsEnum te = getOrdTermsEnum(new SlowMultiReaderWrapper(searcher.getIndexReader()));
 
     boolean doNegative = false;
     if (finfo.length == 0) {

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java Sun Jan 29 12:18:50 2012
@@ -23,8 +23,10 @@ import java.util.*;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.util.Base64;
 import org.apache.solr.common.util.FastWriter;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.request.SolrQueryRequest;
@@ -130,20 +132,29 @@ public abstract class TextResponseWriter
       else {
         writeStr(name, f.stringValue(), true);
       }
-    } else if (val instanceof Integer) {
-      writeInt(name, val.toString());
+    } else if (val instanceof Number) {
+      if (val instanceof Integer) {
+        writeInt(name, val.toString());
+      } else if (val instanceof Long) {
+        writeLong(name, val.toString());
+      } else if (val instanceof Float) {
+        // we pass the float instead of using toString() because
+        // it may need special formatting. same for double.
+        writeFloat(name, ((Float)val).floatValue());
+      } else if (val instanceof Double) {
+        writeDouble(name, ((Double)val).doubleValue());        
+      } else if (val instanceof Short) {
+        writeInt(name, val.toString());
+      } else if (val instanceof Byte) {
+        writeInt(name, val.toString());
+      } else {
+        // default... for debugging only
+        writeStr(name, val.getClass().getName() + ':' + val.toString(), true);
+      }
     } else if (val instanceof Boolean) {
       writeBool(name, val.toString());
-    } else if (val instanceof Long) {
-      writeLong(name, val.toString());
     } else if (val instanceof Date) {
       writeDate(name,(Date)val);
-    } else if (val instanceof Float) {
-      // we pass the float instead of using toString() because
-      // it may need special formatting. same for double.
-      writeFloat(name, ((Float)val).floatValue());
-    } else if (val instanceof Double) {
-      writeDouble(name, ((Double)val).doubleValue());
     } else if (val instanceof Document) {
       SolrDocument doc = toSolrDocument( (Document)val );
       DocTransformer transformer = returnFields.getTransformer();
@@ -181,6 +192,12 @@ public abstract class TextResponseWriter
       writeArray(name,(Object[])val);
     } else if (val instanceof Iterator) {
       writeArray(name,(Iterator)val);
+    } else if (val instanceof byte[]) {
+      byte[] arr = (byte[])val;
+      writeByteArr(name, arr, 0, arr.length);
+    } else if (val instanceof BytesRef) {
+      BytesRef arr = (BytesRef)val;
+      writeByteArr(name, arr.bytes, arr.offset, arr.length);
     } else {
       // default... for debugging only
       writeStr(name, val.getClass().getName() + ':' + val.toString(), true);
@@ -334,4 +351,7 @@ public abstract class TextResponseWriter
   /** if this form of the method is called, val is the Solr ISO8601 based date format */
   public abstract void writeDate(String name, String val) throws IOException;
 
+  public void writeByteArr(String name, byte[] buf, int offset, int len) throws IOException {
+    writeStr(name, Base64.byteArrayToBase64(buf, offset, len), false);
+  }
 }

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java Sun Jan 29 12:18:50 2012
@@ -21,11 +21,15 @@ import java.io.IOException;
 
 import org.apache.lucene.index.IndexWriter;
 import org.apache.solr.cloud.RecoveryStrategy;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class DefaultSolrCoreState extends SolrCoreState {
- 
+  public static Logger log = LoggerFactory.getLogger(DefaultSolrCoreState.class);
+  
   private final Object recoveryLock = new Object();
   private int refCnt = 1;
   private SolrIndexWriter indexWriter = null;
@@ -62,10 +66,14 @@ public final class DefaultSolrCoreState 
     synchronized (this) {
       refCnt--;
       if (refCnt == 0) {
-        if (closer != null) {
-          closer.closeWriter(indexWriter);
-        } else if (indexWriter != null) {
-          indexWriter.close();
+        try {
+          if (closer != null) {
+            closer.closeWriter(indexWriter);
+          } else if (indexWriter != null) {
+            indexWriter.close();
+          }
+        } catch (Throwable t) {
+          SolrException.log(log, t);
         }
         directoryFactory.close();
         closed = true;

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/TransactionLog.java Sun Jan 29 12:18:50 2012
@@ -55,6 +55,8 @@ import java.util.concurrent.atomic.Atomi
  */
 public class TransactionLog {
   public static Logger log = LoggerFactory.getLogger(TransactionLog.class);
+  final boolean debug = log.isDebugEnabled();
+  final boolean trace = log.isTraceEnabled();
 
   public final static String END_MESSAGE="SOLR_TLOG_END";
 
@@ -71,7 +73,6 @@ public class TransactionLog {
   AtomicInteger refcount = new AtomicInteger(1);
   Map<String,Integer> globalStringMap = new HashMap<String, Integer>();
   List<String> globalStringList = new ArrayList<String>();
-  final boolean debug = log.isDebugEnabled();
 
   long snapshot_size;
   int snapshot_numRecords;
@@ -156,6 +157,9 @@ public class TransactionLog {
           addGlobalStrings(globalStrings);
         }
       } else {
+        if (start > 0) {
+          log.error("New transaction log already exists:" + tlogFile + " size=" + raf.length());
+        }
         assert start==0;
         if (start > 0) {
           raf.setLength(0);
@@ -543,8 +547,8 @@ public class TransactionLog {
 
 
       synchronized (TransactionLog.this) {
-        if (debug) {
-          log.debug("Reading log record.  pos="+pos+" currentSize="+fos.size());
+        if (trace) {
+          log.trace("Reading log record.  pos="+pos+" currentSize="+fos.size());
         }
 
         if (pos >= fos.size()) {

Modified: lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/java/org/apache/solr/update/UpdateLog.java Sun Jan 29 12:18:50 2012
@@ -48,6 +48,7 @@ import java.util.concurrent.*;
 public class UpdateLog implements PluginInfoInitialized {
   public static Logger log = LoggerFactory.getLogger(UpdateLog.class);
   public boolean debug = log.isDebugEnabled();
+  public boolean trace = log.isTraceEnabled();
 
 
   public enum SyncLevel { NONE, FLUSH, FSYNC }
@@ -141,6 +142,9 @@ public class UpdateLog implements Plugin
     this.uhandler = uhandler;
 
     if (dataDir.equals(lastDataDir)) {
+      if (debug) {
+        log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", next id=" + id, " this is a reopen... nothing else to do.");
+      }
       // on a normal reopen, we currently shouldn't have to do anything
       return;
     }
@@ -150,6 +154,10 @@ public class UpdateLog implements Plugin
     tlogFiles = getLogList(tlogDir);
     id = getLastLogId() + 1;   // add 1 since we will create a new log for the next update
 
+    if (debug) {
+      log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
+    }
+    
     TransactionLog oldLog = null;
     for (String oldLogName : tlogFiles) {
       File f = new File(tlogDir, oldLogName);
@@ -247,8 +255,8 @@ public class UpdateLog implements Plugin
         map.put(cmd.getIndexedId(), ptr);
       }
 
-      if (debug) {
-        log.debug("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+      if (trace) {
+        log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
       }
     }
   }
@@ -274,8 +282,8 @@ public class UpdateLog implements Plugin
         oldDeletes.put(br, ptr);
       }
 
-      if (debug) {
-        log.debug("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+      if (trace) {
+        log.trace("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
       }
     }
   }
@@ -312,8 +320,8 @@ public class UpdateLog implements Plugin
 
       LogPtr ptr = new LogPtr(pos, cmd.getVersion());
 
-      if (debug) {
-        log.debug("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+      if (trace) {
+        log.trace("TLOG: added deleteByQuery " + cmd.query + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
       }
     }
   }
@@ -385,6 +393,7 @@ public class UpdateLog implements Plugin
 
   public void preSoftCommit(CommitUpdateCommand cmd) {
     debug = log.isDebugEnabled(); // refresh our view of debugging occasionally
+    trace = log.isTraceEnabled();
 
     synchronized (this) {
 
@@ -562,7 +571,7 @@ public class UpdateLog implements Plugin
 
   private void ensureLog() {
     if (tlog == null) {
-      String newLogName = String.format("%s.%019d", TLOG_NAME, id);
+      String newLogName = String.format(Locale.ENGLISH, "%s.%019d", TLOG_NAME, id);
       try {
         tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
       } catch (IOException e) {

Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java Sun Jan 29 12:18:50 2012
@@ -102,7 +102,7 @@ public abstract class AbstractZkTestCase
   private static void putConfig(SolrZkClient zkClient, final String name)
       throws Exception {
     zkClient.makePath("/configs/conf1/" + name, getFile("solr"
-        + File.separator + "conf" + File.separator + name), false, false);  
+        + File.separator + "conf" + File.separator + name), false, true);  
   }
 
   @Override

Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java Sun Jan 29 12:18:50 2012
@@ -242,11 +242,7 @@ public class ChaosMonkey {
   
   public JettySolrRunner getRandomJetty(String slice, boolean aggressivelyKillLeaders) throws KeeperException, InterruptedException {
     
-    // get latest cloud state
-    zkStateReader.updateCloudState(true);
-    
-    Slice theShards = zkStateReader.getCloudState().getSlices(collection)
-        .get(slice);
+
     int numRunning = 0;
     int numRecovering = 0;
     int numActive = 0;
@@ -254,6 +250,12 @@ public class ChaosMonkey {
     for (CloudJettyRunner cloudJetty : shardToJetty.get(slice)) {
       boolean running = true;
       
+      // get latest cloud state
+      zkStateReader.updateCloudState(true);
+      
+      Slice theShards = zkStateReader.getCloudState().getSlices(collection)
+          .get(slice);
+      
       ZkNodeProps props = theShards.getShards().get(cloudJetty.coreNodeName);
       if (props == null) {
         throw new RuntimeException("shard name " + cloudJetty.coreNodeName + " not found in " + theShards.getShards().keySet());

Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java Sun Jan 29 12:18:50 2012
@@ -77,78 +77,88 @@ public class ChaosMonkeyNothingIsSafeTes
   
   @Override
   public void doTest() throws Exception {
-    
-    handle.clear();
-    handle.put("QTime", SKIPVAL);
-    handle.put("timestamp", SKIPVAL);
-    
-    // we cannot do delete by query
-    // as it's not supported for recovery
-    //del("*:*");
-    
-    List<StopableIndexingThread> threads = new ArrayList<StopableIndexingThread>();
-    int threadCount = 1;
-    int i = 0;
-    for (i = 0; i < threadCount; i++) {
-      StopableIndexingThread indexThread = new StopableIndexingThread(i * 50000, true);
-      threads.add(indexThread);
-      indexThread.start();
-    }
-    
-    FullThrottleStopableIndexingThread ftIndexThread = new FullThrottleStopableIndexingThread(
-        clients, i * 50000, true);
-    threads.add(ftIndexThread);
-    ftIndexThread.start();
-    
-    chaosMonkey.startTheMonkey(true, 1500);
+    boolean testsSuccesful = false;
     try {
-      Thread.sleep(atLeast(6000));
+      handle.clear();
+      handle.put("QTime", SKIPVAL);
+      handle.put("timestamp", SKIPVAL);
+      
+      // we cannot do delete by query
+      // as it's not supported for recovery
+      // del("*:*");
+      
+      List<StopableIndexingThread> threads = new ArrayList<StopableIndexingThread>();
+      int threadCount = 1;
+      int i = 0;
+      for (i = 0; i < threadCount; i++) {
+        StopableIndexingThread indexThread = new StopableIndexingThread(
+            i * 50000, true);
+        threads.add(indexThread);
+        indexThread.start();
+      }
+      
+      FullThrottleStopableIndexingThread ftIndexThread = new FullThrottleStopableIndexingThread(
+          clients, i * 50000, true);
+      threads.add(ftIndexThread);
+      ftIndexThread.start();
+      
+      chaosMonkey.startTheMonkey(true, 1500);
+      try {
+        Thread.sleep(atLeast(6000));
+      } finally {
+        chaosMonkey.stopTheMonkey();
+      }
+      
+      for (StopableIndexingThread indexThread : threads) {
+        indexThread.safeStop();
+      }
+      
+      // wait for stop...
+      for (StopableIndexingThread indexThread : threads) {
+        indexThread.join();
+      }
+      
+      // fails will happen...
+      // for (StopableIndexingThread indexThread : threads) {
+      // assertEquals(0, indexThread.getFails());
+      // }
+      
+      // try and wait for any replications and what not to finish...
+      
+      Thread.sleep(2000);
+      
+      // wait until there are no recoveries...
+      waitForThingsToLevelOut();
+      
+      // make sure we again have leaders for each shard
+      for (int j = 1; j < sliceCount; j++) {
+        zkStateReader.getLeaderProps(DEFAULT_COLLECTION, "shard" + j, 10000);
+      }
+      
+      commit();
+      
+      // TODO: assert we didnt kill everyone
+      
+      zkStateReader.updateCloudState(true);
+      assertTrue(zkStateReader.getCloudState().getLiveNodes().size() > 0);
+      
+      checkShardConsistency(false, true);
+      
+      // ensure we have added more than 0 docs
+      long cloudClientDocs = cloudClient.query(new SolrQuery("*:*"))
+          .getResults().getNumFound();
+      
+      assertTrue(cloudClientDocs > 0);
+      
+      if (VERBOSE) System.out.println("control docs:"
+          + controlClient.query(new SolrQuery("*:*")).getResults()
+              .getNumFound() + "\n\n");
+      testsSuccesful = true;
     } finally {
-      chaosMonkey.stopTheMonkey();
-    }
-    
-    for (StopableIndexingThread indexThread : threads) {
-      indexThread.safeStop();
-    }
-    
-    // wait for stop...
-    for (StopableIndexingThread indexThread : threads) {
-      indexThread.join();
-    }
-    
-    
-    // fails will happen...
-//    for (StopableIndexingThread indexThread : threads) {
-//      assertEquals(0, indexThread.getFails());
-//    }
-    
-    // try and wait for any replications and what not to finish...
-    
-    Thread.sleep(2000);
-    
-    // wait until there are no recoveries...
-    waitForThingsToLevelOut();
-    
-    // make sure we again have leaders for each shard
-    for (int j = 1; j < sliceCount; j++) {
-      zkStateReader.getLeaderProps(DEFAULT_COLLECTION, "shard" + j, 10000);
+      if (!testsSuccesful) {
+        printLayout();
+      }
     }
-
-    commit();
-    
-    // TODO: assert we didnt kill everyone
-    
-    zkStateReader.updateCloudState(true);
-    assertTrue(zkStateReader.getCloudState().getLiveNodes().size() > 0);
-    
-    checkShardConsistency(false, false);
-    
-    // ensure we have added more than 0 docs
-    long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
-
-    assertTrue(cloudClientDocs > 0);
-    
-    if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");
   }
 
   private void waitForThingsToLevelOut() throws KeeperException,
@@ -158,7 +168,11 @@ public class ChaosMonkeyNothingIsSafeTes
     do {
       waitForRecoveriesToFinish(VERBOSE);
       
-      commit();
+      try {
+        commit();
+      } catch (Exception e) {
+        // we don't care if this commit fails on some nodes
+      }
       
       updateMappingsFromZk(jettys, clients);
       

Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java Sun Jan 29 12:18:50 2012
@@ -30,7 +30,9 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 
+@Ignore
 public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest {
   
   @BeforeClass

Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java Sun Jan 29 12:18:50 2012
@@ -45,7 +45,6 @@ import org.junit.Ignore;
 /**
  * Super basic testing, no shard restarting or anything.
  */
-@Ignore
 public class FullSolrCloudDistribCmdsTest extends FullSolrCloudTest {
   
   

Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Sun Jan 29 12:18:50 2012
@@ -51,6 +51,7 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 
 /**
  * 
@@ -64,6 +65,8 @@ public class FullSolrCloudTest extends A
   
   protected static final String DEFAULT_COLLECTION = "collection1";
   
+  private boolean printLayoutOnTearDown = false;
+  
   String t1 = "a_t";
   String i1 = "a_si";
   String nint = "n_i";
@@ -223,13 +226,25 @@ public class FullSolrCloudTest extends A
     System.clearProperty("collection");
     controlClient = createNewSolrServer(controlJetty.getLocalPort());
     
-    createJettys(numServers);
+    createJettys(numServers, true);
     
   }
   
-  private List<JettySolrRunner> createJettys(int numJettys) throws Exception,
-      InterruptedException, TimeoutException, IOException, KeeperException,
-      URISyntaxException {
+  private List<JettySolrRunner> createJettys(int numJettys) throws Exception {
+    return createJettys(numJettys, false);
+  }
+  
+
+  /**
+   * @param numJettys
+   * @param checkCreatedVsState
+   *          if true, make sure the number created (numJettys) matches the
+   *          number in the cluster state - if you add more jetties this may not
+   *          be the case
+   * @return
+   * @throws Exception
+   */
+  private List<JettySolrRunner> createJettys(int numJettys, boolean checkCreatedVsState) throws Exception {
     List<JettySolrRunner> jettys = new ArrayList<JettySolrRunner>();
     List<SolrServer> clients = new ArrayList<SolrServer>();
     StringBuilder sb = new StringBuilder();
@@ -247,6 +262,28 @@ public class FullSolrCloudTest extends A
     this.jettys.addAll(jettys);
     this.clients.addAll(clients);
     
+    if (checkCreatedVsState) {
+      // now wait until we see that the number of shards in the cluster state
+      // matches what we expect
+      int numShards = getNumShards(DEFAULT_COLLECTION);
+      int retries = 0;
+      while (numShards != shardCount) {
+        numShards = getNumShards(DEFAULT_COLLECTION);
+        if (numShards == shardCount) break;
+        if (retries++ == 20) {
+          printLayoutOnTearDown = true;
+          fail("Shards in the state does not match what we set:" + numShards
+              + " vs " + shardCount);
+        }
+        Thread.sleep(500);
+      }
+
+      // also make sure we have a leader for each shard
+      for (int i = 1; i <= sliceCount; i++) {
+        zkStateReader.getLeaderProps(DEFAULT_COLLECTION, "shard" + i, 10000);
+      }
+    }
+
     updateMappingsFromZk(this.jettys, this.clients);
     
     // build the shard string
@@ -261,6 +298,16 @@ public class FullSolrCloudTest extends A
     
     return jettys;
   }
+
+  private int getNumShards(String defaultCollection) {
+    Map<String,Slice> slices = this.zkStateReader.getCloudState().getSlices(defaultCollection);
+    int cnt = 0;
+    for (Map.Entry<String,Slice> entry : slices.entrySet()) {
+      cnt += entry.getValue().getShards().size();
+    }
+    
+    return cnt;
+  }
   
   public JettySolrRunner createJetty(String dataDir, String shardList,
       String solrConfigOverride) throws Exception {
@@ -467,71 +514,81 @@ public class FullSolrCloudTest extends A
    */
   @Override
   public void doTest() throws Exception {
-    handle.clear();
-    handle.put("QTime", SKIPVAL);
-    handle.put("timestamp", SKIPVAL);
-    
-    indexr(id, 1, i1, 100, tlong, 100, t1, "now is the time for all good men",
-        "foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d);
-    
-    // make sure we are in a steady state...
-    waitForRecoveriesToFinish(false);
-    
-    commit();
-    
-    assertDocCounts(false);
-    
-    indexAbunchOfDocs();
-    
-    commit();
-    
-    assertDocCounts(VERBOSE);
-    checkQueries();
-    
-    assertDocCounts(VERBOSE);
-    
-    query("q", "*:*", "sort", "n_tl1 desc");
-    
-    brindDownShardIndexSomeDocsAndRecover();
-    
-    query("q", "*:*", "sort", "n_tl1 desc");
-    
-    // test adding another replica to a shard - it should do a
-    // recovery/replication to pick up the index from the leader
-    addNewReplica();
-    
-    long docId = testUpdateAndDelete();
-    
-    // index a bad doc...
+    boolean testFinished = false;
     try {
-      indexr(t1, "a doc with no id");
-      fail("this should fail");
-    } catch (SolrException e) {
-      // expected
+      handle.clear();
+      handle.put("QTime", SKIPVAL);
+      handle.put("timestamp", SKIPVAL);
+      
+      indexr(id, 1, i1, 100, tlong, 100, t1,
+          "now is the time for all good men", "foo_f", 1.414f, "foo_b", "true",
+          "foo_d", 1.414d);
+      
+      // make sure we are in a steady state...
+      waitForRecoveriesToFinish(false);
+      
+      commit();
+      
+      assertDocCounts(false);
+      
+      indexAbunchOfDocs();
+      
+      commit();
+      
+      assertDocCounts(VERBOSE);
+      checkQueries();
+      
+      assertDocCounts(VERBOSE);
+      
+      query("q", "*:*", "sort", "n_tl1 desc");
+      
+      brindDownShardIndexSomeDocsAndRecover();
+      
+      query("q", "*:*", "sort", "n_tl1 desc");
+      
+      // test adding another replica to a shard - it should do a
+      // recovery/replication to pick up the index from the leader
+      addNewReplica();
+      
+      long docId = testUpdateAndDelete();
+      
+      // index a bad doc...
+      try {
+        indexr(t1, "a doc with no id");
+        fail("this should fail");
+      } catch (SolrException e) {
+        // expected
+      }
+      
+      // TODO: bring this to it's own method?
+      // try indexing to a leader that has no replicas up
+      ZkNodeProps leaderProps = zkStateReader.getLeaderProps(
+          DEFAULT_COLLECTION, SHARD2);
+      
+      String nodeName = leaderProps.get(ZkStateReader.NODE_NAME_PROP);
+      chaosMonkey.stopShardExcept(SHARD2, nodeName);
+      
+      SolrServer client = getClient(nodeName);
+      
+      index_specific(client, "id", docId + 1, t1, "what happens here?");
+      
+      // expire a session...
+      CloudJettyRunner cloudJetty = shardToJetty.get("shard1").get(0);
+      chaosMonkey.expireSession(cloudJetty.jetty);
+      
+      indexr("id", docId + 1, t1, "slip this doc in");
+      
+      waitForRecoveriesToFinish(false);
+      
+      checkShardConsistency("shard1");
+      
+      testFinished = true;
+    } finally {
+      if (!testFinished) {
+        printLayoutOnTearDown = true;
+      }
     }
     
-    // TODO: bring this to it's own method?
-    // try indexing to a leader that has no replicas up
-    ZkNodeProps leaderProps = zkStateReader.getLeaderProps(DEFAULT_COLLECTION,
-        SHARD2);
-    
-    String nodeName = leaderProps.get(ZkStateReader.NODE_NAME_PROP);
-    chaosMonkey.stopShardExcept(SHARD2, nodeName);
-    
-    SolrServer client = getClient(nodeName);
-    
-    index_specific(client, "id", docId + 1, t1, "what happens here?");
-    
-    // expire a session...
-    CloudJettyRunner cloudJetty = shardToJetty.get("shard1").get(0);
-    chaosMonkey.expireSession(cloudJetty.jetty);
-    
-    indexr("id", docId + 1, t1, "slip this doc in");
-    
-    waitForRecoveriesToFinish(false);
-    
-    checkShardConsistency("shard1");
-    
   }
   
   private long testUpdateAndDelete() throws Exception, SolrServerException,
@@ -1182,7 +1239,7 @@ public class FullSolrCloudTest extends A
   @Override
   @After
   public void tearDown() throws Exception {
-    if (VERBOSE) {
+    if (VERBOSE || printLayoutOnTearDown) {
       super.printLayout();
     }
     ((CommonsHttpSolrServer) controlClient).shutdown();
@@ -1222,7 +1279,7 @@ public class FullSolrCloudTest extends A
           + DEFAULT_COLLECTION;
       CommonsHttpSolrServer s = new CommonsHttpSolrServer(url);
       s.setConnectionTimeout(100); // 1/10th sec
-      s.setSoTimeout(30000);
+      s.setSoTimeout(45000);
       s.setDefaultMaxConnectionsPerHost(100);
       s.setMaxTotalConnections(100);
       return s;

Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java Sun Jan 29 12:18:50 2012
@@ -34,7 +34,6 @@ import org.apache.solr.common.cloud.ZkNo
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreContainer.Initializer;
-import org.apache.solr.core.SolrConfig;
 import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -101,7 +100,7 @@ public class LeaderElectionIntegrationTe
         AbstractZkTestCase.TIMEOUT);
     
     reader = new ZkStateReader(zkClient); 
-
+    reader.createClusterStateWatchersAndUpdate();
     log.info("####SETUP_END " + getName());
     
   }
@@ -150,12 +149,12 @@ public class LeaderElectionIntegrationTe
       //printLayout(zkServer.getZkAddress());
       
       // poll until leader change is visible
-      for (int j = 0; j < 30; j++) {
+      for (int j = 0; j < 90; j++) {
         String currentLeader = getLeader();
         if(!leader.equals(currentLeader)) {
           break;
         }
-        Thread.sleep(100);
+        Thread.sleep(500);
       }
       
       leader = getLeader();
@@ -216,23 +215,11 @@ public class LeaderElectionIntegrationTe
     //Thread.sleep(100000);
   }
   
-  private String getLeader() throws InterruptedException {
-    String leader = null;
-    int tries = 30;
-    while (tries-- > 0) {
-      ZkNodeProps props;
-      try {
-        reader.updateCloudState(true);
-        props = reader.getLeaderProps("collection1", "shard1", 500);
-        leader = props.get(ZkStateReader.NODE_NAME_PROP);
-        if (leader != null) {
-          break;
-        }
-      } catch (KeeperException e) {
-        // ignore
-      }
-      Thread.sleep(200);
-    }
+  private String getLeader() throws InterruptedException, KeeperException {
+    
+    ZkNodeProps props = reader.getLeaderProps("collection1", "shard1", 30000);
+    String leader = props.get(ZkStateReader.NODE_NAME_PROP);
+    
     return leader;
   }
   

Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java Sun Jan 29 12:18:50 2012
@@ -32,15 +32,12 @@ import org.apache.solr.common.cloud.Solr
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.core.SolrConfig;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
-@Ignore
 public class LeaderElectionTest extends SolrTestCaseJ4 {
   
   static final int TIMEOUT = 30000;
@@ -58,8 +55,7 @@ public class LeaderElectionTest extends 
   
   @AfterClass
   public static void afterClass() throws InterruptedException {
-    // wait just a bit for any zk client threads to outlast timeout
-    Thread.sleep(2000);
+
   }
   
   @Override
@@ -90,10 +86,18 @@ public class LeaderElectionTest extends 
     
     public ClientThread(int nodeNumber) throws Exception {
       super("Thread-" + nodeNumber);
+      boolean created = false;
       this.zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
-      this.zkStateReader = new ZkStateReader(zkClient);
-      this.nodeNumber = nodeNumber;
-      props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, Integer.toString(nodeNumber), ZkStateReader.CORE_NAME_PROP, "");
+      try {
+        this.zkStateReader = new ZkStateReader(zkClient);
+        this.nodeNumber = nodeNumber;
+        props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, Integer.toString(nodeNumber), ZkStateReader.CORE_NAME_PROP, "");
+        created = true;
+      } finally {
+        if (!created) {
+          zkClient.close();
+        }
+      }
     }
     
     @Override
@@ -156,7 +160,7 @@ public class LeaderElectionTest extends 
   
   private String getLeaderUrl(final String collection, final String slice)
       throws KeeperException, InterruptedException {
-    int iterCount = 30;
+    int iterCount = 60;
     while (iterCount-- > 0)
       try {
         byte[] data = zkClient.getData(
@@ -166,7 +170,7 @@ public class LeaderElectionTest extends 
             ZkNodeProps.load(data));
         return leaderProps.getCoreUrl();
       } catch (NoNodeException e) {
-        Thread.sleep(100);
+        Thread.sleep(500);
       }
     throw new RuntimeException("Could not get leader props");
   }
@@ -284,13 +288,13 @@ public class LeaderElectionTest extends 
     threads.add(thread1);
     scheduler.schedule(thread1, 0, TimeUnit.MILLISECONDS);
     
-    Thread.sleep(4000);
+    Thread.sleep(2000);
 
     Thread scheduleThread = new Thread() {
       @Override
       public void run() {
-        
-        for (int i = 1; i < atLeast(15); i++) {
+        int count = atLeast(5);
+        for (int i = 1; i < count; i++) {
           int launchIn = random.nextInt(500);
           ClientThread thread = null;
           try {
@@ -365,7 +369,7 @@ public class LeaderElectionTest extends 
     connLossThread.start();
     killThread.start();
     
-    Thread.sleep(6000);
+    Thread.sleep(4000);
     
     stopStress = true;
     
@@ -374,14 +378,14 @@ public class LeaderElectionTest extends 
     killThread.interrupt();
     
     scheduleThread.join();
+    scheduler.shutdownNow();
+    
     connLossThread.join();
     killThread.join();
     
-    scheduler.shutdownNow();
-
     int seq = threads.get(getLeaderThread()).getSeq();
     
-    assertFalse("seq is -1 and we may have a zombie leader", seq == -1);
+    // we have a leader we know, TODO: lets check some other things
     
     // cleanup any threads still running
     for (ClientThread thread : threads) {

Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Sun Jan 29 12:18:50 2012
@@ -146,7 +146,7 @@ public class OverseerTest extends SolrTe
     SolrZkClient zkClient = null;
     ZkStateReader reader = null;
     final ZkController[] controllers = new ZkController[nodeCount];
-
+    final ExecutorService[] nodeExecutors = new ExecutorService[nodeCount];
     try {
       server.run();
       AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
@@ -174,7 +174,6 @@ public class OverseerTest extends SolrTe
           .getAbsolutePath());
 
       
-      final ExecutorService[] nodeExecutors = new ExecutorService[nodeCount];
       for (int i = 0; i < nodeCount; i++) {
         nodeExecutors[i] = Executors.newFixedThreadPool(1);
       }
@@ -232,7 +231,7 @@ public class OverseerTest extends SolrTe
       }
 
       // make sure all cores have been returned a id
-      for (int i = 0; i < 150; i++) {
+      for (int i = 0; i < 90; i++) {
         int assignedCount = 0;
         for (int j = 0; j < coreCount; j++) {
           if (ids[j] != null) {
@@ -242,7 +241,7 @@ public class OverseerTest extends SolrTe
         if (coreCount == assignedCount) {
           break;
         }
-        Thread.sleep(200);
+        Thread.sleep(500);
       }
       
       final HashMap<String, AtomicInteger> counters = new HashMap<String,AtomicInteger>();
@@ -289,6 +288,9 @@ public class OverseerTest extends SolrTe
           controllers[i].close();
         }
       server.shutdown();
+      for (int i = 0; i < nodeCount; i++) {
+        nodeExecutors[i].shutdownNow();
+      }
     }
     
     System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);

Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Sun Jan 29 12:18:50 2012
@@ -102,6 +102,7 @@ public class ZkControllerTest extends So
 
     ZkTestServer server = new ZkTestServer(zkDir);
     ZkController zkController = null;
+    boolean testFinished = false;
     try {
       server.run();
 
@@ -127,8 +128,12 @@ public class ZkControllerTest extends So
       if (DEBUG) {
         zkController.printLayoutToStdOut();
       }
-
+      testFinished = true;
     } finally {
+      if (testFinished) {
+        zkController.getZkClient().printLayoutToStdOut();
+      }
+      
       if (zkController != null) {
         zkController.close();
       }

Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/JSONWriterTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/JSONWriterTest.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/JSONWriterTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/JSONWriterTest.java Sun Jan 29 12:18:50 2012
@@ -41,7 +41,7 @@ public class JSONWriterTest extends Solr
   }    
   
   @Test
-  public void testNaNInf() throws IOException {
+  public void testTypes() throws IOException {
     SolrQueryRequest req = req("dummy");
     SolrQueryResponse rsp = new SolrQueryResponse();
     QueryResponseWriter w = new PythonResponseWriter();
@@ -77,8 +77,12 @@ public class JSONWriterTest extends Solr
     nl.add(null, 42);
     rsp.add("nl", nl);
 
+    rsp.add("byte", Byte.valueOf((byte)-3));
+    rsp.add("short", Short.valueOf((short)-4));
+    rsp.add("bytes", "abc".getBytes("UTF-8"));
+
     w.write(buf, req, rsp);
-    assertEquals("{\"nl\":[[\"data1\",\"he\\u2028llo\\u2029!\"],[null,42]]}", buf.toString());
+    assertEquals("{\"nl\":[[\"data1\",\"he\\u2028llo\\u2029!\"],[null,42]],\"byte\":-3,\"short\":-4,\"bytes\":\"YWJj\"}", buf.toString());
     req.close();
   }
   

Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/TestFaceting.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/TestFaceting.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/TestFaceting.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/request/TestFaceting.java Sun Jan 29 12:18:50 2012
@@ -21,6 +21,7 @@ import java.util.Locale;
 import java.util.Random;
 
 import org.apache.lucene.index.DocTermOrds;
+import org.apache.lucene.index.SlowMultiReaderWrapper;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.TermsEnum;
 import org.apache.lucene.util.BytesRef;
@@ -80,7 +81,7 @@ public class TestFaceting extends SolrTe
 
     assertEquals(size, uif.getNumTerms());
 
-    TermsEnum te = uif.getOrdTermsEnum(req.getSearcher().getIndexReader());
+    TermsEnum te = uif.getOrdTermsEnum(new SlowMultiReaderWrapper(req.getSearcher().getIndexReader()));
     assertEquals(size == 0, te == null);
 
     Random r = new Random(size);

Modified: lucene/dev/branches/lucene2858/solr/solrj/build.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/build.xml?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/build.xml (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/build.xml Sun Jan 29 12:18:50 2012
@@ -31,6 +31,7 @@
     <mkdir  dir="${dist}/solrj-lib" />
     <copy todir="${dist}/solrj-lib">
       <fileset dir="${common-solr.dir}/lib">
+        <include name="apache-solr-noggit-*.jar"/>
         <include name="commons-codec-*.jar"/>
         <include name="commons-io-*.jar"/>
         <include name="commons-httpclient-*.jar"/>

Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java Sun Jan 29 12:18:50 2012
@@ -475,10 +475,10 @@ public class CommonsHttpSolrServer exten
       return processor.processResponse(respBody, charset);
     }
     catch (HttpException e) {
-      throw new SolrServerException( e );
+      throw new SolrServerException(getBaseURL(), e);
     }
     catch (IOException e) {
-      throw new SolrServerException( e );
+      throw new SolrServerException(getBaseURL(), e);
     }
     finally {
       method.releaseConnection();

Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/HashPartitioner.java Sun Jan 29 12:18:50 2012
@@ -40,7 +40,7 @@ public class HashPartitioner {
    * works up to 65537 before requested num of ranges is one short
    * 
    * @param partitions
-   * @return
+   * @return Range for each partition
    */
   public List<Range> partitionRange(int partitions) {
     // some hokey code to partition the int space

Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Sun Jan 29 12:18:50 2012
@@ -34,7 +34,6 @@ import javax.xml.transform.stream.Stream
 import javax.xml.transform.stream.StreamSource;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate;
 import org.apache.zookeeper.CreateMode;
@@ -119,7 +118,6 @@ public class SolrZkClient {
   public SolrZkClient(String zkServerAddress, int zkClientTimeout,
       ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) throws InterruptedException,
       TimeoutException, IOException {
-    numOpens.incrementAndGet();
     connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
         + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
     strat.connect(zkServerAddress, zkClientTimeout, connManager,
@@ -142,6 +140,7 @@ public class SolrZkClient {
           }
         });
     connManager.waitForConnected(clientConnectTimeout);
+    numOpens.incrementAndGet();
   }
 
   /**
@@ -644,9 +643,6 @@ public class SolrZkClient {
    * @throws InterruptedException
    */
   public void close() throws InterruptedException {
-    if (isClosed) {
-      throw new AlreadyClosedException("This client has already been closed");
-    }
     isClosed = true;
     keeper.close();
     numCloses.incrementAndGet();

Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java Sun Jan 29 12:18:50 2012
@@ -54,7 +54,6 @@ public class ZkCmdExecutor {
   /**
    * Perform the given operation, retrying if the connection fails
    * 
-   * @return
    * @throws IOException 
    */
   @SuppressWarnings("unchecked")

Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkOperation.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkOperation.java?rev=1237259&r1=1237258&r2=1237259&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkOperation.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/common/cloud/ZkOperation.java Sun Jan 29 12:18:50 2012
@@ -22,8 +22,7 @@ import java.io.IOException;
 import org.apache.zookeeper.KeeperException;
 
 /**
- * A callback object which can be used for implementing retry-able operations in the 
- * {@link org.apache.solr.common.cloud.ZkCmdExecutor.lock.ProtocolSupport} class
+ * A callback object which can be used for implementing retry-able operations.
  *
  */
 public abstract class ZkOperation {