You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ro...@apache.org on 2016/07/27 17:19:11 UTC

[2/2] lucene-solr:branch_6x: SOLR-9346: Always close ZkStateReader

SOLR-9346: Always close ZkStateReader


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/90e9c768
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/90e9c768
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/90e9c768

Branch: refs/heads/branch_6x
Commit: 90e9c76851afa43a05beeeb19d7ce138e4e39b10
Parents: 22d2496
Author: Alan Woodward <ro...@apache.org>
Authored: Wed Jul 27 16:45:59 2016 +0100
Committer: Alan Woodward <ro...@apache.org>
Committed: Wed Jul 27 18:03:48 2016 +0100

----------------------------------------------------------------------
 .../apache/solr/hadoop/ZooKeeperInspector.java  |  21 +-
 .../solr/cloud/ChaosMonkeyShardSplitTest.java   |  13 +-
 .../apache/solr/cloud/LeaderElectionTest.java   |   1 +
 .../org/apache/solr/cloud/OverseerTest.java     |   1 +
 .../solr/cloud/overseer/ZkStateWriterTest.java  | 393 ++++++++++---------
 5 files changed, 218 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/90e9c768/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java
index 9b86dcd..76928aa 100644
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java
+++ b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/ZooKeeperInspector.java
@@ -16,6 +16,15 @@
  */
 package org.apache.solr.hadoop;
 
+import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
 import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
 import org.apache.solr.cloud.ZkController;
@@ -35,15 +44,6 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
 /**
  * Extracts SolrCloud information from ZooKeeper.
  */
@@ -78,8 +78,7 @@ final class ZooKeeperInspector {
     }
     SolrZkClient zkClient = getZkClient(zkHost);
     
-    try {
-      ZkStateReader zkStateReader = new ZkStateReader(zkClient);
+    try (ZkStateReader zkStateReader = new ZkStateReader(zkClient)) {
       try {
         // first check for alias
         collection = checkForAlias(zkClient, collection);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/90e9c768/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
index 190db57..7e840da 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.solr.cloud;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.common.SolrInputDocument;
@@ -36,12 +42,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * Test split phase that occurs when a Collection API split call is made.
  */
@@ -254,6 +254,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
         address.replaceAll("/", "_"));
     overseerElector.setup(ec);
     overseerElector.joinElection(ec, false);
+    reader.close();
     return zkClient;
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/90e9c768/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
index 95dccab..8e1be10 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
@@ -118,6 +118,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
       if (!zkClient.isClosed()) {
         zkClient.close();
       }
+      zkStateReader.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/90e9c768/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 8c9daad..9166a43 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -130,6 +130,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
         }
       }
       deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName);
+      zkStateReader.close();
       zkClient.close();
     }
     

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/90e9c768/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index 86aac4d..85dbf4a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -57,68 +57,69 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
       zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
       ZkController.createClusterZkNodes(zkClient);
 
