You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/08/13 21:36:54 UTC
svn commit: r1695763 - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/cloud/
solr/core/src/java/org/apache/solr/cloud/overseer/
solr/core/src/test/org/apache/solr/cloud/
solr/core/src/test/org/apache/solr/clou...
Author: shalin
Date: Thu Aug 13 19:36:54 2015
New Revision: 1695763
URL: http://svn.apache.org/r1695763
Log:
SOLR-7869: Overseer does not handle BadVersionException correctly and, in some cases, can go into an infinite loop if cluster state in ZooKeeper is modified externally
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1695763&r1=1695762&r2=1695763&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Thu Aug 13 19:36:54 2015
@@ -49,6 +49,10 @@ Bug Fixes
* SOLR-7836: Possible deadlock when closing refcounted index writers.
(Jessica Cheng Mallet, Erick Erickson)
+* SOLR-7869: Overseer does not handle BadVersionException correctly and, in some cases,
+ can go into an infinite loop if cluster state in ZooKeeper is modified externally.
+ (Scott Blum, shalin)
+
Optimizations
----------------------
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1695763&r1=1695762&r2=1695763&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Thu Aug 13 19:36:54 2015
@@ -136,7 +136,7 @@ public class Overseer implements Closeab
log.info("Starting to work on the main queue");
try {
- ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
+ ZkStateWriter zkStateWriter = null;
ClusterState clusterState = null;
boolean refreshClusterState = true; // let's refresh in the first iteration
while (!this.isClosed) {
@@ -153,6 +153,7 @@ public class Overseer implements Closeab
try {
reader.updateClusterState();
clusterState = reader.getClusterState();
+ zkStateWriter = new ZkStateWriter(reader, stats);
refreshClusterState = false;
// if there were any errors while processing
@@ -237,13 +238,16 @@ public class Overseer implements Closeab
// clean work queue
while (workQueue.poll() != null);
+ } catch (KeeperException.BadVersionException bve) {
+ log.warn("Bad version writing to ZK using compare-and-set, will force refresh cluster state", bve);
+ refreshClusterState = true;
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;
}
log.error("Exception in Overseer main queue loop", e);
- refreshClusterState = true; // it might have been a bad version error
+ refreshClusterState = true; // force refresh state in case of all errors
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java?rev=1695763&r1=1695762&r2=1695763&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java Thu Aug 13 19:36:54 2015
@@ -36,8 +36,28 @@ import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonMap;
+/**
+ * ZkStateWriter is responsible for writing updates to the cluster state stored in ZooKeeper for
+ * both stateFormat=1 collection (stored in shared /clusterstate.json in ZK) and stateFormat=2 collections
+ * each of which get their own individual state.json in ZK.
+ *
+ * Updates to the cluster state are specified using the
+ * {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} method. The class buffers updates
+ * to reduce the number of writes to ZK. The buffered updates are flushed during <code>enqueueUpdate</code>
+ * automatically if necessary. The {@link #writePendingUpdates()} can be used to force flush any pending updates.
+ *
+ * If either {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} or {@link #writePendingUpdates()}
+ * throws a {@link org.apache.zookeeper.KeeperException.BadVersionException} then the internal buffered state of the
+ * class is suspect and the current instance of the class should be discarded and a new instance should be created
+ * and used for any future updates.
+ */
public class ZkStateWriter {
+ private static final long MAX_FLUSH_INTERVAL = TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
private static Logger log = LoggerFactory.getLogger(ZkStateWriter.class);
+
+ /**
+ * Represents a no-op {@link ZkWriteCommand} which will result in no modification to cluster state
+ */
public static ZkWriteCommand NO_OP = ZkWriteCommand.noop();
protected final ZkStateReader reader;
@@ -52,6 +72,12 @@ public class ZkStateWriter {
protected int lastStateFormat = -1; // sentinel value
protected String lastCollectionName = null;
+ /**
+ * Set to true if we ever get a BadVersionException so that we can disallow future operations
+ * with this instance
+ */
+ protected boolean invalidState = false;
+
public ZkStateWriter(ZkStateReader zkStateReader, Overseer.Stats stats) {
assert zkStateReader != null;
@@ -59,7 +85,32 @@ public class ZkStateWriter {
this.stats = stats;
}
- public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws Exception {
+ /**
+ * Applies the given {@link ZkWriteCommand} on the <code>prevState</code>. The modified
+ * {@link ClusterState} is returned and it is expected that the caller will use the returned
+ * cluster state for the subsequent invocation of this method.
+ * <p>
+ * The modified state may be buffered or flushed to ZooKeeper depending on the internal buffering
+ * logic of this class. The {@link #hasPendingUpdates()} method may be used to determine if the
+ * last enqueue operation resulted in buffered state. The method {@link #writePendingUpdates()} can
+ * be used to force an immediate flush of pending cluster state changes.
+ *
+ * @param prevState the cluster state information on which the given <code>cmd</code> is applied
+ * @param cmd the {@link ZkWriteCommand} which specifies the change to be applied to cluster state
+ * @param callback a {@link org.apache.solr.cloud.overseer.ZkStateWriter.ZkWriteCallback} object to be used
+ * for any callbacks
+ * @return modified cluster state created after applying <code>cmd</code> to <code>prevState</code>. If
+ * <code>cmd</code> is a no-op ({@link #NO_OP}) then the <code>prevState</code> is returned unmodified.
+ * @throws IllegalStateException if the current instance is no longer usable. The current instance must be
+ * discarded.
+ * @throws Exception on an error in ZK operations or callback. If a flush to ZooKeeper results
+ * in a {@link org.apache.zookeeper.KeeperException.BadVersionException} this instance becomes unusable and
+ * must be discarded
+ */
+ public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws IllegalStateException, Exception {
+ if (invalidState) {
+ throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
+ }
if (cmd == NO_OP) return prevState;
if (maybeFlushBefore(cmd)) {
@@ -129,14 +180,25 @@ public class ZkStateWriter {
return false;
lastCollectionName = cmd.name;
lastStateFormat = cmd.collection.getStateFormat();
- return System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
+ return System.nanoTime() - lastUpdatedTime > MAX_FLUSH_INTERVAL;
}
public boolean hasPendingUpdates() {
return !updates.isEmpty() || isClusterStateModified;
}
- public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
+ /**
+ * Writes all pending updates to ZooKeeper and returns the modified cluster state
+ *
+ * @return the modified cluster state
+ * @throws IllegalStateException if the current instance is no longer usable and must be discarded
+ * @throws KeeperException if any ZooKeeper operation results in an error
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
+ if (invalidState) {
+ throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
+ }
if (!hasPendingUpdates()) return clusterState;
TimerContext timerContext = stats.time("update_state");
boolean success = false;
@@ -188,6 +250,10 @@ public class ZkStateWriter {
isClusterStateModified = false;
}
success = true;
+ } catch (KeeperException.BadVersionException bve) {
+ // this is a tragic error, we must disallow usage of this instance
+ invalidState = true;
+ throw bve;
} finally {
timerContext.stop();
if (success) {
Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1695763&r1=1695762&r2=1695763&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Thu Aug 13 19:36:54 2015
@@ -17,6 +17,7 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
+import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -31,8 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.xml.parsers.ParserConfigurationException;
-
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.overseer.OverseerAction;
@@ -58,6 +57,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -573,7 +573,7 @@ public class OverseerTest extends SolrTe
q.offer(Utils.toJSON(m));
- verifyStatus(reader, Replica.State.ACTIVE);
+ verifyStatus(reader, "collection1", "shard1", "core_node1", Replica.State.ACTIVE);
} finally {
@@ -585,20 +585,20 @@ public class OverseerTest extends SolrTe
}
}
- private void verifyStatus(ZkStateReader reader, Replica.State expectedState) throws InterruptedException {
+ private void verifyStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, Replica.State expectedState) throws InterruptedException {
int maxIterations = 100;
Replica.State coreState = null;
while(maxIterations-->0) {
- Slice slice = reader.getClusterState().getSlice("collection1", "shard1");
+ Slice slice = reader.getClusterState().getSlice(collection, shard);
if(slice!=null) {
- coreState = slice.getReplicasMap().get("core_node1").getState();
+ coreState = slice.getReplicasMap().get(coreNodeName).getState();
if(coreState == expectedState) {
return;
}
}
Thread.sleep(50);
}
- fail("Illegal state, was:" + coreState + " expected:" + expectedState + "clusterState:" + reader.getClusterState());
+ fail("Illegal state, was: " + coreState + " expected:" + expectedState + " clusterState:" + reader.getClusterState());
}
private void verifyShardLeader(ZkStateReader reader, String collection, String shard, String expectedCore) throws InterruptedException, KeeperException {
@@ -649,7 +649,7 @@ public class OverseerTest extends SolrTe
mockController.publishState(collection, "core1", "core_node1", Replica.State.RECOVERING, 1);
waitForCollections(reader, collection);
- verifyStatus(reader, Replica.State.RECOVERING);
+ verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
int version = getClusterStateVersion(zkClient);
@@ -657,7 +657,7 @@ public class OverseerTest extends SolrTe
while (version == getClusterStateVersion(zkClient));
- verifyStatus(reader, Replica.State.ACTIVE);
+ verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.ACTIVE);
version = getClusterStateVersion(zkClient);
overseerClient.close();
Thread.sleep(1000); // wait for overseer to get killed
@@ -669,7 +669,7 @@ public class OverseerTest extends SolrTe
while (version == getClusterStateVersion(zkClient));
- verifyStatus(reader, Replica.State.RECOVERING);
+ verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
assertEquals("Live nodes count does not match", 1, reader
.getClusterState().getLiveNodes().size());
@@ -884,7 +884,7 @@ public class OverseerTest extends SolrTe
waitForCollections(reader, "collection1");
- verifyStatus(reader, Replica.State.RECOVERING);
+ verifyStatus(reader, collection, "shard1", "core_node1", Replica.State.RECOVERING);
mockController.close();
@@ -1161,6 +1161,111 @@ public class OverseerTest extends SolrTe
close(reader);
server.shutdown();
}
+ }
+
+ @Test
+ public void testExternalClusterStateChangeBehavior() throws Exception {
+ String zkDir = createTempDir("testExternalClusterStateChangeBehavior").toFile().getAbsolutePath();
+
+ ZkTestServer server = new ZkTestServer(zkDir);
+
+ SolrZkClient zkClient = null;
+ ZkStateReader reader = null;
+ SolrZkClient overseerClient = null;
+
+ try {
+ server.run();
+ zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+
+ AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+ AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+ ZkController.createClusterZkNodes(zkClient);
+
+ zkClient.create("/collections/test", null, CreateMode.PERSISTENT, true);
+
+ reader = new ZkStateReader(zkClient);
+ reader.createClusterStateWatchersAndUpdate();
+
+ overseerClient = electNewOverseer(server.getZkAddress());
+
+ DistributedQueue q = Overseer.getInQueue(zkClient);
+
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
+ ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+ ZkStateReader.NODE_NAME_PROP, "node1",
+ ZkStateReader.COLLECTION_PROP, "c1",
+ ZkStateReader.CORE_NAME_PROP, "core1",
+ ZkStateReader.ROLES_PROP, "",
+ ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
+
+ q.offer(Utils.toJSON(m));
+
+ waitForCollections(reader, "c1");
+ verifyStatus(reader, "c1", "shard1", "core_node1", Replica.State.DOWN);
+
+ m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
+ ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+ ZkStateReader.NODE_NAME_PROP, "node1",
+ ZkStateReader.COLLECTION_PROP, "c1",
+ ZkStateReader.CORE_NAME_PROP, "core1",
+ ZkStateReader.ROLES_PROP, "",
+ ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
+
+ q.offer(Utils.toJSON(m));
+
+
+ m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
+ ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+ ZkStateReader.NODE_NAME_PROP, "node1",
+ ZkStateReader.COLLECTION_PROP, "c1",
+ ZkStateReader.CORE_NAME_PROP, "core1",
+ ZkStateReader.ROLES_PROP, "",
+ ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
+
+ q.offer(Utils.toJSON(m));
+
+ Stat stat = new Stat();
+ byte[] data = zkClient.getData("/clusterstate.json", null, stat, true);
+ // Simulate an external modification
+ zkClient.setData("/clusterstate.json", data, true);
+
+ m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
+ "name", "test",
+ ZkStateReader.NUM_SHARDS_PROP, "1",
+ ZkStateReader.REPLICATION_FACTOR, "1",
+ DocCollection.STATE_FORMAT, "2"
+ );
+ q.offer(Utils.toJSON(m));
+
+ m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATESHARD.toLower(),
+ "collection", "test",
+ ZkStateReader.SHARD_ID_PROP, "x",
+ ZkStateReader.REPLICATION_FACTOR, "1"
+ );
+ q.offer(Utils.toJSON(m));
+
+ m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.ADDREPLICA.toLower(),
+ "collection", "test",
+ ZkStateReader.SHARD_ID_PROP, "x",
+ ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+ ZkStateReader.NODE_NAME_PROP, "node1",
+ ZkStateReader.CORE_NAME_PROP, "core1",
+ ZkStateReader.STATE_PROP, Replica.State.DOWN.toString()
+ );
+ q.offer(Utils.toJSON(m));
+
+ waitForCollections(reader, "test");
+ verifyStatus(reader, "test", "x", "core_node1", Replica.State.DOWN);
+
+ waitForCollections(reader, "c1");
+ verifyStatus(reader, "c1", "shard1", "core_node1", Replica.State.ACTIVE);
+
+ } finally {
+ close(zkClient);
+ close(overseerClient);
+ close(reader);
+ server.shutdown();
+ }
}
private void close(ZkStateReader reader) {
Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java?rev=1695763&r1=1695762&r2=1695763&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java Thu Aug 13 19:36:54 2015
@@ -31,6 +31,7 @@ import org.apache.solr.common.cloud.Slic
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.KeeperException;
import java.util.HashMap;
import java.util.Map;
@@ -204,4 +205,160 @@ public class ZkStateWriterTest extends S
}
+ public void testExternalModificationToSharedClusterState() throws Exception {
+ String zkDir = createTempDir("testExternalModification").toFile().getAbsolutePath();
+
+ ZkTestServer server = new ZkTestServer(zkDir);
+
+ SolrZkClient zkClient = null;
+
+ try {
+ server.run();
+ AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+ AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+ zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+ ZkController.createClusterZkNodes(zkClient);
+
+ ZkStateReader reader = new ZkStateReader(zkClient);
+ reader.createClusterStateWatchersAndUpdate();
+
+ ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+
+ // create collection 1 with stateFormat = 1
+ ZkWriteCommand c1 = new ZkWriteCommand("c1",
+ new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
+ writer.enqueueUpdate(reader.getClusterState(), c1, null);
+ writer.writePendingUpdates();
+
+ reader.updateClusterState();
+ ClusterState clusterState = reader.getClusterState(); // keep a reference to the current cluster state object
+ assertTrue(clusterState.hasCollection("c1"));
+ assertFalse(clusterState.hasCollection("c2"));
+
+ // Simulate an external modification to /clusterstate.json
+ byte[] data = zkClient.getData("/clusterstate.json", null, null, true);
+ zkClient.setData("/clusterstate.json", data, true);
+
+ // enqueue another c1 so that ZkStateWriter has pending updates
+ writer.enqueueUpdate(clusterState, c1, null);
+ assertTrue(writer.hasPendingUpdates());
+
+ // create collection 2 with stateFormat = 1
+ ZkWriteCommand c2 = new ZkWriteCommand("c2",
+ new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
+
+ try {
+ writer.enqueueUpdate(clusterState, c2, null); // we are sending in the old cluster state object
+ fail("Enqueue should not have succeeded");
+ } catch (KeeperException.BadVersionException bve) {
+ // expected
+ }
+
+ reader.updateClusterState();
+ try {
+ writer.enqueueUpdate(reader.getClusterState(), c2, null);
+ fail("enqueueUpdate after BadVersionException should not have suceeded");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ try {
+ writer.writePendingUpdates();
+ fail("writePendingUpdates after BadVersionException should not have suceeded");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ } finally {
+ IOUtils.close(zkClient);
+ server.shutdown();
+ }
+ }
+
+ public void testExternalModificationToStateFormat2() throws Exception {
+ String zkDir = createTempDir("testExternalModificationToStateFormat2").toFile().getAbsolutePath();
+
+ ZkTestServer server = new ZkTestServer(zkDir);
+
+ SolrZkClient zkClient = null;
+
+ try {
+ server.run();
+ AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+ AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+ zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+ ZkController.createClusterZkNodes(zkClient);
+
+ ZkStateReader reader = new ZkStateReader(zkClient);
+ reader.createClusterStateWatchersAndUpdate();
+
+ ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+
+ ClusterState state = reader.getClusterState();
+
+ // create collection 2 with stateFormat = 2
+ ZkWriteCommand c2 = new ZkWriteCommand("c2",
+ new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
+ state = writer.enqueueUpdate(reader.getClusterState(), c2, null);
+ assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
+
+ int sharedClusterStateVersion = state.getZkClusterStateVersion();
+ int stateFormat2Version = state.getCollection("c2").getZNodeVersion();
+
+ // Simulate an external modification to /collections/c2/state.json
+ byte[] data = zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, null, true);
+ zkClient.setData(ZkStateReader.getCollectionPath("c2"), data, true);
+
+ // get the most up-to-date state
+ reader.updateClusterState();
+ state = reader.getClusterState();
+ assertTrue(state.hasCollection("c2"));
+ assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion());
+ assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion());
+
+ // enqueue an update to stateFormat2 collection such that update is pending
+ state = writer.enqueueUpdate(state, c2, null);
+ assertTrue(writer.hasPendingUpdates());
+
+ // get the most up-to-date state
+ reader.updateClusterState();
+ state = reader.getClusterState();
+
+ // enqueue a stateFormat=1 collection which should cause a flush
+ ZkWriteCommand c1 = new ZkWriteCommand("c1",
+ new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
+
+ try {
+ state = writer.enqueueUpdate(state, c1, null);
+ fail("Enqueue should not have succeeded");
+ } catch (KeeperException.BadVersionException bve) {
+ // expected
+ }
+
+ try {
+ writer.enqueueUpdate(reader.getClusterState(), c2, null);
+ fail("enqueueUpdate after BadVersionException should not have suceeded");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ try {
+ writer.writePendingUpdates();
+ fail("writePendingUpdates after BadVersionException should not have suceeded");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+ } finally {
+ IOUtils.close(zkClient);
+ server.shutdown();
+ }
+ }
}