You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2012/01/29 12:21:25 UTC

svn commit: r1237242 - in /lucene/dev/branches/lucene3661: ./ solr/ solr/core/ solr/core/src/java/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/update/ solr/core/src/test/ solr/core/src/test/org/apache/solr/cloud/ solr/t...

Author: rmuir
Date: Sun Jan 29 11:21:24 2012
New Revision: 1237242

URL: http://svn.apache.org/viewvc?rev=1237242&view=rev
Log:
merge trunk (1237128:1237241)

Modified:
    lucene/dev/branches/lucene3661/   (props changed)
    lucene/dev/branches/lucene3661/solr/   (props changed)
    lucene/dev/branches/lucene3661/solr/core/   (props changed)
    lucene/dev/branches/lucene3661/solr/core/src/java/   (props changed)
    lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
    lucene/dev/branches/lucene3661/solr/core/src/test/   (props changed)
    lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
    lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
    lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
    lucene/dev/branches/lucene3661/solr/test-framework/   (props changed)
    lucene/dev/branches/lucene3661/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java

Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1237242&r1=1237241&r2=1237242&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Sun Jan 29 11:21:24 2012
@@ -209,14 +209,31 @@ final class OverseerElectionContext exte
   private final ZkStateReader stateReader;
 
   public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, ZkStateReader stateReader) {
-    super(zkNodeName, "/overseer_elect", null, null);
+    super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null);
     this.zkClient = zkClient;
     this.stateReader = stateReader;
   }
 
   @Override
   void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement) throws KeeperException, InterruptedException {
-    new Overseer(zkClient, stateReader);
+    
+    final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1);
+    ZkNodeProps myProps = new ZkNodeProps("id", id);
+
+    try {
+      zkClient.makePath(leaderPath,
+          ZkStateReader.toJSON(myProps),
+          CreateMode.EPHEMERAL, true);
+    } catch (NodeExistsException e) {
+      // if a previous leader ephemeral still exists for some reason, try and
+      // remove it
+      zkClient.delete(leaderPath, -1, true);
+      zkClient.makePath(leaderPath,
+          ZkStateReader.toJSON(myProps),
+          CreateMode.EPHEMERAL, true);
+    }
+  
+    new Overseer(zkClient, stateReader, id);
   }
   
 }

Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1237242&r1=1237241&r2=1237242&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/cloud/Overseer.java Sun Jan 29 11:21:24 2012
@@ -22,9 +22,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.Set;
 
 import org.apache.solr.cloud.NodeStateWatcher.NodeStateChangeListener;
@@ -51,13 +53,32 @@ import org.slf4j.LoggerFactory;
  * Cluster leader. Responsible node assignments, cluster state file?
  */
 public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
+
+  private static final int STATE_UPDATE_DELAY = 500;  // delay between cloud state updates
+
+  static enum Op {
+    LeaderChange, StateChange; 
+  }
+
+  private final class CloudStateUpdateRequest {
+    
+    final Op operation;
+    final Object[] args;
+    
+     CloudStateUpdateRequest(final Op operation, final Object... args) {
+       this.operation = operation;
+       this.args = args;
+    }
+  }
   
   public static final String ASSIGNMENTS_NODE = "/node_assignments";
   public static final String STATES_NODE = "/node_states";
   private static Logger log = LoggerFactory.getLogger(Overseer.class);
   
   private final SolrZkClient zkClient;
-  private final ZkStateReader reader;
+  
+  // pooled updates
+  private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo = new LinkedBlockingQueue<CloudStateUpdateRequest>();
   
   // node stateWatches
   private HashMap<String,NodeStateWatcher> nodeStateWatches = new HashMap<String,NodeStateWatcher>();
@@ -66,12 +87,222 @@ public class Overseer implements NodeSta
   private HashMap<String, HashMap<String,ShardLeaderWatcher>> shardLeaderWatches = new HashMap<String,HashMap<String,ShardLeaderWatcher>>();
   private ZkCmdExecutor zkCmdExecutor;
 
