You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/13 03:56:35 UTC
[lucene-solr] 02/02: @1467 Cleanup after stress work.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 2141e43b9b9acc0d6480fd236a9f903841420b67
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Mar 12 21:56:02 2021 -0600
@1467 Cleanup after stress work.
Took 13 minutes
---
.../java/org/apache/solr/cloud/LeaderElector.java | 4 +-
.../src/java/org/apache/solr/cloud/Overseer.java | 233 +++-----
.../solr/cloud/OverseerTaskExecutorTask.java | 102 ----
.../org/apache/solr/cloud/RecoveryStrategy.java | 22 +-
.../solr/cloud/ShardLeaderElectionContextBase.java | 2 +-
.../java/org/apache/solr/cloud/StatePublisher.java | 46 +-
.../java/org/apache/solr/cloud/ZkController.java | 24 +-
.../apache/solr/cloud/api/collections/Assign.java | 2 +-
.../api/collections/CollectionCmdResponse.java | 7 +-
.../cloud/api/collections/CreateCollectionCmd.java | 6 +-
.../solr/cloud/api/collections/CreateShardCmd.java | 2 +-
.../cloud/api/collections/DeleteReplicaCmd.java | 27 +-
.../solr/cloud/api/collections/DeleteShardCmd.java | 2 +-
.../solr/cloud/api/collections/MoveReplicaCmd.java | 4 +-
.../OverseerCollectionMessageHandler.java | 12 +-
.../solr/cloud/api/collections/ReplaceNodeCmd.java | 8 +-
.../solr/cloud/api/collections/SplitShardCmd.java | 2 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 621 ++++++++++-----------
.../java/org/apache/solr/core/CoreContainer.java | 23 +-
.../src/java/org/apache/solr/core/SolrCore.java | 7 +-
.../solr/handler/admin/CollectionsHandler.java | 2 +-
.../apache/solr/rest/ManagedResourceStorage.java | 2 +-
.../java/org/apache/solr/update/CommitTracker.java | 2 +-
.../java/org/apache/solr/update/UpdateHandler.java | 5 +-
.../src/java/org/apache/solr/update/UpdateLog.java | 4 +-
.../java/org/apache/solr/update/VersionInfo.java | 2 +-
.../CreateCollectionsIndexAndRestartTest.java | 2 +-
.../cloud/api/collections/TestCollectionAPI.java | 4 +-
.../solrj/impl/SolrClientNodeStateProvider.java | 2 +-
.../org/apache/solr/common/cloud/ClusterState.java | 14 +
.../solr/common/cloud/ConnectionManager.java | 2 +-
.../apache/solr/common/cloud/DocCollection.java | 6 +-
.../org/apache/solr/common/cloud/SolrZkClient.java | 65 +--
.../apache/solr/common/cloud/ZkStateReader.java | 182 +++---
34 files changed, 605 insertions(+), 845 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 8ac4d79..71873ce 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -206,7 +206,7 @@ public class LeaderElector implements Closeable {
try {
String watchedNode = holdElectionPath + "/" + toWatch;
- log.info("I am not the leader (our path is ={}) - watch the node below me {} seqs={}", leaderSeqNodeName, watchedNode, seqs);
+ log.debug("I am not the leader (our path is ={}) - watch the node below me {} seqs={}", leaderSeqNodeName, watchedNode, seqs);
ElectionWatcher oldWatcher = watcher;
if (oldWatcher != null) {
@@ -405,7 +405,7 @@ public class LeaderElector implements Closeable {
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", (byte[]) null, CreateMode.EPHEMERAL_SEQUENTIAL, false);
}
- log.info("Joined leadership election with path: {}", leaderSeqPath);
+ log.debug("Joined leadership election with path: {}", leaderSeqPath);
context.leaderSeqPath = leaderSeqPath;
state = JOIN;
break;
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index aec7014..5ae5e4b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -74,6 +74,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -705,16 +706,8 @@ public class Overseer implements SolrCloseable {
getStateUpdateQueue().offer(data, false);
}
- public Future processQueueItem(Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates) throws InterruptedException {
- if (log.isDebugEnabled()) log.debug("processQueueItem {}", collStateUpdates);
-
- Future future = new OverseerTaskExecutorTask(getCoreContainer(), collStateUpdates).run();
-
- return future;
- }
-
public Future writePendingUpdates(String collection) {
- return ParWork.getRootSharedExecutor().submit(new OverseerTaskExecutorTask.WriteTask(getCoreContainer(), collection));
+ return zkStateWriter.writePendingUpdates(collection);
}
private static abstract class QueueWatcher implements Watcher, Closeable {
@@ -812,15 +805,12 @@ public class Overseer implements SolrCloseable {
public void start() throws KeeperException, InterruptedException {
if (closed) return;
- ourLock.lock();
- try {
- if (closed) return;
- zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
- startItems = super.getItems();
- log.info("Overseer found entries on start {}", startItems);
+
+ zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
+ startItems = super.getItems();
+ log.info("Overseer found entries on start {}", startItems);
+ if (startItems.size() > 0) {
processQueueItems(startItems, true);
- } finally {
- ourLock.unlock();
}
}
@@ -828,7 +818,9 @@ public class Overseer implements SolrCloseable {
protected void processQueueItems(List<String> items, boolean onStart) {
if (closed) return;
List<String> fullPaths = new ArrayList<>(items.size());
+ CountDownLatch delCountDownLatch = null;
ourLock.lock();
+ String forceWrite = null;
try {
if (log.isDebugEnabled()) log.debug("Found state update queue items {}", items);
for (String item : items) {
@@ -836,33 +828,22 @@ public class Overseer implements SolrCloseable {
}
Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
- List<StateEntry> shardStateCollections = null;
- Set<String> scollections = null;
- List<Future> futures = new ArrayList<>();
- for (byte[] item : data.values()) {
- final ZkNodeProps message = ZkNodeProps.load(item);
- try {
- String operation = message.getStr(Overseer.QUEUE_OPERATION);
- if (onStart) {
- if (operation.equals("state")) {
- message.getProperties().remove(OverseerAction.DOWNNODE);
- if (message.getProperties().size() == 1) {
- continue;
- }
- }
- }
- // hack
- if (operation.equals("updateshardstate")) {
- if (scollections == null) {
- scollections = new HashSet<>();
- }
- scollections.add(message.getStr("collection"));
+ if (fullPaths.size() > 0) {
+ if (!zkController.getZkClient().isClosed()) {
+ try {
+ delCountDownLatch = zkController.getZkClient().delete(fullPaths, false);
+ } catch (Exception e) {
+ log.warn("Failed deleting processed items", e);
}
+ }
+ }
- if (shardStateCollections == null) {
- shardStateCollections = new ArrayList<>();
- }
+ final List<StateEntry> shardStateCollections = new ArrayList<>();
+
+ for (byte[] item : data.values()) {
+ final ZkNodeProps message = ZkNodeProps.load(item);
+ try {
StateEntry entry = new StateEntry();
entry.message = message;
shardStateCollections.add(entry);
@@ -872,8 +853,9 @@ public class Overseer implements SolrCloseable {
}
}
Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates = new HashMap<>();
- try {
- for (Overseer.StateEntry sentry : shardStateCollections) {
+
+ for (Overseer.StateEntry sentry : shardStateCollections) {
+ try {
ZkNodeProps stateUpdateMessage = sentry.message;
final String op = stateUpdateMessage.getStr(StatePublisher.OPERATION);
OverseerAction overseerAction = OverseerAction.get(op);
@@ -888,34 +870,9 @@ public class Overseer implements SolrCloseable {
for (Map.Entry<String,Object> stateUpdateEntry : stateUpdateMessage.getProperties().entrySet()) {
if (OverseerAction.DOWNNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
- continue;
- } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
- continue;
- } else {
- // if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(stateUpdateEntry.getKey()));
- String id = stateUpdateEntry.getKey();
-
- String stateString = (String) stateUpdateEntry.getValue();
- if (log.isDebugEnabled()) {
- log.debug("stateString={}", stateString);
- }
- String collId = id.substring(0, id.indexOf('-'));
- List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
- if (updates == null) {
- updates = new ArrayList<>();
- collStateUpdates.put(collId, updates);
+ if (onStart) {
+ continue;
}
-
- ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
- update.id = id;
- update.state = stateString;
- updates.add(update);
-
- }
- }
-
- for (Map.Entry<String,Object> m : stateUpdateMessage.getProperties().entrySet()) {
- if (OverseerAction.DOWNNODE.equals(OverseerAction.get(m.getKey()))) {
Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
String collId = Long.toString(docColl.getId());
List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
@@ -925,7 +882,7 @@ public class Overseer implements SolrCloseable {
}
List<Replica> replicas = docColl.getReplicas();
for (Replica replica : replicas) {
- if (replica.getNodeName().equals(m.getValue())) {
+ if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", op, replica);
ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
update.id = replica.getId();
@@ -934,8 +891,7 @@ public class Overseer implements SolrCloseable {
}
}
});
- }
- if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(m.getKey()))) {
+ } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
String collId = Long.toString(docColl.getId());
List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
@@ -945,7 +901,7 @@ public class Overseer implements SolrCloseable {
}
List<Replica> replicas = docColl.getReplicas();
for (Replica replica : replicas) {
- if (replica.getNodeName().equals(m.getValue())) {
+ if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", op, replica);
ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
update.id = replica.getId();
@@ -954,6 +910,25 @@ public class Overseer implements SolrCloseable {
}
}
});
+ } else {
+ // if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(stateUpdateEntry.getKey()));
+ String id = stateUpdateEntry.getKey();
+
+ String stateString = (String) stateUpdateEntry.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug("stateString={}", stateString);
+ }
+ String collId = id.substring(0, id.indexOf('-'));
+ List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+ if (updates == null) {
+ updates = new ArrayList<>();
+ collStateUpdates.put(collId, updates);
+ }
+
+ ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
+ update.id = id;
+ update.state = stateString;
+ updates.add(update);
}
}
@@ -964,27 +939,29 @@ public class Overseer implements SolrCloseable {
// case REMOVEROUTINGRULE:
// return new SliceMutator(cloudManager).removeRoutingRule(clusterState, message);
case UPDATESHARDSTATE: // MRM TODO: look at how we handle this and make it so it can use StatePublisher
- String collection = stateUpdateMessage.getStr("collection");
- stateUpdateMessage.getProperties().remove("collection");
- stateUpdateMessage.getProperties().remove(StatePublisher.OPERATION);
- Long collIdLong = zkStateWriter.getCS().get(collection).getId();
- if (collIdLong != null) {
- String collId = Long.toString(collIdLong);
- List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
- if (updates == null) {
- updates = new ArrayList<>();
- collStateUpdates.put(collId, updates);
- }
+ String collection = stateUpdateMessage.getStr("collection");
+ stateUpdateMessage.getProperties().remove("collection");
+ stateUpdateMessage.getProperties().remove(StatePublisher.OPERATION);
+ Long collIdLong = zkStateWriter.getCS().get(collection).getId();
+ if (collIdLong != null) {
+ String collId = Long.toString(collIdLong);
+ List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+ if (updates == null) {
+ updates = new ArrayList<>();
+ collStateUpdates.put(collId, updates);
+ }
- if (collIdLong != null) {
- for (Map.Entry<String,Object> stateUpdateEntry : stateUpdateMessage.getProperties().entrySet()) {
- ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
- update.sliceState = (String) stateUpdateEntry.getValue();
- update.sliceName = stateUpdateEntry.getKey();
- updates.add(update);
- }
+ if (collIdLong != null) {
+ for (Map.Entry<String,Object> stateUpdateEntry : stateUpdateMessage.getProperties().entrySet()) {
+ // MRM TODO: slice state should be done like replica state, this is a hack
+ forceWrite = collection;
+ ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
+ update.sliceState = (String) stateUpdateEntry.getValue();
+ update.sliceName = stateUpdateEntry.getKey();
+ updates.add(update);
}
}
+ }
break;
@@ -992,61 +969,40 @@ public class Overseer implements SolrCloseable {
throw new RuntimeException("unknown operation:" + op + " contents:" + stateUpdateMessage.getProperties());
}
+ } catch (Exception e) {
+ log.error("Overseer state update queue processing failed", e);
+ continue;
}
-
- } catch (Exception e) {
- log.error("Overseer state update queue processing failed", e);
}
- Future future = null;
try {
- future = overseer.processQueueItem(collStateUpdates);
+ getZkStateWriter().enqueueUpdate(null, collStateUpdates, true);
} catch (Exception e) {
log.error("Overseer state update queue processing failed", e);
}
- try {
- future.get();
- } catch (Exception e) {
- log.error("failed waiting for enqueued updates", e);
- }
-
- futures.clear();
Set<String> collections = overseer.zkStateWriter.getDirtyStateCollections();
for (String collection : collections) {
- futures.add(overseer.writePendingUpdates(collection));
+ overseer.writePendingUpdates(collection);
}
- for (Future f : futures) {
- try {
- f.get();
- } catch (Exception e) {
- log.error("failed waiting for enqueued updates", e);
- }
- }
- futures.clear();
- if (scollections != null) {
- for (String collection : scollections) {
- futures.add(overseer.writePendingUpdates(collection));
- }
+ if (collections.size() == 0 && forceWrite != null) {
+ overseer.writePendingUpdates(forceWrite);
}
- // for (Future future : futures) {
- // try {
- // future.get();
- // } catch (Exception e) {
- // log.error("failed waiting for enqueued updates", e);
- // }
- // }
- } finally {
- if (overseer.zkStateWriter != null) {
- if (zkController.getZkClient().isAlive()) {
+ } finally {
+ try {
+ if (delCountDownLatch != null) {
try {
- zkController.getZkClient().delete(fullPaths, true);
- } catch (Exception e) {
- log.warn("Failed deleting processed items", e);
+ boolean success = delCountDownLatch.await(10, TimeUnit.SECONDS);
+
+ if (log.isDebugEnabled()) log.debug("done waiting on latch, success={}", success);
+
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
}
}
+ } finally {
ourLock.unlock();
}
}
@@ -1085,17 +1041,14 @@ public class Overseer implements SolrCloseable {
@Override
public void start() throws KeeperException, InterruptedException {
if (closed) return;
- ourLock.lock();
- try {
- if (closed) return;
- zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
- startItems = super.getItems();
+ zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
- log.info("Overseer found entries on start {}", startItems);
+ startItems = super.getItems();
+
+ log.info("Overseer found entries on start {}", startItems);
+ if (startItems.size() > 0) {
processQueueItems(startItems, true);
- } finally {
- ourLock.unlock();
}
}
@@ -1105,7 +1058,7 @@ public class Overseer implements SolrCloseable {
ourLock.lock();
List<String> fullPaths = new ArrayList<>(items.size());
try {
- log.info("Found collection queue items {} onStart={}", items, onStart);
+ log.debug("Found collection queue items {} onStart={}", items, onStart);
for (String item : items) {
fullPaths.add(path + "/" + item);
}
@@ -1216,7 +1169,7 @@ public class Overseer implements SolrCloseable {
} else {
byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
completedMap.update(entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1), sdata);
- log.info("Completed task:[{}] {} {}", message, response.getResponse(), responsePath);
+ log.debug("Completed task:[{}] {} {}", message, response.getResponse(), responsePath);
}
} catch (Exception e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
deleted file mode 100644
index e0f5f67..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.cloud.overseer.ZkStateWriter;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.core.CoreContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-
-public class OverseerTaskExecutorTask {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final ZkController zkController;
- private final SolrCloudManager cloudManager;
- private final SolrZkClient zkClient;
- private final Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates;
-
- public OverseerTaskExecutorTask(CoreContainer cc, Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates) {
- this.zkController = cc.getZkController();
- this.zkClient = zkController.getZkClient();
- this.cloudManager = zkController.getSolrCloudManager();
- this.collStateUpdates = collStateUpdates;
- }
-
-
- private Future processQueueItem(Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates) throws Exception {
- if (log.isDebugEnabled()) log.debug("Consume state update from queue {} {}", collStateUpdates);
-
- // assert clusterState != null;
-
- // if (clusterState.getZNodeVersion() == 0 || clusterState.getZNodeVersion() > lastVersion) {
-
-
-// if (log.isDebugEnabled()) log.debug("Queue operation is {}", operation);
-//
-// if (log.isDebugEnabled()) log.debug("Process message {} {}", message, operation);
-//
-// if (log.isDebugEnabled()) log.debug("Enqueue message {}", operation);
- try {
- return zkController.getOverseer().getZkStateWriter().enqueueUpdate(null, collStateUpdates, true);
- } catch (NullPointerException e) {
- log.info("Overseer is stopped, won't process message " + zkController.getOverseer());
- return null;
- }
-
- }
-
-
- public Future run() {
- if (log.isDebugEnabled()) log.debug("OverseerTaskExecutorTask, going to process message {}", collStateUpdates);
-
- try {
- return processQueueItem(collStateUpdates);
- } catch (Exception e) {
- log.error("Failed to process message " + collStateUpdates, e);
- }
- return null;
- }
-
- public static class WriteTask implements Runnable {
- private final String collection;
- CoreContainer coreContainer;
-
- public WriteTask(CoreContainer coreContainer, String collection) {
- this.collection = collection;
- this.coreContainer = coreContainer;
- }
-
- @Override
- public void run() {
- try {
- coreContainer.getZkController().getOverseer().getZkStateWriter().writePendingUpdates(collection);
- } catch (NullPointerException e) {
- if (log.isDebugEnabled()) log.debug("Won't write pending updates, zkStateWriter=null");
- } catch (Exception e) {
- log.error("Failed to process pending updates", e);
- }
- }
- }
-}
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index efb514c..88e138e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -309,7 +309,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
@Override
final public void run() {
// set request info for logging
- log.info("Starting recovery process. recoveringAfterStartup={}", recoveringAfterStartup);
+ log.debug("Starting recovery process. recoveringAfterStartup={}", recoveringAfterStartup);
try {
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
@@ -508,7 +508,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
// TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
public final boolean doSyncOrReplicateRecovery(SolrCore core, Replica leader) throws Exception {
- log.info("Do peersync or replication recovery core={} collection={}", coreName, core.getCoreDescriptor().getCollectionName());
+ log.debug("Do peersync or replication recovery core={} collection={}", coreName, core.getCoreDescriptor().getCollectionName());
boolean successfulRecovery = false;
boolean publishedActive = false;
@@ -555,10 +555,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
if (startingVersions.isEmpty()) {
- log.info("startupVersions is empty");
+ log.debug("startupVersions is empty");
} else {
- if (log.isInfoEnabled()) {
- log.info("startupVersions size={} range=[{} to {}]", startingVersions.size(), startingVersions.get(0),
+ if (log.isDebugEnabled()) {
+ log.debug("startupVersions size={} range=[{} to {}]", startingVersions.size(), startingVersions.get(0),
startingVersions.get(startingVersions.size() - 1));
}
}
@@ -588,11 +588,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
if (replicaType == Replica.Type.TLOG) {
- log.info("Stopping replication from leader for {}", coreName);
+ log.debug("Stopping replication from leader for {}", coreName);
zkController.stopReplicationFromLeader(coreName);
}
- log.info("Publishing state of core [{}] as buffering {}", coreName, "doSyncOrReplicateRecovery");
+ log.debug("Publishing state of core [{}] as buffering {}", coreName, "doSyncOrReplicateRecovery");
zkController.publish(core.getCoreDescriptor(), Replica.State.BUFFERING);
@@ -612,7 +612,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
return false;
}
- log.info("Begin buffering updates. core=[{}]", coreName);
+ log.debug("Begin buffering updates. core=[{}]", coreName);
// recalling buffer updates will drop the old buffer tlog
ulog.bufferUpdates();
@@ -644,16 +644,16 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
if (syncSuccess) {
SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
- log.info("PeerSync was successful, commit to force open a new searcher");
+ log.debug("PeerSync was successful, commit to force open a new searcher");
// force open a new searcher
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
req.close();
- log.info("PeerSync stage of recovery was successful.");
+ log.debug("PeerSync stage of recovery was successful.");
// solrcloud_debug
// cloudDebugLog(core, "synced");
- log.info("Replaying updates buffered during PeerSync.");
+ log.debug("Replaying updates buffered during PeerSync.");
replay(core);
// sync success
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index 261264c..353e857 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -77,7 +77,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
// We do this by using a multi and ensuring the parent znode of the leader registration node
// matches the version we expect - there is a setData call that increments the parent's znode
// version whenever a leader registers.
- log.info("Removing leader registration node on cancel, parent node: {} {}", Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion);
+ log.debug("Removing leader registration node on cancel, parent node: {} {}", Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion);
List<Op> ops = new ArrayList<>(3);
ops.add(Op.check(Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
ops.add(Op.delete(leaderSeqPath, -1));
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 0a9c52c1..8491334 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -79,7 +79,6 @@ public class StatePublisher implements Closeable {
@Override
public void run() {
- ActionThrottle throttle = new ActionThrottle("StatePublisherWorker", 0);
while (!terminated && !zkStateReader.getZkClient().isClosed()) {
if (!zkStateReader.getZkClient().isConnected()) {
@@ -92,8 +91,7 @@ public class StatePublisher implements Closeable {
}
continue;
}
- throttle.minimumWaitBetweenActions();
- throttle.markAttemptingAction();
+
ZkNodeProps message = null;
ZkNodeProps bulkMessage = new ZkNodeProps();
bulkMessage.getProperties().put(OPERATION, "state");
@@ -101,33 +99,39 @@ public class StatePublisher implements Closeable {
try {
message = workQueue.poll(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- ParWork.propagateInterrupt(e, true);
- return;
+
}
if (message != null) {
- if (log.isDebugEnabled()) log.debug("Got state message " + message);
+ log.debug("Got state message " + message);
if (message == TERMINATE_OP) {
+ log.debug("State publish is terminated");
terminated = true;
- message = null;
+ return;
} else {
bulkMessage(message, bulkMessage);
}
- while (message != null && !terminated) {
+ while (!terminated) {
try {
- message = workQueue.poll(5, TimeUnit.MILLISECONDS);
+ message = workQueue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
-
+ log.warn("state publisher interrupted", e);
+ return;
}
- if (log.isDebugEnabled()) log.debug("Got state message " + message);
if (message != null) {
+ if (log.isDebugEnabled()) log.debug("Got state message " + message);
if (message == TERMINATE_OP) {
terminated = true;
} else {
bulkMessage(message, bulkMessage);
}
+ } else {
+ break;
}
}
+ }
+
+ if (bulkMessage.getProperties().size() > 1) {
processMessage(bulkMessage);
}
@@ -141,25 +145,28 @@ public class StatePublisher implements Closeable {
}
}
- private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) throws KeeperException, InterruptedException {
+ private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) {
+ if (log.isDebugEnabled()) log.debug("Bulk state zkNodeProps={} bulkMessage={}", zkNodeProps, bulkMessage);
if (OverseerAction.get(zkNodeProps.getStr(OPERATION)) == OverseerAction.DOWNNODE) {
String nodeName = zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
+ //clearStatesForNode(bulkMessage, nodeName);
bulkMessage.getProperties().put(OverseerAction.DOWNNODE.toLower(), nodeName);
+ log.debug("bulk state publish down node, props={} result={}", zkNodeProps, bulkMessage);
- clearStatesForNode(bulkMessage, nodeName);
} else if (OverseerAction.get(zkNodeProps.getStr(OPERATION)) == OverseerAction.RECOVERYNODE) {
+ log.debug("bulk state publish recovery node, props={} result={}", zkNodeProps, bulkMessage);
String nodeName = zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
+ // clearStatesForNode(bulkMessage, nodeName);
bulkMessage.getProperties().put(OverseerAction.RECOVERYNODE.toLower(), nodeName);
-
- clearStatesForNode(bulkMessage, nodeName);
+ log.debug("bulk state publish recovery node, props={} result={}" , zkNodeProps, bulkMessage);
} else {
- String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
+ //String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
String core = zkNodeProps.getStr(ZkStateReader.CORE_NAME_PROP);
String id = zkNodeProps.getStr("id");
String state = zkNodeProps.getStr(ZkStateReader.STATE_PROP);
String line = Replica.State.getShortState(Replica.State.valueOf(state.toUpperCase(Locale.ROOT)));
- if (log.isDebugEnabled()) log.debug("Bulk publish core={} id={} line={}", core, id, line);
+ if (log.isDebugEnabled()) log.debug("bulk publish core={} id={} state={} line={}", core, id, state, line);
bulkMessage.getProperties().put(id, line);
}
}
@@ -188,11 +195,8 @@ public class StatePublisher implements Closeable {
}
private void processMessage(ZkNodeProps message) throws KeeperException, InterruptedException {
- if (message.getProperties().size() <= 1) {
- return;
- }
+ log.debug("Send state updates to Overseer {}", message);
byte[] updates = Utils.toJSON(message);
- if (log.isDebugEnabled()) log.debug("Send state updates to Overseer {}", message);
overseerJobQueue.offer(updates);
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 5d85691..680e57c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -325,8 +325,7 @@ public class ZkController implements Closeable, Runnable {
try {
zkController.register(descriptor.getName(), descriptor, afterExpiration);
} catch (Exception e) {
- log.error("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ log.error("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration, e);
}
}
return descriptor;
@@ -1263,7 +1262,7 @@ public class ZkController implements Closeable, Runnable {
final String collection = cloudDesc.getCollectionName();
final String shardId = cloudDesc.getShardId();
- log.info("Register SolrCore, core={} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
+ log.debug("Register SolrCore, core={} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
if (docCollection != null) {
@@ -1274,14 +1273,9 @@ public class ZkController implements Closeable, Runnable {
}
}
- // multiple calls of this method will left only one watcher
+ log.debug("Register replica - core={} id={} address={} collection={} shard={} type={}", coreName, desc.getCoreProperties().get("id"), baseUrl, collection, shardId, cloudDesc.getReplicaType());
- getZkStateReader().registerCore(cloudDesc.getCollectionName(), coreName);
-
-
- log.info("Register replica - core={} id={} address={} collection={} shard={} type={}", coreName, desc.getCoreProperties().get("id"), baseUrl, collection, shardId, cloudDesc.getReplicaType());
-
- log.info("Register terms for replica {}", coreName);
+ log.debug("Register terms for replica {}", coreName);
registerShardTerms(collection, cloudDesc.getShardId(), coreName);
@@ -1333,7 +1327,7 @@ public class ZkController implements Closeable, Runnable {
throw new AlreadyClosedException();
}
- log.info("Timeout waiting to see leader, retry collection={} shard={}", collection, shardId);
+ log.debug("Timeout waiting to see leader, retry collection={} shard={}", collection, shardId);
}
}
@@ -1345,9 +1339,9 @@ public class ZkController implements Closeable, Runnable {
boolean isLeader = leaderName.equals(coreName);
- log.info("We are {} and leader is {} isLeader={}", coreName, leaderName, isLeader);
+ log.debug("We are {} and leader is {} isLeader={}", coreName, leaderName, isLeader);
- log.info("Check if we should recover isLeader={}", isLeader);
+ log.debug("Check if we should recover isLeader={}", isLeader);
//assert !(isLeader && replica.getType() == Type.PULL) : "Pull replica became leader!";
// recover from local transaction log and wait for it to complete before
@@ -1415,7 +1409,7 @@ public class ZkController implements Closeable, Runnable {
// MRM TODO:
// registerUnloadWatcher(cloudDesc.getCollectionName(), cloudDesc.getShardId(), desc.getName());
- log.info("SolrCore Registered, core{} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
+ log.debug("SolrCore Registered, core{} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
desc.getCloudDescriptor().setHasRegistered(true);
@@ -1581,7 +1575,7 @@ public class ZkController implements Closeable, Runnable {
*/
public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws Exception {
MDCLoggingContext.setCoreName(cd.getName());
- log.info("publishing state={}", state);
+ log.debug("publishing state={}", state);
String collection = cd.getCloudDescriptor().getCollectionName();
String shardId = cd.getCloudDescriptor().getShardId();
Map<String,Object> props = new HashMap<>();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index 774e77a..f99fc02 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -105,7 +105,7 @@ public class Assign {
int cnt = overseer.getZkStateWriter().getReplicaAssignCnt(collection.getName(), shard);
String corename = String.format(Locale.ROOT, "%s%s", namePrefix, cnt);
- log.info("Assigned SolrCore name={} id={}", corename, cnt);
+ log.debug("Assigned SolrCore name={} id={}", corename, cnt);
ReplicaName replicaName = new ReplicaName();
replicaName.coreName = corename;
replicaName.id = cnt;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCmdResponse.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCmdResponse.java
index e498d4a..1a5c92f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCmdResponse.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCmdResponse.java
@@ -111,7 +111,7 @@ public class CollectionCmdResponse implements OverseerCollectionMessageHandler.C
ShardRequestTracker shardRequestTracker, @SuppressWarnings({"rawtypes"})NamedList results)
throws IOException, InterruptedException, KeeperException {
- log.info("addReplica() : {}", Utils.toJSONString(message));
+ log.debug("addReplica() : {}", Utils.toJSONString(message));
String extCollectionName = message.getStr(COLLECTION_PROP);
boolean followAliases = message.getBool(FOLLOW_ALIASES, false);
@@ -194,7 +194,7 @@ public class CollectionCmdResponse implements OverseerCollectionMessageHandler.C
ModifiableSolrParams params = getReplicaParams(collection, message, results, skipCreateReplicaInClusterState, shardHandler, createReplica);
- log.info("create replica {} params={}", createReplica, params);
+ log.debug("create replica {} params={}", createReplica, params);
if (!onlyUpdateState) {
shardRequestTracker.sendShardRequest(createReplica.node, params, shardHandler);
}
@@ -220,7 +220,6 @@ public class CollectionCmdResponse implements OverseerCollectionMessageHandler.C
public Response call() {
if (!onlyUpdateState && createdShardHandler) {
try {
- log.info("Processs responses");
shardRequestTracker.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica");
} catch (Exception e) {
ParWork.propagateInterrupt(e);
@@ -357,7 +356,7 @@ public class CollectionCmdResponse implements OverseerCollectionMessageHandler.C
public static CreateReplica assignReplicaDetails(DocCollection coll,
ZkNodeProps message, ReplicaPosition replicaPosition, Overseer overseer) {
- log.info("assignReplicaDetails {} {}", message, replicaPosition);
+ log.debug("assignReplicaDetails {} {}", message, replicaPosition);
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 0ba35e8..be522c6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -279,7 +279,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(), ZkStateReader.NUM_SHARDS_PROP, message.getStr(ZkStateReader.NUM_SHARDS_PROP), "shards", message.getStr("shards"),
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
props.getProperties().putAll(addReplicaProps.getProperties());
- if (log.isDebugEnabled()) log.debug("Sending state update to populate clusterstate with new replica {}", props);
+ log.debug("Sending state update to populate clusterstate with new replica {}", props);
clusterState = new CollectionCmdResponse(ocmh, true).call(clusterState, props, results).clusterState;
// log.info("CreateCollectionCmd after add replica clusterstate={}", clusterState);
@@ -319,8 +319,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
coresToCreate.put(coreName, sreq);
}
- Future future = ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false);
- future.get();
+ ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false);
+
writeFuture = ocmh.overseer.writePendingUpdates(collectionName);
if (log.isDebugEnabled()) log.debug("Sending create call for {} replicas for {}", coresToCreate.size(), collectionName);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 91851f9..64c4377 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -183,7 +183,7 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
// MRM TODO: - put this in finalizer and finalizer after all calls to allow parallel and forward momentum
try {
- overseer.getZkStateWriter().enqueueUpdate(resp.clusterState.getCollection(collection), null, false).get();
+ overseer.getZkStateWriter().enqueueUpdate(resp.clusterState.getCollection(collection), null, false);
} catch (Exception e) {
log.error("failure", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 29ea39c..9cb3980 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -24,7 +24,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -197,7 +196,7 @@ public class DeleteReplicaCmd implements Cmd {
if (waitForFinalState) {
try {
- ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(finalCollectionName1), null, false).get();
+ ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(finalCollectionName1), null, false);
ocmh.overseer.writePendingUpdates(finalCollectionName1);
waitForCoreNodeGone(finalCollectionName1, shard, replicaName, 5000); // MRM TODO: timeout
} catch (Exception e) {
@@ -251,8 +250,8 @@ public class DeleteReplicaCmd implements Cmd {
shardToReplicasMapping.put(individualSlice, replicasToBeDeleted);
}
}
- List<OverseerCollectionMessageHandler.Finalize> finalizers = new ArrayList<>();
- List<Future> futures = new ArrayList<>();
+ List<CollectionCmdResponse.Response> finalizers = new ArrayList<>();
+
for (Map.Entry<Slice,Set<String>> entry : shardToReplicasMapping.entrySet()) {
Slice shardSlice = entry.getKey();
String shardId = shardSlice.getName();
@@ -265,26 +264,16 @@ public class DeleteReplicaCmd implements Cmd {
clusterState = resp.clusterState;
if (clusterState != null) {
try {
- futures.add(ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false));
+ ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false);
} catch (Exception e) {
log.error("failed sending update to zkstatewriter", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
- if (resp.asyncFinalRunner != null) {
- finalizers.add(resp.asyncFinalRunner);
- }
- }
- try {
- for (Future future : futures) {
- future.get();
- }
- ocmh.overseer.writePendingUpdates(collectionName);
- } catch (Exception e) {
- log.error("failed writing update to zkstatewriter", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ finalizers.add(resp);
}
+
results.add("shard_id", shardId);
results.add("replicas_deleted", replicas);
}
@@ -294,8 +283,8 @@ public class DeleteReplicaCmd implements Cmd {
response.asyncFinalRunner = () -> {
CollectionCmdResponse.Response resp = new CollectionCmdResponse.Response();
resp.asyncFinalRunner = () -> {
- for (OverseerCollectionMessageHandler.Finalize finalize : finalizers) {
- finalize.call();
+ for (CollectionCmdResponse.Response finalize : finalizers) {
+ finalize.asyncFinalRunner.call();
}
return new CollectionCmdResponse.Response();
};
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index 5fbb7f9..a7f00bc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -177,7 +177,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
if (waitForFinalState) {
- ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(collectionName), null, false).get();
+ ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(collectionName), null, false);
ocmh.overseer.writePendingUpdates(collectionName).get();
ocmh.overseer.getZkStateReader().waitForState(collectionName, 10, TimeUnit.SECONDS, (liveNodes, coll) -> {
if (coll == null) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
index e9f6aaa..5fc01e3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
@@ -278,7 +278,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh);
CollectionCmdResponse.Response response = ocmh.addReplicaWithResp(clusterState, addReplicasProps, addResult);
- ocmh.overseer.getZkStateWriter().enqueueUpdate(response.clusterState.getCollection(coll.getName()), null,false).get();
+ ocmh.overseer.getZkStateWriter().enqueueUpdate(response.clusterState.getCollection(coll.getName()), null,false);
// wait for the other replica to be active if the source replica was a leader
@@ -309,7 +309,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
try {
response1.clusterState = ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult).clusterState;
String collection = response1.clusterState.getCollectionsMap().keySet().iterator().next();
- ocmh.overseer.getZkStateWriter().enqueueUpdate( response1.clusterState.getCollection(collection), null,false).get();
+ ocmh.overseer.getZkStateWriter().enqueueUpdate( response1.clusterState.getCollection(collection), null,false);
asyncResp.writeFuture = ocmh.overseer.writePendingUpdates(collection);
} catch (SolrException e) {
deleteResult.add("failure", e.toString());
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index faa10df..c6f98da 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -320,19 +320,18 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
if (log.isDebugEnabled()) log.debug("Command returned clusterstate={} results={}", responce.clusterState, results);
CollectionCmdResponse.Response asyncResp = null;
- Future future = null;
+
if (responce.clusterState != null) {
DocCollection docColl = responce.clusterState.getCollectionOrNull(collection);
if (docColl != null) {
- future = zkWriter.enqueueUpdate(docColl, null, false);
+ zkWriter.enqueueUpdate(docColl, null, false);
if (responce != null && responce.asyncFinalRunner != null) {
asyncResp = responce.asyncFinalRunner.call();
}
if (asyncResp == null || asyncResp.writeFuture == null) {
- future.get();
writeFuture2 = overseer.writePendingUpdates(collection);
}
@@ -349,13 +348,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
if (log.isDebugEnabled()) log.debug("Finalize after Command returned clusterstate={}", asyncResp.clusterState);
if (asyncResp.clusterState != null) {
DocCollection docColl = asyncResp.clusterState.getCollectionOrNull(collection);
-
if (docColl != null) {
-
- zkWriter.enqueueUpdate(docColl, null, false).get();
- if (future != null) {
- future.get();
- }
+ zkWriter.enqueueUpdate(docColl, null, false);
writeFuture = overseer.writePendingUpdates(collection);
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
index 2b0fdec..3a55cd8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -100,9 +100,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
for (ZkNodeProps sourceReplica : sourceReplicas) {
@SuppressWarnings({"rawtypes"}) NamedList nl = new NamedList();
String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
- if (log.isInfoEnabled()) {
- log.info("Going to create replica for collection={} shard={} on node={}", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
- }
+
+ log.debug("Going to create replica for collection={} shard={} on node={}", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
+
String targetNode = target;
if (targetNode == null) {
Replica.Type replicaType = Replica.Type.get(sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
@@ -146,7 +146,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
runners.add(runner);
}
String collection = clusterState.getCollectionStates().keySet().iterator().next();
- ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collection), null, false).get();
+ ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collection), null, false);
ocmh.overseer.writePendingUpdates(collection);
CollectionCmdResponse.Response response = new CollectionCmdResponse.Response();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index 793ac18..b19b3fd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -349,7 +349,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
// firstReplicaFutures.add(future);
}
- ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null,false).get();
+ ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null,false);
ocmh.overseer.writePendingUpdates(collectionName);
log.info("Clusterstate after adding new shard for split {}", clusterState);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index a16683a..cd80b5d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
-import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.Stats;
import org.apache.solr.cloud.api.collections.Assign;
@@ -44,7 +43,6 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@@ -66,12 +64,8 @@ public class ZkStateWriter {
protected volatile Stats stats;
- private final Map<String,Integer> trackVersions = new ConcurrentHashMap<>(128, 0.75f, 16);
-
private final Map<String, ZkNodeProps> stateUpdates = new ConcurrentHashMap<>();
- Map<String,DocCollection> failedUpdates = new ConcurrentHashMap<>();
-
Map<Long,String> idToCollection = new ConcurrentHashMap<>(128, 0.75f, 16);
private Map<String,DocAssign> assignMap = new ConcurrentHashMap<>(128, 0.75f, 16);
@@ -82,10 +76,8 @@ public class ZkStateWriter {
private static class ColState {
ReentrantLock collLock = new ReentrantLock(true);
- ActionThrottle throttle = new ActionThrottle("ZkStateWriter", Integer.getInteger("solr.zkstatewriter.throttle", 0), new TimeSource.NanoTimeSource());
}
-
private AtomicLong ID = new AtomicLong();
private Set<String> dirtyStructure = ConcurrentHashMap.newKeySet();
@@ -98,227 +90,206 @@ public class ZkStateWriter {
}
- public Future enqueueUpdate(DocCollection docCollection, Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates, boolean stateUpdate) throws Exception {
- return ParWork.getRootSharedExecutor().submit(() -> {
-
- try {
- if (log.isDebugEnabled()) log.debug("enqueue update stateUpdate={} docCollection={} cs={}", stateUpdate, docCollection, cs);
- if (!stateUpdate) {
-
- String collectionName = docCollection.getName();
-
- ColState collState = collLocks.compute(collectionName, (s, colState) -> {
- if (colState == null) {
- ColState cState = new ColState();
- return cState;
- }
- return colState;
- });
- collState.collLock.lock();
-
- try {
+ public void enqueueUpdate(DocCollection docCollection, Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates, boolean stateUpdate) throws Exception {
- DocCollection currentCollection = cs.get(docCollection.getName());
- log.debug("zkwriter collection={}", docCollection);
- log.debug("zkwriter currentCollection={}", currentCollection);
-
- idToCollection.putIfAbsent(docCollection.getId(), docCollection.getName());
+ try {
- // if (currentCollection != null) {
- // if (currentCollection.getId() != collection.getId()) {
- // removeCollection(collection.getName());
- // }
- // }
+ if (!stateUpdate) {
+ if (log.isDebugEnabled()) log.debug("enqueue structure change docCollection={}", docCollection);
- if (currentCollection != null) {
+ String collectionName = docCollection.getName();
+ ColState collState = collLocks.compute(collectionName, (s, colState) -> {
+ if (colState == null) {
+ ColState cState = new ColState();
+ return cState;
+ }
+ return colState;
+ });
+ collState.collLock.lock();
+ try {
- currentCollection.getProperties().keySet().retainAll(docCollection.getProperties().keySet());
- List<String> removeSlices = new ArrayList();
- for (Slice slice : docCollection) {
- Slice currentSlice = currentCollection.getSlice(slice.getName());
- if (currentSlice != null) {
- if (currentSlice.get("remove") != null || slice.getProperties().get("remove") != null) {
- removeSlices.add(slice.getName());
- } else {
- currentCollection.getSlicesMap().put(slice.getName(), slice.update(currentSlice));
- }
+ DocCollection currentCollection = cs.get(docCollection.getName());
+ log.debug("zkwriter collection={}", docCollection);
+ log.debug("zkwriter currentCollection={}", currentCollection);
+ dirtyStructure.add(docCollection.getName());
+ idToCollection.putIfAbsent(docCollection.getId(), docCollection.getName());
+
+ if (currentCollection != null) {
+ docCollection.setZnodeVersion(currentCollection.getZNodeVersion());
+ currentCollection.getProperties().keySet().retainAll(docCollection.getProperties().keySet());
+ List<String> removeSlices = new ArrayList();
+ for (Slice slice : docCollection) {
+ Slice currentSlice = currentCollection.getSlice(slice.getName());
+ if (currentSlice != null) {
+ if (currentSlice.get("remove") != null || slice.getProperties().get("remove") != null) {
+ removeSlices.add(slice.getName());
} else {
- if (slice.getProperties().get("remove") != null) {
- continue;
- }
- Set<String> remove = new HashSet<>();
+ currentCollection.getSlicesMap().put(slice.getName(), slice.update(currentSlice));
+ }
+ } else {
+ if (slice.getProperties().get("remove") != null) {
+ continue;
+ }
+ Set<String> remove = new HashSet<>();
- for (Replica replica : slice) {
+ for (Replica replica : slice) {
- if (replica.get("remove") != null) {
- remove.add(replica.getName());
- }
- }
- for (String removeReplica : remove) {
- slice.getReplicasMap().remove(removeReplica);
+ if (replica.get("remove") != null) {
+ remove.add(replica.getName());
}
- currentCollection.getSlicesMap().put(slice.getName(), slice);
}
- }
- for (String removeSlice : removeSlices) {
- currentCollection.getSlicesMap().remove(removeSlice);
- }
- cs.put(currentCollection.getName(), currentCollection);
-
- } else {
- docCollection.getProperties().remove("pullReplicas");
- docCollection.getProperties().remove("replicationFactor");
- docCollection.getProperties().remove("maxShardsPerNode");
- docCollection.getProperties().remove("nrtReplicas");
- docCollection.getProperties().remove("tlogReplicas");
- List<String> removeSlices = new ArrayList();
- for (Slice slice : docCollection) {
- Slice currentSlice = docCollection.getSlice(slice.getName());
- if (currentSlice != null) {
- if (slice.getProperties().get("remove") != null) {
- removeSlices.add(slice.getName());
- }
+ for (String removeReplica : remove) {
+ slice.getReplicasMap().remove(removeReplica);
}
+ currentCollection.getSlicesMap().put(slice.getName(), slice);
}
- for (String removeSlice : removeSlices) {
- docCollection.getSlicesMap().remove(removeSlice);
+ }
+ for (String removeSlice : removeSlices) {
+ currentCollection.getSlicesMap().remove(removeSlice);
+ }
+ cs.put(currentCollection.getName(), currentCollection);
+
+ } else {
+ docCollection.getProperties().remove("pullReplicas");
+ docCollection.getProperties().remove("replicationFactor");
+ docCollection.getProperties().remove("maxShardsPerNode");
+ docCollection.getProperties().remove("nrtReplicas");
+ docCollection.getProperties().remove("tlogReplicas");
+ List<String> removeSlices = new ArrayList();
+ for (Slice slice : docCollection) {
+ Slice currentSlice = docCollection.getSlice(slice.getName());
+ if (currentSlice != null) {
+ if (slice.getProperties().get("remove") != null) {
+ removeSlices.add(slice.getName());
+ }
}
-
- cs.put(docCollection.getName(), docCollection);
+ }
+ for (String removeSlice : removeSlices) {
+ docCollection.getSlicesMap().remove(removeSlice);
}
- dirtyStructure.add(collectionName);
-
- } finally {
- collState.collLock.unlock();
+ cs.put(docCollection.getName(), docCollection);
}
- } else {
- for (Map.Entry<String,List<StateUpdate>> entry : collStateUpdates.entrySet()) {
-
- ColState collState = collLocks.compute(entry.getKey(), (s, reentrantLock) -> {
- if (reentrantLock == null) {
- ColState colState = new ColState();
- return colState;
- }
- return reentrantLock;
- });
+ } finally {
+ collState.collLock.unlock();
+ }
+ } else {
+ if (log.isDebugEnabled()) log.debug("enqueue state change states={}", collStateUpdates);
+ for (Map.Entry<String,List<StateUpdate>> entry : collStateUpdates.entrySet()) {
+
+ ColState collState = collLocks.compute(entry.getKey(), (s, reentrantLock) -> {
+ if (reentrantLock == null) {
+ ColState colState = new ColState();
+ return colState;
+ }
+ return reentrantLock;
+ });
- collState.collLock.lock();
- try {
- String collectionId = entry.getKey();
- String collection = idToCollection.get(Long.parseLong(collectionId));
- if (collection == null) {
- log.error("Collection not found by id={} collections={}", collectionId, idToCollection);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Collection not found by id=" + collectionId);
- }
+ collState.collLock.lock();
+ try {
+ String collectionId = entry.getKey();
+ String collection = idToCollection.get(Long.parseLong(collectionId));
+ if (collection == null) {
+ log.error("Collection not found by id={} collections={}", collectionId, idToCollection);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Collection not found by id=" + collectionId);
+ }
- ZkNodeProps updates = stateUpdates.get(collection);
- if (updates == null) {
- updates = new ZkNodeProps();
- stateUpdates.put(collection, updates);
- }
+ ZkNodeProps updates = stateUpdates.get(collection);
+ if (updates == null) {
+ updates = new ZkNodeProps();
+ stateUpdates.put(collection, updates);
+ }
+ DocCollection docColl = cs.get(collection);
+ String csVersion;
+ if (docColl != null) {
+ csVersion = Integer.toString(docColl.getZNodeVersion());
for (StateUpdate state : entry.getValue()) {
-
- Integer ver = trackVersions.get(collection);
- if (ver == null) {
- ver = 0;
+ if (state.sliceState != null) {
+ Slice slice = docColl.getSlice(state.sliceName);
+ if (slice != null) {
+ slice.setState(Slice.State.getState(state.sliceState));
+ }
+ dirtyStructure.add(collection);
+ continue;
}
- updates.getProperties().put("_cs_ver_", ver.toString());
-
- log.debug("version for state updates {}", ver.toString());
- DocCollection docColl = cs.get(collection);
- if (docColl != null) {
-
- if (state.sliceState != null) {
- Slice slice = docColl.getSlice(state.sliceName);
- if (slice != null) {
- slice.setState(Slice.State.getState(state.sliceState));
+ Replica replica = docColl.getReplicaById(state.id);
+ log.debug("found existing collection name={}, look for replica={} found={}", collection, state.id, replica);
+ if (replica != null) {
+ String setState = Replica.State.shortStateToState(state.state).toString();
+ log.debug("zkwriter publish state={} replica={}", state.state, replica.getName());
+ if (setState.equals("leader")) {
+ if (log.isDebugEnabled()) {
+ log.debug("set leader {}", replica);
}
- dirtyStructure.add(docColl.getName());
- continue;
- }
-
- Replica replica = docColl.getReplicaById(state.id);
- log.debug("found existing collection name={}, look for replica={} found={}", collection, state.id, replica);
- if (replica != null) {
- String setState = Replica.State.shortStateToState(state.state).toString();
- // if (blockedNodes.contains(replica.getNodeName())) {
- // continue;
- // }
- log.debug("zkwriter publish state={} replica={}", state.state, replica.getName());
- if (setState.equals("leader")) {
- if (log.isDebugEnabled()) {
- log.debug("set leader {}", replica);
- }
- Slice slice = docColl.getSlice(replica.getSlice());
- slice.setLeader(replica);
- replica.setState(Replica.State.ACTIVE);
- replica.getProperties().put("leader", "true");
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica r : replicas) {
- if (r != replica) {
- r.getProperties().remove("leader");
- }
- }
- updates.getProperties().put(replica.getInternalId(), "l");
- dirtyState.add(collection);
- } else {
- Replica.State s = Replica.State.getState(setState);
- Replica existingLeader = docColl.getSlice(replica).getLeader();
- if (existingLeader != null && existingLeader.getName().equals(replica.getName())) {
- docColl.getSlice(replica).setLeader(null);
+ Slice slice = docColl.getSlice(replica.getSlice());
+ slice.setLeader(replica);
+ replica.setState(Replica.State.ACTIVE);
+ replica.getProperties().put("leader", "true");
+ Collection<Replica> replicas = slice.getReplicas();
+ for (Replica r : replicas) {
+ if (r != replica) {
+ r.getProperties().remove("leader");
}
- updates.getProperties().put(replica.getInternalId(), Replica.State.getShortState(s));
- log.debug("set state {} {}", state, replica);
- replica.setState(s);
- dirtyState.add(collection);
}
- } else {
- log.debug("Could not find replica id={} in {} {}", state.id, docColl.getReplicaByIds(), docColl.getReplicas());
- }
- } else {
- log.debug("Could not find existing collection name={}", collection);
- String setState = Replica.State.shortStateToState(state.state).toString();
- if (setState.equals("leader")) {
- updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), "l");
+ updates.getProperties().put(replica.getInternalId(), "l");
dirtyState.add(collection);
} else {
Replica.State s = Replica.State.getState(setState);
- updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), Replica.State.getShortState(s));
+ Replica existingLeader = docColl.getSlice(replica).getLeader();
+ if (existingLeader != null && existingLeader.getName().equals(replica.getName())) {
+ docColl.getSlice(replica).setLeader(null);
+ }
+ updates.getProperties().put(replica.getInternalId(), Replica.State.getShortState(s));
+ log.debug("set state {} {}", state, replica);
+ replica.setState(s);
dirtyState.add(collection);
}
+ } else {
+ log.debug("Could not find replica id={} in {} {}", state.id, docColl.getReplicaByIds(), docColl.getReplicas());
}
}
-
- String coll = entry.getKey();
- dirtyState.add(coll);
- Integer ver = trackVersions.get(coll);
- if (ver == null) {
- ver = 0;
- }
- updates.getProperties().put("_cs_ver_", ver.toString());
- for (StateUpdate theUpdate : entry.getValue()) {
- updates.getProperties().put(theUpdate.id.substring(theUpdate.id.indexOf("-") + 1), theUpdate.state);
+ } else {
+ for (StateUpdate state : entry.getValue()) {
+ log.warn("Could not find existing collection name={}", collection);
+// String setState = Replica.State.shortStateToState(state.state).toString();
+// if (setState.equals("leader")) {
+// updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), "l");
+// dirtyState.add(collection);
+// } else {
+// Replica.State s = Replica.State.getState(setState);
+// updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), Replica.State.getShortState(s));
+// dirtyState.add(collection);
+// }
}
+ log.debug("version for state updates 0");
+ csVersion = "0";
+ }
- } finally {
- collState.collLock.unlock();
+ if (dirtyState.contains(collection)) {
+ updates.getProperties().put("_cs_ver_", csVersion);
}
+
+ } finally {
+ collState.collLock.unlock();
}
}
-
- } catch (Exception e) {
- log.error("Exception while queuing update", e);
- throw e;
}
- });
+
+ } catch (Exception e) {
+ log.error("Exception while queuing update", e);
+ throw e;
+ }
}
public Integer lastWrittenVersion(String collection) {
- return trackVersions.get(collection);
+ DocCollection col = cs.get(collection);
+ if (col == null) {
+ return 0;
+ }
+ return col.getZNodeVersion();
}
/**
@@ -326,31 +297,29 @@ public class ZkStateWriter {
*
*/
- // if additional updates too large, publish structure change
- public void writePendingUpdates(String collection) {
-
- do {
- try {
- write(collection);
- break;
- } catch (KeeperException.BadVersionException e) {
-
- } catch (Exception e) {
- log.error("write pending failed", e);
- break;
- }
+ public Future writePendingUpdates(String collection) {
+ return ParWork.getRootSharedExecutor().submit(() -> {
+ do {
+ try {
+ write(collection);
+ break;
+ } catch (KeeperException.BadVersionException e) {
- } while (!overseer.isClosed());
+ } catch (Exception e) {
+ log.error("write pending failed", e);
+ break;
+ }
+ } while (!overseer.isClosed() && !overseer.getZkStateReader().getZkClient().isClosed());
+ });
}
private void write(String coll) throws KeeperException.BadVersionException {
if (log.isDebugEnabled()) {
- log.debug("writePendingUpdates {}", cs);
+ log.debug("writePendingUpdates {}", coll);
}
- AtomicInteger lastVersion = new AtomicInteger();
AtomicReference<KeeperException.BadVersionException> badVersionException = new AtomicReference();
DocCollection collection = cs.get(coll);
@@ -359,133 +328,104 @@ public class ZkStateWriter {
return;
}
- if (log.isDebugEnabled()) log.debug("check collection {} {} {}", collection, dirtyStructure, dirtyState);
- Integer version = null;
- if (dirtyStructure.contains(collection.getName())) {
- log.info("process collection {}", collection);
- ColState collState = collLocks.compute(Long.toString(collection.getId()), (s, reentrantLock) -> {
- if (reentrantLock == null) {
- ColState colState = new ColState();
- return colState;
- }
- return reentrantLock;
- });
-
- collState.collLock.lock();
- try {
- collState.throttle.minimumWaitBetweenActions();
- collState.throttle.markAttemptingAction();
- String name = collection.getName();
- String path = ZkStateReader.getCollectionPath(collection.getName());
- String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
- // log.info("process collection {} path {}", collection.getName(), path);
- Stat existsStat = null;
- if (log.isTraceEnabled()) log.trace("process {}", collection);
- try {
- // log.info("get data for {}", name);
-
- // log.info("got data for {} {}", name, data.length);
+ log.debug("process collection {}", coll);
+ ColState collState = collLocks.compute(Long.toString(collection.getId()), (s, reentrantLock) -> {
+ if (reentrantLock == null) {
+ ColState colState = new ColState();
+ return colState;
+ }
+ return reentrantLock;
+ });
+ collState.collLock.lock();
+ try {
+ collection = cs.get(coll);
- try {
+ if (collection == null) {
+ return;
+ }
- if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
- byte[] data = Utils.toJSON(singletonMap(name, collection));
- Integer v = trackVersions.get(collection.getName());
+ if (log.isTraceEnabled()) log.trace("check collection {} {} {}", collection, dirtyStructure, dirtyState);
- if (v != null) {
- //log.info("got version from cache {}", v);
- version = v;
- } else {
- version = 0;
- }
- lastVersion.set(version);
- if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
+ // collState.throttle.minimumWaitBetweenActions();
+ // collState.throttle.markAttemptingAction();
+ String name = collection.getName();
+ String path = ZkStateReader.getCollectionPath(collection.getName());
+ String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
- reader.getZkClient().setData(path, data, version, true, false);
- if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), version + 1);
+ if (log.isTraceEnabled()) log.trace("process {}", collection);
+ try {
- reader.getZkClient().setData(pathSCN, null, -1, true, false);
+ if (dirtyStructure.contains(name)) {
+ if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
- dirtyStructure.remove(collection.getName());
+ byte[] data = Utils.toJSON(singletonMap(name, collection));
- ZkNodeProps updates = stateUpdates.get(collection.getName());
- if (updates != null) {
- updates.getProperties().clear();
- //(collection, updates);
- //dirtyState.remove(collection.getName());
- }
+ if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", collection.getZNodeVersion(), data.length, collection);
- trackVersions.put(collection.getName(), version + 1);
+ Integer finalVersion = collection.getZNodeVersion();
+ dirtyStructure.remove(collection.getName());
+ if (reader == null) {
+ log.error("read not initialized in zkstatewriter");
+ }
+ if (reader.getZkClient() == null) {
+ log.error("zkclient not initialized in zkstatewriter");
+ }
+ Stat stat;
+ try {
+ stat = reader.getZkClient().setData(path, data, finalVersion, true, false);
+ collection.setZnodeVersion(finalVersion + 1);
+ if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), stat.getVersion());
} catch (KeeperException.NoNodeException e) {
- if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
-
- lastVersion.set(-1);
- trackVersions.remove(collection.getName());
- stateUpdates.remove(collection.getName());
- cs.remove(collection);
- // likely deleted
+ log.debug("No node found for state.json", e);
} catch (KeeperException.BadVersionException bve) {
- log.info("Tried to update state.json ({}) with bad version", collection);
- //lastFailedException.set(bve);
- //failedUpdates.put(collection.getName(), collection);
- // Stat estate = reader.getZkClient().exists(path, null);
- trackVersions.remove(collection.getName());
- Stat stat = reader.getZkClient().exists(path, null, false, false);
- log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, version, stat != null ? stat.getVersion() : "null");
-
- if (!overseer.isClosed() && stat != null) {
- trackVersions.put(collection.getName(), stat.getVersion());
- }
+ stat = reader.getZkClient().exists(path, null, false, false);
+ log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, finalVersion, stat != null ? stat.getVersion() : "null");
+
throw bve;
}
- } catch (KeeperException.BadVersionException bve) {
- badVersionException.set(bve);
- } catch (InterruptedException | AlreadyClosedException e) {
- log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
+ reader.getZkClient().setData(pathSCN, null, -1, true, false);
- } catch (Exception e) {
- log.error("Failed processing update=" + collection, e);
- }
- } finally {
- collState.collLock.unlock();
- }
- }
+ ZkNodeProps updates = stateUpdates.get(collection.getName());
+ if (updates != null) {
+ updates.getProperties().clear();
+ }
- if (dirtyState.contains(collection.getName())) { //&& !dirtyStructure.contains(collection.getName())
- ZkNodeProps updates = stateUpdates.get(collection.getName());
- if (updates != null) {
- try {
- writeStateUpdates(collection, updates);
- } catch (Exception e) {
- log.error("exception writing state updates", e);
+ } else if (dirtyState.contains(collection.getName())) {
+ ZkNodeProps updates = stateUpdates.get(collection.getName());
+ if (updates != null) {
+ try {
+ writeStateUpdates(collection, updates);
+ } catch (Exception e) {
+ log.error("exception writing state updates", e);
+ }
+ }
}
- }
- }
- //removeCollections.forEach(c -> removeCollection(c));
+ } catch (KeeperException.BadVersionException bve) {
+ badVersionException.set(bve);
+ } catch (InterruptedException | AlreadyClosedException e) {
+ log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
+ throw new AlreadyClosedException(e);
- if (badVersionException.get() != null) {
- throw badVersionException.get();
- }
-
- //log.info("Done with successful cluster write out");
+ } catch (Exception e) {
+ log.error("Failed processing update=" + collection, e);
+ }
- // } finally {
- // writeLock.unlock();
- // }
- // MRM TODO: - harden against failures and exceptions
+ if (badVersionException.get() != null) {
+ throw badVersionException.get();
+ }
- // if (log.isDebugEnabled()) {
- // log.debug("writePendingUpdates() - end - New Cluster State is: {}", newClusterState);
- // }
+ } finally {
+ collState.collLock.unlock();
+ }
}
private void writeStateUpdates(DocCollection collection, ZkNodeProps updates) throws KeeperException, InterruptedException {
String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(collection.getName());
- if (log.isDebugEnabled()) log.debug("write state updates for collection {} {}", collection.getName(), updates);
+ if (log.isDebugEnabled()) log.debug("write state updates for collection {} ver={} {}", collection.getName(), updates.get("_cs_ver_"), updates);
try {
reader.getZkClient().setData(stateUpdatesPath, Utils.toJSON(updates), -1, true, false);
} catch (KeeperException.NoNodeException e) {
@@ -519,14 +459,6 @@ public class ZkStateWriter {
});
collState.collLock.lock();
try {
- stateUpdates.remove(collection);
- cs.remove(collection);
- assignMap.remove(collection);
- trackVersions.remove(collection);
- dirtyStructure.remove(collection);
- dirtyState.remove(collection);
- ZkNodeProps message = new ZkNodeProps("name", collection);
- cs.remove(collection);
Long id = null;
for (Map.Entry<Long, String> entry : idToCollection.entrySet()) {
if (entry.getValue().equals(collection)) {
@@ -536,6 +468,12 @@ public class ZkStateWriter {
}
if (id != null) {
idToCollection.remove(id);
+ stateUpdates.remove(collection);
+ cs.remove(collection);
+ assignMap.remove(collection);
+ dirtyStructure.remove(collection);
+ dirtyState.remove(collection);
+ cs.remove(collection);
}
} catch (Exception e) {
log.error("", e);
@@ -557,17 +495,17 @@ public class ZkStateWriter {
int id = docAssign.replicaAssignCnt.incrementAndGet();
- log.info("assign id={} for collection={} slice={}", id, collection, shard);
+ log.debug("assign id={} for collection={} slice={}", id, collection, shard);
return id;
}
int id = docAssign.replicaAssignCnt.incrementAndGet();
- log.info("assign id={} for collection={} slice={}", id, collection, shard);
+ log.debug("assign id={} for collection={} slice={}", id, collection, shard);
return id;
}
public void init() {
- reader.forciblyRefreshAllClusterStateSlow();
+
ClusterState readerState = reader.getClusterState();
if (readerState != null) {
cs.putAll(readerState.copy().getCollectionsMap());
@@ -575,31 +513,44 @@ public class ZkStateWriter {
long[] highId = new long[1];
cs.values().forEach(collection -> {
- if (collection.getId() > highId[0]) {
- highId[0] = collection.getId();
- }
+ String collectionName = collection.getName();
+ ColState collState = collLocks.compute(collectionName, (s, colState) -> {
+ if (colState == null) {
+ ColState cState = new ColState();
+ return cState;
+ }
+ return colState;
+ });
+ collState.collLock.lock();
+ try {
- idToCollection.put(collection.getId(), collection.getName());
+ if (collection.getId() > highId[0]) {
+ highId[0] = collection.getId();
+ }
- trackVersions.put(collection.getName(), collection.getZNodeVersion());
+ idToCollection.put(collection.getId(), collection.getName());
- DocAssign docAssign = new DocAssign();
- docAssign.name = collection.getName();
- assignMap.put(docAssign.name, docAssign);
- int max = 1;
- Collection<Slice> slices = collection.getSlices();
- for (Slice slice : slices) {
- Collection<Replica> replicas = slice.getReplicas();
-
- for (Replica replica : replicas) {
- Matcher matcher = Assign.pattern.matcher(replica.getName());
- if (matcher.matches()) {
- int val = Integer.parseInt(matcher.group(1));
- max = Math.max(max, val);
+
+ DocAssign docAssign = new DocAssign();
+ docAssign.name = collection.getName();
+ assignMap.put(docAssign.name, docAssign);
+ int max = 1;
+ Collection<Slice> slices = collection.getSlices();
+ for (Slice slice : slices) {
+ Collection<Replica> replicas = slice.getReplicas();
+
+ for (Replica replica : replicas) {
+ Matcher matcher = Assign.pattern.matcher(replica.getName());
+ if (matcher.matches()) {
+ int val = Integer.parseInt(matcher.group(1));
+ max = Math.max(max, val);
+ }
}
}
+ docAssign.replicaAssignCnt.set(max);
+ } finally {
+ collState.collLock.unlock();
}
- docAssign.replicaAssignCnt.set(max);
});
ID.set(highId[0]);
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 5cc13bb..6775b88 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -156,6 +156,7 @@ public class CoreContainer implements Closeable {
final SolrCores solrCores = new SolrCores(this);
private volatile boolean startedLoadingCores;
private volatile boolean loaded;
+ private volatile SolrClient metricsHistoryHandlerCloudClient;
public static class CoreLoadFailure {
@@ -877,7 +878,7 @@ public class CoreContainer implements Closeable {
List<Replica> replicas = c.getReplicas();
for (Replica replica : replicas) {
- log.debug("startup replica on node={} replica={}", zkSys.getZkController().getNodeName(), replica);
+ log.trace("startup replica on node={} replica={}", zkSys.getZkController().getNodeName(), replica);
if (replica.getNodeName().equals(nodeName)) {
if (replica.getState().equals(State.ACTIVE)) {
if (log.isDebugEnabled()) log.debug("Found incorrect state {} {} ourNodeName={} replica={}", replica.getState(), replica.getNodeName(), nodeName, replica);
@@ -998,20 +999,20 @@ public class CoreContainer implements Closeable {
}
String name;
SolrCloudManager cloudManager;
- SolrClient client;
+
if (isZooKeeperAware()) {
name = getZkController().getNodeName();
cloudManager = getZkController().getSolrCloudManager();
- client = new CloudHttp2SolrClient.Builder(getZkController().getZkStateReader())
+ metricsHistoryHandlerCloudClient = new CloudHttp2SolrClient.Builder(getZkController().getZkStateReader())
.withHttpClient(updateShardHandler.getTheSharedHttpClient()).markInternalRequest().build();
- ((CloudHttp2SolrClient)client).connect();
+ ((CloudHttp2SolrClient) metricsHistoryHandlerCloudClient).connect();
} else {
name = getNodeConfig().getNodeName();
if (name == null || name.isEmpty()) {
name = "127.0.0.1";
}
cloudManager = null;
- client = new EmbeddedSolrServer();
+ metricsHistoryHandlerCloudClient = new EmbeddedSolrServer();
// enable local metrics unless specifically set otherwise
if (!initArgs.containsKey(MetricsHistoryHandler.ENABLE_NODES_PROP)) {
initArgs.put(MetricsHistoryHandler.ENABLE_NODES_PROP, true);
@@ -1020,8 +1021,7 @@ public class CoreContainer implements Closeable {
initArgs.put(MetricsHistoryHandler.ENABLE_REPLICAS_PROP, true);
}
}
- metricsHistoryHandler = new MetricsHistoryHandler(name, metricsHandler,
- client, cloudManager, initArgs, isZooKeeperAware() ? zkSys.getZkController().getOverseer() : null);
+ metricsHistoryHandler = new MetricsHistoryHandler(name, metricsHandler, metricsHistoryHandlerCloudClient, cloudManager, initArgs, isZooKeeperAware() ? zkSys.getZkController().getOverseer() : null);
containerHandlers.put(METRICS_HISTORY_PATH, metricsHistoryHandler);
metricsHistoryHandler.initializeMetrics(solrMetricsContext, METRICS_HISTORY_PATH);
}
@@ -1169,7 +1169,7 @@ public class CoreContainer implements Closeable {
if (auditloggerPlugin != null) {
auditPlugin = auditloggerPlugin.plugin;
}
-
+ closer.collect(metricsHistoryHandlerCloudClient);
closer.collect(authPlugin);
closer.collect(authenPlugin);
closer.collect(auditPlugin);
@@ -1410,7 +1410,7 @@ public class CoreContainer implements Closeable {
@SuppressWarnings("resource")
private SolrCore createFromDescriptor(CoreDescriptor dcore, boolean newCollection) {
- log.info("createFromDescriptor {} {}", dcore, newCollection);
+ log.debug("createFromDescriptor {} {}", dcore, newCollection);
if (isShutDown()) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Solr has been shutdown.");
@@ -1421,10 +1421,15 @@ public class CoreContainer implements Closeable {
boolean registered = false;
try {
MDCLoggingContext.setCoreName(dcore.getName());
+
StopWatch timeValidateCoreNameLoadConfigSet = new StopWatch(dcore.getName() + "-validateCoreNameLoadConfigSet");
SolrIdentifierValidator.validateCoreName(dcore.getName());
+ if (isZooKeeperAware()) {
+ getZkController().getZkStateReader().registerCore(dcore.getCollectionName(), dcore.getName());
+ }
+
ConfigSet coreConfig = coreConfigService.loadConfigSet(dcore);
dcore.setConfigSetTrusted(coreConfig.isTrusted());
if (log.isInfoEnabled()) {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 17c4351..f380a0f 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1769,8 +1769,11 @@ public final class SolrCore implements SolrInfoBean, Closeable {
}
}
- void doClose () {
-
+ synchronized void doClose () {
+ if (refCount.get() == -1) {
+ log.warn("SolrCore is already closed {}", name);
+ return;
+ }
try {
if (closing) {
this.closing = true;
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 6a8e1fa..5f9f90e 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -1077,7 +1077,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
copyPropertiesWithPrefix(req.getParams(), m, COLL_PROP_PREFIX);
if (m.isEmpty()) {
throw new SolrException(ErrorCode.BAD_REQUEST,
- formatString("no supported values provided {0}", CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES.toString()));
+ formatString("no supported values provided {0} {1}", req.getParams(), CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES.toString()));
}
copy(req.getParams().required(), m, COLLECTION_PROP);
for (Map.Entry<String, Object> entry : m.entrySet()) {
diff --git a/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java b/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
index 05f81bf..f4cf0b8 100644
--- a/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
+++ b/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
@@ -222,7 +222,7 @@ public abstract class ManagedResourceStorage {
throw new SolrException(ErrorCode.SERVER_ERROR, errMsg, exc);
}
- log.info("Configured ZooKeeperStorageIO with znodeBase: {}", znodeBase);
+ log.debug("Configured ZooKeeperStorageIO with znodeBase: {}", znodeBase);
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index d912d62..3943f82 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -101,7 +101,7 @@ public final class CommitTracker implements Runnable, Closeable {
scheduler.setRemoveOnCancelPolicy(true);
scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- log.info("{} AutoCommit: {}", name, this);
+ log.debug("{} AutoCommit: {}", name, this);
assert ObjectReleaseTracker.track(this);
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
index e17ef6a..a0c772f 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
@@ -144,9 +144,8 @@ UpdateHandler implements SolrInfoBean, Closeable {
ulog.clearLog(core, ulogPluginInfo);
}
- if (log.isInfoEnabled()) {
- log.info("Using UpdateLog implementation: {}", ulog.getClass().getName());
- }
+ log.debug("Using UpdateLog implementation: {}", ulog.getClass().getName());
+
ulog.init(ulogPluginInfo);
ulog.init(this, core);
} else {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index dfb3679..b9c6461 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -391,7 +391,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Number of version buckets must be greater than 0!");
- log.info("Initializing UpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={} numVersionBuckets={}",
+ log.debug("Initializing UpdateLog: dataDir={} defaultSyncLevel={} numRecordsToKeep={} maxNumLogsToKeep={} numVersionBuckets={}",
dataDir, defaultSyncLevel, numRecordsToKeep, maxNumLogsToKeep, numVersionBuckets);
}
@@ -2386,7 +2386,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent);
if (maxVersion == 0L) {
maxVersion = versions.getNewClock();
- log.info("Could not find max version in index or recent updates, using new clock {}", maxVersion);
+ log.debug("Could not find max version in index or recent updates, using new clock {}", maxVersion);
}
// seed all version buckets with the highest value from recent and index
diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
index 61a785d..4f9e619 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
@@ -277,7 +277,7 @@ public class VersionInfo {
final String versionFieldName = versionField.getName();
- log.info("Refreshing highest value of {} for {} version buckets from index", versionFieldName, buckets.length);
+ log.debug("Refreshing highest value of {} for {} version buckets from index", versionFieldName, buckets.length);
// if indexed, then we have terms to get the max from
if (versionField.indexed()) {
if (versionField.getType().isPointField()) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
index 620b890..bfd55b3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
@@ -56,7 +56,7 @@ public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
@Test
public void start() throws Exception {
- int collectionCnt = 40;
+ int collectionCnt = 5;
List<Future> futures = new ArrayList<>();
List<Future> indexFutures = new ArrayList<>();
for (int i = 0; i < collectionCnt; i ++) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
index 91fd066..b88e93f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
@@ -51,9 +51,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+
//@LuceneTestCase.Nightly
public class TestCollectionAPI extends ReplicaPropertiesBase {
@@ -909,6 +910,7 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
params.set("router.name", "implicit");
params.set("numShards", "1");
params.set("shards", "invalid@name#with$weird%characters");
+ params.set(WAIT_FOR_FINAL_STATE, "true");
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
index 86c7995..b70b08d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
@@ -379,7 +379,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
String url = zkClientClusterStateProvider.getZkStateReader().getBaseUrlForNodeName(solrNode);
try {
- GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.GET, path, params);
+ GenericSolrRequest request = new GenericSolrRequest(SolrRequest.METHOD.POST, path, params); // params too long for GET
try (Http2SolrClient client = new Http2SolrClient.Builder().withHttpClient(httpClient).withBaseUrl(url).markInternalRequest().build()) {
NamedList<Object> rsp = client.request(request);
request.response.nl = rsp;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index dc36207..d7d41d4 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -215,6 +215,20 @@ public class ClusterState implements JSONWriter.Writable {
return createFromCollectionMap(nodeNameToBaseUrl, version, stateMap);
}
+ public static DocCollection createDocCollectionFromJson(Replica.NodeNameToBaseUrl nodeNameToBaseUrl, int version, byte[] bytes) {
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
+ Map<String, Object> stateMap = (Map<String, Object>) Utils.fromJSON(bytes);
+ ClusterState cs = createFromCollectionMap(nodeNameToBaseUrl, version, stateMap);
+ if (cs.getCollectionsMap().size() == 0) {
+ return null;
+ }
+ DocCollection docCollection = cs.getCollectionsMap().values().iterator().next();
+ docCollection.setZnodeVersion(version);
+ return docCollection;
+ }
+
public static ClusterState createFromCollectionMap(Replica.NodeNameToBaseUrl zkStateReader, int version, Map<String, Object> stateMap) {
Map<String,CollectionRef> collections = new LinkedHashMap<>(stateMap.size());
for (Entry<String, Object> entry : stateMap.entrySet()) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 814f808..59cd767 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -93,7 +93,7 @@ public class ConnectionManager implements Watcher, Closeable {
throw new IllegalStateException("You must call start on " + SolrZkClient.class.getName() + " before you can use it");
}
- if (keeper != null && isClosed && !keeper.getState().isAlive()) {
+ if (keeper != null && (isClosed || !keeper.getState().isAlive())) {
throw new AlreadyClosedException(this + " SolrZkClient is not currently connected state=" + keeper.getState());
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 3c0326f..b55ce1a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -51,7 +51,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
public static final String DOC_ROUTER = "router";
public static final String SHARDS = "shards";
- private final int znodeVersion;
+ private int znodeVersion;
private final String name;
private final Map<String, Slice> slices;
@@ -462,4 +462,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
public Map getStateUpdates() {
return stateUpdates;
}
+
+ public void setZnodeVersion(int version) {
+ this.znodeVersion = version;
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 80eea86..2a48875 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -426,9 +426,11 @@ public class SolrZkClient implements Closeable {
return setData(path, data, version, retryOnConnLoss, false);
}
- /**
- * Returns node's state
- */
+ public void setData(final String path, final byte data[], final int version, AsyncCallback.StatCallback cb, Object ctx)
+ throws KeeperException, InterruptedException {
+ connManager.getKeeper().setData(path, data, version, cb, ctx);
+ }
+
public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss, boolean retryOnSessionExpiration)
throws KeeperException, InterruptedException {
@@ -823,21 +825,13 @@ public class SolrZkClient implements Closeable {
return dataMap;
}
- public void delete(Collection<String> paths, boolean wait) throws KeeperException {
+ public CountDownLatch delete(Collection<String> paths, boolean wait) throws KeeperException {
if (log.isDebugEnabled()) log.debug("delete paths {} wait={}", paths, wait);
- if (paths.size() == 0) {
- return;
- }
- CountDownLatch latch = null;
- if (wait) {
- latch = new CountDownLatch(paths.size());
- }
+ CountDownLatch latch = new CountDownLatch(paths.size());
+
KeeperException[] ke = new KeeperException[1];
for (String path : paths) {
if (log.isDebugEnabled()) log.debug("process path={} connManager={}", path, connManager);
-
-
- CountDownLatch finalLatch = latch;
connManager.getKeeper().delete(path, -1, (rc, path1, ctx) -> {
try {
@@ -855,9 +849,7 @@ public class SolrZkClient implements Closeable {
}
}
} finally {
- if (wait) {
- finalLatch.countDown();
- }
+ latch.countDown();
}
}, null);
@@ -867,43 +859,26 @@ public class SolrZkClient implements Closeable {
log.debug("done with all paths, see if wait ... wait={}", wait);
}
if (wait) {
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
- boolean success = false;
- while (!timeout.hasTimedOut() && !isClosed) {
- if (!connManager.getKeeper().getState().isConnected()) {
- try {
- connManager.waitForConnected(30000);
- } catch (TimeoutException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- }
- }
+ try {
+ boolean success = latch.await(10, TimeUnit.SECONDS);
- if (ke[0] != null) {
- throw ke[0];
- }
- try {
- success = latch.await(10, TimeUnit.SECONDS);
- if (log.isDebugEnabled()) log.debug("done waiting on latch, success={}", success);
- if (success) {
- break;
+ if (log.isDebugEnabled()) log.debug("done waiting on latch, success={}", success);
+ if (success) {
+ if (ke[0] != null) {
+ throw ke[0];
}
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- log.error("", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
- if (ke[0] != null) {
- throw ke[0];
- }
if (log.isDebugEnabled()) {
log.debug("done with delete {} {}", paths, wait);
}
+
+ return latch;
}
// Calls setData for a list of existing paths in parallel
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 12d2671..8290360 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -260,7 +260,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
public boolean canBeRemoved() {
int refCount = coreRefCount.get();
int watcherCount = stateWatchers.size();
- log.debug("{} watcher can be removed coreRefCount={}, stateWatchers={}", collection, refCount, watcherCount);
+ log.trace("{} watcher can be removed coreRefCount={}, stateWatchers={}", collection, refCount, watcherCount);
return refCount <= 0 && watcherCount <= 0;
}
@@ -449,7 +449,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
if (log.isDebugEnabled()) {
log.debug("Server older than client {}<{}", collection.getZNodeVersion(), version);
}
- DocCollection nu = getCollectionLive(this, coll);
+ DocCollection nu = getCollectionLive(coll);
if (nu == null) return -3;
if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
if (updateWatchedCollection(coll, nu, false)) {
@@ -634,13 +634,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
*/
private void constructState(DocCollection collection, String caller) {
- if (log.isDebugEnabled()) log.debug("construct new cluster state on structure change {} {}", caller, collection);
+ if (log.isDebugEnabled()) log.trace("construct new cluster state on structure change {} {}", caller, collection);
- if (log.isTraceEnabled()) {
- log.trace("clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]", collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(),
+
+ log.trace("clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]", collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(),
clusterState.keySet());
- }
+
//
// watchedCollectionStates.forEach((s, slices) -> {
// clusterState.putIfAbsent(s, new ClusterState.CollectionRef(slices));
@@ -689,8 +689,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
LazyCollectionRef docRef = new LazyCollectionRef(coll);
LazyCollectionRef old = lazyCollectionStates.putIfAbsent(coll, docRef);
if (old == null) {
- log.debug("Created lazy collection {}", coll);
clusterState.put(coll, docRef);
+
+ log.debug("Created lazy collection {} interesting [{}] watched [{}] lazy [{}] total [{}]", coll, collectionWatches.keySet().size(),
+ watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(), clusterState.size());
}
}
}
@@ -752,7 +754,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
private void notifyCloudCollectionsListeners(boolean notifyIfSame) {
- if (log.isDebugEnabled()) log.debug("Notify cloud collection listeners {}", notifyIfSame);
+ log.trace("Notify cloud collection listeners {}", notifyIfSame);
Set<String> newCollections;
Set<String> oldCollections;
boolean fire = true;
@@ -763,9 +765,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
fire = true;
}
- if (log.isDebugEnabled()) log.debug("Should fire listeners? {} listeners={}", fire, cloudCollectionsListeners.size());
+ log.trace("Should fire listeners? {} listeners={}", fire, cloudCollectionsListeners.size());
if (fire) {
-
cloudCollectionsListeners.forEach(new CloudCollectionsListenerConsumer(oldCollections, newCollections));
}
}
@@ -807,7 +808,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
if (shouldFetch) {
try {
- DocCollection cdc = getCollectionLive(ZkStateReader.this, collName);
+ DocCollection cdc = getCollectionLive(collName);
if (cdc != null) {
cdc.setCreatedLazy();
lastUpdateTime = System.nanoTime();
@@ -959,7 +960,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
public Replica getLeader(String collection, String shard) {
- return getLeader(getClusterState().getCollection(collection), shard);
+ return getLeader(getCollectionOrNull(collection), shard);
}
private Replica getLeader(DocCollection docCollection, String shard) {
@@ -1486,18 +1487,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
@Override
public void close() throws IOException {
this.closed = true;
- // IOUtils.closeQuietly(stateUpdateWatcher);
-// SolrZooKeeper zk = zkClient.getSolrZooKeeper();
-// if (zk != null) {
-// try {
-// zk.removeWatches(getCollectionSCNPath(coll), this, WatcherType.Any, true);
-// } catch (KeeperException.NoWatcherException e) {
-//
-// } catch (Exception e) {
-// if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
-// }
-// }
-// IOUtils.closeQuietly(stateUpdateWatcher);
}
private class StateUpdateWatcher implements Watcher, Closeable {
@@ -1511,40 +1500,22 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
@Override
public void close() throws IOException {
this.closed = true;
-// SolrZooKeeper zk = zkClient.getSolrZooKeeper();
-// if (zk != null) {
-// if (stateUpdateWatcher != null) {
-// try {
-// zk.removeWatches(getCollectionStateUpdatesPath(coll), stateUpdateWatcher, WatcherType.Any, true);
-// } catch (KeeperException.NoWatcherException e) {
-//
-// } catch (Exception e) {
-// if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
-// }
-// }
-// }
}
@Override
public void process(WatchedEvent event) {
if (zkClient.isClosed() || closed) return;
- if (log.isDebugEnabled()) log.debug("_statupdates event {}", event);
+ log.trace("_statupdates event {}", event);
try {
-
- // if (event.getType() == EventType.NodeDataChanged ||
- // event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) {
collectionStateLock.lock();
getAndProcessStateUpdates(coll, stateUpdatesPath, false, getCollectionOrNull(coll), collectionStateLock);
- // }
-
} catch (AlreadyClosedException e) {
} catch (Exception e) {
log.error("Unwatched collection: [{}]", coll, e);
}
}
-
}
}
@@ -1783,7 +1754,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
}
- public DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
+ public DocCollection getCollectionLive(String coll) {
log.debug("getCollectionLive {}", coll);
DocCollection newState;
try {
@@ -1812,7 +1783,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
private DocCollection getAndProcessStateUpdates(String coll, String stateUpdatesPath, boolean live, DocCollection docCollection, ReentrantLock collectionStateLock) throws KeeperException, InterruptedException {
DocCollection result = null;
try {
- log.debug("get and process state updates for {}", coll);
+ log.trace("get and process state updates for {}", coll);
Stat stat;
try {
@@ -1827,7 +1798,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
if (docCollection != null && docCollection.hasStateUpdates()) {
int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
- if (stat.getVersion() < oldVersion) {
+ if (stat.getVersion() <= oldVersion) {
if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
return docCollection;
}
@@ -1854,7 +1825,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
Integer version = Integer.parseInt((String) m.get("_cs_ver_"));
- if (log.isDebugEnabled()) log.debug("Got additional state updates with version {} {} cs={}", version, m, clusterState);
+ if (log.isDebugEnabled()) log.debug("Got additional state updates with znode version {} for cs version {} updates={}", stat.getVersion(), version, m);
m.remove("_cs_ver_");
m.put("_ver_", stat.getVersion());
@@ -1862,11 +1833,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
Set<Entry<String,Object>> entrySet = m.entrySet();
if (docCollection != null) {
- // || (version > docCollection.getZNodeVersion() && clusterState.getZkClusterStateVersion() == -1)) {
-// if (!docCollection.hasStateUpdates() && version < docCollection.getZNodeVersion()) {
-// if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
-// return docCollection;
-// }
+ if (version < docCollection.getZNodeVersion()) {
+ if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
+ return docCollection;
+ }
if (docCollection.hasStateUpdates()) {
int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
@@ -1885,7 +1855,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
Replica replica = docCollection.getReplicaById(docCollection.getId() + "-" + id);
- if (log.isDebugEnabled()) log.debug("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica, id, docCollection.getReplicaByIds());
+ if (log.isTraceEnabled()) log.trace("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica.getName(), id, docCollection.getReplicaByIds());
if (replica != null) {
@@ -1902,7 +1872,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
if (replica.getName().equals(r.getName())) {
continue;
}
- log.debug("process non leader {} {}", r, r.getProperty(LEADER_PROP));
+ log.trace("process non leader {} {}", r, r.getProperty(LEADER_PROP));
if ("true".equals(r.getProperties().get(LEADER_PROP))) {
log.debug("remove leader prop {}", r);
Map<String,Object> props = new HashMap<>(r.getProperties());
@@ -1912,7 +1882,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
}
} else if (state != null && !properties.get(ZkStateReader.STATE_PROP).equals(state.toString())) {
- if (log.isDebugEnabled()) log.debug("std state, set to {}", state);
+ log.trace("std state, set to {}", state);
properties.put(ZkStateReader.STATE_PROP, state.toString());
if ("true".equals(properties.get(LEADER_PROP))) {
properties.remove(LEADER_PROP);
@@ -1921,7 +1891,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
Replica newReplica = new Replica(replica.getName(), properties, coll, docCollection.getId(), replica.getSlice(), ZkStateReader.this);
- if (log.isDebugEnabled()) log.debug("add new replica {}", newReplica);
+ log.trace("add new replica {}", newReplica);
replicasMap.put(replica.getName(), newReplica);
@@ -1933,7 +1903,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
Map<String,Slice> newSlices = new HashMap<>(docCollection.getSlicesMap());
newSlices.put(slice.getName(), newSlice);
- if (log.isDebugEnabled()) log.debug("add new slice leader={} {}", newSlice.getLeader(), newSlice);
+ log.trace("add new slice leader={} {}", newSlice.getLeader(), newSlice);
DocCollection newDocCollection = new DocCollection(coll, newSlices, docCollection.getProperties(), docCollection.getRouter(), docCollection.getZNodeVersion(), m);
docCollection = newDocCollection;
@@ -1957,7 +1927,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
//
// });
- if (log.isDebugEnabled()) log.debug("Set a new clusterstate based on update diff {}", result);
+ log.trace("Set a new clusterstate based on update diff {}", result);
updateWatchedCollection(coll, result, false);
constructState(result);
@@ -2001,39 +1971,41 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
}
- if (lazyCollectionStates.containsKey(coll)) {
- LazyCollectionRef lazyColl = lazyCollectionStates.get(coll);
- DocCollection cachedCollection = lazyColl.getCachedDocCollection();
- if (cachedCollection != null) {
- int localVersion = cachedCollection.getZNodeVersion();
- if (cachedCollection.hasStateUpdates()) {
- if (localVersion == version) {
- return cachedCollection;
- }
- } else {
- if (localVersion == version) {
- return cachedCollection;
- }
- }
- }
-
- }
+// if (lazyCollectionStates.containsKey(coll)) {
+// LazyCollectionRef lazyColl = lazyCollectionStates.get(coll);
+// DocCollection cachedCollection = lazyColl.getCachedDocCollection();
+// if (cachedCollection != null) {
+// int localVersion = cachedCollection.getZNodeVersion();
+// if (cachedCollection.hasStateUpdates()) {
+// if (localVersion == version) {
+// return cachedCollection;
+// }
+// } else {
+// if (localVersion == version) {
+// return cachedCollection;
+// }
+// }
+// }
+// }
} else {
return null;
}
- if (log.isDebugEnabled()) log.debug("getting latest state.json");
+ log.debug("getting latest state.json");
Stat stat = new Stat();
byte[] data;
try {
data = zkClient.getData(collectionPath, null, stat, true);
} catch (NoNodeException e) {
+ log.debug("no state.json znode found");
return null;
}
- if (data == null) return null;
- ClusterState state = ClusterState.createFromJson(this, stat.getVersion(), data);
- ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
- DocCollection docCollection = collectionRef == null ? null : collectionRef.get();
+
+ if (data == null) {
+ log.debug("no data found at state.json node");
+ return null;
+ }
+ DocCollection docCollection = ClusterState.createDocCollectionFromJson(this, stat.getVersion(), data);
return docCollection;
}
@@ -2084,6 +2056,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
sw.createWatch();
sw.refresh();
sw.refreshStateUpdates();
+ v.coreRefCount.incrementAndGet();
return v;
}
v.coreRefCount.incrementAndGet();
@@ -2187,11 +2160,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
collectionWatches.compute(collection, (k, v) -> {
if (v == null) {
- log.debug("creating new Collection State watcher for {}", collection);
+ log.debug("creating CollectionStateWatcher and refreshing for {}", collection);
v = new CollectionWatch<>(collection);
CollectionStateWatcher sw = new CollectionStateWatcher(collection);
stateWatchersMap.put(collection, sw);
- log.debug("creating watches and refreshing state watcher for {}", collection);
+
sw.createWatch();
sw.refresh();
sw.refreshStateUpdates();
@@ -2214,7 +2187,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
- private DocCollection getCollectionOrNull(String collection) {
+ public DocCollection getCollection(String collection) {
+ DocCollection coll = getCollectionOrNull(collection);
+ if (coll == null) throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find collection : " + collection + " collections=" + clusterState.keySet());
+ return coll;
+ }
+
+ public DocCollection getCollectionOrNull(String collection) {
ClusterState.CollectionRef coll = clusterState.get(collection);
if (coll == null) return null;
return coll.get();
@@ -2275,11 +2254,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) {
- log.info("waitForActiveCollection: {} interesting [{}] watched [{}] lazy [{}] total [{}]", collection, collectionWatches.keySet().size(), watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(),
+ log.debug("waitForActiveCollection: {} interesting [{}] watched [{}] lazy [{}] total [{}]", collection, collectionWatches.keySet().size(), watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(),
clusterState.size());
- log.debug("waitForActiveCollection: {} interesting [{}] watched [{}] lazy [{}] total [{}]", collection, collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(),
- clusterState.keySet());
assert collection != null;
CollectionStatePredicate predicate = expectedShardsAndActiveReplicas(shards, totalReplicas, exact);
@@ -2378,7 +2355,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
* @see #registerDocCollectionWatcher
*/
public void removeDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
- if (log.isDebugEnabled()) log.debug("remove watcher for collection {}", collection);
+ log.trace("remove watcher for collection {}", collection);
if (collection == null) {
throw new IllegalArgumentException("Collection cannot be null");
@@ -2388,18 +2365,17 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
collectionWatches.compute(collection, (k, v) -> {
if (v == null) return null;
- v.stateWatchers.remove(watcher);
if (v.canBeRemoved()) {
- log.debug("no longer watch collection {}", collection);
+ log.trace("no longer watch collection {}", collection);
watchedCollectionStates.remove(collection);
LazyCollectionRef docRef = new LazyCollectionRef(collection);
lazyCollectionStates.put(collection, docRef);
clusterState.put(collection, docRef);
-// CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
-// if (stateWatcher != null) {
-// IOUtils.closeQuietly(stateWatcher);
-// stateWatcher.removeWatch();
-// }
+ CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
+ if (stateWatcher != null) {
+ stateWatcher.removeWatch();
+ IOUtils.closeQuietly(stateWatcher);
+ }
reconstructState.set(true);
return null;
}
@@ -2529,7 +2505,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
private void notifyStateWatchers(String collection, DocCollection collectionState) {
- if (log.isDebugEnabled()) log.debug("Notify state watchers [{}] {}", collectionWatches.keySet(), collectionState);
+ log.trace("Notify state watchers [{}] {}", collectionWatches.keySet(), collectionState);
try {
notifications.submit(new Notification(collection, collectionState, collectionWatches));
@@ -2572,7 +2548,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
try (ParWork work = new ParWork(this)) {
watchers.forEach(watcher -> {
work.collect("", () -> {
- log.debug("Notify DocCollectionWatcher {} {}", watcher, collectionState);
+ log.trace("Notify DocCollectionWatcher {} {}", watcher, collectionState);
try {
if (watcher.onStateChanged(collectionState)) {
removeDocCollectionWatcher(collection, watcher);
@@ -2875,7 +2851,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
return false;
Collection<Slice> activeSlices = collectionState.getActiveSlices();
- log.debug("active slices expected={} {} {} allSlices={}",expectedShards, activeSlices.size(), activeSlices, collectionState.getSlices());
+ log.trace("active slices expected={} {} {} allSlices={}", expectedShards, activeSlices.size(), activeSlices, collectionState.getSlices());
if (!exact) {
if (activeSlices.size() < expectedShards) {
@@ -2888,16 +2864,16 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
if (expectedReplicas == 0 && !exact) {
- log.info("0 replicas expected and found, return");
+ log.debug("0 replicas expected and found, return");
return true;
}
int activeReplicas = 0;
for (Slice slice : activeSlices) {
Replica leader = slice.getLeader();
- log.info("slice is {} and leader is {}", slice.getName(), leader);
+ log.trace("slice is {} and leader is {}", slice.getName(), leader);
if (leader == null) {
- log.info("slice={}", slice);
+ log.debug("slice={}", slice);
return false;
}
for (Replica replica : slice) {
@@ -2905,7 +2881,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
activeReplicas++;
}
}
- log.info("slice is {} and active replicas is {}, expected {} liveNodes={}", slice.getName(), activeReplicas, expectedReplicas, liveNodes);
+ log.trace("slice is {} and active replicas is {}, expected {} liveNodes={}", slice.getName(), activeReplicas, expectedReplicas, liveNodes);
}
if (!exact) {
if (activeReplicas >= expectedReplicas) {
@@ -2934,7 +2910,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
protected void checkShardConsistency(String collection, boolean verbose)
throws Exception {
- Set<String> theShards = getClusterState().getCollection(collection).getSlicesMap().keySet();
+ Set<String> theShards = getCollection(collection).getSlicesMap().keySet();
String failMessage = null;
for (String shard : theShards) {
String shardFailMessage = checkShardConsistency(collection, shard, false, verbose);
@@ -2970,7 +2946,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
if (verbose) System.err.println("__________________________\n");
int cnt = 0;
- DocCollection coll = getClusterState().getCollection(collection);
+ DocCollection coll = getCollection(collection);
Slice replicas = coll.getSlice(shard);