You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2016/04/13 15:55:09 UTC
[1/2] lucene-solr:master: tests: raise wait time
Repository: lucene-solr
Updated Branches:
refs/heads/master f2f484680 -> 744b419b4
tests: raise wait time
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/6c9391df
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/6c9391df
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/6c9391df
Branch: refs/heads/master
Commit: 6c9391df01bafe35b6e6f7d351f483b5242cb937
Parents: f2f4846
Author: markrmiller <ma...@apache.org>
Authored: Wed Apr 13 09:54:16 2016 -0400
Committer: markrmiller <ma...@apache.org>
Committed: Wed Apr 13 09:54:16 2016 -0400
----------------------------------------------------------------------
.../test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/6c9391df/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 3c45e23..8581995 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -95,7 +95,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
if (explicitRefresh) {
reader.forceUpdateCollection("c1");
} else {
- for (int i = 0; i < 500; ++i) {
+ for (int i = 0; i < 1000; ++i) {
if (reader.getClusterState().hasCollection("c1")) {
break;
}
@@ -123,7 +123,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
if (explicitRefresh) {
reader.forceUpdateCollection("c1");
} else {
- for (int i = 0; i < 500; ++i) {
+ for (int i = 0; i < 1000; ++i) {
if (reader.getClusterState().getCollection("c1").getStateFormat() == 2) {
break;
}
[2/2] lucene-solr:master: SOLR-8914: ZkStateReader's
refreshLiveNodes(Watcher) is not thread safe.
Posted by ma...@apache.org.
SOLR-8914: ZkStateReader's refreshLiveNodes(Watcher) is not thread safe.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/744b419b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/744b419b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/744b419b
Branch: refs/heads/master
Commit: 744b419b42a5797700c0a3a5f859d86ae9d05325
Parents: 6c9391d
Author: markrmiller <ma...@apache.org>
Authored: Wed Apr 13 09:54:40 2016 -0400
Committer: markrmiller <ma...@apache.org>
Committed: Wed Apr 13 09:54:40 2016 -0400
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../apache/solr/cloud/TestStressLiveNodes.java | 252 +++++++++++++++++++
.../apache/solr/common/cloud/ZkStateReader.java | 85 ++++---
3 files changed, 311 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/744b419b/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 0b98fa0..fbe4698 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -108,6 +108,9 @@ Bug Fixes
* SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a
SolrResponse node is in the overseer queue. (Jessica Cheng Mallet via shalin)
+* SOLR-8914: ZkStateReader's refreshLiveNodes(Watcher) is not thread safe. (Scott Blum, hoss,
+ sarowe, Erick Erickson, Mark Miller, shalin)
+
Optimizations
----------------------
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/744b419b/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java b/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java
new file mode 100644
index 0000000..28dcc82
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.core.CloudConfig.CloudConfigBuilder;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+import org.junit.BeforeClass;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stress test LiveNodes watching.
+ *
+ * Does bursts of adds to live_nodes using parallel threads to and verifies that after each
+ * burst a ZkStateReader detects the correct set.
+ */
+@Slow
+public class TestStressLiveNodes extends SolrCloudTestCase {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /** A basic cloud client, we'll be testing the behavior of it's ZkStateReader */
+ private static CloudSolrClient CLOUD_CLIENT;
+
+ /** The addr of the zk server used in this test */
+ private static String ZK_SERVER_ADDR;
+
+ /* how many seconds we're willing to wait for our executor tasks to finish before failing the test */
+ private final static int WAIT_TIME = TEST_NIGHTLY ? 60 : 30;
+
+ @BeforeClass
+ private static void createMiniSolrCloudCluster() throws Exception {
+
+ // we only need 1 node, and we don't care about any configs or collections
+ // we're going to fake all the live_nodes changes we want to fake.
+ configureCluster(1).configure();
+
+ // give all nodes a chance to come alive
+ TestTolerantUpdateProcessorCloud.assertSpinLoopAllJettyAreRunning(cluster);
+
+ CLOUD_CLIENT = cluster.getSolrClient();
+ CLOUD_CLIENT.connect(); // force connection even though we aren't sending any requests
+
+ ZK_SERVER_ADDR = cluster.getZkServer().getZkAddress();
+
+ }
+
+ private static SolrZkClient newSolrZkClient() {
+ assertNotNull(ZK_SERVER_ADDR);
+ // WTF is CloudConfigBuilder.DEFAULT_ZK_CLIENT_TIMEOUT private?
+ return new SolrZkClient(ZK_SERVER_ADDR, 15000);
+ }
+
+ /** returns the true set of live nodes (currently in zk) as a sorted list */
+ private static List<String> getTrueLiveNodesFromZk() throws Exception {
+ SolrZkClient client = newSolrZkClient();
+ try {
+ ArrayList<String> result = new ArrayList<>(client.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null, true));
+ Collections.sort(result);
+ return result;
+ } finally {
+ client.close();
+ }
+ }
+
+ /**
+ * returns the cached set of live nodes (according to the ZkStateReader in our CloudSolrClient)
+ * as a sorted list.
+ * This is done in a sleep+retry loop until the result matches the expectedCount, or a few iters have passed
+ * (this way we aren't testing how fast the watchers complete, just that they got the correct result)
+ */
+ private static List<String> getCachedLiveNodesFromLocalState(final int expectedCount) throws Exception {
+ ArrayList<String> result = null;
+
+ for (int i = 0; i < 10; i++) {
+ result = new ArrayList<>(CLOUD_CLIENT.getZkStateReader().getClusterState().getLiveNodes());
+ if (expectedCount != result.size()) {
+ log.info("sleeping #{} to give watchers a chance to finish: {} != {}",
+ i, expectedCount, result.size());
+ Thread.sleep(200);
+ } else {
+ break;
+ }
+ }
+ if (expectedCount != result.size()) {
+ log.error("gave up waiting for live nodes to match expected size: {} != {}",
+ expectedCount, result.size());
+ }
+ Collections.sort(result);
+ return result;
+ }
+
+ public void testStress() throws Exception {
+
+ // do many iters, so we have "bursts" of adding nodes that we then check
+ final int numIters = atLeast(1000);
+ for (int iter = 0; iter < numIters; iter++) {
+
+ // sanity check that ZK says there is in fact 1 live node
+ List<String> actualLiveNodes = getTrueLiveNodesFromZk();
+ assertEquals("iter"+iter+": " + actualLiveNodes.toString(),
+ 1, actualLiveNodes.size());
+
+ // only here do we forcibly update the cached live nodes so we don't have to wait for it to catch up
+ // with all the ephemeral nodes that vanished after the last iteration
+ CLOUD_CLIENT.getZkStateReader().updateLiveNodes();
+
+ // sanity check that our Cloud Client's local state knows about the 1 (real) live node in our cluster
+ List<String> cachedLiveNodes = getCachedLiveNodesFromLocalState(actualLiveNodes.size());
+ assertEquals("iter"+iter+" " + actualLiveNodes.size() + " != " + cachedLiveNodes.size(),
+ actualLiveNodes, cachedLiveNodes);
+
+
+ // start spining up some threads to add some live_node children in parallel
+
+ // we don't need a lot of threads or nodes (we don't want to swamp the CPUs
+ // just bursts of conccurent adds) but we do want to randomize it a bit so we increase the
+ // odds of concurrent watchers firing regardless of the num CPUs or load on the machine running
+ // the test (but we deliberately don't look at availableProcessors() since we want randomization
+ // consistency across all machines for a given seed)
+ final int numThreads = TestUtil.nextInt(random(), 2, 5);
+
+ // use same num for all thrashers, to increase likely hood of them all competing
+ // (diff random number would mean heavy concurency only for ~ the first N=lowest num requetss)
+ //
+ // this does not need to be a large number -- in fact, the higher it is, the more
+ // likely we are to see a mistake in early watcher triggers get "corrected" by a later one
+ // and overlook a possible bug
+ final int numNodesPerThrasher = TestUtil.nextInt(random(), 1, 5);
+
+ log.info("preparing parallel adds to live nodes: iter={}, numThreads={} numNodesPerThread={}",
+ iter, numThreads, numNodesPerThrasher);
+
+ // NOTE: using ephemeral nodes
+ // so we can't close any of these thrashers until we are done with our assertions
+ final List<LiveNodeTrasher> thrashers = new ArrayList<>(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ thrashers.add(new LiveNodeTrasher("T"+iter+"_"+i, numNodesPerThrasher));
+ }
+ try {
+ final ExecutorService executorService = ExecutorUtil.newMDCAwareFixedThreadPool
+ (thrashers.size()+1, new DefaultSolrThreadFactory("test_live_nodes_thrasher_iter"+iter));
+
+ executorService.invokeAll(thrashers);
+ executorService.shutdown();
+ if (! executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) {
+ for (LiveNodeTrasher thrasher : thrashers) {
+ thrasher.stop();
+ }
+ }
+ assertTrue("iter"+iter+": thrashers didn't finish even after explicitly stopping",
+ executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS));
+
+ // sanity check the *real* live_nodes entries from ZK match what the thrashers added
+ int totalAdded = 1; // 1 real live node when we started
+ for (LiveNodeTrasher thrasher : thrashers) {
+ totalAdded += thrasher.getNumAdded();
+ }
+ actualLiveNodes = getTrueLiveNodesFromZk();
+ assertEquals("iter"+iter, totalAdded, actualLiveNodes.size());
+
+ // verify our local client knows the correct set of live nodes
+ cachedLiveNodes = getCachedLiveNodesFromLocalState(actualLiveNodes.size());
+ assertEquals("iter"+iter+" " + actualLiveNodes.size() + " != " + cachedLiveNodes.size(),
+ actualLiveNodes, cachedLiveNodes);
+
+ } finally {
+ for (LiveNodeTrasher thrasher : thrashers) {
+ // shutdown our zk connection, freeing our ephemeral nodes
+ thrasher.close();
+ }
+ }
+ }
+ }
+
+ /** NOTE: has internal counter which is not thread safe, only call() in one thread at a time */
+ public static final class LiveNodeTrasher implements Callable<Integer> {
+ private final String id;
+ private final int numNodesToAdd;
+ private final SolrZkClient client;
+
+ private boolean running = false;;
+ private int numAdded = 0;
+
+ /** ID should ideally be unique amonst any other instances */
+ public LiveNodeTrasher(String id, int numNodesToAdd) {
+ this.id = id;
+ this.numNodesToAdd = numNodesToAdd;
+ this.client = newSolrZkClient();
+ }
+ /** returns the number of nodes actually added w/o error */
+ public Integer call() {
+ running = true;
+ // NOTE: test includes 'running'
+ for (int i = 0; running && i < numNodesToAdd; i++) {
+ final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/thrasher-" + id + "-" + i;
+ try {
+ client.makePath(nodePath, CreateMode.EPHEMERAL, true);
+ numAdded++;
+ } catch (Exception e) {
+ log.error("failed to create: " + nodePath, e);
+ }
+ }
+ return numAdded;
+ }
+ public int getNumAdded() {
+ return numAdded;
+ }
+ public void close() {
+ client.close();
+ }
+ public void stop() {
+ running = false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/744b419b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 308b3e0..e8d95c3 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.common.Callable;
import org.apache.solr.common.SolrException;
@@ -487,6 +488,10 @@ public class ZkStateReader implements Closeable {
final byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
final ClusterState loadedData = ClusterState.load(stat.getVersion(), data, emptySet(), CLUSTER_STATE);
synchronized (getUpdateLock()) {
+ if (this.legacyClusterStateVersion >= stat.getVersion()) {
+ // Nothing to do, someone else updated same or newer.
+ return;
+ }
this.legacyCollectionStates = loadedData.getCollectionStates();
this.legacyClusterStateVersion = stat.getVersion();
}
@@ -509,6 +514,9 @@ public class ZkStateReader implements Closeable {
}
}
+ // We don't get a Stat or track versions on getChildren() calls, so force linearization.
+ private final Object refreshCollectionListLock = new Object();
+
/**
* Search for any lazy-loadable state format2 collections.
*
@@ -522,29 +530,32 @@ public class ZkStateReader implements Closeable {
* {@link ClusterState#getCollections()} method as a safeguard against exposing wrong collection names to the users
*/
private void refreshCollectionList(Watcher watcher) throws KeeperException, InterruptedException {
- List<String> children = null;
- try {
- children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
- } catch (KeeperException.NoNodeException e) {
- LOG.warn("Error fetching collection names: [{}]", e.getMessage());
- // fall through
- }
- if (children == null || children.isEmpty()) {
- lazyCollectionStates.clear();
- return;
- }
-
- // Don't mess with watchedCollections, they should self-manage.
+ synchronized (refreshCollectionListLock) {
+ List<String> children = null;
+ try {
+ children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.warn("Error fetching collection names: [{}]", e.getMessage());
+ // fall through
+ }
+ if (children == null || children.isEmpty()) {
+ lazyCollectionStates.clear();
+ return;
+ }
- // First, drop any children that disappeared.
- this.lazyCollectionStates.keySet().retainAll(children);
- for (String coll : children) {
- // We will create an eager collection for any interesting collections, so don't add to lazy.
- if (!interestingCollections.contains(coll)) {
- // Double check contains just to avoid allocating an object.
- LazyCollectionRef existing = lazyCollectionStates.get(coll);
- if (existing == null) {
- lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
+ // Don't lock getUpdateLock() here, we don't need it and it would cause deadlock.
+ // Don't mess with watchedCollections, they should self-manage.
+
+ // First, drop any children that disappeared.
+ this.lazyCollectionStates.keySet().retainAll(children);
+ for (String coll : children) {
+ // We will create an eager collection for any interesting collections, so don't add to lazy.
+ if (!interestingCollections.contains(coll)) {
+ // Double check contains just to avoid allocating an object.
+ LazyCollectionRef existing = lazyCollectionStates.get(coll);
+ if (existing == null) {
+ lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
+ }
}
}
}
@@ -576,19 +587,35 @@ public class ZkStateReader implements Closeable {
}
}
+ // We don't get a Stat or track versions on getChildren() calls, so force linearization.
+ private final Object refreshLiveNodesLock = new Object();
+ // Ensures that only the latest getChildren fetch gets applied.
+ private final AtomicReference<Set<String>> lastFetchedLiveNodes = new AtomicReference<>();
+
/**
* Refresh live_nodes.
*/
private void refreshLiveNodes(Watcher watcher) throws KeeperException, InterruptedException {
- Set<String> newLiveNodes;
- try {
- List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
- newLiveNodes = new HashSet<>(nodeList);
- } catch (KeeperException.NoNodeException e) {
- newLiveNodes = emptySet();
+ synchronized (refreshLiveNodesLock) {
+ Set<String> newLiveNodes;
+ try {
+ List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
+ newLiveNodes = new HashSet<>(nodeList);
+ } catch (KeeperException.NoNodeException e) {
+ newLiveNodes = emptySet();
+ }
+ lastFetchedLiveNodes.set(newLiveNodes);
}
- Set<String> oldLiveNodes;
+
+ // Can't lock getUpdateLock() until we release the other, it would cause deadlock.
+ Set<String> oldLiveNodes, newLiveNodes;
synchronized (getUpdateLock()) {
+ newLiveNodes = lastFetchedLiveNodes.getAndSet(null);
+ if (newLiveNodes == null) {
+ // Someone else won the race to apply the last update, just exit.
+ return;
+ }
+
oldLiveNodes = this.liveNodes;
this.liveNodes = newLiveNodes;
if (clusterState != null) {