-  public Overseer(final SolrZkClient zkClient, final ZkStateReader reader) throws KeeperException, InterruptedException {
-    log.info("Constructing new Overseer");
+  private static class CloudStateUpdater implements Runnable {
+    
+    private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo;
+    private final ZkStateReader reader;
+    private final SolrZkClient zkClient;
+    private final String myId;
+    
+    public CloudStateUpdater(final LinkedBlockingQueue<CloudStateUpdateRequest> fifo, final ZkStateReader reader, final SolrZkClient zkClient, final String myId) {
+      this.fifo = fifo;
+      this.myId = myId;
+      this.reader = reader;
+      this.zkClient = zkClient;
+    }
+      @Override
+      public void run() {
+        while (amILeader()) {
+          
+          
+          LinkedList<CloudStateUpdateRequest> requests = new LinkedList<Overseer.CloudStateUpdateRequest>();
+          while (!fifo.isEmpty()) {
+            // collect all queued requests
+            CloudStateUpdateRequest req;
+            req = fifo.poll();
+            if (req == null) {
+              break;
+            }
+            requests.add(req);
+          }
+
+          if (requests.size() > 0) {
+            // process updates
+            synchronized (reader.getUpdateLock()) {
+              try {
+                reader.updateCloudState(true);
+                CloudState cloudState = reader.getCloudState();
+                for (CloudStateUpdateRequest request : requests) {
+
+                  switch (request.operation) {
+                  case LeaderChange:
+                    cloudState = setShardLeader(cloudState,
+                        (String) request.args[0], (String) request.args[1],
+                        (String) request.args[2]);
+
+                    break;
+                  case StateChange:
+                    cloudState = updateState(cloudState,
+                        (String) request.args[0], (CoreState) request.args[1]);
+                    break;
+
+                  }
+                }
+
+                log.info("Announcing new cluster state");
+                zkClient.setData(ZkStateReader.CLUSTER_STATE,
+                    ZkStateReader.toJSON(cloudState), true);
+
+              } catch (KeeperException e) {
+                // XXX stop processing, exit
+                return;
+              } catch (InterruptedException e) {
+                // XXX stop processing, exit
+                return;
+              }
+            }
+          }
+
+          try {
+            Thread.sleep(STATE_UPDATE_DELAY);
+          } catch (InterruptedException e) {
+            //
+          }
+        }
+      }
+      
+      private boolean amILeader() {
+        try {
+          ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, false));
+          if(myId.equals(props.get("id"))) {
+            return true;
+          }
+        } catch (KeeperException e) {
+          // assume we're dead
+        } catch (InterruptedException e) {
+          // assume we're dead
+        }
+        log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
+        return false;
+      }
+      /**
+       * Try to assign core to the cluster
+       * @throws KeeperException 
+       * @throws InterruptedException 
+       */
+      private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
+        String collection = coreState.getCollectionName();
+        String zkCoreNodeName = coreState.getCoreNodeName();
+        
+          String shardId;
+          if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
+            shardId = AssignShard.assignShard(collection, state);
+          } else {
+            shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
+          }
+          
+          Map<String,String> props = new HashMap<String,String>();
+          for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
+            props.put(entry.getKey(), entry.getValue());
+          }
+          ZkNodeProps zkProps = new ZkNodeProps(props);
+          Slice slice = state.getSlice(collection, shardId);
+          Map<String,ZkNodeProps> shardProps;
+          if (slice == null) {
+            shardProps = new HashMap<String,ZkNodeProps>();
+          } else {
+            shardProps = state.getSlice(collection, shardId).getShardsCopy();
+          }
+          shardProps.put(zkCoreNodeName, zkProps);
+
+          slice = new Slice(shardId, shardProps);
+          CloudState newCloudState = updateSlice(state, collection, slice);
+          return newCloudState;
+      }
+      
+      private CloudState updateSlice(CloudState state, String collection, Slice slice) {
+        
+        final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
+        newStates.putAll(state.getCollectionStates());
+        
+        if (!newStates.containsKey(collection)) {
+          newStates.put(collection, new LinkedHashMap<String,Slice>());
+        }
+        
+        final Map<String, Slice> slices = newStates.get(collection);
+        if (!slices.containsKey(slice.getName())) {
+          slices.put(slice.getName(), slice);
+        } else {
+          final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
+          final Slice existingSlice = slices.get(slice.getName());
+          shards.putAll(existingSlice.getShards());
+          //XXX preserve existing leader
+          for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
+            if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
+              HashMap<String, String> newProps = new HashMap<String,String>();
+              newProps.putAll(edit.getValue().getProperties());
+              newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
+              shards.put(edit.getKey(), new ZkNodeProps(newProps));
+            } else {
+              shards.put(edit.getKey(), edit.getValue());
+            }
+          }
+          final Slice updatedSlice = new Slice(slice.getName(), shards);
+          slices.put(slice.getName(), updatedSlice);
+        }
+        return new CloudState(state.getLiveNodes(), newStates);
+      }
+      
+      private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
+        
+        boolean updated = false;
+        final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
+        newStates.putAll(state.getCollectionStates());
+        
+        final Map<String, Slice> slices = newStates.get(collection);
+
+        if(slices==null) {
+          log.error("Could not mark shard leader for non existing collection.");
+          return state;
+        }
+        
+        if (!slices.containsKey(sliceName)) {
+          log.error("Could not mark leader for non existing slice.");
+          return state;
+        } else {
+          final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
+          for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
+            Map<String, String> newShardProps = new LinkedHashMap<String,String>();
+            newShardProps.putAll(shard.getValue().getProperties());
+            
+            String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP);  //clean any previously existed flag
+
+            ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
+            if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
+              newShardProps.put(ZkStateReader.LEADER_PROP,"true");
+              if (wasLeader == null) {
+                updated = true;
+              }
+            } else {
+              if (wasLeader != null) {
+                updated = true;
+              }
+            }
+            newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
+          }
+          Slice slice = new Slice(sliceName, newShards);
+          slices.put(sliceName, slice);
+        }
+        if (updated) {
+          return new CloudState(state.getLiveNodes(), newStates);
+        } else {
+          return state;
+        }
+      }
+
+    }
+  
+  public Overseer(final SolrZkClient zkClient, final ZkStateReader reader, String id) throws KeeperException, InterruptedException {
+    log.info("Constructing new Overseer id=" + id);
     this.zkClient = zkClient;
     this.zkCmdExecutor = new ZkCmdExecutor();
-    this.reader = reader;
     createWatches();
+    
+    //launch cluster state updater thread
+    ThreadGroup tg = new ThreadGroup("Overseer delayed state updater");
+    Thread updaterThread = new Thread(tg, new CloudStateUpdater(fifo, reader, zkClient, id));
+    updaterThread.setDaemon(true);
+    updaterThread.start();
   }
   
   public synchronized void createWatches()
