You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/18 23:06:15 UTC
[lucene-solr] branch reference_impl updated: @236 - Start putting
some things in order
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl by this push:
new 490eaf4 @236 - Start putting some things in order
490eaf4 is described below
commit 490eaf4d9825cc84e5e02b01ce8b7975658e30d8
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Jul 18 18:05:56 2020 -0500
@236 - Start putting some things in order
---
solr/core/src/java/org/apache/solr/api/ApiBag.java | 2 +-
.../src/java/org/apache/solr/cloud/Overseer.java | 60 ++++++++++++----------
.../apache/solr/cloud/overseer/ZkStateWriter.java | 55 ++++++++++----------
.../java/org/apache/solr/update/CommitTracker.java | 8 ++-
4 files changed, 68 insertions(+), 57 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/api/ApiBag.java b/solr/core/src/java/org/apache/solr/api/ApiBag.java
index 172e03e..0577228 100644
--- a/solr/core/src/java/org/apache/solr/api/ApiBag.java
+++ b/solr/core/src/java/org/apache/solr/api/ApiBag.java
@@ -69,7 +69,7 @@ public class ApiBag {
/**Register a POJO annotated with {@link EndPoint}
* @param o the instance to be used for invocations
*/
- public synchronized List<Api> registerObject(Object o) {
+ public List<Api> registerObject(Object o) {
List<Api> l = AnnotatedApi.getApis(o);
for (Api api : l) {
register(api, Collections.EMPTY_MAP);
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 4cfa32f..5aa96ae 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -198,6 +198,7 @@ public class Overseer implements SolrCloseable {
private final ZkDistributedQueue workQueue;
private volatile boolean isClosed = false;
+ private int lastVersion;
public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
this.zkClient = reader.getZkClient();
@@ -317,7 +318,7 @@ public class Overseer implements SolrCloseable {
LinkedList<Pair<String, byte[]>> queue = null;
try {
// We do not need to filter any nodes here cause all processed nodes are removed once we flush clusterstate
- queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, 10000L, (x) -> true));
+ queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, 2000L, (x) -> true));
} catch (InterruptedException | AlreadyClosedException e) {
ParWork.propegateInterrupt(e);
return;
@@ -373,13 +374,15 @@ public class Overseer implements SolrCloseable {
return;
} catch (Exception e) {
log.error("Unexpected error in Overseer state update loop", e);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException interruptedException) {
- ParWork.propegateInterrupt(e);
- return;
+ if (!isClosed()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException interruptedException) {
+ ParWork.propegateInterrupt(e);
+ return;
+ }
+ continue;
}
- continue;
}
}
} finally {
@@ -416,33 +419,29 @@ public class Overseer implements SolrCloseable {
return false;
}
- private ClusterState processQueueItem(ZkNodeProps message, final ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
+ private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
if (log.isDebugEnabled()) log.debug("Consume state update from queue {}", message);
- assert clusterState != null;
- AtomicReference<ClusterState> state = new AtomicReference<>();
+ // assert clusterState != null;
- final String operation = message.getStr(QUEUE_OPERATION);
- if (operation == null) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Message missing " + QUEUE_OPERATION + ":" + message);
- }
- AtomicBoolean stop = new AtomicBoolean(false);
+ ClusterState cs = null;
+ // if (clusterState.getZNodeVersion() == 0 || clusterState.getZNodeVersion() > lastVersion) {
- // ### expert use
- ParWork.getExecutor().invokeAll(Collections.singleton(() -> {
+
+ final String operation = message.getStr(QUEUE_OPERATION);
+ if (operation == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Message missing " + QUEUE_OPERATION + ":" + message);
+ }
List<ZkWriteCommand> zkWriteOps = processMessage(clusterState, message, operation);
ZkStateWriter zkStateWriter1 = new ZkStateWriter(zkController.getZkStateReader(), new Stats());
- ClusterState cs = zkStateWriter1.enqueueUpdate(clusterState, zkWriteOps,
+ cs = zkStateWriter1.enqueueUpdate(clusterState, zkWriteOps,
() -> {
// log.info("on write callback");
});
- state.set(cs);
- return null;
+ lastVersion = cs.getZNodeVersion();
+ // }
-
- }));
-
- return (state.get() != null ? state.get() : clusterState);
+ return cs;
}
private List<ZkWriteCommand> processMessage(ClusterState clusterState,
@@ -563,6 +562,11 @@ public class Overseer implements SolrCloseable {
@Override
public void close() throws IOException {
thread.close();
+ try {
+ join(10000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted waiting to close");
+ }
this.isClosed = true;
}
@@ -839,7 +843,7 @@ public class Overseer implements SolrCloseable {
@Override
public boolean isClosed() {
- return closed;
+ return closed || zkController.getCoreContainer().isShutDown();
}
void doClose() {
@@ -849,18 +853,18 @@ public class Overseer implements SolrCloseable {
try (ParWork closer = new ParWork(this, true)) {
closer.collect(() -> {
- IOUtils.closeQuietly(ccThread);
ccThread.interrupt();
+ IOUtils.closeQuietly(ccThread);
});
closer.collect(() -> {
- IOUtils.closeQuietly(updaterThread);
updaterThread.interrupt();
+ IOUtils.closeQuietly(updaterThread);
});
closer.collect(() -> {
- IOUtils.closeQuietly(triggerThread);
triggerThread.interrupt();
+ IOUtils.closeQuietly(triggerThread);
});
closer.addCollect("OverseerInternals");
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 9a6a75b..dac61f4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -194,13 +194,13 @@ public class ZkStateWriter {
prevState.getZNodeVersion());
}
- // assert c.getStateFormat() > 1;
+ // assert c.getStateFormat() > 1;
// stat = reader.getZkClient().getCurator().checkExists().forPath(path);
prevVersion = prevState.getCollection(c.getName()).getZNodeVersion();
- Map<String,Slice> existingSlices = prevState.getCollection(c.getName()).getSlicesMap();
+ Map<String, Slice> existingSlices = prevState.getCollection(c.getName()).getSlicesMap();
- Map<String,Slice> newSliceMap = new HashMap<>(existingSlices.size() + 1);
+ Map<String, Slice> newSliceMap = new HashMap<>(existingSlices.size() + 1);
if (log.isDebugEnabled()) {
log.debug("Existing slices {}", existingSlices);
@@ -214,12 +214,13 @@ public class ZkStateWriter {
log.debug("Add collection {}", c);
}
+ DocCollection finalC = c;
prevState.getCollection(c.getName()).getSlicesMap().forEach((sliceId, slice) -> {
Collection<Replica> replicas = slice.getReplicas();
- Map<String,Replica> newReplicas = new HashMap<>();
+ Map<String, Replica> newReplicas = new HashMap<>();
- Map<String,Object> newProps = new HashMap<>();
+ Map<String, Object> newProps = new HashMap<>();
newProps.putAll(slice.getProperties());
@@ -232,11 +233,11 @@ public class ZkStateWriter {
replicas.forEach((replica) -> newReplicas.put(replica.getName(), replica));
- c.getSlice(sliceId).getReplicas().forEach((replica) -> {
+ finalC.getSlice(sliceId).getReplicas().forEach((replica) -> {
newReplicas.put(replica.getName(), replica);
});
- Slice newSlice = new Slice(sliceId, newReplicas, newProps, c.getName());
+ Slice newSlice = new Slice(sliceId, newReplicas, newProps, finalC.getName());
newSliceMap.put(sliceId, newSlice);
});
@@ -250,19 +251,19 @@ public class ZkStateWriter {
LinkedHashMap collStates = new LinkedHashMap<>(prevState.getCollectionsMap());
collStates.put(name, new ClusterState.CollectionRef(newCollection));
newClusterState = new ClusterState(prevState.getLiveNodes(), collStates, prevVersion);
-
+ c = newClusterState.getCollection(name);
byte[] data = Utils.toJSON(singletonMap(c.getName(), newCollection));
if (log.isDebugEnabled()) {
log.debug("Write state.json bytes={} cs={}", data.length, newClusterState);
}
- // stat = reader.getZkClient().getCurator().setData().withVersion(prevVersion).forPath(path, data);
- stat = reader.getZkClient().setData(path, data, prevVersion, true);
+ // stat = reader.getZkClient().getCurator().setData().withVersion(prevVersion).forPath(path, data);
+ stat = reader.getZkClient().setData(path, data, prevVersion, true);
} else {
if (log.isDebugEnabled()) {
log.debug("writePendingUpdates() - going to create_collection {}", path);
}
- // assert c.getStateFormat() > 1;
+ // assert c.getStateFormat() > 1;
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(),
0, path);
@@ -279,15 +280,16 @@ public class ZkStateWriter {
try {
prevVersion = 0;
reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
- } catch(KeeperException.NodeExistsException e) {
- stat = reader.getZkClient().setData(path, data, -1, true);
+ } catch (KeeperException.NodeExistsException e) {
+ stat = reader.getZkClient().setData(path, data, -1, true);
}
}
} catch (Exception e) {
if (e instanceof KeeperException.BadVersionException) {
// nocommit invalidState = true;
- if (log.isDebugEnabled()) log.debug("Tried to update the cluster state using version={} but we where rejected, currently at {}", prevVersion, ((KeeperException.BadVersionException) e).getMessage(), e);
+ if (log.isDebugEnabled())
+ log.debug("Tried to update the cluster state using version={} but we where rejected, currently at {}", prevVersion, ((KeeperException.BadVersionException) e).getMessage(), e);
throw (KeeperException.BadVersionException) e;
}
ParWork.propegateInterrupt(e);
@@ -298,19 +300,20 @@ public class ZkStateWriter {
updates.clear();
// numUpdates = 0;
- try {
- reader.waitForState(c.getName(), 5, TimeUnit.SECONDS,
- (l, col) -> {
- if (col != null && col.getZNodeVersion() > prevState.getZNodeVersion()) {
- if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion());
- return true;
- }
- return false;
- });
- } catch (TimeoutException e) {
- throw new RuntimeException(e);
+ if (c != null) {
+ try {
+ reader.waitForState(c.getName(), 5, TimeUnit.SECONDS,
+ (l, col) -> {
+ if (col != null && col.getZNodeVersion() > prevState.getZNodeVersion()) {
+ if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion());
+ return true;
+ }
+ return false;
+ });
+ } catch (TimeoutException e) {
+ throw new RuntimeException(e);
+ }
}
-
}
// assert newClusterState.getZNodeVersion() >= 0;
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index 71f4079..d673a72 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -80,7 +80,8 @@ public final class CommitTracker implements Runnable, Closeable {
private static final boolean WAIT_SEARCHER = true;
private String name;
-
+ private volatile boolean closed;
+
public CommitTracker(String name, SolrCore core, int docsUpperBound, int timeUpperBound, long tLogFileSizeUpperBound,
boolean openSearcher, boolean softCommit) {
this.core = core;
@@ -103,6 +104,7 @@ public final class CommitTracker implements Runnable, Closeable {
}
public synchronized void close() {
+ this.closed = true;
if (pending != null) {
pending.cancel(false);
pending = null;
@@ -164,7 +166,9 @@ public final class CommitTracker implements Runnable, Closeable {
// log.info("###scheduling for " + commitMaxTime);
// schedule our new commit
- pending = scheduler.schedule(this, commitMaxTime, TimeUnit.MILLISECONDS);
+ if (!closed) {
+ pending = scheduler.schedule(this, commitMaxTime, TimeUnit.MILLISECONDS);
+ }
}
}