You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/11 18:28:32 UTC

[lucene-solr] 01/02: @1455 Write the collection we are interested in.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit ec7a1da7468307de43c51e0412575e4922c6d9cb
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Mar 11 12:02:39 2021 -0600

    @1455 Write the collection we are interested in.
    
    Took 22 minutes
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |  21 +-
 .../solr/cloud/OverseerTaskExecutorTask.java       |   6 +-
 .../cloud/api/collections/CreateCollectionCmd.java |   2 +-
 .../solr/cloud/api/collections/CreateShardCmd.java |   2 +-
 .../cloud/api/collections/DeleteReplicaCmd.java    |   4 +-
 .../solr/cloud/api/collections/DeleteShardCmd.java |   2 +-
 .../solr/cloud/api/collections/MoveReplicaCmd.java |   5 +-
 .../OverseerCollectionMessageHandler.java          |   4 +-
 .../solr/cloud/api/collections/ReplaceNodeCmd.java |   6 +-
 .../solr/cloud/api/collections/SplitShardCmd.java  |   2 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 233 ++++++++++-----------
 11 files changed, 133 insertions(+), 154 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index dd665e2..f200fe7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -72,7 +72,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -712,8 +711,8 @@ public class Overseer implements SolrCloseable {
     return future;
   }
 
