You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/08/13 21:36:54 UTC

svn commit: r1695763 - in /lucene/dev/branches/branch_5x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/cloud/overseer/ solr/core/src/test/org/apache/solr/cloud/ solr/core/src/test/org/apache/solr/clou...

Author: shalin
Date: Thu Aug 13 19:36:54 2015
New Revision: 1695763

URL: http://svn.apache.org/r1695763
Log:
SOLR-7869: Overseer does not handle BadVersionException correctly and, in some cases, can go into an infinite loop if cluster state in ZooKeeper is modified externally

Modified:
    lucene/dev/branches/branch_5x/   (props changed)
    lucene/dev/branches/branch_5x/solr/   (props changed)
    lucene/dev/branches/branch_5x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_5x/solr/core/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java

Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1695763&r1=1695762&r2=1695763&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Thu Aug 13 19:36:54 2015
@@ -49,6 +49,10 @@ Bug Fixes
 * SOLR-7836: Possible deadlock when closing refcounted index writers.
   (Jessica Cheng Mallet, Erick Erickson)
 
+* SOLR-7869: Overseer does not handle BadVersionException correctly and, in some cases,
+  can go into an infinite loop if cluster state in ZooKeeper is modified externally.
+  (Scott Blum, shalin)
+
 Optimizations
 ----------------------
 

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1695763&r1=1695762&r2=1695763&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Thu Aug 13 19:36:54 2015
@@ -136,7 +136,7 @@ public class Overseer implements Closeab
 
       log.info("Starting to work on the main queue");
       try {
-        ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
+        ZkStateWriter zkStateWriter = null;
         ClusterState clusterState = null;
         boolean refreshClusterState = true; // let's refresh in the first iteration
         while (!this.isClosed) {
@@ -153,6 +153,7 @@ public class Overseer implements Closeab
             try {
               reader.updateClusterState();
               clusterState = reader.getClusterState();
+              zkStateWriter = new ZkStateWriter(reader, stats);
               refreshClusterState = false;
 
               // if there were any errors while processing
@@ -237,13 +238,16 @@ public class Overseer implements Closeab
             // clean work queue
             while (workQueue.poll() != null);
 
+          } catch (KeeperException.BadVersionException bve) {
+            log.warn("Bad version writing to ZK using compare-and-set, will force refresh cluster state", bve);
+            refreshClusterState = true;
           } catch (KeeperException e) {
             if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
               log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
               return;
             }
             log.error("Exception in Overseer main queue loop", e);
-            refreshClusterState = true; // it might have been a bad version error
+            refreshClusterState = true; // force refresh state in case of all errors
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             return;

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java?rev=1695763&r1=1695762&r2=1695763&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java Thu Aug 13 19:36:54 2015
@@ -36,8 +36,28 @@ import org.slf4j.LoggerFactory;
 
 import static java.util.Collections.singletonMap;
 
+/**
+ * ZkStateWriter is responsible for writing updates to the cluster state stored in ZooKeeper for
+ * both stateFormat=1 collection (stored in shared /clusterstate.json in ZK) and stateFormat=2 collections
+ * each of which get their own individual state.json in ZK.
+ *
+ * Updates to the cluster state are specified using the
+ * {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} method. The class buffers updates
+ * to reduce the number of writes to ZK. The buffered updates are flushed during <code>enqueueUpdate</code>
+ * automatically if necessary. The {@link #writePendingUpdates()} can be used to force flush any pending updates.
+ *
+ * If either {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} or {@link #writePendingUpdates()}
+ * throws a {@link org.apache.zookeeper.KeeperException.BadVersionException} then the internal buffered state of the
+ * class is suspect and the current instance of the class should be discarded and a new instance should be created
+ * and used for any future updates.
+ */
 public class ZkStateWriter {
+  private static final long MAX_FLUSH_INTERVAL = TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
   private static Logger log = LoggerFactory.getLogger(ZkStateWriter.class);
+
+  /**
+   * Represents a no-op {@link ZkWriteCommand} which will result in no modification to cluster state
+   */
   public static ZkWriteCommand NO_OP = ZkWriteCommand.noop();
 
   protected final ZkStateReader reader;
@@ -52,6 +72,12 @@ public class ZkStateWriter {
   protected int lastStateFormat = -1; // sentinel value
   protected String lastCollectionName = null;
 
+  /**
+   * Set to true if we ever get a BadVersionException so that we can disallow future operations
+   * with this instance
+   */
+  protected boolean invalidState = false;
+
   public ZkStateWriter(ZkStateReader zkStateReader, Overseer.Stats stats) {
     assert zkStateReader != null;
 
@@ -59,7 +85,32 @@ public class ZkStateWriter {
     this.stats = stats;
   }
 
-  public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws Exception {
+  /**
+   * Applies the given {@link ZkWriteCommand} on the <code>prevState</code>. The modified
+   * {@link ClusterState} is returned and it is expected that the caller will use the returned
+   * cluster state for the subsequent invocation of this method.
+   * <p>
+   * The modified state may be buffered or flushed to ZooKeeper depending on the internal buffering
+   * logic of this class. The {@link #hasPendingUpdates()} method may be used to determine if the
+   * last enqueue operation resulted in buffered state. The method {@link #writePendingUpdates()} can
+   * be used to force an immediate flush of pending cluster state changes.
+   *
+   * @param prevState the cluster state information on which the given <code>cmd</code> is applied
+   * @param cmd       the {@link ZkWriteCommand} which specifies the change to be applied to cluster state
+   * @param callback  a {@link org.apache.solr.cloud.overseer.ZkStateWriter.ZkWriteCallback} object to be used
+   *                  for any callbacks
+   * @return modified cluster state created after applying <code>cmd</code> to <code>prevState</code>. If
+   * <code>cmd</code> is a no-op ({@link #NO_OP}) then the <code>prevState</code> is returned unmodified.
+   * @throws IllegalStateException if the current instance is no longer usable. The current instance must be
+   *                               discarded.
+   * @throws Exception             on an error in ZK operations or callback. If a flush to ZooKeeper results
+   *                               in a {@link org.apache.zookeeper.KeeperException.BadVersionException} this instance becomes unusable and
+   *                               must be discarded
+   */
+  public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws IllegalStateException, Exception {
+    if (invalidState) {
+      throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
+    }
     if (cmd == NO_OP) return prevState;
 
     if (maybeFlushBefore(cmd)) {
@@ -129,14 +180,25 @@ public class ZkStateWriter {
       return false;
     lastCollectionName = cmd.name;
     lastStateFormat = cmd.collection.getStateFormat();
-    return System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
+    return System.nanoTime() - lastUpdatedTime > MAX_FLUSH_INTERVAL;
   }
 
   public boolean hasPendingUpdates() {
     return !updates.isEmpty() || isClusterStateModified;
   }
 
-  public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
+  /**
+   * Writes all pending updates to ZooKeeper and returns the modified cluster state
+   *
+   * @return the modified cluster state
+   * @throws IllegalStateException if the current instance is no longer usable and must be discarded
+   * @throws KeeperException       if any ZooKeeper operation results in an error
+   * @throws InterruptedException  if the current thread is interrupted
+   */
+  public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
+    if (invalidState) {
+      throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
+    }
     if (!hasPendingUpdates()) return clusterState;
     TimerContext timerContext = stats.time("update_state");
     boolean success = false;
@@ -188,6 +250,10 @@ public class ZkStateWriter {
         isClusterStateModified = false;
       }
       success = true;
+    } catch (KeeperException.BadVersionException bve) {
+      // this is a tragic error, we must disallow usage of this instance
+      invalidState = true;
+      throw bve;
     } finally {
       timerContext.stop();
       if (success) {

Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1695763&r1=1695762&r2=1695763&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Thu Aug 13 19:36:54 2015
@@ -17,6 +17,7 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
+import javax.xml.parsers.ParserConfigurationException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -31,8 +32,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.xml.parsers.ParserConfigurationException;
-
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.cloud.overseer.OverseerAction;
@@ -58,6 +57,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.data.Stat;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -573,7 +573,7 @@ public class OverseerTest extends SolrTe
 
       q.offer(Utils.toJSON(m));
 
-      verifyStatus(reader, Replica.State.ACTIVE);
+      verifyStatus(reader, "collection1", "shard1", "core_node1", Replica.State.ACTIVE);
 
     } finally {
 
@@ -585,20 +585,20 @@ public class OverseerTest extends SolrTe
     }
   }
 
-  private void verifyStatus(ZkStateReader reader, Replica.State expectedState) throws InterruptedException {
+  private void verifyStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, Replica.State expectedState) throws InterruptedException {
     int maxIterations = 100;
     Replica.State coreState = null;
     while(maxIterations-->0) {
-      Slice slice = reader.getClusterState().getSlice("collection1", "shard1");
+      Slice slice = reader.getClusterState().getSlice(collection, shard);
       if(slice!=null) {
-        coreState = slice.getReplicasMap().get("core_node1").getState();
+        coreState = slice.getReplicasMap().get(coreNodeName).getState();
         if(coreState == expectedState) {
           return;
         }
       }
       Thread.sleep(50);
     }
-    fail("Illegal state, was:" + coreState + " expected:" + expectedState + "clusterState:" + reader.getClusterState());
+    fail("Illegal state, was: " + coreState + " expected:" + expectedState + " clusterState:" + reader.getClusterState());
   }
   
   private void verifyShardLeader(ZkStateReader reader, String collection, String shard, String expectedCore) throws InterruptedException, KeeperException {
@@ -649,7 +649,7 @@ public class OverseerTest extends SolrTe
       mockController.publishState(collection, "core1", "core_node1", Replica.State.RECOVERING, 1);
       
       waitForCollections(reader, collection);
-      verifyStatus(reader, Replica.State.RECOVERING);
+      verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
       
       int version = getClusterStateVersion(zkClient);
       
@@ -657,7 +657,7 @@ public class OverseerTest extends SolrTe
       
       while (version == getClusterStateVersion(zkClient));
       
-      verifyStatus(reader, Replica.State.ACTIVE);
+      verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.ACTIVE);
       version = getClusterStateVersion(zkClient);
       overseerClient.close();
       Thread.sleep(1000); // wait for overseer to get killed
@@ -669,7 +669,7 @@ public class OverseerTest extends SolrTe
       
       while (version == getClusterStateVersion(zkClient));
       
-      verifyStatus(reader, Replica.State.RECOVERING);
+      verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
       
       assertEquals("Live nodes count does not match", 1, reader
           .getClusterState().getLiveNodes().size());
@@ -884,7 +884,7 @@ public class OverseerTest extends SolrTe
 
       waitForCollections(reader, "collection1");
       
-      verifyStatus(reader, Replica.State.RECOVERING);
+      verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
 
       mockController.close();
 
@@ -1161,6 +1161,111 @@ public class OverseerTest extends SolrTe
       close(reader);
       server.shutdown();
     }
+  }
+
+  @Test
+  public void testExternalClusterStateChangeBehavior() throws Exception {
+    String zkDir = createTempDir("testExternalClusterStateChangeBehavior").toFile().getAbsolutePath();
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+    ZkStateReader reader = null;
+    SolrZkClient overseerClient = null;
+
+    try {
+      server.run();
+      zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+      ZkController.createClusterZkNodes(zkClient);
+
+      zkClient.create("/collections/test", null, CreateMode.PERSISTENT, true);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+
+      overseerClient = electNewOverseer(server.getZkAddress());
+
+      DistributedQueue q = Overseer.getInQueue(zkClient);
+
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
+          ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+          ZkStateReader.NODE_NAME_PROP, "node1",
+          ZkStateReader.COLLECTION_PROP, "c1",
+          ZkStateReader.CORE_NAME_PROP, "core1",
+          ZkStateReader.ROLES_PROP, "",
+          ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
+
+      q.offer(Utils.toJSON(m));
+
+      waitForCollections(reader, "c1");
+      verifyStatus(reader, "c1", "shard1", "core_node1", Replica.State.DOWN);
+
+      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
+          ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+          ZkStateReader.NODE_NAME_PROP, "node1",
+          ZkStateReader.COLLECTION_PROP, "c1",
+          ZkStateReader.CORE_NAME_PROP, "core1",
+          ZkStateReader.ROLES_PROP, "",
+          ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
+
+      q.offer(Utils.toJSON(m));
+
+
+      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
+          ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+          ZkStateReader.NODE_NAME_PROP, "node1",
+          ZkStateReader.COLLECTION_PROP, "c1",
+          ZkStateReader.CORE_NAME_PROP, "core1",
+          ZkStateReader.ROLES_PROP, "",
+          ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
+
+      q.offer(Utils.toJSON(m));
+
+      Stat stat = new Stat();
+      byte[] data = zkClient.getData("/clusterstate.json", null, stat, true);
+      // Simulate an external modification
+      zkClient.setData("/clusterstate.json", data, true);
+
+      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
+          "name", "test",
+          ZkStateReader.NUM_SHARDS_PROP, "1",
+          ZkStateReader.REPLICATION_FACTOR, "1",
+          DocCollection.STATE_FORMAT, "2"
+      );
+      q.offer(Utils.toJSON(m));
+
+      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATESHARD.toLower(),
+          "collection", "test",
+          ZkStateReader.SHARD_ID_PROP, "x",
+          ZkStateReader.REPLICATION_FACTOR, "1"
+      );
+      q.offer(Utils.toJSON(m));
+
+      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.ADDREPLICA.toLower(),
+          "collection", "test",
+          ZkStateReader.SHARD_ID_PROP, "x",
+          ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+          ZkStateReader.NODE_NAME_PROP, "node1",
+          ZkStateReader.CORE_NAME_PROP, "core1",
+          ZkStateReader.STATE_PROP, Replica.State.DOWN.toString()
+      );
+      q.offer(Utils.toJSON(m));
+
+      waitForCollections(reader, "test");
+      verifyStatus(reader, "test", "x", "core_node1", Replica.State.DOWN);
+
+      waitForCollections(reader, "c1");
+      verifyStatus(reader, "c1", "shard1", "core_node1", Replica.State.ACTIVE);
+
+    } finally {
+      close(zkClient);
+      close(overseerClient);
+      close(reader);
+      server.shutdown();
+    }
   }
 
   private void close(ZkStateReader reader) {

Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java?rev=1695763&r1=1695762&r2=1695763&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java Thu Aug 13 19:36:54 2015
@@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.Slic
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.KeeperException;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -204,4 +205,160 @@ public class ZkStateWriterTest extends S
 
   }
 
+  public void testExternalModificationToSharedClusterState() throws Exception {
+    String zkDir = createTempDir("testExternalModification").toFile().getAbsolutePath();
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+
+    try {
+      server.run();
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      ZkStateReader reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+
+      // create collection 1 with stateFormat = 1
+      ZkWriteCommand c1 = new ZkWriteCommand("c1",
+          new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
+      writer.enqueueUpdate(reader.getClusterState(), c1, null);
+      writer.writePendingUpdates();
+
+      reader.updateClusterState();
+      ClusterState clusterState = reader.getClusterState(); // keep a reference to the current cluster state object
+      assertTrue(clusterState.hasCollection("c1"));
+      assertFalse(clusterState.hasCollection("c2"));
+
+      // Simulate an external modification to /clusterstate.json
+      byte[] data = zkClient.getData("/clusterstate.json", null, null, true);
+      zkClient.setData("/clusterstate.json", data, true);
+
+      // enqueue another c1 so that ZkStateWriter has pending updates
+      writer.enqueueUpdate(clusterState, c1, null);
+      assertTrue(writer.hasPendingUpdates());
+
+      // create collection 2 with stateFormat = 1
+      ZkWriteCommand c2 = new ZkWriteCommand("c2",
+          new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
+
+      try {
+        writer.enqueueUpdate(clusterState, c2, null); // we are sending in the old cluster state object
+        fail("Enqueue should not have succeeded");
+      } catch (KeeperException.BadVersionException bve) {
+        // expected
+      }
+
+      reader.updateClusterState();
+      try {
+        writer.enqueueUpdate(reader.getClusterState(), c2, null);
+        fail("enqueueUpdate after BadVersionException should not have suceeded");
+      } catch (IllegalStateException e) {
+        // expected
+      }
+
+      try {
+        writer.writePendingUpdates();
+        fail("writePendingUpdates after BadVersionException should not have suceeded");
+      } catch (IllegalStateException e) {
+        // expected
+      }
+
+    } finally {
+      IOUtils.close(zkClient);
+      server.shutdown();
+    }
+  }
+
+  public void testExternalModificationToStateFormat2() throws Exception {
+    String zkDir = createTempDir("testExternalModificationToStateFormat2").toFile().getAbsolutePath();
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+
+    try {
+      server.run();
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      ZkStateReader reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+
+      ClusterState state = reader.getClusterState();
+
+      // create collection 2 with stateFormat = 2
+      ZkWriteCommand c2 = new ZkWriteCommand("c2",
+          new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
+      state = writer.enqueueUpdate(reader.getClusterState(), c2, null);
+      assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
+
+      int sharedClusterStateVersion = state.getZkClusterStateVersion();
+      int stateFormat2Version = state.getCollection("c2").getZNodeVersion();
+
+      // Simulate an external modification to /collections/c2/state.json
+      byte[] data = zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, null, true);
+      zkClient.setData(ZkStateReader.getCollectionPath("c2"), data, true);
+
+      // get the most up-to-date state
+      reader.updateClusterState();
+      state = reader.getClusterState();
+      assertTrue(state.hasCollection("c2"));
+      assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion());
+      assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion());
+
+      // enqueue an update to stateFormat2 collection such that update is pending
+      state = writer.enqueueUpdate(state, c2, null);
+      assertTrue(writer.hasPendingUpdates());
+
+      // get the most up-to-date state
+      reader.updateClusterState();
+      state = reader.getClusterState();
+
+      // enqueue a stateFormat=1 collection which should cause a flush
+      ZkWriteCommand c1 = new ZkWriteCommand("c1",
+          new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
+
+      try {
+        state = writer.enqueueUpdate(state, c1, null);
+        fail("Enqueue should not have succeeded");
+      } catch (KeeperException.BadVersionException bve) {
+        // expected
+      }
+
+      try {
+        writer.enqueueUpdate(reader.getClusterState(), c2, null);
+        fail("enqueueUpdate after BadVersionException should not have suceeded");
+      } catch (IllegalStateException e) {
+        // expected
+      }
+
+      try {
+        writer.writePendingUpdates();
+        fail("writePendingUpdates after BadVersionException should not have suceeded");
+      } catch (IllegalStateException e) {
+        // expected
+      }
+    } finally {
+      IOUtils.close(zkClient);
+      server.shutdown();
+    }
+  }
 }