@@ -267,41 +498,6 @@ public class Overseer implements NodeSta
     }
   }
   
-  /**
-   * Try to assign core to the cluster
-   * @throws KeeperException 
-   * @throws InterruptedException 
-   */
-  private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
-    String collection = coreState.getCollectionName();
-    String zkCoreNodeName = coreState.getCoreNodeName();
-    
-      String shardId;
-      if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
-        shardId = AssignShard.assignShard(collection, state);
-      } else {
-        shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
-      }
-      
-      Map<String,String> props = new HashMap<String,String>();
-      for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
-        props.put(entry.getKey(), entry.getValue());
-      }
-      ZkNodeProps zkProps = new ZkNodeProps(props);
-      Slice slice = state.getSlice(collection, shardId);
-      Map<String,ZkNodeProps> shardProps;
-      if (slice == null) {
-        shardProps = new HashMap<String,ZkNodeProps>();
-      } else {
-        shardProps = state.getSlice(collection, shardId).getShardsCopy();
-      }
-      shardProps.put(zkCoreNodeName, zkProps);
-
-      slice = new Slice(shardId, shardProps);
-      CloudState newCloudState = updateSlice(state, collection, slice);
-      return newCloudState;
-  }
-  
   private Set<String> complement(Collection<String> next,
       Collection<String> prev) {
     Set<String> downCollections = new HashSet<String>();
@@ -311,23 +507,11 @@ public class Overseer implements NodeSta
   }
 
   @Override