-  public Future writePendingUpdates() {
-    return ParWork.getRootSharedExecutor().submit(new OverseerTaskExecutorTask.WriteTask(getCoreContainer(), zkStateWriter));
+  public Future writePendingUpdates(String collection) {
+    return ParWork.getRootSharedExecutor().submit(new OverseerTaskExecutorTask.WriteTask(getCoreContainer(), collection));
   }
 
   private static abstract class QueueWatcher implements Watcher, Closeable {
@@ -766,7 +765,7 @@ public class Overseer implements SolrCloseable {
         return;
       }
 
-      //ourLock.lock();
+      ourLock.lock();
       try {
         try {
           List<String> items = getItems();
@@ -779,7 +778,7 @@ public class Overseer implements SolrCloseable {
           log.error("Exception during overseer queue queue processing", e);
         }
       } finally {
-     //   ourLock.unlock();
+        ourLock.unlock();
       }
 
     }
@@ -820,7 +819,6 @@ public class Overseer implements SolrCloseable {
     @Override
     protected void processQueueItems(List<String> items, boolean onStart) {
       List<String> fullPaths = new ArrayList<>(items.size());
-      ourLock.lock();
       try {
         if (log.isDebugEnabled()) log.debug("Found state update queue items {}", items);
         for (String item : items) {
@@ -856,9 +854,13 @@ public class Overseer implements SolrCloseable {
             log.error("failed waiting for enqueued updates", e);
           }
         }
-        overseer.writePendingUpdates();
 
+        Set<String> collections = overseer.zkStateWriter.getDirtyStateCollections();
+        for (String collection : collections) {
+          overseer.writePendingUpdates(collection);
+        }
       } finally {
+
         if (overseer.zkStateWriter != null) {
           if (zkController.getZkClient().isAlive()) {
             try {
@@ -867,8 +869,8 @@ public class Overseer implements SolrCloseable {
               log.warn("Failed deleting processed items", e);
             }
           }
+
         }
-        ourLock.unlock();
       }
     }
 
@@ -909,7 +911,6 @@ public class Overseer implements SolrCloseable {
       @Override
       protected void processQueueItems(List<String> items, boolean onStart) {
         List<String> fullPaths = new ArrayList<>(items.size());
-        ourLock.lock();
         try {
           log.info("Found collection queue items {} onStart={}", items, onStart);
           for (String item : items) {
@@ -940,9 +941,7 @@ public class Overseer implements SolrCloseable {
           } catch (Exception e) {
             log.warn("Delete items failed {}", e.getMessage());
           }
-          ourLock.unlock();
         }
-
       }
 
       private void runAsync(Map.Entry<String,byte[]> entry, boolean onStart) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
index ac6a4cb..c118ac5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
@@ -82,16 +82,18 @@ public class OverseerTaskExecutorTask {
   }
 
   public static class WriteTask implements Runnable {
+    private final String collection;
     CoreContainer coreContainer;
 
-    public WriteTask(CoreContainer coreContainer, ZkStateWriter zkStateWriter) {
+    public WriteTask(CoreContainer coreContainer, String collection) {
+      this.collection = collection;
       this.coreContainer = coreContainer;
     }
 
     @Override
     public void run() {
       try {
-        coreContainer.getZkController().getOverseer().getZkStateWriter().writePendingUpdates();
+        coreContainer.getZkController().getOverseer().getZkStateWriter().writePendingUpdates(collection);
       } catch (NullPointerException e) {
         if (log.isDebugEnabled()) log.debug("Won't write pending updates, zkStateWriter=null");
       } catch (Exception e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 4783d7b..0ba35e8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -321,7 +321,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
       Future future = ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false);
       future.get();
-      writeFuture = ocmh.overseer.writePendingUpdates();
+      writeFuture = ocmh.overseer.writePendingUpdates(collectionName);
 
       if (log.isDebugEnabled()) log.debug("Sending create call for {} replicas for {}", coresToCreate.size(), collectionName);
       for (Map.Entry<String,ShardRequest> e : coresToCreate.entrySet()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index c9a2708..91851f9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -188,7 +188,7 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
         log.error("failure", e);
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
-      response.writeFuture = overseer.writePendingUpdates();
+      response.writeFuture = overseer.writePendingUpdates(collection);
       if (resp.asyncFinalRunner != null) {
         try {
           resp.asyncFinalRunner.call();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 4b8a170..29ea39c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -198,7 +198,7 @@ public class DeleteReplicaCmd implements Cmd {
           if (waitForFinalState) {
             try {
               ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(finalCollectionName1), null, false).get();
-              ocmh.overseer.writePendingUpdates();
+              ocmh.overseer.writePendingUpdates(finalCollectionName1);
               waitForCoreNodeGone(finalCollectionName1, shard, replicaName, 5000); // MRM TODO: timeout
             } catch (Exception e) {
               log.error("Failed waiting for replica to be removed", e);
@@ -280,7 +280,7 @@ public class DeleteReplicaCmd implements Cmd {
         for (Future future : futures) {
           future.get();
         }
-        ocmh.overseer.writePendingUpdates();
+        ocmh.overseer.writePendingUpdates(collectionName);
       } catch (Exception e) {
         log.error("failed writing update to zkstatewriter", e);
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index 3a3be00..1ab2ebe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -178,7 +178,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
       }
       if (waitForFinalState) {
         ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(collectionName), null, false).get();
-        ocmh.overseer.writePendingUpdates().get();
+        ocmh.overseer.writePendingUpdates(collectionName).get();
         ocmh.overseer.getZkStateReader().waitForState(collectionName, 10, TimeUnit.SECONDS, (liveNodes, coll) -> {
           if (coll == null) {
             return true;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
index 8ae6612..e9f6aaa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
@@ -308,8 +308,9 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
         @SuppressWarnings({"rawtypes"}) NamedList deleteResult = new NamedList();
         try {
           response1.clusterState = ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult).clusterState;
-          ocmh.overseer.getZkStateWriter().enqueueUpdate( response1.clusterState.getCollection(response1.clusterState.getCollectionsMap().keySet().iterator().next()), null,false).get();
-          asyncResp.writeFuture = ocmh.overseer.writePendingUpdates();
+          String collection = response1.clusterState.getCollectionsMap().keySet().iterator().next();
+          ocmh.overseer.getZkStateWriter().enqueueUpdate( response1.clusterState.getCollection(collection), null,false).get();
+          asyncResp.writeFuture = ocmh.overseer.writePendingUpdates(collection);
         } catch (SolrException e) {
           deleteResult.add("failure", e.toString());
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 3126c0e..faa10df 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -333,7 +333,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
             }
             if (asyncResp == null || asyncResp.writeFuture == null) {
               future.get();
-              writeFuture2 = overseer.writePendingUpdates();
+              writeFuture2 = overseer.writePendingUpdates(collection);
             }
 
           } else {
@@ -356,7 +356,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
               if (future != null) {
                 future.get();
               }
-              writeFuture = overseer.writePendingUpdates();
+              writeFuture = overseer.writePendingUpdates(collection);
             }
           }
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
index a694708..2b0fdec 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -145,9 +145,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
       };
       runners.add(runner);
     }
-
-    ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(clusterState.getCollectionStates().keySet().iterator().next()), null, false).get();
-    ocmh.overseer.writePendingUpdates();
+    String collection = clusterState.getCollectionStates().keySet().iterator().next();
+    ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collection), null, false).get();
+    ocmh.overseer.writePendingUpdates(collection);
 
     CollectionCmdResponse.Response response = new CollectionCmdResponse.Response();
     response.results = results;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index ca636a8..793ac18 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -350,7 +350,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       }
 
       ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null,false).get();
-      ocmh.overseer.writePendingUpdates();
+      ocmh.overseer.writePendingUpdates(collectionName);
 
       log.info("Clusterstate after adding new shard for split {}", clusterState);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 92123bf..c039414 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -67,7 +67,6 @@ public class ZkStateWriter {
 
   protected volatile Stats stats;
 
-  AtomicReference<Exception> lastFailedException = new AtomicReference<>();
   private final Map<String,Integer> trackVersions = new ConcurrentHashMap<>();
 
   private final Map<String, ZkNodeProps> stateUpdates = new ConcurrentHashMap<>();
@@ -510,11 +509,11 @@ public class ZkStateWriter {
    */
 
   // if additional updates too large, publish structure change
-  public void writePendingUpdates() {
+  public void writePendingUpdates(String collection) {
 
     do {
       try {
-        write();
+        write(collection);
         break;
       } catch (KeeperException.BadVersionException e) {
 
@@ -527,156 +526,129 @@ public class ZkStateWriter {
 
   }
 
-  private void write() throws KeeperException.BadVersionException {
-    // writeLock.lock();
-    // try {
-    //   log.info("Get our write lock");
-
-    //ourLock.lock();
-    try {
-      //     log.info("Got our write lock");
-      if (log.isDebugEnabled()) {
-        log.debug("writePendingUpdates {}", cs);
-      }
+  private void write(String coll) throws KeeperException.BadVersionException {
 
+    if (log.isDebugEnabled()) {
+      log.debug("writePendingUpdates {}", cs);
+    }
 
+    AtomicInteger lastVersion = new AtomicInteger();
+    AtomicReference<KeeperException.BadVersionException> badVersionException = new AtomicReference();
 
-      //      if (failedUpdates.size() > 0) {
-      //        Exception lfe = lastFailedException.get();
-      //        log.warn("Some collection updates failed {} logging last exception", failedUpdates, lfe); // MRM TODO: expand
-      //        failedUpdates.clear();
-      //        lfe = null;
-      //        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, lfe);
-      //      }
-      //      } finally {
-      //        ourLock.unlock();
-      //      }
-
-      // wait to see our last publish version has propagated TODO don't wait on collections not hosted on overseer?
-      // waitForStateWePublishedToComeBack();
-
-      //   ourLock.lock();
-      AtomicInteger lastVersion = new AtomicInteger();
-      AtomicReference<KeeperException.BadVersionException> badVersionException = new AtomicReference();
-      List<String> removeCollections = new ArrayList<>();
-      //log.info("writing out state, looking at collections count={} toWrite={} {} : {}", cs.getCollectionsMap().size(), collectionsToWrite.size(), cs.getCollectionsMap().keySet(), collectionsToWrite);
-      //try {
-      cs.values().forEach(collection -> {
-
-
-        if (log.isDebugEnabled()) log.debug("check collection {} {} {}", collection, dirtyStructure, dirtyState);
-        Integer version = null;
-        if (dirtyStructure.contains(collection.getName()) || dirtyState.contains(collection.getName())) {
-          log.info("process collection {}", collection);
-          ColState collState = collLocks.compute(collection.getName(), (s, reentrantLock) -> {
-            if (reentrantLock == null) {
-              ColState colState = new ColState();
-              return colState;
-            }
-            return reentrantLock;
-          });
+    DocCollection collection = cs.get(coll);
 
-          collState.collLock.lock();
-          try {
-            collState.throttle.minimumWaitBetweenActions();
-            collState.throttle.markAttemptingAction();
-            String name = collection.getName();
-            String path = ZkStateReader.getCollectionPath(collection.getName());
-            String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
-            // log.info("process collection {} path {}", collection.getName(), path);
-            Stat existsStat = null;
-            if (log.isTraceEnabled()) log.trace("process {}", collection);
-            try {
-              // log.info("get data for {}", name);
-              byte[] data = Utils.toJSON(singletonMap(name, collection));
-              //  log.info("got data for {} {}", name, data.length);
+    if (collection == null) {
+      return;
+    }
 
-              try {
+    if (log.isDebugEnabled()) log.debug("check collection {} {} {}", collection, dirtyStructure, dirtyState);
+    Integer version = null;
+    if (dirtyStructure.contains(collection.getName()) || dirtyState.contains(collection.getName())) {
+      log.info("process collection {}", collection);
+      ColState collState = collLocks.compute(collection.getName(), (s, reentrantLock) -> {
+        if (reentrantLock == null) {
+          ColState colState = new ColState();
+          return colState;
+        }
+        return reentrantLock;
+      });
 
-                if (dirtyStructure.contains(collection.getName())) {
-                  if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
+      collState.collLock.lock();
+      try {
+        collState.throttle.minimumWaitBetweenActions();
+        collState.throttle.markAttemptingAction();
+        String name = collection.getName();
+        String path = ZkStateReader.getCollectionPath(collection.getName());
+        String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
+        // log.info("process collection {} path {}", collection.getName(), path);
+        Stat existsStat = null;
+        if (log.isTraceEnabled()) log.trace("process {}", collection);
+        try {
+          // log.info("get data for {}", name);
+          byte[] data = Utils.toJSON(singletonMap(name, collection));
+          //  log.info("got data for {} {}", name, data.length);
 
-                  Integer v = trackVersions.get(collection.getName());
+          try {
 
-                  if (v != null) {
-                    //log.info("got version from cache {}", v);
-                    version = v;
-                  } else {
-                    version = 0;
-                  }
-                  lastVersion.set(version);
-                  if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
+            if (dirtyStructure.contains(collection.getName())) {
+              if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
 
-                  reader.getZkClient().setData(path, data, version, true, false);
-                  if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), version + 1);
-                  trackVersions.put(collection.getName(), version + 1);
+              Integer v = trackVersions.get(collection.getName());
 
-                  reader.getZkClient().setData(pathSCN, null, -1, true, false);
-                  dirtyStructure.remove(collection.getName());
+              if (v != null) {
+                //log.info("got version from cache {}", v);
+                version = v;
+              } else {
+                version = 0;
+              }
+              lastVersion.set(version);
+              if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
 
-                  ZkNodeProps updates = stateUpdates.get(collection.getName());
-                  if (updates != null) {
-                    updates.getProperties().clear();
-                  }
-                }
+              reader.getZkClient().setData(path, data, version, true, false);
+              if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), version + 1);
+              trackVersions.put(collection.getName(), version + 1);
 
-              } catch (KeeperException.NoNodeException e) {
-                if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
-
-                lastVersion.set(-1);
-                trackVersions.remove(collection.getName());
-                stateUpdates.remove(collection.getName());
-                cs.remove(collection);
-                // likely deleted
-
-              } catch (KeeperException.BadVersionException bve) {
-                log.info("Tried to update state.json ({}) with bad version", collection);
-                //lastFailedException.set(bve);
-                //failedUpdates.put(collection.getName(), collection);
-                // Stat estate = reader.getZkClient().exists(path, null);
-                trackVersions.remove(collection.getName());
-                Stat stat = reader.getZkClient().exists(path, null, false, false);
-                log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, version, stat != null ? stat.getVersion() : "null");
-
-                if (!overseer.isClosed() && stat != null) {
-                  trackVersions.put(collection.getName(), stat.getVersion());
-                }
-                throw bve;
-              }
+              reader.getZkClient().setData(pathSCN, null, -1, true, false);
+              dirtyStructure.remove(collection.getName());
 
-              if (dirtyState.contains(collection.getName())) { //&& !dirtyStructure.contains(collection.getName())
-                ZkNodeProps updates = stateUpdates.get(collection.getName());
-                if (updates != null) {
-                  writeStateUpdates(collection, updates);
-                }
+              ZkNodeProps updates = stateUpdates.get(collection.getName());
+              if (updates != null) {
+                updates.getProperties().clear();
               }
+            }
 
-            } catch (KeeperException.BadVersionException bve) {
-              badVersionException.set(bve);
-            } catch (InterruptedException | AlreadyClosedException e) {
-              log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
-
-            } catch (Exception e) {
-              log.error("Failed processing update=" + collection, e);
+          } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
+
+            lastVersion.set(-1);
+            trackVersions.remove(collection.getName());
+            stateUpdates.remove(collection.getName());
+            cs.remove(collection);
+            // likely deleted
+
+          } catch (KeeperException.BadVersionException bve) {
+            log.info("Tried to update state.json ({}) with bad version", collection);
+            //lastFailedException.set(bve);
+            //failedUpdates.put(collection.getName(), collection);
+            // Stat estate = reader.getZkClient().exists(path, null);
+            trackVersions.remove(collection.getName());
+            Stat stat = reader.getZkClient().exists(path, null, false, false);
+            log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, version, stat != null ? stat.getVersion() : "null");
+
+            if (!overseer.isClosed() && stat != null) {
+              trackVersions.put(collection.getName(), stat.getVersion());
             }
-          } finally {
-            collState.collLock.unlock();
+            throw bve;
           }
-        }
 
-      });
+          if (dirtyState.contains(collection.getName())) { //&& !dirtyStructure.contains(collection.getName())
+            ZkNodeProps updates = stateUpdates.get(collection.getName());
+            if (updates != null) {
+              writeStateUpdates(collection, updates);
+            }
+          }
 
-      //removeCollections.forEach(c ->  removeCollection(c));
+        } catch (KeeperException.BadVersionException bve) {
+          badVersionException.set(bve);
+        } catch (InterruptedException | AlreadyClosedException e) {
+          log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
 
-      if (badVersionException.get() != null) {
-        throw badVersionException.get();
+        } catch (Exception e) {
+          log.error("Failed processing update=" + collection, e);
+        }
+      } finally {
+        collState.collLock.unlock();
       }
+    }
 
-      //log.info("Done with successful cluster write out");
-    } finally {
+    //removeCollections.forEach(c ->  removeCollection(c));
 
+    if (badVersionException.get() != null) {
+      throw badVersionException.get();
     }
 
+    //log.info("Done with successful cluster write out");
+
     //    } finally {
     //      writeLock.unlock();
     //    }
@@ -703,6 +675,11 @@ public class ZkStateWriter {
     return ClusterState.getRefCS(cs, -2);
   }
 
+  public Set<String> getDirtyStateCollections() {
+    return dirtyState;
+  }
+
+
   public void removeCollection(String collection) {
     log.info("Removing collection from zk state {}", collection);
     ColState collState = collLocks.compute(collection, (s, reentrantLock) -> {