-      ZkStateReader reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-
-      ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
-
-      assertFalse("Deletes can always be batched", writer.maybeFlushBefore(new ZkWriteCommand("xyz", null)));
-      assertFalse("Deletes can always be batched", writer.maybeFlushAfter(new ZkWriteCommand("xyz", null)));
-
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
-
-      // create new collection with stateFormat = 2
-      ZkWriteCommand c1 = new ZkWriteCommand("c1",
-          new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1"));
-      assertFalse("First requests can always be batched", writer.maybeFlushBefore(c1));
-
-      ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
-
-      ZkWriteCommand c2 = new ZkWriteCommand("c2",
-          new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2"));
-      assertTrue("Different (new) collection create cannot be batched together with another create", writer.maybeFlushBefore(c2));
-
-      // simulate three state changes on same collection, all should be batched together before
-      assertFalse(writer.maybeFlushBefore(c1));
-      assertFalse(writer.maybeFlushBefore(c1));
-      assertFalse(writer.maybeFlushBefore(c1));
-      // and after too
-      assertFalse(writer.maybeFlushAfter(c1));
-      assertFalse(writer.maybeFlushAfter(c1));
-      assertFalse(writer.maybeFlushAfter(c1));
-
-      // simulate three state changes on two different collections with stateFormat=2, none should be batched
-      assertFalse(writer.maybeFlushBefore(c1));
-      // flushAfter has to be called as it updates the internal batching related info
-      assertFalse(writer.maybeFlushAfter(c1));
-      assertTrue(writer.maybeFlushBefore(c2));
-      assertFalse(writer.maybeFlushAfter(c2));
-      assertTrue(writer.maybeFlushBefore(c1));
-      assertFalse(writer.maybeFlushAfter(c1));
-
-      // create a collection in stateFormat = 1 i.e. inside the main cluster state
-      ZkWriteCommand c3 = new ZkWriteCommand("c3",
-          new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
-      clusterState = writer.enqueueUpdate(clusterState, c3, null);
-
-      // simulate three state changes in c3, all should be batched
-      for (int i=0; i<3; i++) {
+      try (ZkStateReader reader = new ZkStateReader(zkClient)) {
+        reader.createClusterStateWatchersAndUpdate();
+
+        ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+
+        assertFalse("Deletes can always be batched", writer.maybeFlushBefore(new ZkWriteCommand("xyz", null)));
+        assertFalse("Deletes can always be batched", writer.maybeFlushAfter(new ZkWriteCommand("xyz", null)));
+
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+
+        // create new collection with stateFormat = 2
+        ZkWriteCommand c1 = new ZkWriteCommand("c1",
+            new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1"));
+        assertFalse("First requests can always be batched", writer.maybeFlushBefore(c1));
+
+        ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
+
+        ZkWriteCommand c2 = new ZkWriteCommand("c2",
+            new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2"));
+        assertTrue("Different (new) collection create cannot be batched together with another create", writer.maybeFlushBefore(c2));
+
+        // simulate three state changes on same collection, all should be batched together before
+        assertFalse(writer.maybeFlushBefore(c1));
+        assertFalse(writer.maybeFlushBefore(c1));
+        assertFalse(writer.maybeFlushBefore(c1));
+        // and after too
+        assertFalse(writer.maybeFlushAfter(c1));
+        assertFalse(writer.maybeFlushAfter(c1));
+        assertFalse(writer.maybeFlushAfter(c1));
+
+        // simulate three state changes on two different collections with stateFormat=2, none should be batched
+        assertFalse(writer.maybeFlushBefore(c1));
+        // flushAfter has to be called as it updates the internal batching related info
+        assertFalse(writer.maybeFlushAfter(c1));
+        assertTrue(writer.maybeFlushBefore(c2));
+        assertFalse(writer.maybeFlushAfter(c2));
+        assertTrue(writer.maybeFlushBefore(c1));
+        assertFalse(writer.maybeFlushAfter(c1));
+
+        // create a collection in stateFormat = 1 i.e. inside the main cluster state
+        ZkWriteCommand c3 = new ZkWriteCommand("c3",
+            new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
+        clusterState = writer.enqueueUpdate(clusterState, c3, null);
+
+        // simulate three state changes in c3, all should be batched
+        for (int i = 0; i < 3; i++) {
+          assertFalse(writer.maybeFlushBefore(c3));
+          assertFalse(writer.maybeFlushAfter(c3));
+        }
+
+        // simulate state change in c3 (stateFormat=1) interleaved with state changes from c1,c2 (stateFormat=2)
+        // none should be batched together
         assertFalse(writer.maybeFlushBefore(c3));
         assertFalse(writer.maybeFlushAfter(c3));
+        assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c1));
+        assertFalse(writer.maybeFlushAfter(c1));
+        assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c3));
+        assertFalse(writer.maybeFlushAfter(c3));
+        assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c2));
+        assertFalse(writer.maybeFlushAfter(c2));
       }
 
-      // simulate state change in c3 (stateFormat=1) interleaved with state changes from c1,c2 (stateFormat=2)
-      // none should be batched together
-      assertFalse(writer.maybeFlushBefore(c3));
-      assertFalse(writer.maybeFlushAfter(c3));
-      assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c1));
-      assertFalse(writer.maybeFlushAfter(c1));
-      assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c3));
-      assertFalse(writer.maybeFlushAfter(c3));
-      assertTrue("different stateFormat, should be flushed", writer.maybeFlushBefore(c2));
-      assertFalse(writer.maybeFlushAfter(c2));
-
     } finally {
       IOUtils.close(zkClient);
       server.shutdown();
@@ -140,24 +141,25 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
       zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
       ZkController.createClusterZkNodes(zkClient);
 
-      ZkStateReader reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
+      try (ZkStateReader reader = new ZkStateReader(zkClient)) {
+        reader.createClusterStateWatchersAndUpdate();
 
-      ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+        ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
 
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
 
-      // create new collection with stateFormat = 1
-      ZkWriteCommand c1 = new ZkWriteCommand("c1",
-          new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
+        // create new collection with stateFormat = 1
+        ZkWriteCommand c1 = new ZkWriteCommand("c1",
+            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
 
-      ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
-      writer.writePendingUpdates();
+        ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
+        writer.writePendingUpdates();
 
-      Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
-      assertNotNull(map.get("c1"));
-      boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
-      assertFalse(exists);
+        Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
+        assertNotNull(map.get("c1"));
+        boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
+        assertFalse(exists);
+      }
 
     } finally {
       IOUtils.close(zkClient);
@@ -181,24 +183,25 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
       zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
       ZkController.createClusterZkNodes(zkClient);
 
-      ZkStateReader reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
+      try (ZkStateReader reader = new ZkStateReader(zkClient)) {
+        reader.createClusterStateWatchersAndUpdate();
 
-      ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+        ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
 
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
 
-      // create new collection with stateFormat = 2
-      ZkWriteCommand c1 = new ZkWriteCommand("c1",
-          new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
+        // create new collection with stateFormat = 2
+        ZkWriteCommand c1 = new ZkWriteCommand("c1",
+            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
 
-      ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
-      writer.writePendingUpdates();
+        ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
+        writer.writePendingUpdates();
 
-      Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
-      assertNull(map.get("c1"));
-      map = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", null, null, true));
-      assertNotNull(map.get("c1"));
+        Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
+        assertNull(map.get("c1"));
+        map = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", null, null, true));
+        assertNotNull(map.get("c1"));
+      }
 
     } finally {
       IOUtils.close(zkClient);
@@ -224,63 +227,64 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
       zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
       ZkController.createClusterZkNodes(zkClient);
 
-      ZkStateReader reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-
-      ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
-
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
-
-      // create collection 1 with stateFormat = 1
-      ZkWriteCommand c1 = new ZkWriteCommand("c1",
-          new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
-      writer.enqueueUpdate(reader.getClusterState(), c1, null);
-      writer.writePendingUpdates();
+      try (ZkStateReader reader = new ZkStateReader(zkClient)) {
+        reader.createClusterStateWatchersAndUpdate();
 
-      reader.forceUpdateCollection("c1");
-      reader.forceUpdateCollection("c2");
-      ClusterState clusterState = reader.getClusterState(); // keep a reference to the current cluster state object
-      assertTrue(clusterState.hasCollection("c1"));
-      assertFalse(clusterState.hasCollection("c2"));
+        ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
 
-      // Simulate an external modification to /clusterstate.json
-      byte[] data = zkClient.getData("/clusterstate.json", null, null, true);
-      zkClient.setData("/clusterstate.json", data, true);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
 
-      // enqueue another c1 so that ZkStateWriter has pending updates
-      writer.enqueueUpdate(clusterState, c1, null);
-      assertTrue(writer.hasPendingUpdates());
-
-      // create collection 2 with stateFormat = 1
-      ZkWriteCommand c2 = new ZkWriteCommand("c2",
-          new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
-
-      try {
-        writer.enqueueUpdate(clusterState, c2, null); // we are sending in the old cluster state object
-        fail("Enqueue should not have succeeded");
-      } catch (KeeperException.BadVersionException bve) {
-        // expected
-      }
-
-      try {
-        writer.enqueueUpdate(reader.getClusterState(), c2, null);
-        fail("enqueueUpdate after BadVersionException should not have succeeded");
-      } catch (IllegalStateException e) {
-        // expected
-      }
-
-      try {
+        // create collection 1 with stateFormat = 1
+        ZkWriteCommand c1 = new ZkWriteCommand("c1",
+            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
+        writer.enqueueUpdate(reader.getClusterState(), c1, null);
         writer.writePendingUpdates();
-        fail("writePendingUpdates after BadVersionException should not have succeeded");
-      } catch (IllegalStateException e) {
-        // expected
-      }
 
+        reader.forceUpdateCollection("c1");
+        reader.forceUpdateCollection("c2");
+        ClusterState clusterState = reader.getClusterState(); // keep a reference to the current cluster state object
+        assertTrue(clusterState.hasCollection("c1"));
+        assertFalse(clusterState.hasCollection("c2"));
+
+        // Simulate an external modification to /clusterstate.json
+        byte[] data = zkClient.getData("/clusterstate.json", null, null, true);
+        zkClient.setData("/clusterstate.json", data, true);
+
+        // enqueue another c1 so that ZkStateWriter has pending updates
+        writer.enqueueUpdate(clusterState, c1, null);
+        assertTrue(writer.hasPendingUpdates());
+
+        // create collection 2 with stateFormat = 1
+        ZkWriteCommand c2 = new ZkWriteCommand("c2",
+            new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
+
+        try {
+          writer.enqueueUpdate(clusterState, c2, null); // we are sending in the old cluster state object
+          fail("Enqueue should not have succeeded");
+        } catch (KeeperException.BadVersionException bve) {
+          // expected
+        }
+
+        try {
+          writer.enqueueUpdate(reader.getClusterState(), c2, null);
+          fail("enqueueUpdate after BadVersionException should not have succeeded");
+        } catch (IllegalStateException e) {
+          // expected
+        }
+
+        try {
+          writer.writePendingUpdates();
+          fail("writePendingUpdates after BadVersionException should not have succeeded");
+        } catch (IllegalStateException e) {
+          // expected
+        }
+      }
     } finally {
       IOUtils.close(zkClient);
       server.shutdown();
     }
+
   }
 
   public void testExternalModificationToStateFormat2() throws Exception {
@@ -298,68 +302,69 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
       zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
       ZkController.createClusterZkNodes(zkClient);
 
-      ZkStateReader reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-
-      ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
-
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
-
-      ClusterState state = reader.getClusterState();
-
-      // create collection 2 with stateFormat = 2
-      ZkWriteCommand c2 = new ZkWriteCommand("c2",
-          new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
-      state = writer.enqueueUpdate(reader.getClusterState(), c2, null);
-      assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
-
-      int sharedClusterStateVersion = state.getZkClusterStateVersion();
-      int stateFormat2Version = state.getCollection("c2").getZNodeVersion();
-
-      // Simulate an external modification to /collections/c2/state.json
-      byte[] data = zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, null, true);
-      zkClient.setData(ZkStateReader.getCollectionPath("c2"), data, true);
-
-      // get the most up-to-date state
-      reader.forceUpdateCollection("c2");
-      state = reader.getClusterState();
-      log.info("Cluster state: {}", state);
-      assertTrue(state.hasCollection("c2"));
-      assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion());
-      assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion());
-
-      // enqueue an update to stateFormat2 collection such that update is pending
-      state = writer.enqueueUpdate(state, c2, null);
-      assertTrue(writer.hasPendingUpdates());
-
-      // get the most up-to-date state
-      reader.forceUpdateCollection("c2");
-      state = reader.getClusterState();
-
-      // enqueue a stateFormat=1 collection which should cause a flush
-      ZkWriteCommand c1 = new ZkWriteCommand("c1",
-          new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
-
-      try {
-        writer.enqueueUpdate(state, c1, null);
-        fail("Enqueue should not have succeeded");
-      } catch (KeeperException.BadVersionException bve) {
-        // expected
-      }
-
-      try {
-        writer.enqueueUpdate(reader.getClusterState(), c2, null);
-        fail("enqueueUpdate after BadVersionException should not have succeeded");
-      } catch (IllegalStateException e) {
-        // expected
-      }
-
-      try {
-        writer.writePendingUpdates();
-        fail("writePendingUpdates after BadVersionException should not have succeeded");
-      } catch (IllegalStateException e) {
-        // expected
+      try (ZkStateReader reader = new ZkStateReader(zkClient)) {
+        reader.createClusterStateWatchersAndUpdate();
+
+        ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+
+        ClusterState state = reader.getClusterState();
+
+        // create collection 2 with stateFormat = 2
+        ZkWriteCommand c2 = new ZkWriteCommand("c2",
+            new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
+        state = writer.enqueueUpdate(reader.getClusterState(), c2, null);
+        assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
+
+        int sharedClusterStateVersion = state.getZkClusterStateVersion();
+        int stateFormat2Version = state.getCollection("c2").getZNodeVersion();
+
+        // Simulate an external modification to /collections/c2/state.json
+        byte[] data = zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, null, true);
+        zkClient.setData(ZkStateReader.getCollectionPath("c2"), data, true);
+
+        // get the most up-to-date state
+        reader.forceUpdateCollection("c2");
+        state = reader.getClusterState();
+        log.info("Cluster state: {}", state);
+        assertTrue(state.hasCollection("c2"));
+        assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion());
+        assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion());
+
+        // enqueue an update to stateFormat2 collection such that update is pending
+        state = writer.enqueueUpdate(state, c2, null);
+        assertTrue(writer.hasPendingUpdates());
+
+        // get the most up-to-date state
+        reader.forceUpdateCollection("c2");
+        state = reader.getClusterState();
+
+        // enqueue a stateFormat=1 collection which should cause a flush
+        ZkWriteCommand c1 = new ZkWriteCommand("c1",
+            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
+
+        try {
+          writer.enqueueUpdate(state, c1, null);
+          fail("Enqueue should not have succeeded");
+        } catch (KeeperException.BadVersionException bve) {
+          // expected
+        }
+
+        try {
+          writer.enqueueUpdate(reader.getClusterState(), c2, null);
+          fail("enqueueUpdate after BadVersionException should not have succeeded");
+        } catch (IllegalStateException e) {
+          // expected
+        }
+
+        try {
+          writer.writePendingUpdates();
+          fail("writePendingUpdates after BadVersionException should not have succeeded");
+        } catch (IllegalStateException e) {
+          // expected
+        }
       }
     } finally {
       IOUtils.close(zkClient);