You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/10 18:52:10 UTC
[lucene-solr] branch reference_impl_dev updated: @1139 Only write
out the collections that have been updated.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new d2fa7c2 @1139 Only write out the collections that have been updated.
d2fa7c2 is described below
commit d2fa7c2eb00db74431c901e115cf80033604fe26
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Nov 10 12:51:43 2020 -0600
@1139 Only write out the collections that have been updated.
---
settings.gradle | 3 +
.../client/solrj/embedded/JettySolrRunner.java | 2 +-
.../src/java/org/apache/solr/cloud/Overseer.java | 5 +-
.../solr/cloud/api/collections/AddReplicaCmd.java | 2 +-
.../cloud/api/collections/CreateCollectionCmd.java | 2 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 143 +++++++++++----------
.../org/apache/solr/common/cloud/SolrZkClient.java | 4 +-
7 files changed, 86 insertions(+), 75 deletions(-)
diff --git a/settings.gradle b/settings.gradle
index e6f1795..4f39831 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -86,3 +86,6 @@ include "solr:benchmark"
include "solr:benchmark"
+
+include "solr:benchmark"
+
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 36aa894..8b911b4 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -596,7 +596,7 @@ public class JettySolrRunner implements Closeable {
if (!success) {
log.warn("Timedout waiting to see {} node in zk", ZkStateReader.COLLECTIONS_ZKNODE);
}
- log.info("Done waiting on latch");
+ if (log.isDebugEnabled()) log.debug("Done waiting on latch");
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index a74ecee..9335299 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -846,8 +846,9 @@ public class Overseer implements SolrCloseable {
try {
List<String> items = setWatch();
-
- processQueueItems(items);
+ if (items.size() > 0) {
+ processQueueItems(items);
+ }
} catch (Exception e) {
log.error("Exception during overseer queue queue processing", e);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 33b987a..679445d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -353,7 +353,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
public static CreateReplica assignReplicaDetails(DocCollection coll,
ZkNodeProps message, ReplicaPosition replicaPosition) {
- log.info("assignReplicaDetails {} {} {}", message, replicaPosition, coll);
+ log.info("assignReplicaDetails {} {}", message, replicaPosition);
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 2cf5a41..a3bfab0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -233,7 +233,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
if (log.isDebugEnabled()) log.debug("Sending state update to populate clusterstate with new replica {}", props);
clusterState = new AddReplicaCmd(ocmh, true).call(clusterState, props, results).clusterState;
- log.info("CreateCollectionCmd after add replica clusterstate={}", clusterState);
+ // log.info("CreateCollectionCmd after add replica clusterstate={}", clusterState);
//clusterState = new SliceMutator(cloudManager).addReplica(clusterState, props);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index edcc02f..832c835 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -18,7 +18,9 @@ package org.apache.solr.cloud.overseer;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -35,6 +37,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.BoundedTreeSet;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -61,6 +64,7 @@ public class ZkStateWriter {
private volatile ClusterState cs;
private boolean dirty;
+ private Set<String> collectionsToWrite = new HashSet<>();
public ZkStateWriter(ZkStateReader zkStateReader, Stats stats) {
assert zkStateReader != null;
@@ -105,6 +109,9 @@ public class ZkStateWriter {
}
}
});
+
+
+ collectionsToWrite.addAll(clusterState.getCollectionsMap().keySet());
Collection<DocCollection> collections = cs.getCollectionsMap().values();
for (DocCollection collection : collections) {
if (clusterState.getCollectionOrNull(collection.getName()) == null) {
@@ -216,7 +223,7 @@ public class ZkStateWriter {
}
}
});
-
+ collectionsToWrite.addAll(clusterState.getCollectionsMap().keySet());
}
if (stateUpdate) {
@@ -254,89 +261,91 @@ public class ZkStateWriter {
// wait to see our last publish version has propagated
cs.forEachCollection(collection -> {
- Integer v = null;
- try {
- //System.out.println("waiting to see state " + prevVersion);
- v = trackVersions.get(collection.getName());
- if (v == null) v = 0;
- if (v == 0) return;
- Integer version = v;
+ if (collectionsToWrite.contains(collection.getName())) {
+ Integer v = null;
try {
- log.debug("wait to see last published version for collection {} {}", collection.getName(), v);
- reader.waitForState(collection.getName(), 5, TimeUnit.SECONDS, (l, col) -> {
- if (col == null) {
- return true;
- }
-// if (col != null) {
-// log.info("the version " + col.getZNodeVersion());
-// }
- if (col != null && col.getZNodeVersion() >= version) {
- if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion() + 1);
- // System.out.println("found the version");
- return true;
- }
- return false;
- });
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ //System.out.println("waiting to see state " + prevVersion);
+ v = trackVersions.get(collection.getName());
+ if (v == null) v = 0;
+ if (v == 0) return;
+ Integer version = v;
+ try {
+ log.debug("wait to see last published version for collection {} {}", collection.getName(), v);
+ reader.waitForState(collection.getName(), 5, TimeUnit.SECONDS, (l, col) -> {
+ if (col == null) {
+ return true;
+ }
+ // if (col != null) {
+ // log.info("the version " + col.getZNodeVersion());
+ // }
+ if (col != null && col.getZNodeVersion() >= version) {
+ if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion() + 1);
+ // System.out.println("found the version");
+ return true;
+ }
+ return false;
+ });
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ } catch (TimeoutException e) {
+ log.warn("Timeout waiting to see written cluster state come back " + v);
}
- } catch (TimeoutException e) {
- log.warn("Timeout waiting to see written cluster state come back " + v);
}
});
cs.forEachCollection(collection -> {
+ if (collectionsToWrite.contains(collection.getName())) {
+ String name = collection.getName();
+ String path = ZkStateReader.getCollectionPath(collection.getName());
+ if (log.isDebugEnabled()) log.debug("process {}", collection);
+ Stat stat = new Stat();
+ boolean success = false;
+ try {
- String name = collection.getName();
- String path = ZkStateReader.getCollectionPath(collection.getName());
- if (log.isDebugEnabled()) log.debug("process {}", collection);
- Stat stat = new Stat();
- boolean success = false;
- try {
-
+ byte[] data = Utils.toJSON(singletonMap(name, collection));
- byte[] data = Utils.toJSON(singletonMap(name, collection));
+ if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", collection.getZNodeVersion(), data.length, collection);
- if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", collection.getZNodeVersion(), data.length, collection);
+ try {
+ int version = collection.getZNodeVersion();
+ Integer v = trackVersions.get(collection.getName());
+ if (v != null) {
+ version = v;
+ }
- try {
- int version = collection.getZNodeVersion();
- Integer v = trackVersions.get(collection.getName());
- if (v != null) {
- version = v;
+ reader.getZkClient().setData(path, data, version == 0 ? -1 : version, true);
+
+ trackVersions.put(collection.getName(), version + 1);
+ } catch (KeeperException.NoNodeException e) {
+ if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
+ trackVersions.remove(collection.getName());
+ // likely deleted
+ } catch (KeeperException.BadVersionException bve) {
+ lastFailedException.set(bve);
+ failedUpdates.put(collection.getName(), collection);
+ stat = reader.getZkClient().exists(path, null);
+ // this is a tragic error, we must disallow usage of this instance
+ log.warn("Tried to update the cluster state using version={} but we where rejected, found {}", collection.getZNodeVersion(), stat.getVersion(), bve);
}
-
-
- reader.getZkClient().setData(path, data, version == 0 ? -1 : version, true);
-
- trackVersions.put(collection.getName(), version + 1);
- } catch (KeeperException.NoNodeException e) {
- if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
- trackVersions.remove(collection.getName());
- // likely deleted
- } catch (KeeperException.BadVersionException bve) {
- lastFailedException.set(bve);
- failedUpdates.put(collection.getName(), collection);
- stat = reader.getZkClient().exists(path, null);
- // this is a tragic error, we must disallow usage of this instance
- log.warn("Tried to update the cluster state using version={} but we where rejected, found {}", collection.getZNodeVersion(), stat.getVersion(), bve);
+ if (log.isDebugEnabled()) log.debug("Set version for local collection {} to {}", collection.getName(), collection.getZNodeVersion() + 1);
+ } catch (InterruptedException | AlreadyClosedException e) {
+ log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ } catch (KeeperException.SessionExpiredException e) {
+ log.error("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ } catch (Exception e) {
+ log.error("Failed processing update=" + collection, e);
}
- if (log.isDebugEnabled()) log.debug("Set version for local collection {} to {}", collection.getName(), collection.getZNodeVersion() + 1);
- } catch (InterruptedException | AlreadyClosedException e) {
- log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- } catch (KeeperException.SessionExpiredException e) {
- log.error("", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- } catch (Exception e) {
- log.error("Failed processing update=" + collection, e);
}
});
-
+
dirty = false;
+ collectionsToWrite.clear();
// nocommit - harden against failures and exceptions
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index f352a88..b7a607e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -777,7 +777,6 @@ public class SolrZkClient implements Closeable {
log.info("delete paths {} wait={}", paths, wait);
CountDownLatch latch = null;
if (wait) {
- log.info("setup countdown latch {}", paths.size());
latch = new CountDownLatch(paths.size());
}
for (String path : paths) {
@@ -809,7 +808,6 @@ public class SolrZkClient implements Closeable {
if (wait) {
boolean success;
try {
- log.info("watch on countdownlatch {}", "15s");
success = latch.await(15, TimeUnit.SECONDS);
log.info("done waiting on latch, success={}", success);
} catch (InterruptedException e) {
@@ -824,7 +822,7 @@ public class SolrZkClient implements Closeable {
throw e;
}
}
- log.error("done with delete {} {}", paths, wait);
+ if (log.isDebugEnabled()) log.debug("done with delete {} {}", paths, wait);
}
// Calls setData for a list of existing paths in parallel