You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/08/04 00:34:08 UTC
svn commit: r1369236 - in /lucene/dev/trunk/solr:
core/src/java/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/common/cloud/
Author: markrmiller
Date: Fri Aug 3 22:34:08 2012
New Revision: 1369236
URL: http://svn.apache.org/viewvc?rev=1369236&view=rev
Log:
LUCENE-3985: add close methods to Overseer and ConnectionManager and use them
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1369236&r1=1369235&r2=1369236&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Fri Aug 3 22:34:08 2012
@@ -250,16 +250,13 @@ final class ShardLeaderElectionContext e
final class OverseerElectionContext extends ElectionContext {
private final SolrZkClient zkClient;
- private final ZkStateReader stateReader;
- private ShardHandler shardHandler;
- private String adminPath;
+ private Overseer overseer;
- public OverseerElectionContext(ShardHandler shardHandler, String adminPath, final String zkNodeName, ZkStateReader stateReader) {
- super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, stateReader.getZkClient());
- this.stateReader = stateReader;
- this.shardHandler = shardHandler;
- this.adminPath = adminPath;
- this.zkClient = stateReader.getZkClient();
+
+ public OverseerElectionContext(SolrZkClient zkClient, Overseer overseer, final String zkNodeName) {
+ super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, zkClient);
+ this.overseer = overseer;
+ this.zkClient = zkClient;
}
@Override
@@ -281,7 +278,7 @@ final class OverseerElectionContext exte
CreateMode.EPHEMERAL, true);
}
- new Overseer(shardHandler, adminPath, stateReader, id);
+ overseer.start(id);
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1369236&r1=1369235&r2=1369236&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Aug 3 22:34:08 2012
@@ -47,7 +47,7 @@ public class Overseer {
private static Logger log = LoggerFactory.getLogger(Overseer.class);
- private static class CloudStateUpdater implements Runnable {
+ private class CloudStateUpdater implements Runnable {
private static final String DELETECORE = "deletecore";
private final ZkStateReader reader;
@@ -70,7 +70,7 @@ public class Overseer {
@Override
public void run() {
- if(amILeader()) {
+ if(amILeader() && !Overseer.this.isClosed) {
// see if there's something left from the previous Overseer and re
// process all events that were not persisted into cloud state
synchronized (reader.getUpdateLock()) { //XXX this only protects against edits inside single node
@@ -110,7 +110,7 @@ public class Overseer {
}
log.info("Starting to work on the main queue");
- while (amILeader()) {
+ while (amILeader() && !isClosed) {
synchronized (reader.getUpdateLock()) {
try {
byte[] head = stateUpdateQueue.peek();
@@ -401,21 +401,45 @@ public class Overseer {
}
}
+
+ private Thread ccThread;
+
+ private Thread updaterThread;
+
+ private volatile boolean isClosed;
+
+ private ZkStateReader reader;
+
+ private ShardHandler shardHandler;
+
+ private String adminPath;
+
+ public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader) throws KeeperException, InterruptedException {
+ this.reader = reader;
+ this.shardHandler = shardHandler;
+ this.adminPath = adminPath;
+ }
- public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader, final String id) throws KeeperException, InterruptedException {
+ public void start(String id) {
log.info("Overseer (id=" + id + ") starting");
createOverseerNode(reader.getZkClient());
//launch cluster state updater thread
ThreadGroup tg = new ThreadGroup("Overseer state updater.");
- Thread updaterThread = new Thread(tg, new CloudStateUpdater(reader, id));
+ updaterThread = new Thread(tg, new CloudStateUpdater(reader, id));
updaterThread.setDaemon(true);
- updaterThread.start();
-
+
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
- Thread ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath));
+ ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath),
+ "Overseer-" + id);
ccThread.setDaemon(true);
+
+ updaterThread.start();
ccThread.start();
}
+
+ public void close() {
+ isClosed = true;
+ }
/**
* Get queue that can be used to send messages to Overseer.
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1369236&r1=1369235&r2=1369236&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Fri Aug 3 22:34:08 2012
@@ -64,6 +64,8 @@ public class OverseerCollectionProcessor
private String adminPath;
private ZkStateReader zkStateReader;
+
+ private boolean isClosed;
public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) {
this.zkStateReader = zkStateReader;
@@ -76,7 +78,7 @@ public class OverseerCollectionProcessor
@Override
public void run() {
log.info("Process current queue of collection creations");
- while (amILeader()) {
+ while (amILeader() && !isClosed) {
try {
byte[] head = workQueue.peek(true);
@@ -108,6 +110,10 @@ public class OverseerCollectionProcessor
}
}
+ public void close() {
+ isClosed = true;
+ }
+
private boolean amILeader() {
try {
ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1369236&r1=1369235&r2=1369236&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Aug 3 22:34:08 2012
@@ -121,6 +121,8 @@ public final class ZkController {
// may accept defaults or use mocks rather than pulling things from a CoreContainer
private CoreContainer cc;
+ protected volatile Overseer overseer;
+
/**
* @param cc if null, recovery will not be enabled
* @param zkServerAddress
@@ -170,10 +172,8 @@ public final class ZkController {
shardHandler = cc.getShardHandlerFactory().getShardHandler();
adminPath = cc.getAdminPath();
}
-
- ElectionContext context = new OverseerElectionContext(
- shardHandler, adminPath,
- getNodeName(), zkStateReader);
+ ZkController.this.overseer = new Overseer(shardHandler, adminPath, zkStateReader);
+ ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
overseerElector.joinElection(context);
zkStateReader.createClusterStateWatchersAndUpdate();
@@ -243,6 +243,11 @@ public final class ZkController {
*/
public void close() {
try {
+ overseer.close();
+ } catch(Throwable t) {
+ log.error("Error closing overseer", t);
+ }
+ try {
zkClient.close();
} catch (InterruptedException e) {
// Restore the interrupted status
@@ -366,8 +371,8 @@ public final class ZkController {
}
overseerElector = new LeaderElector(zkClient);
- ElectionContext context = new OverseerElectionContext(shardHandler,
- adminPath, getNodeName(), zkStateReader);
+ this.overseer = new Overseer(shardHandler, adminPath, zkStateReader);
+ ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
overseerElector.setup(context);
overseerElector.joinElection(context);
zkStateReader.createClusterStateWatchersAndUpdate();
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java?rev=1369236&r1=1369235&r2=1369236&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java Fri Aug 3 22:34:08 2012
@@ -47,6 +47,8 @@ class ConnectionManager implements Watch
private OnReconnect onReconnect;
+ private volatile boolean isClosed = false;
+
public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect) {
this.name = name;
this.client = client;
@@ -68,6 +70,8 @@ class ConnectionManager implements Watch
log.info("Watcher " + this + " name:" + name + " got event " + event
+ " path:" + event.getPath() + " type:" + event.getType());
}
+
+ checkClosed();
state = event.getState();
if (state == KeeperState.SyncConnected) {
@@ -81,11 +85,18 @@ class ConnectionManager implements Watch
connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
new ZkClientConnectionStrategy.ZkUpdate() {
@Override
- public void update(SolrZooKeeper keeper)
- throws InterruptedException, TimeoutException {
+ public void update(SolrZooKeeper keeper) throws TimeoutException {
synchronized (connectionStrategy) {
- waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
- client.updateKeeper(keeper);
+ checkClosed();
+ try {
+ waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
+ checkClosed();
+ client.updateKeeper(keeper);
+ } catch (InterruptedException e) {
+ // we must have been asked to stop
+ throw new RuntimeException("Giving up on connecting - we were interrupted");
+ }
+ checkClosed();
if (onReconnect != null) {
onReconnect.command();
}
@@ -95,6 +106,7 @@ class ConnectionManager implements Watch
}
}
+
});
} catch (Exception e) {
SolrException.log(log, "", e);
@@ -109,7 +121,13 @@ class ConnectionManager implements Watch
}
public synchronized boolean isConnected() {
- return connected;
+ return !isClosed && connected;
+ }
+
+ // we use a volatile rather than sync
+ // to avoid deadlock on shutdown
+ public void close() {
+ this.isClosed = true;
}
public synchronized KeeperState state() {
@@ -122,12 +140,20 @@ class ConnectionManager implements Watch
long left = waitForConnection;
while (!connected && left > 0) {
wait(left);
+ checkClosed();
left = expire - System.currentTimeMillis();
}
if (!connected) {
throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
}
}
+
+ private synchronized void checkClosed() {
+ if (isClosed) {
+ log.info("Not acting because I am closed");
+ return;
+ }
+ }
public synchronized void waitForDisconnected(long timeout)
throws InterruptedException, TimeoutException {
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1369236&r1=1369235&r2=1369236&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Fri Aug 3 22:34:08 2012
@@ -654,7 +654,11 @@ public class SolrZkClient {
public void close() throws InterruptedException {
if (isClosed) return; // it's okay if we over close - same as solrcore
isClosed = true;
- keeper.close();
+ try {
+ keeper.close();
+ } finally {
+ connManager.close();
+ }
numCloses.incrementAndGet();
}