You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/28 21:02:55 UTC
[lucene-solr] 02/03: @1058 Harden.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit e33c44705e8cf9b459adee7620f48cfd515a1971
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Oct 28 15:29:05 2020 -0500
@1058 Harden.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 24 +++++++++++++++++-----
.../apache/solr/cloud/OverseerElectionContext.java | 22 +++++++-------------
.../solr/cloud/overseer/CollectionMutator.java | 8 ++++----
.../apache/solr/cloud/CollectionsAPISolrJTest.java | 5 ++++-
.../src/java/org/apache/solr/common/ParWork.java | 2 +-
.../solr/common/cloud/ConnectionManager.java | 20 ++++++++++++------
6 files changed, 49 insertions(+), 32 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index d3af42f..7dd0116 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -237,7 +237,7 @@ public class Overseer implements SolrCloseable {
// we do not sure which message is bad message, therefore we will re-process node one by one
int fallbackQueueSize = Integer.MAX_VALUE;
ZkDistributedQueue fallbackQueue = workQueue;
- while (!isClosed()) {
+ while (!checkClosed()) {
if (zkStateWriter == null) {
try {
zkStateWriter = new ZkStateWriter(reader, stats);
@@ -282,7 +282,6 @@ public class Overseer implements SolrCloseable {
}
// force flush at the end of the loop, if there are no pending updates, this is a no op call
clusterState = zkStateWriter.writePendingUpdates(clusterState);
- assert clusterState != null;
// the workQueue is empty now, use stateUpdateQueue as fallback queue
fallbackQueue = stateUpdateQueue;
fallbackQueueSize = 0;
@@ -315,6 +314,10 @@ public class Overseer implements SolrCloseable {
// }
queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> true));
} catch (AlreadyClosedException e) {
+ if (isClosed()) {
+ log.info("Overseer closed (AlreadyClosedException), exiting loop");
+ return;
+ }
return;
} catch (KeeperException.SessionExpiredException e) {
log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
@@ -350,7 +353,10 @@ public class Overseer implements SolrCloseable {
processedNodes.clear();
});
}
- if (isClosed()) return;
+ if (isClosed()) {
+ log.info("Overseer closed, exiting loop");
+ return;
+ }
// if an event comes in the next *ms batch it together
int wait = 0;
// if (zkStateWriter.getUpdatesToWrite().isEmpty()) {
@@ -359,7 +365,7 @@ public class Overseer implements SolrCloseable {
// wait = 0;
// }
queue = new LinkedList<>(stateUpdateQueue.peekElements(100, wait, node -> !processedNodes.contains(node)));
- if (loopCnt >= 1) {
+ if (loopCnt >= 3) {
break;
}
loopCnt++;
@@ -399,6 +405,14 @@ public class Overseer implements SolrCloseable {
}
}
+ private boolean checkClosed() {
+ boolean closed = isClosed();
+ if (closed) {
+ log.info("Overseer is closed, will not continue loop ...");
+ }
+ return closed;
+ }
+
// Return true whenever the exception thrown by ZkStateWriter is correspond
// to a invalid state or 'bad' message (in this case, we should remove that message from queue)
private boolean isBadMessage(Exception e) {
@@ -673,11 +687,11 @@ public class Overseer implements SolrCloseable {
}
public synchronized void start(String id, ElectionContext context) throws KeeperException {
+ doClose();
if (getCoreContainer().isShutDown() || closeAndDone) {
if (log.isDebugEnabled()) log.debug("Already closed, exiting");
return;
}
- doClose();
closed = false;
MDCLoggingContext.setNode(zkController == null ?
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
index 2c811c6..6913b48 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
@@ -76,22 +76,14 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
log.info("Registered as Overseer leader, starting Overseer ...");
- synchronized (this) {
- if (isClosed()) {
- log.info("Bailing on becoming leader, we are closed");
- return;
- }
- if (!isClosed() && !overseer.getZkController().getCoreContainer().isShutDown() && !overseer.isDone() && (overseer.getUpdaterThread() == null || !overseer.getUpdaterThread().isAlive())) {
-
- try {
- overseer.start(id, context);
- } finally {
- if (isClosed()) {
- overseer.close();
- }
- }
- }
+ if (isClosed()) {
+ log.info("Bailing on becoming leader, we are closed");
+ return;
}
+ if (!isClosed() && !overseer.getZkController().getCoreContainer().isShutDown() && !overseer.isDone() && (overseer.getUpdaterThread() == null || !overseer.getUpdaterThread().isAlive())) {
+ overseer.start(id, context);
+ }
+
}
private void clearQueue(ZkDistributedQueue queue)
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index a64cee0..83c2331 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -91,11 +91,11 @@ public class CollectionMutator {
// TODO - fix, no makePath (ensure every path part exists), async, single node
try {
stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName
- + "/leader_elect/" + shardId);
+ + "/leader_elect/" + shardId, null, CreateMode.PERSISTENT, false);
stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName
- + "/leader_elect/" + shardId + LeaderElector.ELECTION_NODE);
+ + "/leader_elect/" + shardId + LeaderElector.ELECTION_NODE, null, CreateMode.PERSISTENT, false);
stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE+ "/" + collectionName + "/" + shardId
- + ZkStateReader.SHARD_LEADERS_ZKNODE);
+ + ZkStateReader.SHARD_LEADERS_ZKNODE, null, CreateMode.PERSISTENT, false);
stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName + "/" + shardId, null, CreateMode.PERSISTENT, false);
stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName + "/leader_elect/" + shardId, null, CreateMode.PERSISTENT, false);
@@ -104,7 +104,7 @@ public class CollectionMutator {
stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName + "/terms/" + shardId, ZkStateReader.emptyJson, CreateMode.PERSISTENT, false);
stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName + "/schema_lock", null, CreateMode.PERSISTENT, false);
} catch (AlreadyExistsException e) {
- throw new AlreadyClosedException();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (KeeperException e) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index 5c09ae8..f4c851b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -351,6 +351,8 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
return true;
});
+ cluster.waitForActiveCollection(collectionName, 3, 3);
+
// Test splitting using split.key
response = CollectionAdminRequest.splitShard(collectionName)
.setSplitKey("b!")
@@ -359,7 +361,8 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
assertEquals(0, response.getStatus());
assertTrue(response.isSuccess());
- waitForState("Expected 5 slices to be active", collectionName, (n, c) -> c.getActiveSlices().size() == 5);
+ // wait for 5 active shards
+ cluster.waitForActiveCollection(collectionName, 5, 5);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 07e35a2..69125c8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -640,7 +640,7 @@ public class ParWork implements Closeable {
Thread.currentThread().interrupt();
} else {
if (infoLogMsg) {
- log.info(t.getMessage());
+ log.info(t.getClass().getName() + " " + t.getMessage());
} else {
log.warn("Solr ran into an unexpected exception", t);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index b2bd431..55254e2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -22,6 +22,7 @@ import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
@@ -60,7 +61,7 @@ public class ConnectionManager implements Watcher, Closeable {
private volatile boolean isClosed = false;
- private final Object keeperLock = new Object();
+ private final ReentrantLock keeperLock = new ReentrantLock(true);
private volatile CountDownLatch connectedLatch = new CountDownLatch(1);
private volatile CountDownLatch disconnectedLatch = new CountDownLatch(1);
@@ -82,12 +83,15 @@ public class ConnectionManager implements Watcher, Closeable {
}
public ZooKeeper getKeeper() {
- synchronized (keeperLock) {
+ keeperLock.lock();
+ try {
SolrZooKeeper rKeeper = keeper;
if (rKeeper == null) {
throw new AlreadyClosedException();
}
return rKeeper;
+ } finally {
+ keeperLock.unlock();
}
}
@@ -189,14 +193,16 @@ public class ConnectionManager implements Watcher, Closeable {
}
private void updatezk() throws IOException {
- synchronized (keeperLock) {
+ keeperLock.lock();
+ try {
if (isClosed()) return;
if (keeper != null) {
ParWork.close(keeper);
- keeper = null;
}
SolrZooKeeper zk = createSolrZooKeeper(zkServerAddress, zkTimeout, this);
keeper = zk;
+ } finally {
+ keeperLock.unlock();
}
}
@@ -265,7 +271,8 @@ public class ConnectionManager implements Watcher, Closeable {
}
}
- synchronized (keeperLock) {
+ keeperLock.lock();
+ try {
if (isClosed()) return;
if (keeper != null) {
// if there was a problem creating the new SolrZooKeeper
@@ -281,6 +288,8 @@ public class ConnectionManager implements Watcher, Closeable {
}
}
+ } finally {
+ keeperLock.unlock();
}
do {
@@ -351,7 +360,6 @@ public class ConnectionManager implements Watcher, Closeable {
if (keeper != null) {
keeper.close();
}
- keeper = null;
ExecutorUtil.awaitTermination(client.zkCallbackSerialExecutor);
ExecutorUtil.awaitTermination(client.zkCallbackExecutor);