-  public void coreChanged(final String nodeName, final Set<CoreState> states) throws KeeperException, InterruptedException  {
-    log.debug("Cores changed: " + nodeName + " states:" + states);
-    synchronized(reader.getUpdateLock()) {
-      reader.updateCloudState(true);
-      CloudState cloudState = reader.getCloudState();
-      for (CoreState state : states) {
-        cloudState = updateState(cloudState, nodeName, state);
-      }
-
-      try {
-        zkClient.setData(ZkStateReader.CLUSTER_STATE,
-            ZkStateReader.toJSON(cloudState), true);  
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            "Interrupted while publishing new state", e);
-      }
+  public void coreChanged(final String nodeName, final Set<CoreState> states)
+      throws KeeperException, InterruptedException {
+    log.info("Core change pooled: " + nodeName + " states:" + states);
+    for (CoreState state : states) {
+      fifo.add(new CloudStateUpdateRequest(Op.StateChange, nodeName, state));
     }
   }
   
@@ -340,111 +524,11 @@ public class Overseer implements NodeSta
     ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
     zkCmdExecutor.ensureExists(node, zkClient);
   }
-  
-  private CloudState updateSlice(CloudState state, String collection, Slice slice) {
-    
-    final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
-    newStates.putAll(state.getCollectionStates());
-    
-    if (!newStates.containsKey(collection)) {
-      newStates.put(collection, new LinkedHashMap<String,Slice>());
-    }
-    
-    final Map<String, Slice> slices = newStates.get(collection);
-    if (!slices.containsKey(slice.getName())) {
-      slices.put(slice.getName(), slice);
-    } else {
-      final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
-      final Slice existingSlice = slices.get(slice.getName());
-      shards.putAll(existingSlice.getShards());
-      //XXX preserve existing leader
-      for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
-        if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
-          HashMap<String, String> newProps = new HashMap<String,String>();
-          newProps.putAll(edit.getValue().getProperties());
-          newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
-          shards.put(edit.getKey(), new ZkNodeProps(newProps));
-        } else {
-          shards.put(edit.getKey(), edit.getValue());
-        }
-      }
-      final Slice updatedSlice = new Slice(slice.getName(), shards);
-      slices.put(slice.getName(), updatedSlice);
-    }
-    return new CloudState(state.getLiveNodes(), newStates);
-  }
-
-  private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
-    
-    boolean updated = false;
-    final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
-    newStates.putAll(state.getCollectionStates());
-    
-    final Map<String, Slice> slices = newStates.get(collection);
-
-    if(slices==null) {
-      log.error("Could not mark shard leader for non existing collection.");
-      return state;
-    }
-    
-    if (!slices.containsKey(sliceName)) {
-      log.error("Could not mark leader for non existing slice.");
-      return state;
-    } else {
-      final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
-      for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
-        Map<String, String> newShardProps = new LinkedHashMap<String,String>();
-        newShardProps.putAll(shard.getValue().getProperties());
-        
-        String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP);  //clean any previously existed flag
-
-        ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
-        if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
-          newShardProps.put(ZkStateReader.LEADER_PROP,"true");
-          if (wasLeader == null) {
-            updated = true;
-          }
-        } else {
-          if (wasLeader != null) {
-            updated = true;
-          }
-        }
-        newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
-      }
-      Slice slice = new Slice(sliceName, newShards);
-      slices.put(sliceName, slice);
-    }
-    if (updated) {
-      return new CloudState(state.getLiveNodes(), newStates);
-    } else {
-      return state;
-    }
-  }
 
   @Override
   public void announceLeader(String collection, String shardId, ZkCoreNodeProps props) {
-    synchronized (reader.getUpdateLock()) {
-      try {
-        reader.updateCloudState(true); // get fresh copy of the state
-      final CloudState state = reader.getCloudState();
-      final CloudState newState = setShardLeader(state, collection, shardId,
-          props.getCoreUrl());
-        if (state != newState) { // if same instance was returned no need to
-                                 // update state
-          log.info("Announcing new leader: coll: " + collection + " shard: " + shardId + " props:" + props);
-          zkClient.setData(ZkStateReader.CLUSTER_STATE,
-              ZkStateReader.toJSON(newState), true);
-          
-        } else {
-          log.debug("State was not changed.");
-        }
-      } catch (KeeperException e) {
-        log.warn("Could not announce new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        log.warn("Could not promote new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
-      }
-    }
+    log.info("Leader change pooled.");
+    fifo.add(new CloudStateUpdateRequest(Op.LeaderChange, collection, shardId, props.getCoreUrl()));
   }
   
 }
