You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/09/03 21:46:48 UTC
svn commit: r1380322 - in /lucene/dev/branches/branch_4x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/cloud/
solr/core/src/java/org/apache/solr/core/
solr/core/src/java/org/apache/solr/handler/component/
solr/core/src/java/org/apache/solr/up...
Author: markrmiller
Date: Mon Sep 3 19:46:47 2012
New Revision: 1380322
URL: http://svn.apache.org/viewvc?rev=1380322&view=rev
Log:
SOLR-3772,SOLR-3750: make optional, wait time configurable, default to off
SOLR-3782: A leader going down while updates are coming in can cause shard inconsistency.
Added:
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
- copied unchanged from r1380300, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/solr/ (props changed)
lucene/dev/branches/branch_4x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_4x/solr/core/ (props changed)
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
lucene/dev/branches/branch_4x/solr/solrj/ (props changed)
Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Mon Sep 3 19:46:47 2012
@@ -99,17 +99,12 @@ Bug Fixes
* SOLR-3770: Overseer may lose updates to cluster state (siren)
-* SOLR-3721: Fix bug that could allow multiple recoveries to run briefly at
- the same time if the recovery thread join call was interrupted.
+* SOLR-3721: Fix bug that could theoretically allow multiple recoveries to run
+ briefly at the same time if the recovery thread join call was interrupted.
(Per Steffensen, Mark Miller)
-
-* SOLR-3750: On session expiration, we should explicitly wait some time before
- running the leader sync process so that we are sure every node participates.
- (Per Steffensen, Mark Miller)
-
-* SOLR-3772: On cluster startup, we should wait until we see all registered
- replicas before running the leader process - or if they all do not come up,
- N amount of time. (Jan Høydahl, Per Steffensen, Mark Miller)
+
+* SOLR-3782: A leader going down while updates are coming in can cause shard
+ inconsistency. (Mark Miller)
Other Changes
----------------------
@@ -136,8 +131,15 @@ Other Changes
* SOLR-3780: Maven build: Make solrj tests run separately from solr-core.
(Steve Rowe)
-* SOLR-3707: Upgrade Solr to Tika 1.2 (janhoy)
+* SOLR-3707: Upgrade Solr to Tika 1.2 (janhoy)
+
+* SOLR-3772: Optionally, on cluster startup, we can wait until we see all registered
+ replicas before running the leader process - or if they all do not come up,
+ N amount of time. (Jan Høydahl, Per Steffensen, Mark Miller)
+* SOLR-3750: Optionaly, on session expiration, we can explicitly wait some time before
+ running the leader sync process so that we are sure every node participates.
+ (Per Steffensen, Mark Miller)
================== 4.0.0-BETA ===================
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Mon Sep 3 19:46:47 2012
@@ -55,6 +55,8 @@ public abstract class ElectionContext {
this.zkClient = zkClient;
}
+ public void close() {}
+
public void cancelElection() throws InterruptedException, KeeperException {
zkClient.delete(leaderSeqPath, -1, true);
}
@@ -83,10 +85,6 @@ class ShardLeaderElectionContextBase ext
@Override
void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException {
- // this pause is important
- // but I don't know why yet :*( - it must come before this publish call
- // and can happen at the start of leader election process even
- Thread.sleep(100);
zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
CreateMode.EPHEMERAL, true);
@@ -112,6 +110,8 @@ final class ShardLeaderElectionContext e
private SyncStrategy syncStrategy = new SyncStrategy();
private boolean afterExpiration;
+
+ private volatile boolean isClosed = false;
public ShardLeaderElectionContext(LeaderElector leaderElector,
final String shardId, final String collection,
@@ -124,8 +124,15 @@ final class ShardLeaderElectionContext e
}
@Override
+ public void close() {
+ this.isClosed = true;
+ }
+
+ @Override
void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException {
+ log.info("Running the leader process. afterExpiration=" + afterExpiration);
+
String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
// clear the leader in clusterstate
@@ -134,21 +141,10 @@ final class ShardLeaderElectionContext e
collection);
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
- waitForReplicasToComeUp(weAreReplacement);
-
- // wait for local leader state to clear...
- // int tries = 0;
- // while (zkController.getClusterState().getLeader(collection, shardId) !=
- // null) {
- // System.out.println("leader still shown " + tries + " " +
- // zkController.getClusterState().getLeader(collection, shardId));
- // Thread.sleep(1000);
- // tries++;
- // if (tries == 30) {
- // break;
- // }
- // }
- // Thread.sleep(1000);
+ String leaderVoteWait = cc.getZkController().getLeaderVoteWait();
+ if (leaderVoteWait != null) {
+ waitForReplicasToComeUp(weAreReplacement, leaderVoteWait);
+ }
SolrCore core = null;
try {
@@ -238,14 +234,14 @@ final class ShardLeaderElectionContext e
}
- private void waitForReplicasToComeUp(boolean weAreReplacement)
+ private void waitForReplicasToComeUp(boolean weAreReplacement, String leaderVoteWait)
throws InterruptedException {
- int retries = 300; // ~ 5 min
+ int timeout = Integer.parseInt(leaderVoteWait);
+ long timeoutAt = System.currentTimeMillis() + timeout;
+
boolean tryAgain = true;
Slice slices = zkController.getClusterState().getSlice(collection, shardId);
- log.info("Running the leader process. afterExperiation=" + afterExpiration);
- while (tryAgain || slices == null) {
-
+ while (true && !isClosed) {
// wait for everyone to be up
if (slices != null) {
Map<String,ZkNodeProps> shards = slices.getShards();
@@ -265,24 +261,23 @@ final class ShardLeaderElectionContext e
if ((afterExpiration || !weAreReplacement)
&& found >= slices.getShards().size()) {
log.info("Enough replicas found to continue.");
- tryAgain = false;
+ break;
} else if (!afterExpiration && found >= slices.getShards().size() - 1) {
// a previous leader went down - wait for one less than the total
// known shards
log.info("Enough replicas found to continue.");
- tryAgain = false;
+ break;
} else {
- log.info("Waiting until we see more replicas up");
+ log.info("Waiting until we see more replicas up: total=" + slices.getShards().size() + " found=" + found + " timeoutin=" + (timeoutAt - System.currentTimeMillis()));
}
-
- retries--;
- if (retries == 0) {
+
+ if (System.currentTimeMillis() > timeoutAt) {
log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
break;
}
}
if (tryAgain) {
- Thread.sleep(1000);
+ Thread.sleep(500);
slices = zkController.getClusterState().getSlice(collection, shardId);
}
}
@@ -306,6 +301,12 @@ final class ShardLeaderElectionContext e
private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core) {
log.info("Checking if I should try and be the leader.");
+
+ if (isClosed) {
+ log.info("Bailing on leader process because we have been closed");
+ return false;
+ }
+
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
Map<String,Slice> slices = clusterState.getSlices(this.collection);
Slice slice = slices.get(shardId);
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Mon Sep 3 19:46:47 2012
@@ -65,8 +65,14 @@ public class RecoveryStrategy extends Th
private static Logger log = LoggerFactory.getLogger(RecoveryStrategy.class);
+ public static interface RecoveryListener {
+ public void recovered();
+ public void failed();
+ }
+
private volatile boolean close = false;
+ private RecoveryListener recoveryListener;
private ZkController zkController;
private String baseUrl;
private String coreZkNodeName;
@@ -76,9 +82,10 @@ public class RecoveryStrategy extends Th
private boolean recoveringAfterStartup;
private CoreContainer cc;
- public RecoveryStrategy(CoreContainer cc, String name) {
+ public RecoveryStrategy(CoreContainer cc, String name, RecoveryListener recoveryListener) {
this.cc = cc;
this.coreName = name;
+ this.recoveryListener = recoveryListener;
setName("RecoveryThread");
zkController = cc.getZkController();
zkStateReader = zkController.getZkStateReader();
@@ -93,7 +100,7 @@ public class RecoveryStrategy extends Th
// make sure any threads stop retrying
public void close() {
close = true;
- log.warn("Stopping recovery for core " + coreName + " zkNodeName=" + coreZkNodeName);
+ log.warn("Stopping recovery for zkNodeName=" + coreZkNodeName + "core=" + coreName );
}
@@ -105,6 +112,7 @@ public class RecoveryStrategy extends Th
zkController.publish(cd, ZkStateReader.RECOVERY_FAILED);
} finally {
close();
+ recoveryListener.failed();
}
}
@@ -210,15 +218,15 @@ public class RecoveryStrategy extends Th
try {
doRecovery(core);
- } catch (KeeperException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (InterruptedException e) {
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
SolrException.log(log, "", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
+ } catch (Throwable t) {
+ log.error("", t);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", t);
}
} finally {
if (core != null) core.close();
@@ -258,38 +266,52 @@ public class RecoveryStrategy extends Th
List<Long> startingVersions = ulog.getStartingVersions();
-
if (startingVersions != null && recoveringAfterStartup) {
- int oldIdx = 0; // index of the start of the old list in the current list
- long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
-
- for (; oldIdx<recentVersions.size(); oldIdx++) {
- if (recentVersions.get(oldIdx) == firstStartingVersion) break;
- }
-
- if (oldIdx > 0) {
- log.info("####### Found new versions added after startup: num=" + oldIdx);
- log.info("###### currentVersions=" + recentVersions);
+ try {
+ int oldIdx = 0; // index of the start of the old list in the current
+ // list
+ long firstStartingVersion = startingVersions.size() > 0 ? startingVersions
+ .get(0) : 0;
+
+ for (; oldIdx < recentVersions.size(); oldIdx++) {
+ if (recentVersions.get(oldIdx) == firstStartingVersion) break;
+ }
+
+ if (oldIdx > 0) {
+ log.info("####### Found new versions added after startup: num="
+ + oldIdx);
+ log.info("###### currentVersions=" + recentVersions);
+ }
+
+ log.info("###### startupVersions=" + startingVersions);
+ } catch (Throwable t) {
+ SolrException.log(log, "Error getting recent versions. core=" + coreName, t);
+ recentVersions = new ArrayList<Long>(0);
}
-
- log.info("###### startupVersions=" + startingVersions);
}
if (recoveringAfterStartup) {
// if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
// when we went down. We may have received updates since then.
recentVersions = startingVersions;
-
- if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
- // last operation at the time of startup had the GAP flag set...
- // this means we were previously doing a full index replication
- // that probably didn't complete and buffering updates in the meantime.
- log.info("Looks like a previous replication recovery did not complete - skipping peer sync. core=" + coreName);
- firstTime = false; // skip peersync
+ try {
+ if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
+ // last operation at the time of startup had the GAP flag set...
+ // this means we were previously doing a full index replication
+ // that probably didn't complete and buffering updates in the
+ // meantime.
+ log.info("Looks like a previous replication recovery did not complete - skipping peer sync. core="
+ + coreName);
+ firstTime = false; // skip peersync
+ }
+ } catch (Throwable t) {
+ SolrException.log(log, "Error trying to get ulog starting operation. core="
+ + coreName, t);
+ firstTime = false; // skip peersync
}
}
- while (!successfulRecovery && !isClosed() && !isInterrupted()) { // don't use interruption or it will close channels though
+ while (!successfulRecovery && !isInterrupted()) { // don't use interruption or it will close channels though
try {
CloudDescriptor cloudDesc = core.getCoreDescriptor()
.getCloudDescriptor();
@@ -393,6 +415,7 @@ public class RecoveryStrategy extends Th
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
close = true;
successfulRecovery = true;
+ recoveryListener.recovered();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Recovery was interrupted", e);
@@ -421,10 +444,17 @@ public class RecoveryStrategy extends Th
try {
log.error("Recovery failed - trying again... core=" + coreName);
+
+ if (isClosed()) {
+ retries = INTERRUPTED;
+ }
+
retries++;
if (retries >= MAX_RETRIES) {
if (retries == INTERRUPTED) {
-
+ SolrException.log(log, "Recovery failed - interrupted. core=" + coreName);
+ recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
+ core.getCoreDescriptor());
} else {
SolrException.log(log, "Recovery failed - max retries exceeded. core=" + coreName);
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
@@ -433,7 +463,7 @@ public class RecoveryStrategy extends Th
break;
}
- } catch (Exception e) {
+ } catch (Throwable e) {
SolrException.log(log, "core=" + coreName, e);
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Mon Sep 3 19:46:47 2012
@@ -123,6 +123,8 @@ public final class ZkController {
protected volatile Overseer overseer;
+ private String leaderVoteWait;
+
/**
* @param cc
* @param zkServerAddress
@@ -139,6 +141,27 @@ public final class ZkController {
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
String localHostContext, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
TimeoutException, IOException {
+ this(cc, zkServerAddress, zkClientTimeout, zkClientConnectTimeout, localHost, locaHostPort, localHostContext, null, registerOnReconnect);
+ }
+
+
+ /**
+ * @param cc
+ * @param zkServerAddress
+ * @param zkClientTimeout
+ * @param zkClientConnectTimeout
+ * @param localHost
+ * @param locaHostPort
+ * @param localHostContext
+ * @param leaderVoteWait
+ * @param registerOnReconnect
+ * @throws InterruptedException
+ * @throws TimeoutException
+ * @throws IOException
+ */
+ public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
+ String localHostContext, String leaderVoteWait, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
+ TimeoutException, IOException {
if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
this.cc = cc;
if (localHostContext.contains("/")) {
@@ -153,6 +176,7 @@ public final class ZkController {
this.hostName = getHostNameFromAddress(this.localHost);
this.nodeName = this.hostName + ':' + this.localHostPort + '_' + this.localHostContext;
this.baseURL = this.localHost + ":" + this.localHostPort + "/" + this.localHostContext;
+ this.leaderVoteWait = leaderVoteWait;
zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
// on reconnect, reload cloud info
@@ -257,6 +281,10 @@ public final class ZkController {
init(registerOnReconnect);
}
+ public String getLeaderVoteWait() {
+ return leaderVoteWait;
+ }
+
private void registerAllCoresAsDown(
final CurrentCoreDescriptorProvider registerOnReconnect) {
List<CoreDescriptor> descriptors = registerOnReconnect
@@ -282,6 +310,22 @@ public final class ZkController {
*/
public void close() {
try {
+ String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
+ // we don't retry if there is a problem - count on ephem timeout
+ zkClient.delete(nodePath, -1, false);
+ } catch (KeeperException.NoNodeException e) {
+ // fine
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (KeeperException e) {
+ SolrException.log(log, "Error trying to remove our ephem live node", e);
+ }
+
+ for (ElectionContext context : electionContexts.values()) {
+ context.close();
+ }
+
+ try {
overseer.close();
} catch(Throwable t) {
log.error("Error closing overseer", t);
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java Mon Sep 3 19:46:47 2012
@@ -34,6 +34,9 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.xml.parsers.ParserConfigurationException;
@@ -56,10 +59,7 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.util.DOMUtil;
-import org.apache.solr.util.FileUtils;
-import org.apache.solr.util.SystemIdResolver;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.core.SolrXMLSerializer.SolrCoreXMLDef;
import org.apache.solr.core.SolrXMLSerializer.SolrXMLDef;
import org.apache.solr.handler.admin.CollectionsHandler;
@@ -71,6 +71,10 @@ import org.apache.solr.logging.LogWatche
import org.apache.solr.logging.jul.JulWatcher;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.update.SolrCoreState;
+import org.apache.solr.util.DOMUtil;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.FileUtils;
+import org.apache.solr.util.SystemIdResolver;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,8 +143,10 @@ public class CoreContainer
protected LogWatcher logging = null;
private String zkHost;
private Map<SolrCore,String> coreToOrigName = new ConcurrentHashMap<SolrCore,String>();
+ private String leaderVoteWait;
-
+ private ThreadPoolExecutor cmdDistribExecutor;
+
{
log.info("New CoreContainer " + System.identityHashCode(this));
}
@@ -184,6 +190,10 @@ public class CoreContainer
}
protected void initZooKeeper(String zkHost, int zkClientTimeout) {
+ cmdDistribExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new DefaultSolrThreadFactory("cmdDistribExecutor"));
+
// if zkHost sys property is not set, we are not using ZooKeeper
String zookeeperHost;
if(zkHost == null) {
@@ -227,7 +237,7 @@ public class CoreContainer
} else {
log.info("Zookeeper client=" + zookeeperHost);
}
- zkController = new ZkController(this, zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext, new CurrentCoreDescriptorProvider() {
+ zkController = new ZkController(this, zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext, leaderVoteWait, new CurrentCoreDescriptorProvider() {
@Override
public List<CoreDescriptor> getCurrentDescriptors() {
@@ -286,6 +296,11 @@ public class CoreContainer
}
+ // may return null if not in zk mode
+ public ThreadPoolExecutor getCmdDistribExecutor() {
+ return cmdDistribExecutor;
+ }
+
public Properties getContainerProperties() {
return containerProperties;
}
@@ -456,6 +471,8 @@ public class CoreContainer
hostContext = cfg.get("solr/cores/@hostContext", DEFAULT_HOST_CONTEXT);
host = cfg.get("solr/cores/@host", null);
+
+ leaderVoteWait = cfg.get("solr/cores/@leaderVoteWait", null);
if(shareSchema){
indexSchemaCache = new ConcurrentHashMap<String ,IndexSchema>();
@@ -601,6 +618,13 @@ public class CoreContainer
if (shardHandlerFactory != null) {
shardHandlerFactory.close();
}
+ if (cmdDistribExecutor != null) {
+ try {
+ ExecutorUtil.shutdownAndAwaitTermination(cmdDistribExecutor);
+ } catch (Throwable e) {
+ SolrException.log(log, e);
+ }
+ }
// we want to close zk stuff last
if(zkController != null) {
zkController.close();
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java Mon Sep 3 19:46:47 2012
@@ -25,6 +25,7 @@ import org.apache.solr.client.solrj.impl
import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.util.DefaultSolrThreadFactory;
@@ -163,7 +164,7 @@ public class HttpShardHandlerFactory ext
SolrException.log(log, e);
}
try {
- commExecutor.shutdownNow();
+ ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
} catch (Throwable e) {
SolrException.log(log, e);
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java Mon Sep 3 19:46:47 2012
@@ -17,6 +17,13 @@ package org.apache.solr.handler.componen
* limitations under the License.
*/
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
@@ -28,7 +35,7 @@ import org.apache.solr.common.SolrDocume
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.*;
+import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
@@ -52,10 +59,6 @@ import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.URL;
-import java.util.*;
-
public class RealTimeGetComponent extends SearchComponent
{
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java Mon Sep 3 19:46:47 2012
@@ -29,7 +29,7 @@ import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class DefaultSolrCoreState extends SolrCoreState {
+public final class DefaultSolrCoreState extends SolrCoreState implements RecoveryStrategy.RecoveryListener {
public static Logger log = LoggerFactory.getLogger(DefaultSolrCoreState.class);
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
@@ -43,7 +43,7 @@ public final class DefaultSolrCoreState
private SolrIndexWriter indexWriter = null;
private DirectoryFactory directoryFactory;
- private boolean recoveryRunning;
+ private volatile boolean recoveryRunning;
private RecoveryStrategy recoveryStrat;
private boolean closed = false;
@@ -163,6 +163,7 @@ public final class DefaultSolrCoreState
log.error("Error during shutdown of directory factory.", t);
}
try {
+ log.info("Closing SolrCoreState - canceling any ongoing recovery");
cancelRecovery();
} catch (Throwable t) {
log.error("Error cancelling recovery", t);
@@ -210,6 +211,7 @@ public final class DefaultSolrCoreState
}
synchronized (recoveryLock) {
+ log.info("Running recovery - first canceling any ongoing recovery");
cancelRecovery();
while (recoveryRunning) {
@@ -229,7 +231,7 @@ public final class DefaultSolrCoreState
// if true, we are recovering after startup and shouldn't have (or be receiving) additional updates (except for local tlog recovery)
boolean recoveringAfterStartup = recoveryStrat == null;
- recoveryStrat = new RecoveryStrategy(cc, name);
+ recoveryStrat = new RecoveryStrategy(cc, name, this);
recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
recoveryStrat.start();
recoveryRunning = true;
@@ -240,7 +242,7 @@ public final class DefaultSolrCoreState
@Override
public void cancelRecovery() {
synchronized (recoveryLock) {
- if (recoveryStrat != null) {
+ if (recoveryStrat != null && recoveryRunning) {
recoveryStrat.close();
while (true) {
try {
@@ -257,5 +259,15 @@ public final class DefaultSolrCoreState
}
}
}
+
+ @Override
+ public void recovered() {
+ recoveryRunning = false;
+ }
+
+ @Override
+ public void failed() {
+ recoveryRunning = false;
+ }
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Mon Sep 3 19:46:47 2012
@@ -53,11 +53,6 @@ import org.slf4j.LoggerFactory;
public class SolrCmdDistributor {
private static final int MAX_RETRIES_ON_FORWARD = 6;
public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
-
- // TODO: shut this thing down
- static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
- TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
- new DefaultSolrThreadFactory("cmdDistribExecutor"));;
static final HttpClient client;
static AdjustableSemaphore semaphore = new AdjustableSemaphore(8);
@@ -90,7 +85,7 @@ public class SolrCmdDistributor {
ModifiableSolrParams params;
}
- public SolrCmdDistributor(int numHosts) {
+ public SolrCmdDistributor(int numHosts, ThreadPoolExecutor executor) {
int maxPermits = Math.max(8, (numHosts - 1) * 8);
// limits how many tasks can actually execute at once
@@ -98,7 +93,7 @@ public class SolrCmdDistributor {
semaphore.setMaxPermits(maxPermits);
}
- completionService = new ExecutorCompletionService<Request>(commExecutor);
+ completionService = new ExecutorCompletionService<Request>(executor);
pending = new HashSet<Future<Request>>();
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Sep 3 19:46:47 2012
@@ -164,15 +164,13 @@ public class DistributedUpdateProcessor
}
//this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
-
-
cloudDesc = coreDesc.getCloudDescriptor();
if (cloudDesc != null) {
collection = cloudDesc.getCollectionName();
}
- cmdDistrib = new SolrCmdDistributor(numNodes);
+ cmdDistrib = new SolrCmdDistributor(numNodes, coreDesc.getCoreContainer().getCmdDistribExecutor());
}
private List<Node> setupRequest(int hash) {
@@ -851,6 +849,10 @@ public class DistributedUpdateProcessor
// forward to all replicas
if (leaderLogic && replicas != null) {
+ if (!req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) {
+ log.error("Abort sending request to replicas, we are no longer leader");
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Abort sending request to replicas, we are no longer leader");
+ }
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java Mon Sep 3 19:46:47 2012
@@ -164,11 +164,14 @@ public class ChaosMonkeyNothingIsSafeTes
// have request fails
checkShardConsistency(false, true);
+ long ctrlDocs = controlClient.query(new SolrQuery("*:*")).getResults()
+ .getNumFound();
+
// ensure we have added more than 0 docs
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*"))
.getResults().getNumFound();
- assertTrue(cloudClientDocs > 0);
+ assertTrue("Found " + ctrlDocs + " control docs", cloudClientDocs > 0);
if (VERBOSE) System.out.println("control docs:"
+ controlClient.query(new SolrQuery("*:*")).getResults()
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java Mon Sep 3 19:46:47 2012
@@ -21,6 +21,9 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.client.solrj.SolrQuery;
@@ -37,9 +40,12 @@ import org.apache.solr.common.util.Named
import org.apache.solr.update.SolrCmdDistributor.Node;
import org.apache.solr.update.SolrCmdDistributor.Response;
import org.apache.solr.update.SolrCmdDistributor.StdNode;
+import org.apache.solr.util.DefaultSolrThreadFactory;
public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
-
+ private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new DefaultSolrThreadFactory("cmdDistribExecutor"));
public SolrCmdDistributorTest() {
fixShardCount = true;
@@ -85,7 +91,7 @@ public class SolrCmdDistributorTest exte
public void doTest() throws Exception {
del("*:*");
- SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(8);
+ SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(8, executor);
ModifiableSolrParams params = new ModifiableSolrParams();
List<Node> nodes = new ArrayList<Node>();
@@ -119,7 +125,7 @@ public class SolrCmdDistributorTest exte
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
// add another 2 docs to control and 3 to client
- cmdDistrib = new SolrCmdDistributor(8);
+ cmdDistrib = new SolrCmdDistributor(8, executor);
cmd.solrDoc = sdoc("id", 2);
cmdDistrib.distribAdd(cmd, nodes, params);
@@ -152,7 +158,7 @@ public class SolrCmdDistributorTest exte
DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
dcmd.id = "2";
- cmdDistrib = new SolrCmdDistributor(8);
+ cmdDistrib = new SolrCmdDistributor(8, executor);
cmdDistrib.distribDelete(dcmd, nodes, params);
cmdDistrib.distribCommit(ccmd, nodes, params);
@@ -180,7 +186,7 @@ public class SolrCmdDistributorTest exte
int id = 5;
- cmdDistrib = new SolrCmdDistributor(8);
+ cmdDistrib = new SolrCmdDistributor(8, executor);
nodes.clear();
int cnt = atLeast(200);