You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ro...@apache.org on 2016/06/06 12:19:48 UTC
lucene-solr:master: Revert "SOLR-9140: Replace some zk state polling
with CollectionStateWatchers"
Repository: lucene-solr
Updated Branches:
refs/heads/master b1fb142af -> b64c558e3
Revert "SOLR-9140: Replace some zk state polling with CollectionStateWatchers"
There's still some places where notifications can be missed, so I'm reverting
this until those are fixed.
This reverts commit d550b1ca43c7c523b71b4540edef217036421f9e.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b64c558e
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b64c558e
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b64c558e
Branch: refs/heads/master
Commit: b64c558e3e2e748b0b7a6795d0aeb026f6e59350
Parents: b1fb142
Author: Alan Woodward <ro...@apache.org>
Authored: Mon Jun 6 13:13:54 2016 +0100
Committer: Alan Woodward <ro...@apache.org>
Committed: Mon Jun 6 13:13:54 2016 +0100
----------------------------------------------------------------------
solr/CHANGES.txt | 3 -
.../org/apache/solr/cloud/ZkController.java | 124 +++++++++++--------
.../org/apache/solr/core/CoreContainer.java | 41 +++---
.../solrj/request/CollectionAdminRequest.java | 2 +-
.../apache/solr/common/cloud/DocCollection.java | 23 ----
.../cloud/TestCollectionStateWatchers.java | 5 +-
6 files changed, 91 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b64c558e/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index c9f4663..fa84d56 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -277,9 +277,6 @@ Optimizations
* SOLR-8744: Overseer operations performed with fine grained mutual exclusion (noble, Scott Blum)
-* SOLR-9140: Replace zk polling in ZkController with CollectionStateWatchers
- (Alan Woodward)
-
Other Changes
----------------------
* SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b64c558e/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 64fa54b..b36e766 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -27,7 +27,6 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -683,23 +682,35 @@ public final class ZkController {
InterruptedException {
publishNodeAsDown(getNodeName());
-
- Set<String> collections = cc.getLocalCollections();
- CountDownLatch latch = new CountDownLatch(collections.size());
-
- for (String collection : collections) {
- zkStateReader.registerCollectionStateWatcher(collection, (nodes, state) -> {
- for (Replica replica : state.getReplicasOnNode(getNodeName())) {
- if (replica.getState() != Replica.State.DOWN)
- return false;
+
+ // now wait till the updates are in our state
+ long now = System.nanoTime();
+ long timeout = now + TimeUnit.NANOSECONDS.convert(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ boolean foundStates = true;
+
+ while (System.nanoTime() < timeout) {
+ ClusterState clusterState = zkStateReader.getClusterState();
+ Map<String, DocCollection> collections = clusterState.getCollectionsMap();
+ for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
+ DocCollection collection = entry.getValue();
+ Collection<Slice> slices = collection.getSlices();
+ for (Slice slice : slices) {
+ Collection<Replica> replicas = slice.getReplicas();
+ for (Replica replica : replicas) {
+ if (getNodeName().equals(replica.getNodeName()) && replica.getState() != Replica.State.DOWN) {
+ foundStates = false;
+ }
+ }
}
- latch.countDown();
- return true;
- });
- }
+ }
- if (latch.await(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS) == false) {
- // TODO should we abort here?
+ if (foundStates) {
+ Thread.sleep(1000);
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ if (!foundStates) {
log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
}
@@ -1355,7 +1366,7 @@ public final class ZkController {
return zkStateReader;
}
- private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) throws InterruptedException {
+ private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
if (coreNodeName != null) {
@@ -1367,45 +1378,58 @@ public final class ZkController {
}
}
- private void waitForCoreNodeName(CoreDescriptor descriptor) throws InterruptedException {
- log.info("Waiting for coreNodeName for core {} in collection {} to be assigned",
- descriptor.getName(), descriptor.getCollectionName());
- final String thisNode = getNodeName();
- try {
- zkStateReader.waitForState(descriptor.getCollectionName(), 320, TimeUnit.SECONDS, (n, c) -> {
- if (c == null)
- return false;
- for (Replica replica : c.getReplicasOnNode(thisNode)) {
- if (descriptor.getName().equals(replica.getCoreName())) {
- descriptor.getCloudDescriptor().setCoreNodeName(replica.getName());
- return true;
+ private void waitForCoreNodeName(CoreDescriptor descriptor) {
+ int retryCount = 320;
+ log.info("look for our core node name");
+ while (retryCount-- > 0) {
+ Map<String, Slice> slicesMap = zkStateReader.getClusterState()
+ .getSlicesMap(descriptor.getCloudDescriptor().getCollectionName());
+ if (slicesMap != null) {
+
+ for (Slice slice : slicesMap.values()) {
+ for (Replica replica : slice.getReplicas()) {
+ // TODO: for really large clusters, we could 'index' on this
+
+ String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
+ String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+
+ String msgNodeName = getNodeName();
+ String msgCore = descriptor.getName();
+
+ if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
+ descriptor.getCloudDescriptor()
+ .setCoreNodeName(replica.getName());
+ return;
+ }
}
}
- return false;
- });
- } catch (TimeoutException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out getting coreNodeName for " + descriptor.getName());
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
}
- private void waitForShardId(CoreDescriptor cd) throws InterruptedException {
+ private void waitForShardId(CoreDescriptor cd) {
log.info("waiting to find shard id in clusterstate for " + cd.getName());
- final String thisNode = getNodeName();
- try {
- zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS, (n, c) -> {
- if (c == null)
- return false;
- String shardId = c.getShardId(thisNode, cd.getName());
- if (shardId != null) {
- cd.getCloudDescriptor().setShardId(shardId);
- return true;
- }
- return false;
- });
- }
- catch (TimeoutException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out getting shard id for core: " + cd.getName());
+ int retryCount = 320;
+ while (retryCount-- > 0) {
+ final String shardId = zkStateReader.getClusterState().getShardId(cd.getCollectionName(), getNodeName(), cd.getName());
+ if (shardId != null) {
+ cd.getCloudDescriptor().setShardId(shardId);
+ return;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
+
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Could not get shard id for core: " + cd.getName());
}
@@ -1419,7 +1443,7 @@ public final class ZkController {
return coreNodeName;
}
- public void preRegister(CoreDescriptor cd) throws InterruptedException {
+ public void preRegister(CoreDescriptor cd) {
String coreNodeName = getCoreNodeName(cd);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b64c558e/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index eb9a256..ff776b5 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -16,6 +16,17 @@
*/
package org.apache.solr.core;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Collections.EMPTY_MAP;
+import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
+import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
+import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.ZK_PATH;
+import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
+
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
@@ -26,15 +37,12 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.config.Lookup;
@@ -48,7 +56,6 @@ import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
@@ -76,16 +83,8 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.Collections.EMPTY_MAP;
-import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
-import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
-import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.ZK_PATH;
-import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
/**
@@ -845,7 +844,6 @@ public class CoreContainer {
return core;
} catch (Exception e) {
- SolrZkClient.checkInterrupted(e);
coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
log.error("Error creating core [{}]: {}", dcore.getName(), e.getMessage(), e);
final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e);
@@ -896,17 +894,6 @@ public class CoreContainer {
}
/**
- * @return a Set containing the names of all collections with a core hosted in this container
- */
- public Set<String> getLocalCollections() {
- Set<String> collections = getCoreDescriptors().stream()
- .filter(cd -> cd.getCollectionName() != null)
- .map(CoreDescriptor::getCollectionName)
- .collect(Collectors.toSet());
- return collections;
- }
-
- /**
* Returns an immutable Map of Exceptions that occured when initializing
* SolrCores (either at startup, or do to runtime requests to create cores)
* keyed off of the name (String) of the SolrCore that had the Exception
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b64c558e/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 2307463..452c7a1 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -1104,7 +1104,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
deleteAsyncId(requestId).process(client);
return state;
}
- TimeUnit.MILLISECONDS.sleep(100);
+ TimeUnit.SECONDS.sleep(1);
}
return state;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b64c558e/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 9848e65..5504a8b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -260,26 +259,4 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
}
return replicas;
}
-
- /**
- * Get all the replicas on a particular node
- */
- public List<Replica> getReplicasOnNode(String nodeName) {
- return getReplicas().stream()
- .filter(replica -> replica.getNodeName().equals(nodeName))
- .collect(Collectors.toList());
- }
-
- /**
- * Get the shardId of a core on a specific node
- */
- public String getShardId(String nodeName, String coreName) {
- for (Slice slice : this) {
- for (Replica replica : slice) {
- if (Objects.equals(replica.getNodeName(), nodeName) && Objects.equals(replica.getCoreName(), coreName))
- return slice.getName();
- }
- }
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b64c558e/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
index 8716dbe..2b2e181 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
@@ -51,9 +51,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
@BeforeClass
public static void startCluster() throws Exception {
configureCluster(CLUSTER_SIZE)
- .addConfig("config", getFile("solrj/solr/configsets/streaming/conf").toPath())
+ .addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
.configure();
- cluster.getSolrClient().connect();
}
@AfterClass
@@ -260,7 +259,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
final CloudSolrClient client = cluster.getSolrClient();
- Future<Boolean> future = waitInBackground("stateformat1", 30, TimeUnit.SECONDS,
+ Future<Boolean> future = waitInBackground("stateformat1", 10, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)