\ No newline at end of file

Modified: lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java?rev=1237242&r1=1237241&r2=1237242&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java Sun Jan 29 11:21:24 2012
@@ -21,11 +21,15 @@ import java.io.IOException;
 
 import org.apache.lucene.index.IndexWriter;
 import org.apache.solr.cloud.RecoveryStrategy;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class DefaultSolrCoreState extends SolrCoreState {
- 
+  public static Logger log = LoggerFactory.getLogger(DefaultSolrCoreState.class);
+  
   private final Object recoveryLock = new Object();
   private int refCnt = 1;
   private SolrIndexWriter indexWriter = null;
@@ -62,10 +66,14 @@ public final class DefaultSolrCoreState 
     synchronized (this) {
       refCnt--;
       if (refCnt == 0) {
-        if (closer != null) {
-          closer.closeWriter(indexWriter);
-        } else if (indexWriter != null) {
-          indexWriter.close();
+        try {
+          if (closer != null) {
+            closer.closeWriter(indexWriter);
+          } else if (indexWriter != null) {
+            indexWriter.close();
+          }
+        } catch (Throwable t) {
+          SolrException.log(log, t);
         }
         directoryFactory.close();
         closed = true;

Modified: lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java?rev=1237242&r1=1237241&r2=1237242&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java Sun Jan 29 11:21:24 2012
@@ -102,7 +102,7 @@ public abstract class AbstractZkTestCase
   private static void putConfig(SolrZkClient zkClient, final String name)
       throws Exception {
     zkClient.makePath("/configs/conf1/" + name, getFile("solr"
-        + File.separator + "conf" + File.separator + name), false, false);  
+        + File.separator + "conf" + File.separator + name), false, true);  
   }
 
   @Override

Modified: lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1237242&r1=1237241&r2=1237242&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Sun Jan 29 11:21:24 2012
@@ -59,7 +59,6 @@ import org.junit.Ignore;
  * what we test now - the default update chain
  * 
  */
-@Ignore
 public class FullSolrCloudTest extends AbstractDistributedZkTestCase {
   
   private static final String SHARD2 = "shard2";

Modified: lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java?rev=1237242&r1=1237241&r2=1237242&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java Sun Jan 29 11:21:24 2012
@@ -100,7 +100,7 @@ public class LeaderElectionIntegrationTe
         AbstractZkTestCase.TIMEOUT);
     
     reader = new ZkStateReader(zkClient); 
-
+    reader.createClusterStateWatchersAndUpdate();
     log.info("####SETUP_END " + getName());
     
   }
@@ -217,8 +217,7 @@ public class LeaderElectionIntegrationTe
   
   private String getLeader() throws InterruptedException, KeeperException {
     
-    reader.updateCloudState(true);
-    ZkNodeProps props = reader.getLeaderProps("collection1", "shard1", 15000);
+    ZkNodeProps props = reader.getLeaderProps("collection1", "shard1", 30000);
     String leader = props.get(ZkStateReader.NODE_NAME_PROP);
     
     return leader;

Modified: lucene/dev/branches/lucene3661/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java?rev=1237242&r1=1237241&r2=1237242&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java (original)
+++ lucene/dev/branches/lucene3661/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java Sun Jan 29 11:21:24 2012
@@ -148,8 +148,9 @@ public abstract class SolrTestCaseJ4 ext
      if (endNumOpens-numOpens != endNumCloses-numCloses) {
        String msg = "ERROR: SolrIndexSearcher opens=" + (endNumOpens-numOpens) + " closes=" + (endNumCloses-numCloses);
        log.error(msg);
-       testsFailed = true;
-       fail(msg);
+       // TODO: re-enable this when we've nailed down why this happens on jenkins so often (and not other systems) - see SOLR-3066
+       // testsFailed = true;
+       // fail(msg);
      }
   }