You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2018/11/29 18:19:22 UTC
[13/16] lucene-solr:master: SOLR-12801: Make massive improvements to
the tests.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index 654b166..241b6cd 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -197,7 +197,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private boolean replicateOnStart = false;
- private ScheduledExecutorService executorService;
+ private volatile ScheduledExecutorService executorService;
private volatile long executorStartTime;
@@ -1369,6 +1369,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
if (restoreFuture != null) {
restoreFuture.cancel(false);
}
+
+ ExecutorUtil.shutdownAndAwaitTermination(executorService);
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/handler/admin/AutoscalingHistoryHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/AutoscalingHistoryHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/AutoscalingHistoryHandler.java
index ae99453..d6464fc 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/AutoscalingHistoryHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/AutoscalingHistoryHandler.java
@@ -125,7 +125,7 @@ public class AutoscalingHistoryHandler extends RequestHandlerBase implements Per
}
}
}
- try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(coreContainer.getZkController().getZkServerAddress()), Optional.empty())
+ try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(coreContainer.getZkController().getZkServerAddress()), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000)
.withHttpClient(coreContainer.getUpdateShardHandler().getDefaultHttpClient())
.build()) {
QueryResponse qr = cloudSolrClient.query(collection, params);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index dfb3c6b..c593be6 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
@@ -45,10 +46,10 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
-import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
+import org.apache.solr.cloud.ZkController.NotInClusterStateException;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.cloud.overseer.SliceMutator;
@@ -285,7 +286,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
} else {
// submits and doesn't wait for anything (no response)
- Overseer.getStateUpdateQueue(coreContainer.getZkController().getZkClient()).offer(Utils.toJSON(props));
+ coreContainer.getZkController().getOverseer().offerStateUpdate(Utils.toJSON(props));
}
}
@@ -1249,61 +1250,59 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
return;
}
+ int replicaFailCount;
if (createCollResponse.getResponse().get("failure") != null) {
- // TODO: we should not wait for Replicas we know failed
+ replicaFailCount = ((NamedList) createCollResponse.getResponse().get("failure")).size();
+ } else {
+ replicaFailCount = 0;
}
- String replicaNotAlive = null;
- String replicaState = null;
- String nodeNotLive = null;
-
CloudConfig ccfg = cc.getConfig().getCloudConfig();
- Integer numRetries = ccfg.getCreateCollectionWaitTimeTillActive(); // this config is actually # seconds, not # tries
+ Integer seconds = ccfg.getCreateCollectionWaitTimeTillActive();
Boolean checkLeaderOnly = ccfg.isCreateCollectionCheckLeaderActive();
- log.info("Wait for new collection to be active for at most " + numRetries + " seconds. Check all shard "
+ log.info("Wait for new collection to be active for at most " + seconds + " seconds. Check all shard "
+ (checkLeaderOnly ? "leaders" : "replicas"));
- ZkStateReader zkStateReader = cc.getZkController().getZkStateReader();
- for (int i = 0; i < numRetries; i++) {
- ClusterState clusterState = zkStateReader.getClusterState();
-
- final DocCollection docCollection = clusterState.getCollectionOrNull(collectionName);
-
- if (docCollection != null && docCollection.getSlices() != null) {
- Collection<Slice> shards = docCollection.getSlices();
- replicaNotAlive = null;
- for (Slice shard : shards) {
- Collection<Replica> replicas;
- if (!checkLeaderOnly) replicas = shard.getReplicas();
- else {
- replicas = new ArrayList<Replica>();
- replicas.add(shard.getLeader());
- }
- for (Replica replica : replicas) {
- String state = replica.getStr(ZkStateReader.STATE_PROP);
- log.debug("Checking replica status, collection={} replica={} state={}", collectionName,
- replica.getCoreUrl(), state);
- if (!clusterState.liveNodesContain(replica.getNodeName())
- || !state.equals(Replica.State.ACTIVE.toString())) {
- replicaNotAlive = replica.getCoreUrl();
- nodeNotLive = replica.getNodeName();
- replicaState = state;
- break;
+
+ try {
+ cc.getZkController().getZkStateReader().waitForState(collectionName, seconds, TimeUnit.SECONDS, (n, c) -> {
+
+ if (c == null) {
+ // the collection was not created, don't wait
+ return true;
+ }
+
+ if (c.getSlices() != null) {
+ Collection<Slice> shards = c.getSlices();
+ int replicaNotAliveCnt = 0;
+ for (Slice shard : shards) {
+ Collection<Replica> replicas;
+ if (!checkLeaderOnly) replicas = shard.getReplicas();
+ else {
+ replicas = new ArrayList<Replica>();
+ replicas.add(shard.getLeader());
+ }
+ for (Replica replica : replicas) {
+ String state = replica.getStr(ZkStateReader.STATE_PROP);
+ log.debug("Checking replica status, collection={} replica={} state={}", collectionName,
+ replica.getCoreUrl(), state);
+ if (!n.contains(replica.getNodeName())
+ || !state.equals(Replica.State.ACTIVE.toString())) {
+ replicaNotAliveCnt++;
+ return false;
+ }
}
}
- if (replicaNotAlive != null) break;
- }
- if (replicaNotAlive == null) return;
- }
- Thread.sleep(1000); // thus numRetries is roughly number of seconds
- }
- if (nodeNotLive != null && replicaState != null) {
- log.error("Timed out waiting for new collection's replicas to become ACTIVE "
- + (replicaState.equals(Replica.State.ACTIVE.toString()) ? "node " + nodeNotLive + " is not live"
- : "replica " + replicaNotAlive + " is in state of " + replicaState.toString()) + " with timeout=" + numRetries);
- } else {
- log.error("Timed out waiting for new collection's replicas to become ACTIVE with timeout=" + numRetries);
+ if ((replicaNotAliveCnt == 0) || (replicaNotAliveCnt <= replicaFailCount)) return true;
+ }
+ return false;
+ });
+ } catch (TimeoutException | InterruptedException e) {
+
+ String error = "Timeout waiting for active collection " + collectionName + " with timeout=" + seconds;
+ throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
}
+
}
public static void verifyRuleParams(CoreContainer cc, Map<String, Object> m) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
index 66dc39e..04942e4 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
@@ -371,7 +371,7 @@ public class CoreAdminHandler extends RequestHandlerBase implements PermissionNa
* Method to ensure shutting down of the ThreadPool Executor.
*/
public void shutdown() {
- if (parallelExecutor != null && !parallelExecutor.isShutdown())
+ if (parallelExecutor != null)
ExecutorUtil.shutdownAndAwaitTermination(parallelExecutor);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/handler/admin/MetricsHistoryHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/MetricsHistoryHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/MetricsHistoryHandler.java
index b569fe8..7dd8e4f 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/MetricsHistoryHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/MetricsHistoryHandler.java
@@ -642,7 +642,17 @@ public class MetricsHistoryHandler extends RequestHandlerBase implements Permiss
public void close() {
log.debug("Closing " + hashCode());
if (collectService != null) {
- collectService.shutdownNow();
+ boolean shutdown = false;
+ while (!shutdown) {
+ try {
+ // Wait a while for existing tasks to terminate
+ collectService.shutdownNow();
+ shutdown = collectService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
}
if (factory != null) {
factory.close();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index d064e78..7109944 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -18,13 +18,15 @@
package org.apache.solr.handler.admin;
import java.lang.invoke.MethodHandles;
-import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController.NotInClusterStateException;
import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -47,10 +49,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
final SolrParams params = it.req.getParams();
- String cname = params.get(CoreAdminParams.CORE);
- if (cname == null) {
- cname = "";
- }
+ String cname = params.get(CoreAdminParams.CORE, "");
String nodeName = params.get("nodeName");
String coreNodeName = params.get("coreNodeName");
@@ -59,133 +58,110 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
Boolean onlyIfLeader = params.getBool("onlyIfLeader");
Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive");
-
CoreContainer coreContainer = it.handler.coreContainer;
// wait long enough for the leader conflict to work itself out plus a little extra
int conflictWaitMs = coreContainer.getZkController().getLeaderConflictResolveWait();
- int maxTries = (int) Math.round(conflictWaitMs / 1000) + 3;
- log.info("Going to wait for coreNodeName: {}, state: {}, checkLive: {}, onlyIfLeader: {}, onlyIfLeaderActive: {}, maxTime: {} s",
- coreNodeName, waitForState, checkLive, onlyIfLeader, onlyIfLeaderActive, maxTries);
-
- Replica.State state = null;
- boolean live = false;
- int retry = 0;
- while (true) {
- try (SolrCore core = coreContainer.getCore(cname)) {
- if (core == null && retry == Math.min(30, maxTries)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "core not found:"
- + cname);
- }
- if (core != null) {
+ log.info(
+ "Going to wait for coreNodeName: {}, state: {}, checkLive: {}, onlyIfLeader: {}, onlyIfLeaderActive: {}",
+ coreNodeName, waitForState, checkLive, onlyIfLeader, onlyIfLeaderActive);
+
+ String collectionName;
+ CloudDescriptor cloudDescriptor;
+ try (SolrCore core = coreContainer.getCore(cname)) {
+ if (core == null) throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "core not found:" + cname);
+ collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ cloudDescriptor = core.getCoreDescriptor()
+ .getCloudDescriptor();
+ }
+ AtomicReference<String> errorMessage = new AtomicReference<>();
+ try {
+ coreContainer.getZkController().getZkStateReader().waitForState(collectionName, conflictWaitMs, TimeUnit.MILLISECONDS, (n, c) -> {
+ if (c == null)
+ return false;
+
+ try (SolrCore core = coreContainer.getCore(cname)) {
+ if (core == null) throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "core not found:" + cname);
if (onlyIfLeader != null && onlyIfLeader) {
if (!core.getCoreDescriptor().getCloudDescriptor().isLeader()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "We are not the leader");
}
}
+ }
- // wait until we are sure the recovering node is ready
- // to accept updates
- CloudDescriptor cloudDescriptor = core.getCoreDescriptor()
- .getCloudDescriptor();
- String collectionName = cloudDescriptor.getCollectionName();
-
- if (retry % 15 == 0) {
- if (retry > 0 && log.isInfoEnabled())
- log.info("After " + retry + " seconds, core " + cname + " (" +
- cloudDescriptor.getShardId() + " of " +
- cloudDescriptor.getCollectionName() + ") still does not have state: " +
- waitForState + "; forcing ClusterState update from ZooKeeper");
-
- // force a cluster state update
- coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName);
- }
-
- ClusterState clusterState = coreContainer.getZkController().getClusterState();
- DocCollection collection = clusterState.getCollection(collectionName);
- Slice slice = collection.getSlice(cloudDescriptor.getShardId());
- if (slice != null) {
- final Replica replica = slice.getReplicasMap().get(coreNodeName);
- if (replica != null) {
- state = replica.getState();
- live = clusterState.liveNodesContain(nodeName);
-
- final Replica.State localState = cloudDescriptor.getLastPublished();
-
- // TODO: This is funky but I've seen this in testing where the replica asks the
- // leader to be in recovery? Need to track down how that happens ... in the meantime,
- // this is a safeguard
- boolean leaderDoesNotNeedRecovery = (onlyIfLeader != null &&
- onlyIfLeader &&
- core.getName().equals(replica.getStr("core")) &&
- waitForState == Replica.State.RECOVERING &&
- localState == Replica.State.ACTIVE &&
- state == Replica.State.ACTIVE);
-
- if (leaderDoesNotNeedRecovery) {
- log.warn("Leader " + core.getName() + " ignoring request to be in the recovering state because it is live and active.");
- }
+ // wait until we are sure the recovering node is ready
+ // to accept updates
+ Replica.State state = null;
+ boolean live = false;
+ Slice slice = c.getSlice(cloudDescriptor.getShardId());
+ if (slice != null) {
+ final Replica replica = slice.getReplicasMap().get(coreNodeName);
+ if (replica != null) {
+ state = replica.getState();
+ live = n.contains(nodeName);
+
+ final Replica.State localState = cloudDescriptor.getLastPublished();
+
+ // TODO: This is funky but I've seen this in testing where the replica asks the
+ // leader to be in recovery? Need to track down how that happens ... in the meantime,
+ // this is a safeguard
+ boolean leaderDoesNotNeedRecovery = (onlyIfLeader != null &&
+ onlyIfLeader &&
+ cname.equals(replica.getStr("core")) &&
+ waitForState == Replica.State.RECOVERING &&
+ localState == Replica.State.ACTIVE &&
+ state == Replica.State.ACTIVE);
+
+ if (leaderDoesNotNeedRecovery) {
+ log.warn(
+ "Leader " + cname + " ignoring request to be in the recovering state because it is live and active.");
+ }
- ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
- // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
- if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && shardTerms.skipSendingUpdatesTo(coreNodeName)) {
- // The replica changed it term, then published itself as RECOVERING.
- // This core already see replica as RECOVERING
- // so it is guarantees that a live-fetch will be enough for this core to see max term published
- shardTerms.refreshTerms();
- }
+ ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
+ // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
+ if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName)
+ && shardTerms.skipSendingUpdatesTo(coreNodeName)) {
+ // The replica changed it term, then published itself as RECOVERING.
+ // This core already see replica as RECOVERING
+ // so it is guarantees that a live-fetch will be enough for this core to see max term published
+ shardTerms.refreshTerms();
+ }
- boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && localState != Replica.State.ACTIVE;
- log.info("In WaitForState(" + waitForState + "): collection=" + collectionName + ", shard=" + slice.getName() +
- ", thisCore=" + core.getName() + ", leaderDoesNotNeedRecovery=" + leaderDoesNotNeedRecovery +
- ", isLeader? " + core.getCoreDescriptor().getCloudDescriptor().isLeader() +
- ", live=" + live + ", checkLive=" + checkLive + ", currentState=" + state.toString() + ", localState=" + localState + ", nodeName=" + nodeName +
- ", coreNodeName=" + coreNodeName + ", onlyIfActiveCheckResult=" + onlyIfActiveCheckResult + ", nodeProps: " + replica);
-
- if (!onlyIfActiveCheckResult && replica != null && (state == waitForState || leaderDoesNotNeedRecovery)) {
- if (checkLive == null) {
- break;
- } else if (checkLive && live) {
- break;
- } else if (!checkLive && !live) {
- break;
- }
+ boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive
+ && localState != Replica.State.ACTIVE;
+ log.info(
+ "In WaitForState(" + waitForState + "): collection=" + collectionName + ", shard=" + slice.getName() +
+ ", thisCore=" + cname + ", leaderDoesNotNeedRecovery=" + leaderDoesNotNeedRecovery +
+ ", isLeader? " + cloudDescriptor.isLeader() +
+ ", live=" + live + ", checkLive=" + checkLive + ", currentState=" + state.toString()
+ + ", localState=" + localState + ", nodeName=" + nodeName +
+ ", coreNodeName=" + coreNodeName + ", onlyIfActiveCheckResult=" + onlyIfActiveCheckResult
+ + ", nodeProps: " + replica);
+
+ if (!onlyIfActiveCheckResult && replica != null && (state == waitForState || leaderDoesNotNeedRecovery)) {
+ if (checkLive == null) {
+ return true;
+ } else if (checkLive && live) {
+ return true;
+ } else if (!checkLive && !live) {
+ return true;
}
}
}
}
- if (retry++ == maxTries) {
- String collection = null;
- String leaderInfo = null;
- String shardId = null;
-
- try {
- CloudDescriptor cloudDescriptor =
- core.getCoreDescriptor().getCloudDescriptor();
- collection = cloudDescriptor.getCollectionName();
- shardId = cloudDescriptor.getShardId();
- leaderInfo = coreContainer.getZkController().
- getZkStateReader().getLeaderUrl(collection, shardId, 5000);
- } catch (Exception exc) {
- leaderInfo = "Not available due to: " + exc;
- }
-
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "I was asked to wait on state " + waitForState + " for "
- + shardId + " in " + collection + " on " + nodeName
- + " but I still do not see the requested state. I see state: "
- + Objects.toString(state) + " live:" + live + " leader from ZK: " + leaderInfo);
- }
-
if (coreContainer.isShutDown()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Solr is shutting down");
}
- }
- Thread.sleep(1000);
+
+ return false;
+ });
+ } catch (TimeoutException | InterruptedException e) {
+ String error = errorMessage.get();
+ if (error == null)
+ error = "Timeout waiting for collection state.";
+ throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
}
- log.info("Waited coreNodeName: " + coreNodeName + ", state: " + waitForState
- + ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader + " for: " + retry + " seconds.");
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java b/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java
index 97d4199..e787894 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java
@@ -16,13 +16,16 @@
*/
package org.apache.solr.handler.component;
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.concurrent.Future;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
@@ -34,28 +37,28 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.http.client.HttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.params.CommonParams.DISTRIB;
-
public abstract class IterativeMergeStrategy implements MergeStrategy {
- protected ExecutorService executorService;
- protected static HttpClient httpClient;
+ protected volatile ExecutorService executorService;
+
+ protected volatile CloseableHttpClient httpClient;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public void merge(ResponseBuilder rb, ShardRequest sreq) {
rb._responseDocs = new SolrDocumentList(); // Null pointers will occur otherwise.
rb.onePassDistributedQuery = true; // Turn off the second pass distributed.
- executorService = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("IterativeMergeStrategy"));
+ executorService = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("IterativeMergeStrategy"));
+ httpClient = getHttpClient();
try {
process(rb, sreq);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
+ HttpClientUtil.close(httpClient);
executorService.shutdownNow();
}
}
@@ -76,7 +79,7 @@ public abstract class IterativeMergeStrategy implements MergeStrategy {
}
- public static class CallBack implements Callable<CallBack> {
+ public class CallBack implements Callable<CallBack> {
private HttpSolrClient solrClient;
private QueryRequest req;
private QueryResponse response;
@@ -85,7 +88,7 @@ public abstract class IterativeMergeStrategy implements MergeStrategy {
public CallBack(ShardResponse originalShardResponse, QueryRequest req) {
this.solrClient = new Builder(originalShardResponse.getShardAddress())
- .withHttpClient(getHttpClient())
+ .withHttpClient(httpClient)
.build();
this.req = req;
this.originalShardResponse = originalShardResponse;
@@ -122,16 +125,16 @@ public abstract class IterativeMergeStrategy implements MergeStrategy {
protected abstract void process(ResponseBuilder rb, ShardRequest sreq) throws Exception;
- static synchronized HttpClient getHttpClient() {
-
- if(httpClient == null) {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
- params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
- httpClient = HttpClientUtil.createClient(params);
- return httpClient;
- } else {
- return httpClient;
- }
+ private CloseableHttpClient getHttpClient() {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
+ params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
+ CloseableHttpClient httpClient = HttpClientUtil.createClient(params);
+
+ return httpClient;
}
+
}
+
+
+
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
index 01f5f60..a4ac256 100644
--- a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
+++ b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
@@ -38,7 +38,6 @@ import org.apache.solr.common.util.DataInputInputStream;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
-import org.apache.solr.handler.RequestHandlerUtils;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
@@ -89,13 +88,6 @@ public class JavabinLoader extends ContentStreamLoader {
@Override
public void update(SolrInputDocument document, UpdateRequest updateRequest, Integer commitWithin, Boolean overwrite) {
if (document == null) {
- // Perhaps commit from the parameters
- try {
- RequestHandlerUtils.handleCommit(req, processor, updateRequest.getParams(), false);
- RequestHandlerUtils.handleRollback(req, processor, updateRequest.getParams(), false);
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR handling commit/rollback");
- }
return;
}
if (addCmd == null) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java b/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
index c4ef72c..e4d7a2d 100644
--- a/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
+++ b/solr/core/src/java/org/apache/solr/handler/sql/SolrSchema.java
@@ -53,7 +53,7 @@ class SolrSchema extends AbstractSchema {
@Override
protected Map<String, Table> getTableMap() {
String zk = this.properties.getProperty("zk");
- try(CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zk), Optional.empty()).build()) {
+ try(CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zk), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000).build()) {
cloudSolrClient.connect();
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
@@ -77,7 +77,7 @@ class SolrSchema extends AbstractSchema {
private Map<String, LukeResponse.FieldInfo> getFieldInfo(String collection) {
String zk = this.properties.getProperty("zk");
- try(CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zk), Optional.empty()).build()) {
+ try(CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zk), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000).build()) {
cloudSolrClient.connect();
LukeRequest lukeRequest = new LukeRequest();
lukeRequest.setNumTerms(0);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/request/SimpleFacets.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/request/SimpleFacets.java b/solr/core/src/java/org/apache/solr/request/SimpleFacets.java
index a506ca1..4608e2d 100644
--- a/solr/core/src/java/org/apache/solr/request/SimpleFacets.java
+++ b/solr/core/src/java/org/apache/solr/request/SimpleFacets.java
@@ -34,8 +34,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.lucene.index.LeafReader;
@@ -66,7 +64,6 @@ import org.apache.solr.common.params.FacetParams;
import org.apache.solr.common.params.GroupParams;
import org.apache.solr.common.params.RequiredSolrParams;
import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
@@ -93,7 +90,6 @@ import org.apache.solr.search.facet.FacetDebugInfo;
import org.apache.solr.search.facet.FacetRequest;
import org.apache.solr.search.grouping.GroupingSpecification;
import org.apache.solr.util.BoundedTreeSet;
-import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.RTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,6 +166,7 @@ public class SimpleFacets {
this.docsOrig = docs;
this.global = params;
this.rb = rb;
+ this.facetExecutor = req.getCore().getCoreContainer().getUpdateShardHandler().getUpdateExecutor();
}
public void setFacetDebugInfo(FacetDebugInfo fdebugParent) {
@@ -773,13 +770,7 @@ public class SimpleFacets {
}
};
- static final Executor facetExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(
- 0,
- Integer.MAX_VALUE,
- 10, TimeUnit.SECONDS, // terminate idle threads after 10 sec
- new SynchronousQueue<Runnable>() // directly hand off tasks
- , new DefaultSolrThreadFactory("facetExecutor")
- );
+ private final Executor facetExecutor;
/**
* Returns a list of value constraints and the associated facet counts
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
index 7f02b24..424f1a6 100644
--- a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
+++ b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
@@ -55,7 +55,7 @@ public class SolrRequestInfo {
SolrRequestInfo prev = threadLocal.get();
if (prev != null) {
log.error("Previous SolrRequestInfo was not closed! req=" + prev.req.getOriginalParams().toString());
- log.error("prev == info : {}", prev.req == info.req);
+ log.error("prev == info : {}", prev.req == info.req, new RuntimeException());
}
assert prev == null;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java b/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
index 43dac48..54d09d8 100644
--- a/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
+++ b/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
@@ -60,7 +60,7 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
private final Map<String, PublicKey> keyCache = new ConcurrentHashMap<>();
private final PublicKeyHandler publicKeyHandler;
private final CoreContainer cores;
- private final int MAX_VALIDITY = Integer.parseInt(System.getProperty("pkiauth.ttl", "10000"));
+ private final int MAX_VALIDITY = Integer.parseInt(System.getProperty("pkiauth.ttl", "15000"));
private final String myNodeName;
private final HttpHeaderClientInterceptor interceptor = new HttpHeaderClientInterceptor();
private boolean interceptorRegistered = false;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index 64dc3dd..78ca8d4 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -885,9 +885,8 @@ public class HttpSolrCall {
boolean byCoreName = false;
if (slices == null) {
- activeSlices = new ArrayList<>();
- // look by core name
byCoreName = true;
+ activeSlices = new ArrayList<>();
getSlicesForCollections(clusterState, activeSlices, true);
if (activeSlices.isEmpty()) {
getSlicesForCollections(clusterState, activeSlices, false);
@@ -930,7 +929,7 @@ public class HttpSolrCall {
if (!activeReplicas || (liveNodes.contains(replica.getNodeName())
&& replica.getState() == Replica.State.ACTIVE)) {
- if (byCoreName && !collectionName.equals(replica.getStr(CORE_NAME_PROP))) {
+ if (byCoreName && !origCorename.equals(replica.getStr(CORE_NAME_PROP))) {
// if it's by core name, make sure they match
continue;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
index 78e58d0..9e6523b 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
@@ -102,6 +102,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
private final String metricTag = Integer.toHexString(hashCode());
private SolrMetricManager metricManager;
private String registryName;
+ private volatile boolean closeOnDestroy = true;
/**
* Enum to define action that needs to be processed.
@@ -294,26 +295,43 @@ public class SolrDispatchFilter extends BaseSolrFilter {
@Override
public void destroy() {
- try {
- FileCleaningTracker fileCleaningTracker = SolrRequestParsers.fileCleaningTracker;
- if (fileCleaningTracker != null) {
- fileCleaningTracker.exitWhenFinished();
- }
- } catch (Exception e) {
- log.warn("Exception closing FileCleaningTracker", e);
- } finally {
- SolrRequestParsers.fileCleaningTracker = null;
- }
-
- if (metricManager != null) {
- metricManager.unregisterGauges(registryName, metricTag);
+ if (closeOnDestroy) {
+ close();
}
-
- if (cores != null) {
+ }
+
+ public void close() {
+ CoreContainer cc = cores;
+ cores = null;
+ try {
try {
- cores.shutdown();
+ FileCleaningTracker fileCleaningTracker = SolrRequestParsers.fileCleaningTracker;
+ if (fileCleaningTracker != null) {
+ fileCleaningTracker.exitWhenFinished();
+ }
+ } catch (NullPointerException e) {
+ // okay
+ } catch (Exception e) {
+ log.warn("Exception closing FileCleaningTracker", e);
} finally {
- cores = null;
+ SolrRequestParsers.fileCleaningTracker = null;
+ }
+
+ if (metricManager != null) {
+ try {
+ metricManager.unregisterGauges(registryName, metricTag);
+ } catch (NullPointerException e) {
+ // okay
+ } catch (Exception e) {
+ log.warn("Exception closing FileCleaningTracker", e);
+ } finally {
+ metricManager = null;
+ }
+ }
+ } finally {
+ if (cc != null) {
+ httpClient = null;
+ cc.shutdown();
}
}
}
@@ -594,4 +612,8 @@ public class SolrDispatchFilter extends BaseSolrFilter {
return response;
}
}
+
+ public void closeOnDestroy(boolean closeOnDestroy) {
+ this.closeOnDestroy = closeOnDestroy;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/update/CommitTracker.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index 7da9651..d3929b2 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -59,7 +59,7 @@ public final class CommitTracker implements Runnable {
private long tLogFileSizeUpperBound;
private final ScheduledExecutorService scheduler =
- Executors.newScheduledThreadPool(1, new DefaultSolrThreadFactory("commitScheduler"));
+ Executors.newScheduledThreadPool(0, new DefaultSolrThreadFactory("commitScheduler"));
private ScheduledFuture pending;
// state
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 660df06..4dc5b3b 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -814,25 +814,23 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
}
- public static boolean commitOnClose = true; // TODO: make this a real config option or move it to TestInjection
+ public static volatile boolean commitOnClose = true; // TODO: make this a real config option or move it to TestInjection
// IndexWriterCloser interface method - called from solrCoreState.decref(this)
@Override
public void closeWriter(IndexWriter writer) throws IOException {
assert TestInjection.injectNonGracefullClose(core.getCoreContainer());
-
- boolean clearRequestInfo = false;
- solrCoreState.getCommitLock().lock();
- try {
- SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
- SolrQueryResponse rsp = new SolrQueryResponse();
- if (SolrRequestInfo.getRequestInfo() == null) {
- clearRequestInfo = true;
- SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // important for debugging
- }
+ boolean clearRequestInfo = false;
+ SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
+ SolrQueryResponse rsp = new SolrQueryResponse();
+ if (SolrRequestInfo.getRequestInfo() == null) {
+ clearRequestInfo = true;
+ SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // important for debugging
+ }
+ try {
if (!commitOnClose) {
if (writer != null) {
writer.rollback();
@@ -845,58 +843,65 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
return;
}
- // do a commit before we quit?
- boolean tryToCommit = writer != null && ulog != null && ulog.hasUncommittedChanges() && ulog.getState() == UpdateLog.State.ACTIVE;
+ // do a commit before we quit?
+ boolean tryToCommit = writer != null && ulog != null && ulog.hasUncommittedChanges()
+ && ulog.getState() == UpdateLog.State.ACTIVE;
+ // be tactical with this lock! closing the updatelog can deadlock when it tries to commit
+ solrCoreState.getCommitLock().lock();
try {
- if (tryToCommit) {
- log.info("Committing on IndexWriter close.");
- CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
- cmd.openSearcher = false;
- cmd.waitSearcher = false;
- cmd.softCommit = false;
-
- // TODO: keep other commit callbacks from being called?
- // this.commit(cmd); // too many test failures using this method... is it because of callbacks?
-
- synchronized (solrCoreState.getUpdateLock()) {
- ulog.preCommit(cmd);
- }
-
- // todo: refactor this shared code (or figure out why a real CommitUpdateCommand can't be used)
- SolrIndexWriter.setCommitData(writer, cmd.getVersion());
- writer.commit();
+ try {
+ if (tryToCommit) {
+ log.info("Committing on IndexWriter close.");
+ CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
+ cmd.openSearcher = false;
+ cmd.waitSearcher = false;
+ cmd.softCommit = false;
+
+ // TODO: keep other commit callbacks from being called?
+ // this.commit(cmd); // too many test failures using this method... is it because of callbacks?
+
+ synchronized (solrCoreState.getUpdateLock()) {
+ ulog.preCommit(cmd);
+ }
+
+ // todo: refactor this shared code (or figure out why a real CommitUpdateCommand can't be used)
+ SolrIndexWriter.setCommitData(writer, cmd.getVersion());
+ writer.commit();
- synchronized (solrCoreState.getUpdateLock()) {
- ulog.postCommit(cmd);
+ synchronized (solrCoreState.getUpdateLock()) {
+ ulog.postCommit(cmd);
+ }
+ }
+ } catch (Throwable th) {
+ log.error("Error in final commit", th);
+ if (th instanceof OutOfMemoryError) {
+ throw (OutOfMemoryError) th;
}
}
- } catch (Throwable th) {
- log.error("Error in final commit", th);
- if (th instanceof OutOfMemoryError) {
- throw (OutOfMemoryError) th;
- }
- }
- // we went through the normal process to commit, so we don't have to artificially
- // cap any ulog files.
- try {
- if (ulog != null) ulog.close(false);
- } catch (Throwable th) {
- log.error("Error closing log files", th);
- if (th instanceof OutOfMemoryError) {
- throw (OutOfMemoryError) th;
- }
- }
+ } finally {
+ solrCoreState.getCommitLock().unlock();
- if (writer != null) {
- writer.close();
}
-
} finally {
- solrCoreState.getCommitLock().unlock();
if (clearRequestInfo) SolrRequestInfo.clearRequestInfo();
}
+ // we went through the normal process to commit, so we don't have to artificially
+ // cap any ulog files.
+ try {
+ if (ulog != null) ulog.close(false);
+ } catch (Throwable th) {
+ log.error("Error closing log files", th);
+ if (th instanceof OutOfMemoryError) {
+ throw (OutOfMemoryError) th;
+ }
+ }
+
+ if (writer != null) {
+ writer.close();
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index 665db77..380bc9a 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -26,6 +26,7 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.Sort;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.cloud.RecoveryStrategy;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.DirectoryFactory;
@@ -172,7 +173,12 @@ public abstract class SolrCoreState {
public abstract void setLastReplicateIndexSuccess(boolean success);
- public static class CoreIsClosedException extends IllegalStateException {
+ public static class CoreIsClosedException extends AlreadyClosedException {
+
+ public CoreIsClosedException() {
+ super();
+ }
+
public CoreIsClosedException(String s) {
super(s);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 1abf23c..0941da5 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -183,7 +183,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
long id = -1;
- protected State state = State.ACTIVE;
+ protected volatile State state = State.ACTIVE;
protected TransactionLog bufferTlog;
protected TransactionLog tlog;
@@ -1351,8 +1351,9 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
public void close(boolean committed, boolean deleteOnClose) {
+ recoveryExecutor.shutdown(); // no new tasks
+
synchronized (this) {
- recoveryExecutor.shutdown(); // no new tasks
// Don't delete the old tlogs, we want to be able to replay from them and retrieve old versions
@@ -1373,11 +1374,12 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
bufferTlog.forceClose();
}
- try {
- ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor);
- } catch (Exception e) {
- SolrException.log(log, e);
- }
+ }
+
+ try {
+ ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor);
+ } catch (Exception e) {
+ SolrException.log(log, e);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index bc013bb..4bb201f 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -66,10 +66,14 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
private final CloseableHttpClient updateOnlyClient;
+ private final CloseableHttpClient recoveryOnlyClient;
+
private final CloseableHttpClient defaultClient;
private final InstrumentedPoolingHttpClientConnectionManager updateOnlyConnectionManager;
+ private final InstrumentedPoolingHttpClientConnectionManager recoveryOnlyConnectionManager;
+
private final InstrumentedPoolingHttpClientConnectionManager defaultConnectionManager;
private final InstrumentedHttpRequestExecutor httpRequestExecutor;
@@ -83,10 +87,13 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
public UpdateShardHandler(UpdateShardHandlerConfig cfg) {
updateOnlyConnectionManager = new InstrumentedPoolingHttpClientConnectionManager(HttpClientUtil.getSchemaRegisteryProvider().getSchemaRegistry());
+ recoveryOnlyConnectionManager = new InstrumentedPoolingHttpClientConnectionManager(HttpClientUtil.getSchemaRegisteryProvider().getSchemaRegistry());
defaultConnectionManager = new InstrumentedPoolingHttpClientConnectionManager(HttpClientUtil.getSchemaRegisteryProvider().getSchemaRegistry());
if (cfg != null ) {
updateOnlyConnectionManager.setMaxTotal(cfg.getMaxUpdateConnections());
updateOnlyConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost());
+ recoveryOnlyConnectionManager.setMaxTotal(cfg.getMaxUpdateConnections());
+ recoveryOnlyConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost());
defaultConnectionManager.setMaxTotal(cfg.getMaxUpdateConnections());
defaultConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost());
}
@@ -110,6 +117,7 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
httpRequestExecutor = new InstrumentedHttpRequestExecutor(metricNameStrategy);
updateOnlyClient = HttpClientUtil.createClient(clientParams, updateOnlyConnectionManager, false, httpRequestExecutor);
+ recoveryOnlyClient = HttpClientUtil.createClient(clientParams, recoveryOnlyConnectionManager, false, httpRequestExecutor);
defaultClient = HttpClientUtil.createClient(clientParams, defaultConnectionManager, false, httpRequestExecutor);
// following is done only for logging complete configuration.
@@ -178,6 +186,11 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
return updateOnlyClient;
}
+ // don't introduce a bug, this client is for recovery ops only!
+ public HttpClient getRecoveryOnlyHttpClient() {
+ return recoveryOnlyClient;
+ }
+
/**
* This method returns an executor that is meant for non search related tasks.
@@ -191,6 +204,10 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
public PoolingHttpClientConnectionManager getDefaultConnectionManager() {
return defaultConnectionManager;
}
+
+ public PoolingHttpClientConnectionManager getRecoveryOnlyConnectionManager() {
+ return recoveryOnlyConnectionManager;
+ }
/**
*
@@ -206,12 +223,14 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
ExecutorUtil.shutdownAndAwaitTermination(updateExecutor);
ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor);
} catch (Exception e) {
- SolrException.log(log, e);
+ throw new RuntimeException(e);
} finally {
HttpClientUtil.close(updateOnlyClient);
+ HttpClientUtil.close(recoveryOnlyClient);
HttpClientUtil.close(defaultClient);
updateOnlyConnectionManager.close();
defaultConnectionManager.close();
+ recoveryOnlyConnectionManager.close();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 004f4f7..74bd86e 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -16,6 +16,9 @@
*/
package org.apache.solr.update.processor;
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
@@ -28,6 +31,9 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@@ -37,7 +43,6 @@ import org.apache.lucene.util.CharsRefBuilder;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrRequest.METHOD;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -97,9 +102,6 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.params.CommonParams.DISTRIB;
-import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
-
// NOT mt-safe... create a new processor for each add thread
// TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for
public class DistributedUpdateProcessor extends UpdateRequestProcessor {
@@ -116,12 +118,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
/**
* Request forwarded to a leader of a different shard will be retried up to this amount of times by default
*/
- static final int MAX_RETRIES_ON_FORWARD_DEAULT = 25;
+ static final int MAX_RETRIES_ON_FORWARD_DEAULT = Integer.getInteger("solr.retries.on.forward", 25);
/**
* Requests from leader to it's followers will be retried this amount of times by default
*/
- static final int MAX_RETRIES_TO_FOLLOWERS_DEFAULT = 3;
+ static final int MAX_RETRIES_TO_FOLLOWERS_DEFAULT = Integer.getInteger("solr.retries.to.followers", 3);
/**
* Values this processor supports for the <code>DISTRIB_UPDATE_PARAM</code>.
@@ -433,6 +435,46 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
return false;
}
+
+ private List<Node> getReplicaNodesForLeader(String shardId, Replica leaderReplica) {
+ ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+ String leaderCoreNodeName = leaderReplica.getName();
+ List<Replica> replicas = clusterState.getCollection(collection)
+ .getSlice(shardId)
+ .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
+ replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName));
+ if (replicas.isEmpty()) {
+ return null;
+ }
+
+ // check for test param that lets us miss replicas
+ String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
+ Set<String> skipListSet = null;
+ if (skipList != null) {
+ skipListSet = new HashSet<>(skipList.length);
+ skipListSet.addAll(Arrays.asList(skipList));
+ log.info("test.distrib.skip.servers was found and contains:" + skipListSet);
+ }
+
+ List<Node> nodes = new ArrayList<>(replicas.size());
+ skippedCoreNodeNames = new HashSet<>();
+ ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
+ for (Replica replica : replicas) {
+ String coreNodeName = replica.getName();
+ if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
+ log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true");
+ } else if (zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
+ log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
+ skippedCoreNodeNames.add(replica.getName());
+ } else if (!clusterState.getLiveNodes().contains(replica.getNodeName())
+ || replica.getState() == Replica.State.DOWN) {
+ skippedCoreNodeNames.add(replica.getName());
+ } else {
+ nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId));
+ }
+ }
+ return nodes;
+ }
/** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */
private List<Node> getSubShardLeaders(DocCollection coll, String shardId, String docId, SolrInputDocument doc) {
@@ -521,8 +563,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
ZkStateReader.SHARD_ID_PROP, myShardId,
"routeKey", routeKey + "!");
SolrZkClient zkClient = zkController.getZkClient();
- DistributedQueue queue = Overseer.getStateUpdateQueue(zkClient);
- queue.offer(Utils.toJSON(map));
+ zkController.getOverseer().offerStateUpdate(Utils.toJSON(map));
} catch (KeeperException e) {
log.warn("Exception while removing routing rule for route key: " + routeKey, e);
} catch (Exception e) {
@@ -1865,38 +1906,42 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
updateCommand = cmd;
List<Node> nodes = null;
- boolean singleLeader = false;
+ Replica leaderReplica = null;
if (zkEnabled) {
zkCheck();
+ try {
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId());
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
+ }
+ isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
- nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT));
+ nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT), true);
if (nodes == null) {
// This could happen if there are only pull replicas
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to distribute commit operation. No replicas available of types " + Replica.Type.TLOG + " or " + Replica.Type.NRT);
}
- if (isLeader && nodes.size() == 1 && replicaType != Replica.Type.PULL) {
- singleLeader = true;
- }
+
+ nodes.removeIf((node) -> node.getNodeProps().getNodeName().equals(zkController.getNodeName())
+ && node.getNodeProps().getCoreName().equals(req.getCore().getName()));
}
- if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) {
+ CompletionService<Exception> completionService = new ExecutorCompletionService<>(req.getCore().getCoreContainer().getUpdateShardHandler().getUpdateExecutor());
+ Set<Future<Exception>> pending = new HashSet<>();
+ if (!zkEnabled || (!isLeader && req.getParams().get(COMMIT_END_POINT, "").equals("replicas"))) {
if (replicaType == Replica.Type.TLOG) {
- try {
- Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
- collection, cloudDesc.getShardId());
- isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
- if (isLeader) {
- long commitVersion = vinfo.getNewClock();
- cmd.setVersion(commitVersion);
- doLocalCommit(cmd);
- } else {
- assert TestInjection.waitForInSyncWithLeader(req.getCore(),
- zkController, collection, cloudDesc.getShardId()): "Core " + req.getCore() + " not in sync with leader";
- }
- } catch (InterruptedException e) {
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
+
+ if (isLeader) {
+ long commitVersion = vinfo.getNewClock();
+ cmd.setVersion(commitVersion);
+ doLocalCommit(cmd);
+ } else {
+ assert TestInjection.waitForInSyncWithLeader(req.getCore(),
+ zkController, collection, cloudDesc.getShardId()) : "Core " + req.getCore() + " not in sync with leader";
}
+
} else if (replicaType == Replica.Type.PULL) {
log.warn("Commit not supported on replicas of type " + Replica.Type.PULL);
} else {
@@ -1905,21 +1950,51 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
long commitVersion = vinfo.getNewClock();
cmd.setVersion(commitVersion);
}
+
doLocalCommit(cmd);
}
} else {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
- if (!req.getParams().getBool(COMMIT_END_POINT, false)) {
- params.set(COMMIT_END_POINT, true);
+
+ List<Node> useNodes = null;
+ if (req.getParams().get(COMMIT_END_POINT) == null) {
+ useNodes = nodes;
+ params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
+ params.set(COMMIT_END_POINT, "leaders");
+ if (useNodes != null) {
+ params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+ cmdDistrib.distribCommit(cmd, useNodes, params);
+ cmdDistrib.blockAndDoRetries();
+ }
+ }
+
+ if (isLeader) {
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
- params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
- if (nodes != null) {
- cmdDistrib.distribCommit(cmd, nodes, params);
+
+ params.set(COMMIT_END_POINT, "replicas");
+
+ useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
+
+ if (useNodes != null) {
+ params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+
+ cmdDistrib.distribCommit(cmd, useNodes, params);
+ }
+ // NRT replicas will always commit
+ if (vinfo != null) {
+ long commitVersion = vinfo.getNewClock();
+ cmd.setVersion(commitVersion);
+ }
+
+ doLocalCommit(cmd);
+ if (useNodes != null) {
cmdDistrib.blockAndDoRetries();
}
}
}
+
}
private void doLocalCommit(CommitUpdateCommand cmd) throws IOException {
@@ -1951,7 +2026,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (next != null && nodes == null) next.finish();
}
- private List<Node> getCollectionUrls(String collection, EnumSet<Replica.Type> types) {
+ private List<Node> getCollectionUrls(String collection, EnumSet<Replica.Type> types, boolean onlyLeaders) {
ClusterState clusterState = zkController.getClusterState();
final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (collection == null || docCollection.getSlicesMap() == null) {
@@ -1962,7 +2037,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
final List<Node> urls = new ArrayList<>(slices.size());
for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) {
Slice replicas = slices.get(sliceEntry.getKey());
-
+ if (onlyLeaders) {
+ Replica replica = docCollection.getLeader(replicas.getName());
+ if (replica != null) {
+ ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica);
+ urls.add(new StdNode(nodeProps, collection, replicas.getName()));
+ }
+ continue;
+ }
Map<String,Replica> shardMap = replicas.getReplicasMap();
for (Entry<String,Replica> entry : shardMap.entrySet()) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/util/SolrCLI.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/SolrCLI.java b/solr/core/src/java/org/apache/solr/util/SolrCLI.java
index dc239f1..03aa5f8 100755
--- a/solr/core/src/java/org/apache/solr/util/SolrCLI.java
+++ b/solr/core/src/java/org/apache/solr/util/SolrCLI.java
@@ -2381,7 +2381,7 @@ public class SolrCLI {
protected void deleteCollection(CommandLine cli) throws Exception {
String zkHost = getZkHost(cli);
- try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zkHost), Optional.empty()).build()) {
+ try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zkHost), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000).build()) {
echoIfVerbose("Connecting to ZooKeeper at " + zkHost, cli);
cloudSolrClient.connect();
deleteCollection(cloudSolrClient, cli);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/util/TestInjection.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index bee6278..b03b8ab 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -16,6 +16,9 @@
*/
package org.apache.solr.util;
+import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
+import static org.apache.solr.handler.ReplicationHandler.COMMAND;
+
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.util.Collections;
@@ -24,6 +27,7 @@ import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -50,9 +54,6 @@ import org.apache.solr.update.SolrIndexWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
-import static org.apache.solr.handler.ReplicationHandler.COMMAND;
-
/**
* Allows random faults to be injected in running code during test runs.
@@ -116,43 +117,50 @@ public class TestInjection {
}
}
- public static String nonGracefullClose = null;
+ public volatile static String nonGracefullClose = null;
- public static String failReplicaRequests = null;
+ public volatile static String failReplicaRequests = null;
- public static String failUpdateRequests = null;
+ public volatile static String failUpdateRequests = null;
- public static String nonExistentCoreExceptionAfterUnload = null;
+ public volatile static String nonExistentCoreExceptionAfterUnload = null;
- public static String updateLogReplayRandomPause = null;
+ public volatile static String updateLogReplayRandomPause = null;
- public static String updateRandomPause = null;
+ public volatile static String updateRandomPause = null;
- public static String prepRecoveryOpPauseForever = null;
+ public volatile static String prepRecoveryOpPauseForever = null;
- public static String randomDelayInCoreCreation = null;
+ public volatile static String randomDelayInCoreCreation = null;
- public static int randomDelayMaxInCoreCreationInSec = 10;
+ public volatile static int randomDelayMaxInCoreCreationInSec = 10;
- public static String splitFailureBeforeReplicaCreation = null;
+ public volatile static String splitFailureBeforeReplicaCreation = null;
- public static String splitFailureAfterReplicaCreation = null;
+ public volatile static String splitFailureAfterReplicaCreation = null;
- public static CountDownLatch splitLatch = null;
+ public volatile static CountDownLatch splitLatch = null;
- public static String waitForReplicasInSync = "true:60";
+ public volatile static String waitForReplicasInSync = "true:60";
- public static String failIndexFingerprintRequests = null;
+ public volatile static String failIndexFingerprintRequests = null;
- public static String wrongIndexFingerprint = null;
+ public volatile static String wrongIndexFingerprint = null;
- private static Set<Timer> timers = Collections.synchronizedSet(new HashSet<Timer>());
+ private volatile static Set<Timer> timers = Collections.synchronizedSet(new HashSet<Timer>());
- private static AtomicInteger countPrepRecoveryOpPauseForever = new AtomicInteger(0);
+ private volatile static AtomicInteger countPrepRecoveryOpPauseForever = new AtomicInteger(0);
- public static Integer delayBeforeSlaveCommitRefresh=null;
+ public volatile static Integer delayBeforeSlaveCommitRefresh=null;
- public static boolean uifOutOfMemoryError = false;
+ public volatile static boolean uifOutOfMemoryError = false;
+
+ private volatile static CountDownLatch notifyPauseForeverDone = new CountDownLatch(1);
+
+ public static void notifyPauseForeverDone() {
+ notifyPauseForeverDone.countDown();
+ notifyPauseForeverDone = new CountDownLatch(1);
+ }
public static void reset() {
nonGracefullClose = null;
@@ -172,7 +180,8 @@ public class TestInjection {
wrongIndexFingerprint = null;
delayBeforeSlaveCommitRefresh = null;
uifOutOfMemoryError = false;
-
+ notifyPauseForeverDone();
+ newSearcherHooks.clear();
for (Timer timer : timers) {
timer.cancel();
}
@@ -371,19 +380,20 @@ public class TestInjection {
}
public static boolean injectPrepRecoveryOpPauseForever() {
- if (prepRecoveryOpPauseForever != null) {
+ String val = prepRecoveryOpPauseForever;
+ if (val != null) {
Random rand = random();
if (null == rand) return true;
-
- Pair<Boolean,Integer> pair = parseValue(prepRecoveryOpPauseForever);
+ Pair<Boolean,Integer> pair = parseValue(val);
boolean enabled = pair.first();
int chanceIn100 = pair.second();
// Prevent for continuous pause forever
if (enabled && rand.nextInt(100) >= (100 - chanceIn100) && countPrepRecoveryOpPauseForever.get() < 1) {
countPrepRecoveryOpPauseForever.incrementAndGet();
log.info("inject pause forever for prep recovery op");
+
try {
- Thread.sleep(Integer.MAX_VALUE);
+ notifyPauseForeverDone.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -481,9 +491,12 @@ public class TestInjection {
return false;
}
- private static Pair<Boolean,Integer> parseValue(String raw) {
+ private static Pair<Boolean,Integer> parseValue(final String raw) {
+ if (raw == null) return new Pair<>(false, 0);
Matcher m = ENABLED_PERCENT.matcher(raw);
- if (!m.matches()) throw new RuntimeException("No match, probably bad syntax: " + raw);
+ if (!m.matches()) {
+ throw new RuntimeException("No match, probably bad syntax: " + raw);
+ }
String val = m.group(1);
String percent = "100";
if (m.groupCount() == 2) {
@@ -511,4 +524,24 @@ public class TestInjection {
return true;
}
+ static Set<Hook> newSearcherHooks = ConcurrentHashMap.newKeySet();
+
+ public interface Hook {
+ public void newSearcher(String collectionName);
+ public void waitForSearcher(String collection, int cnt, int timeoutms, boolean failOnTimeout) throws InterruptedException;
+ }
+
+ public static boolean newSearcherHook(Hook hook) {
+ newSearcherHooks.add(hook);
+ return true;
+ }
+
+ public static boolean injectSearcherHooks(String collectionName) {
+ for (Hook hook : newSearcherHooks) {
+ hook.newSearcher(collectionName);
+ }
+ return true;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/util/TimeOut.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/TimeOut.java b/solr/core/src/java/org/apache/solr/util/TimeOut.java
index ce996f4..c06fe6e 100644
--- a/solr/core/src/java/org/apache/solr/util/TimeOut.java
+++ b/solr/core/src/java/org/apache/solr/util/TimeOut.java
@@ -61,8 +61,13 @@ public class TimeOut {
public void waitFor(String messageOnTimeOut, Supplier<Boolean> supplier)
throws InterruptedException, TimeoutException {
while (!supplier.get() && !hasTimedOut()) {
- Thread.sleep(500);
+ Thread.sleep(250);
}
if (hasTimedOut()) throw new TimeoutException(messageOnTimeOut);
}
+
+ @Override
+ public String toString() {
+ return "TimeOut [timeoutAt=" + timeoutAt + ", startTime=" + startTime + ", timeSource=" + timeSource + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/test-files/solr/solr-jmxreporter.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/solr-jmxreporter.xml b/solr/core/src/test-files/solr/solr-jmxreporter.xml
index bb9d05d..58c4d0c 100644
--- a/solr/core/src/test-files/solr/solr-jmxreporter.xml
+++ b/solr/core/src/test-files/solr/solr-jmxreporter.xml
@@ -35,6 +35,7 @@
<int name="autoReplicaFailoverWaitAfterExpiration">${autoReplicaFailoverWaitAfterExpiration:10000}</int>
<int name="autoReplicaFailoverWorkLoopDelay">${autoReplicaFailoverWorkLoopDelay:10000}</int>
<int name="autoReplicaFailoverBadNodeExpiration">${autoReplicaFailoverBadNodeExpiration:60000}</int>
+ <int name="createCollectionWaitTimeTillActive">${createCollectionWaitTimeTillActive:30}</int>
</solrcloud>
<metrics>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/test-files/solr/solr.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/solr.xml b/solr/core/src/test-files/solr/solr.xml
index ae27fe7..2c13448 100644
--- a/solr/core/src/test-files/solr/solr.xml
+++ b/solr/core/src/test-files/solr/solr.xml
@@ -27,7 +27,7 @@
<shardHandlerFactory name="shardHandlerFactory" class="HttpShardHandlerFactory">
<str name="urlScheme">${urlScheme:}</str>
- <int name="socketTimeout">${socketTimeout:90000}</int>
+ <int name="socketTimeout">${socketTimeout:15000}</int>
<int name="connTimeout">${connTimeout:15000}</int>
</shardHandlerFactory>
@@ -40,12 +40,12 @@
<str name="host">127.0.0.1</str>
<int name="hostPort">${hostPort:8983}</int>
<str name="hostContext">${hostContext:solr}</str>
- <int name="zkClientTimeout">${solr.zkclienttimeout:30000}</int>
+ <int name="zkClientTimeout">${solr.zkclienttimeout:60000}</int> <!-- This should be high by default - dc's are expensive -->
<bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
- <int name="leaderVoteWait">${leaderVoteWait:10000}</int>
- <int name="leaderConflictResolveWait">${leaderConflictResolveWait:180000}</int>
- <int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:45000}</int>
- <int name="distribUpdateSoTimeout">${distribUpdateSoTimeout:340000}</int>
+ <int name="leaderVoteWait">${leaderVoteWait:15000}</int> <!-- We are running tests - the default should be low, not like production -->
+ <int name="leaderConflictResolveWait">${leaderConflictResolveWait:45000}</int>
+ <int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:5000}</int>
+ <int name="distribUpdateSoTimeout">${distribUpdateSoTimeout:15000}</int> <!-- We are running tests - the default should be low, not like production -->
<int name="autoReplicaFailoverWaitAfterExpiration">${autoReplicaFailoverWaitAfterExpiration:10000}</int>
<int name="autoReplicaFailoverWorkLoopDelay">${autoReplicaFailoverWorkLoopDelay:10000}</int>
<int name="autoReplicaFailoverBadNodeExpiration">${autoReplicaFailoverBadNodeExpiration:60000}</int>