You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/12/12 17:28:31 UTC
[lucene-solr] 04/06: @1242 WIP
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 8a475be1d804e0bcbba203c6f4f16ed50e5a7397
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Dec 11 23:56:25 2020 -0600
@1242 WIP
---
.../ja/JapanesePartOfSpeechStopFilterFactory.java | 8 +-
solr/bin/solr | 102 ++-----
.../prometheus/scraper/SolrCloudScraperTest.java | 1 +
.../client/solrj/embedded/JettySolrRunner.java | 14 +-
.../src/java/org/apache/solr/cloud/Overseer.java | 28 +-
.../OverseerCollectionConfigSetProcessor.java | 6 +-
.../org/apache/solr/cloud/OverseerTaskQueue.java | 15 +-
.../org/apache/solr/cloud/RecoveryStrategy.java | 301 +++++++++++----------
.../org/apache/solr/cloud/ZkCollectionTerms.java | 55 ++--
.../java/org/apache/solr/cloud/ZkController.java | 105 +++----
.../java/org/apache/solr/cloud/ZkShardTerms.java | 4 +-
.../cloud/api/collections/DeleteCollectionCmd.java | 49 +---
.../OverseerCollectionMessageHandler.java | 4 +-
.../solr/cloud/overseer/CollectionMutator.java | 1 -
.../apache/solr/cloud/overseer/ZkStateWriter.java | 2 +
.../apache/solr/core/CachingDirectoryFactory.java | 2 +-
.../src/java/org/apache/solr/core/SolrCore.java | 43 +--
.../src/java/org/apache/solr/core/SolrCores.java | 4 +-
.../src/java/org/apache/solr/core/ZkContainer.java | 2 +-
.../apache/solr/handler/CheckSumFailException.java | 4 +
.../java/org/apache/solr/handler/IndexFetcher.java | 121 ++++-----
.../apache/solr/handler/ReplicationHandler.java | 5 +-
.../solr/handler/admin/CollectionsHandler.java | 4 +-
.../solr/handler/admin/ConfigSetsHandler.java | 2 +-
.../java/org/apache/solr/servlet/HttpSolrCall.java | 5 +-
.../apache/solr/servlet/SolrDispatchFilter.java | 2 +-
.../org/apache/solr/update/SolrCmdDistributor.java | 8 +-
.../org/apache/solr/update/UpdateShardHandler.java | 5 +-
.../processor/DistributedZkUpdateProcessor.java | 4 +-
.../src/java/org/apache/solr/util/ExportTool.java | 2 +
.../datanode/fsdataset/impl/BlockPoolSlice.java | 2 +-
.../solr/cloud/ChaosMonkeyShardSplitTest.java | 4 +-
.../org/apache/solr/cloud/LeaderElectionTest.java | 2 +-
.../apache/solr/cloud/MockSimpleZkController.java | 2 +-
.../test/org/apache/solr/cloud/OverseerTest.java | 4 +-
.../solr/cloud/TestLeaderElectionZkExpiry.java | 3 +-
.../org/apache/solr/cloud/ZkControllerTest.java | 6 +-
.../processor/DistributedUpdateProcessorTest.java | 2 +-
.../processor/RoutedAliasUpdateProcessorTest.java | 24 +-
solr/server/contexts/solr-jetty-context.xml | 1 -
solr/server/etc/jetty-http.xml | 5 +-
solr/server/etc/jetty-https.xml | 7 +-
solr/server/modules/quickstart.mod | 9 +
.../solr/configsets/_default/conf/solrconfig.xml | 2 +-
solr/server/solr/solr.xml | 10 +-
.../solr/client/solrj/impl/Http2SolrClient.java | 12 +-
.../solr/client/solrj/impl/HttpClientUtil.java | 2 +-
.../solr/client/solrj/impl/LBHttp2SolrClient.java | 1 +
.../src/java/org/apache/solr/common/ParWork.java | 2 +-
.../solr/common/cloud/ConnectionManager.java | 37 +--
.../org/apache/solr/common/cloud/SolrZkClient.java | 2 +-
.../apache/solr/common/cloud/ZkCmdExecutor.java | 2 +-
.../solr/common/cloud/ZkMaintenanceUtils.java | 10 +-
.../solr/common/util/SolrQueuedThreadPool.java | 2 +-
.../src/java/org/apache/solr/SolrTestCase.java | 2 +-
solr/webapp/web/WEB-INF/quickstart-web.xml | 0
56 files changed, 505 insertions(+), 558 deletions(-)
diff --git a/lucene/analysis/kuromoji/src/java/org/apache/lucene/analysis/ja/JapanesePartOfSpeechStopFilterFactory.java b/lucene/analysis/kuromoji/src/java/org/apache/lucene/analysis/ja/JapanesePartOfSpeechStopFilterFactory.java
index f0e2a80..0f3941a 100644
--- a/lucene/analysis/kuromoji/src/java/org/apache/lucene/analysis/ja/JapanesePartOfSpeechStopFilterFactory.java
+++ b/lucene/analysis/kuromoji/src/java/org/apache/lucene/analysis/ja/JapanesePartOfSpeechStopFilterFactory.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.analysis.CharArraySet;
import org.apache.lucene.analysis.TokenStream;
@@ -47,7 +48,7 @@ public class JapanesePartOfSpeechStopFilterFactory extends TokenFilterFactory im
public static final String NAME = "japanesePartOfSpeechStop";
private final String stopTagFiles;
- private Set<String> stopTags;
+ private final Set<String> stopTags = ConcurrentHashMap.newKeySet();
/** Creates a new JapanesePartOfSpeechStopFilterFactory */
public JapanesePartOfSpeechStopFilterFactory(Map<String,String> args) {
@@ -65,10 +66,9 @@ public class JapanesePartOfSpeechStopFilterFactory extends TokenFilterFactory im
@Override
public void inform(ResourceLoader loader) throws IOException {
- stopTags = null;
+ stopTags.clear();
CharArraySet cas = getWordSet(loader, stopTagFiles, false);
if (cas != null) {
- stopTags = new HashSet<>();
for (Object element : cas) {
char chars[] = (char[]) element;
stopTags.add(new String(chars));
@@ -79,7 +79,7 @@ public class JapanesePartOfSpeechStopFilterFactory extends TokenFilterFactory im
@Override
public TokenStream create(TokenStream stream) {
// if stoptags is null, it means the file is empty
- if (stopTags != null) {
+ if (stopTags .size() > 0) {
final TokenStream filter = new JapanesePartOfSpeechStopFilter(stream, stopTags);
return filter;
} else {
diff --git a/solr/bin/solr b/solr/bin/solr
index 7055e1c..d2a01a1 100755
--- a/solr/bin/solr
+++ b/solr/bin/solr
@@ -189,6 +189,7 @@ if [ "$SOLR_SSL_ENABLED" == "true" ]; then
echo >&2 "HTTP/2 + SSL is not support in Java 8. "
echo >&2 "Configure Solr with HTTP/1.1 + SSL"
SOLR_JETTY_CONFIG+=("--module=https8")
+ SOLR_JETTY_CONFIG+=("--module=https8")
else
SOLR_JETTY_CONFIG+=("--module=https")
fi
@@ -839,49 +840,15 @@ function stop_solr() {
SOLR_PID="$4"
if [ "$SOLR_PID" != "" ]; then
- echo -e "Sending stop command to Solr running on port $SOLR_PORT ... waiting up to $SOLR_STOP_WAIT seconds to allow Jetty process $SOLR_PID to stop gracefully."
+ echo -e "Sending stop command to Solr running on port $SOLR_PORT ... "
"$JAVA" $SOLR_SSL_OPTS $AUTHC_OPTS -jar "$DIR/start.jar" "STOP.PORT=$THIS_STOP_PORT" "STOP.KEY=$STOP_KEY" --stop || true
- (loops=0
- while true
- do
- CHECK_PID=`ps auxww | awk '{print $2}' | grep -w $SOLR_PID | sort -r | tr -d ' '`
- if [ "$CHECK_PID" != "" ]; then
- slept=$((loops * 1))
- if [ $slept -lt $SOLR_STOP_WAIT ]; then
- sleep 1
- loops=$[$loops+1]
- else
- exit # subshell!
- fi
- else
- exit # subshell!
- fi
- done) &
- spinner $!
+
rm -f "$SOLR_PID_DIR/solr-$SOLR_PORT.pid"
else
echo -e "No Solr nodes found to stop."
exit 0
fi
- CHECK_PID=`ps auxww | awk '{print $2}' | grep -w $SOLR_PID | sort -r | tr -d ' '`
- if [ "$CHECK_PID" != "" ]; then
- if [ "$JSTACK" != "" ]; then
- echo -e "Solr process $SOLR_PID is still running; jstacking it now."
- $JSTACK $SOLR_PID
- fi
- echo -e "Solr process $SOLR_PID is still running; forcefully killing it now."
- kill -9 $SOLR_PID
- echo "Killed process $SOLR_PID"
- rm -f "$SOLR_PID_DIR/solr-$SOLR_PORT.pid"
- sleep 1
- fi
-
- CHECK_PID=`ps auxww | awk '{print $2}' | grep -w $SOLR_PID | sort -r | tr -d ' '`
- if [ "$CHECK_PID" != "" ]; then
- echo "ERROR: Failed to kill previous Solr Java process $SOLR_PID ... script fails."
- exit 1
- fi
} # end stop_solr
if [ $# -eq 1 ]; then
@@ -1918,11 +1885,6 @@ if [[ "$SCRIPT_CMD" == "start" ]]; then
# not found using the pid file ... but use ps to ensure not found
SOLR_PID=`ps auxww | grep start\.jar | grep -w "\-Djetty\.port=$SOLR_PORT" | grep -v grep | awk '{print $2}' | sort -r`
fi
-
- if [ "$SOLR_PID" != "" ]; then
- echo -e "\nPort $SOLR_PORT is already being used by another process (pid: $SOLR_PID)\nPlease choose a different port using the -p option.\n"
- exit 1
- fi
else
# either stop or restart
# see if Solr is already running
@@ -2127,7 +2089,7 @@ fi
# Pick default for Java thread stack size, and then add to SOLR_OPTS
if [ -z ${SOLR_JAVA_STACK_SIZE+x} ]; then
- SOLR_JAVA_STACK_SIZE='-Xss512k'
+ SOLR_JAVA_STACK_SIZE='-Xss256k'
fi
SOLR_OPTS+=($SOLR_JAVA_STACK_SIZE)
@@ -2240,7 +2202,7 @@ function start_solr() {
# users who don't care about useful error msgs can override in SOLR_OPTS with +OmitStackTraceInFastThrow
"${SOLR_HOST_ARG[@]}" "-Duser.timezone=$SOLR_TIMEZONE" "-XX:-OmitStackTraceInFastThrow" \
"-Djetty.home=$SOLR_SERVER_DIR" "-Dsolr.solr.home=$SOLR_HOME" "-Dsolr.data.home=$SOLR_DATA_HOME" "-Dsolr.install.dir=$SOLR_TIP" \
- "-Dorg.apache.xml.dtm.DTMManager=org.apache.xml.dtm.ref.DTMManagerDefault -Dsolr.default.confdir=$DEFAULT_CONFDIR" "${LOG4J_CONFIG[@]}" "${SOLR_OPTS[@]}" "${SECURITY_MANAGER_OPTS[@]}" "${SOLR_ADMIN_UI}")
+ "-Dorg.apache.xml.dtm.DTMManager=org.apache.xml.dtm.ref.DTMManagerDefault -Djava.net.preferIPv4Stack=true -Dsolr.default.confdir=$DEFAULT_CONFDIR" "${LOG4J_CONFIG[@]}" "${SOLR_OPTS[@]}" "${SECURITY_MANAGER_OPTS[@]}" "${SOLR_ADMIN_UI}")
if [ "$SOLR_MODE" == "solrcloud" ]; then
IN_CLOUD_MODE=" in SolrCloud mode"
@@ -2262,54 +2224,26 @@ function start_solr() {
;;
esac
+ # check if /proc/sys/kernel/random/entropy_avail exists then check output of cat /proc/sys/kernel/random/entropy_avail to see if less than 300
+ if [[ -f /proc/sys/kernel/random/entropy_avail ]] && (( `cat /proc/sys/kernel/random/entropy_avail` < 300)); then
+ echo "Warning: Available entropy is low. As a result, use of the UUIDField, SSL, or any other features that require"
+ echo "RNG might not work properly. To check for the amount of available entropy, use 'cat /proc/sys/kernel/random/entropy_avail'."
+ echo ""
+ fi
+
if [ "$run_in_foreground" == "true" ]; then
exec "$JAVA" "${SOLR_START_OPTS[@]}" $SOLR_ADDL_ARGS -XX:-UseBiasedLocking -jar start.jar "${SOLR_JETTY_CONFIG[@]}" $SOLR_JETTY_ADDL_CONFIG
else
# run Solr in the background
nohup "$JAVA" "${SOLR_START_OPTS[@]}" $SOLR_ADDL_ARGS -XX:-UseBiasedLocking -Dsolr.log.muteconsole \
"-XX:OnOutOfMemoryError=$SOLR_TIP/bin/oom_solr.sh $SOLR_PORT $SOLR_LOGS_DIR" \
- -jar start.jar "${SOLR_JETTY_CONFIG[@]}" $SOLR_JETTY_ADDL_CONFIG \
- 1>"$SOLR_LOGS_DIR/solr-$SOLR_PORT-console.log" 2>&1 & echo $! > "$SOLR_PID_DIR/solr-$SOLR_PORT.pid"
-
- # check if /proc/sys/kernel/random/entropy_avail exists then check output of cat /proc/sys/kernel/random/entropy_avail to see if less than 300
- if [[ -f /proc/sys/kernel/random/entropy_avail ]] && (( `cat /proc/sys/kernel/random/entropy_avail` < 300)); then
- echo "Warning: Available entropy is low. As a result, use of the UUIDField, SSL, or any other features that require"
- echo "RNG might not work properly. To check for the amount of available entropy, use 'cat /proc/sys/kernel/random/entropy_avail'."
- echo ""
- fi
- # no lsof on cygwin though
- if lsof -v 2>&1 | grep -q revision; then
- echo -n "Waiting up to $SOLR_STOP_WAIT seconds to see Solr running on port $SOLR_PORT"
- # Launch in a subshell to show the spinner
- (loops=0
- while true
- do
- running=$(lsof -t -PniTCP:$SOLR_PORT -sTCP:LISTEN)
- if [ -z "$running" ]; then
- slept=$((loops / 2))
- if [ $slept -lt $SOLR_STOP_WAIT ]; then
- sleep 0.5
- loops=$[$loops+1]
- else
- echo -e "Still not seeing Solr listening on $SOLR_PORT after $SOLR_STOP_WAIT seconds!"
- tail -30 "$SOLR_LOGS_DIR/solr.log"
- exit # subshell!
- fi
- else
- SOLR_PID=`ps auxww | grep start\.jar | grep -w "\-Djetty\.port=$SOLR_PORT" | grep -v grep | awk '{print $2}' | sort -r`
- echo -e "\nStarted Solr server on port $SOLR_PORT (pid=$SOLR_PID). Happy searching!\n"
- exit # subshell!
- fi
- done) &
- spinner $!
- else
- echo -e "NOTE: Please install lsof as this script needs it to determine if Solr is listening on port $SOLR_PORT."
- sleep 10
- SOLR_PID=`ps auxww | grep start\.jar | grep -w "\-Djetty\.port=$SOLR_PORT" | grep -v grep | awk '{print $2}' | sort -r`
- echo -e "\nStarted Solr server on port $SOLR_PORT (pid=$SOLR_PID). Happy searching!\n"
- return;
+ -jar start.jar "${SOLR_JETTY_CONFIG[@]}" $SOLR_JETTY_ADDL_CONFIG 1>"$SOLR_LOGS_DIR/solr-$SOLR_PORT-console.log" 2>&1 &
+ SOLR_PID=$!
+ echo $SOLR_PID > "$SOLR_PID_DIR/solr-$SOLR_PORT.pid"
+ echo -e "\nStarted Solr server on port $SOLR_PORT (pid=$SOLR_PID). Happy searching!\n"
+
fi
- fi
+
}
start_solr "$FG" "$ADDITIONAL_CMD_OPTS" "$ADDITIONAL_JETTY_CONFIG"
diff --git a/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/SolrCloudScraperTest.java b/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/SolrCloudScraperTest.java
index 571b540..bc1529d 100644
--- a/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/SolrCloudScraperTest.java
+++ b/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/SolrCloudScraperTest.java
@@ -95,6 +95,7 @@ public class SolrCloudScraperTest extends PrometheusExporterTestBase {
super.tearDown();
IOUtils.closeQuietly(solrCloudScraper);
if (null != executor) {
+ executor.shutdown();
executor.shutdownNow();
executor = null;
}
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index e1d1d6d..96c7fb0 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -569,7 +569,7 @@ public class JettySolrRunner implements Closeable {
}
if (getCoreContainer() != null && System.getProperty("zkHost") != null && wait) {
- SolrZkClient zkClient = getCoreContainer().getZkController().getZkStateReader().getZkClient();
+ SolrZkClient zkClient = getCoreContainer().getZkController().getZkClient();
CountDownLatch latch = new CountDownLatch(1);
Watcher watcher = new ClusterReadyWatcher(latch, zkClient);
@@ -594,12 +594,12 @@ public class JettySolrRunner implements Closeable {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
}
-
- log.info("waitForNode: {}", getNodeName());
-
- ZkStateReader reader = getCoreContainer().getZkController().getZkStateReader();
-
- reader.waitForLiveNodes(30, TimeUnit.SECONDS, (n) -> n != null && getNodeName() != null && n.contains(getNodeName()));
+// if we need this, us client, not reader
+// log.info("waitForNode: {}", getNodeName());
+//
+// ZkStateReader reader = getCoreContainer().getZkController().getZkStateReader();
+//
+// reader.waitForLiveNodes(30, TimeUnit.SECONDS, (n) -> n != null && getNodeName() != null && n.contains(getNodeName()));
}
} finally {
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 8c79436..ef0a8f2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -41,7 +41,6 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
-import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
@@ -233,8 +232,6 @@ public class Overseer implements SolrCloseable {
private volatile ZkStateWriter zkStateWriter;
- private final ZkStateReader reader;
-
private final UpdateShardHandler updateShardHandler;
private final String adminPath;
@@ -254,9 +251,7 @@ public class Overseer implements SolrCloseable {
public volatile LBHttp2SolrClient overseerLbClient;
// overseer not responsible for closing reader
- public Overseer(UpdateShardHandler updateShardHandler, String adminPath,
- final ZkStateReader reader, ZkController zkController, CloudConfig config) {
- this.reader = reader;
+ public Overseer(UpdateShardHandler updateShardHandler, String adminPath, ZkController zkController, CloudConfig config) {
this.updateShardHandler = updateShardHandler;
this.adminPath = adminPath;
this.zkController = zkController;
@@ -292,7 +287,7 @@ public class Overseer implements SolrCloseable {
// stateManagmentExecutor = ParWork.getParExecutorService("stateManagmentExecutor",
// 1, 1, 3000, new SynchronousQueue());
taskExecutor = ParWork.getParExecutorService("overseerTaskExecutor",
- 10, 32, 1000, new SynchronousQueue());
+ 3, 32, 1000, new SynchronousQueue());
// try {
// if (context != null) context.close();
@@ -300,7 +295,7 @@ public class Overseer implements SolrCloseable {
// log.error("", e);
// }
if (overseerOnlyClient == null && !closeAndDone && !initedHttpClient) {
- overseerOnlyClient = new Http2SolrClient.Builder().idleTimeout(500000).markInternalRequest().build();
+ overseerOnlyClient = new Http2SolrClient.Builder().idleTimeout(60000).connectionTimeout(5000).markInternalRequest().build();
overseerOnlyClient.enableCloseLock();
this.overseerLbClient = new LBHttp2SolrClient(overseerOnlyClient);
initedHttpClient = true;
@@ -342,7 +337,7 @@ public class Overseer implements SolrCloseable {
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
- this.zkStateWriter = new ZkStateWriter(reader, stats);
+ this.zkStateWriter = new ZkStateWriter( zkController.getZkStateReader(), stats);
//systemCollectionCompatCheck(new StringBiConsumer());
queueWatcher = new WorkQueueWatcher(getCoreContainer());
@@ -512,7 +507,6 @@ public class Overseer implements SolrCloseable {
collectionQueueWatcher.close();
}
- this.zkStateWriter = null;
if (!cd) {
boolean retry;
synchronized (this) {
@@ -552,11 +546,13 @@ public class Overseer implements SolrCloseable {
}
taskExecutor.shutdownNow();
- ExecutorUtil.shutdownAndAwaitTermination(taskExecutor);
+ // ExecutorUtil.shutdownAndAwaitTermination(taskExecutor);
}
}
+ this.zkStateWriter = null;
+
if (log.isDebugEnabled()) {
log.debug("doClose - end");
}
@@ -598,7 +594,7 @@ public class Overseer implements SolrCloseable {
* @return a {@link ZkDistributedQueue} object
*/
ZkDistributedQueue getStateUpdateQueue(Stats zkStats) {
- return new ZkDistributedQueue(reader.getZkClient(), "/overseer/queue", zkStats, STATE_UPDATE_MAX_QUEUE, new ConnectionManager.IsClosed(){
+ return new ZkDistributedQueue(zkController.getZkClient(), "/overseer/queue", zkStats, STATE_UPDATE_MAX_QUEUE, new ConnectionManager.IsClosed(){
public boolean isClosed() {
return Overseer.this.isClosed() || zkController.getCoreContainer().isShutDown(); // nocommit use
}
@@ -711,7 +707,7 @@ public class Overseer implements SolrCloseable {
}
public ZkStateReader getZkStateReader() {
- return reader;
+ return zkController.getZkStateReader();
}
public ZkStateWriter getZkStateWriter() {
@@ -880,9 +876,9 @@ public class Overseer implements SolrCloseable {
super(cc, Overseer.OVERSEER_COLLECTION_QUEUE_WORK);
collMessageHandler = new OverseerCollectionMessageHandler(cc, myId, overseerLbClient, adminPath, stats, overseer);
configMessageHandler = new OverseerConfigSetMessageHandler(cc);
- failureMap = Overseer.getFailureMap(cc.getZkController().getZkStateReader().getZkClient());
- runningMap = Overseer.getRunningMap(cc.getZkController().getZkStateReader().getZkClient());
- completedMap = Overseer.getCompletedMap(cc.getZkController().getZkStateReader().getZkClient());
+ failureMap = Overseer.getFailureMap(cc.getZkController().getZkClient());
+ runningMap = Overseer.getRunningMap(cc.getZkController().getZkClient());
+ completedMap = Overseer.getCompletedMap(cc.getZkController().getZkClient());
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
index 891a0c7..f7737cd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
@@ -33,9 +33,9 @@ import java.io.IOException;
public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor {
public OverseerCollectionConfigSetProcessor(CoreContainer cc, String myId, LBHttp2SolrClient overseerLbClient, String adminPath, Stats stats, Overseer overseer) throws KeeperException {
- this(cc, myId, overseerLbClient, adminPath, stats, overseer, overseer.getCollectionQueue(cc.getZkController().getZkStateReader().getZkClient(), stats),
- Overseer.getRunningMap(cc.getZkController().getZkStateReader().getZkClient()), Overseer.getCompletedMap(cc.getZkController().getZkStateReader().getZkClient()),
- Overseer.getFailureMap(cc.getZkController().getZkStateReader().getZkClient()));
+ this(cc, myId, overseerLbClient, adminPath, stats, overseer, overseer.getCollectionQueue(cc.getZkController().getZkClient(), stats),
+ Overseer.getRunningMap(cc.getZkController().getZkClient()), Overseer.getCompletedMap(cc.getZkController().getZkClient()),
+ Overseer.getFailureMap(cc.getZkController().getZkClient()));
}
protected OverseerCollectionConfigSetProcessor(CoreContainer cc, String myId, LBHttp2SolrClient overseerLbClient, String adminPath, Stats stats, Overseer overseer, OverseerTaskQueue workQueue,
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 468b102..5fbfe77 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -166,9 +166,9 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
try {
Stat stat = zkClient.exists(path, this, true);
if (stat != null && stat.getDataLength() > 0) {
- this.event = new WatchedEvent(Event.EventType.NodeDataChanged, Event.KeeperState.SyncConnected, path);
lock.lock();
try {
+ this.event = new WatchedEvent(Event.EventType.NodeDataChanged, Event.KeeperState.SyncConnected, path);
eventReceived.signalAll();
} finally {
lock.unlock();
@@ -180,15 +180,16 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
}
}
- public void await(long timeoutMs) throws InterruptedException {
+ public void await(long timeoutMs) {
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
lock.lock();
try {
- if (this.event != null) {
- return;
- }
while (!timeout.hasTimedOut() && event == null && !closed) {
- eventReceived.await(500, TimeUnit.MILLISECONDS);
+ try {
+ eventReceived.await(500, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+
+ }
}
if (timeout.hasTimedOut()) {
@@ -207,7 +208,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
public void close() throws IOException {
this.closed = true;
try {
- zkClient.getSolrZooKeeper().removeWatches(path, this, Watcher.WatcherType.Any, true);
+ zkClient.getSolrZooKeeper().removeWatches(path, this, WatcherType.Data, true);
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 9337712..f8db3eb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -78,8 +78,8 @@ import java.util.concurrent.atomic.AtomicInteger;
public class RecoveryStrategy implements Runnable, Closeable {
private volatile CountDownLatch latch;
- private volatile ReplicationHandler replicationHandler;
- private volatile Http2SolrClient recoveryOnlyClient;
+ private final ReplicationHandler replicationHandler;
+ private final Http2SolrClient recoveryOnlyClient;
public static class Builder implements NamedListInitializedPlugin {
private NamedList args;
@@ -136,6 +136,17 @@ public class RecoveryStrategy implements Runnable, Closeable {
this.cc = cc;
this.coreName = cd.getName();
+ try (SolrCore core = cc.getCore(coreName)) {
+ if (core == null) {
+ log.warn("SolrCore is null, won't do recovery");
+ throw new AlreadyClosedException();
+ }
+ recoveryOnlyClient = core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyClient();
+ SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
+ replicationHandler = (ReplicationHandler) handler;
+
+ }
+
this.recoveryListener = recoveryListener;
zkController = cc.getZkController();
zkStateReader = zkController.getZkStateReader();
@@ -182,34 +193,26 @@ public class RecoveryStrategy implements Runnable, Closeable {
final public void close() {
close = true;
- try (ParWork closer = new ParWork(this, true, true)) {
- closer.collect("prevSendPreRecoveryHttpUriRequestAbort", () -> {
- try {
- if (prevSendPreRecoveryHttpUriRequest != null) {
- prevSendPreRecoveryHttpUriRequest.cancel();
- }
- prevSendPreRecoveryHttpUriRequest = null;
- } catch (NullPointerException e) {
- // expected
- }
- });
-
- if (replicationHandler != null) {
- ReplicationHandler finalReplicationHandler = replicationHandler;
- closer.collect("abortFetch", () -> {
- if (finalReplicationHandler != null) finalReplicationHandler.abortFetch();
- replicationHandler = null;
- });
- }
- if (latch != null) {
- closer.collect("latch", () -> {
- try {
- latch.countDown();
- } catch (NullPointerException e) {
- // expected
- }
- });
- }
+ //
+ //
+ // try {
+ // if (prevSendPreRecoveryHttpUriRequest != null) {
+ // prevSendPreRecoveryHttpUriRequest.cancel();
+ // }
+ // prevSendPreRecoveryHttpUriRequest = null;
+ // } catch (NullPointerException e) {
+ // // expected
+ // }
+ //
+ //
+ ReplicationHandler finalReplicationHandler = replicationHandler;
+ if (finalReplicationHandler != null) {
+
+ finalReplicationHandler.abortFetch();
+ }
+ if (latch != null) {
+
+ latch.countDown();
}
@@ -239,21 +242,16 @@ public class RecoveryStrategy implements Runnable, Closeable {
return leaderprops.getCoreUrl();
}
- final private void replicate(Replica leaderprops)
+ final private IndexFetcher.IndexFetchResult replicate(Replica leaderprops)
throws SolrServerException, IOException {
- final String leaderUrl = getReplicateLeaderUrl(leaderprops, zkStateReader);
-
log.info("Attempting to replicate from [{}].", leaderprops);
+ final String leaderUrl = getReplicateLeaderUrl(leaderprops, zkStateReader);
+
// send commit
- commitOnLeader(leaderUrl);
- if (replicationHandler == null) {
- log.error("Could not find replication handler for recovery");
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
- "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
- }
+ commitOnLeader(leaderUrl);
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
@@ -265,20 +263,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("do replication fetch [{}].", solrParams);
- IndexFetcher.IndexFetchResult result = replicationHandler.doFetch(solrParams, false);
-
- if (result.getMessage().equals(IndexFetcher.IndexFetchResult.FAILED_BY_INTERRUPT_MESSAGE)) {
- log.info("Interrupted, stopping recovery");
+ IndexFetcher.IndexFetchResult result = replicationHandler.doFetch(solrParams, retries.get() > 5);
- }
-
- if (result.getSuccessful()) {
- log.info("replication fetch reported as success");
- success= true;
- } else {
- log.error("replication fetch reported as failed: {} {} {}", result.getMessage(), result, result.getException());
- throw new SolrException(ErrorCode.SERVER_ERROR, "Replication fetch reported as failed");
- }
+ return result;
// solrcloud_debug
// if (log.isDebugEnabled()) {
@@ -319,13 +306,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
ureq.setBasePath(leaderUrl);
ureq.setParams(new ModifiableSolrParams());
ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, "terminal");
- //ureq.getParams().set("dist", false);
+ // ureq.getParams().set("dist", false);
// ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);// Why do we need to open searcher if
// "onlyLeaderIndexes"?
ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
log.info("send commit to leader {} {}", leaderUrl, ureq.getParams());
ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false).process(recoveryOnlyClient);
+ log.info("done send commit to leader {} {}", leaderUrl);
}
@Override
@@ -390,19 +378,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
// it will close channels
// though
try {
- try (SolrCore core = cc.getCore(coreName)) {
- if (core == null) {
- log.warn("SolrCore is null, won't do recovery");
- throw new AlreadyClosedException();
- }
- recoveryOnlyClient = core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyClient();
- SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
- replicationHandler = (ReplicationHandler) handler;
- }
CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
Replica leaderprops = zkStateReader.getLeaderRetry(
- cloudDesc.getCollectionName(), cloudDesc.getShardId(), 15000);
+ cloudDesc.getCollectionName(), cloudDesc.getShardId(), 5000);
log.info("Starting Replication Recovery. [{}] leader is [{}] and I am [{}]", coreName, leaderprops.getName(), Replica.getCoreUrl(baseUrl, coreName));
log.info("");
@@ -410,19 +389,26 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
log.info("Stopping background replicate from leader process");
zkController.stopReplicationFromLeader(coreName);
- replicate(leaderprops);
+ IndexFetcher.IndexFetchResult result = replicate(leaderprops);
+
+ if (result.getSuccessful()) {
+ log.info("replication fetch reported as success");
+ } else {
+ log.error("replication fetch reported as failed: {} {} {}", result.getMessage(), result, result.getException());
+ successfulRecovery = false;
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Replication fetch reported as failed");
+ }
log.info("Replication Recovery was successful.");
successfulRecovery = true;
} catch (Exception e) {
- ParWork.propagateInterrupt(e);
- SolrException.log(log, "Error while trying to recover", e);
- return;
+ log.error("Error while trying to recover", e);
+ successfulRecovery = false;
}
} catch (Exception e) {
- ParWork.propagateInterrupt(e);
- SolrException.log(log, "Error while trying to recover. core=" + coreName, e);
+ log.error("Error while trying to recover. core=" + coreName, e);
+ successfulRecovery = false;
} finally {
if (successfulRecovery) {
log.info("Restarting background replicate from leader process");
@@ -431,7 +417,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
} catch (Exception e) {
- ParWork.propagateInterrupt(e);
log.error("Could not publish as ACTIVE after succesful recovery", e);
successfulRecovery = false;
}
@@ -450,29 +435,27 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.error("Recovery failed - trying again... ({})", retries);
-
if (retries.incrementAndGet() >= maxRetries) {
- SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
+ close = true;
+ log.error("Recovery failed - max retries exceeded (" + retries + ").");
try {
recoveryFailed(zkController, baseUrl, this.coreDescriptor);
} catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- return;
+
} catch (Exception e) {
- ParWork.propagateInterrupt(e);
- SolrException.log(log, "Could not publish that recovery failed", e);
+ log.error("Could not publish that recovery failed", e);
}
- break;
}
} catch (Exception e) {
- ParWork.propagateInterrupt(e);
- SolrException.log(log, "An error has occurred during recovery", e);
- }
- if (!successfulRecovery) {
- waitForRetry();
+ log.error("An error has occurred during recovery", e);
}
}
+ if (!successfulRecovery) {
+ waitForRetry();
+ } else {
+ break;
+ }
}
// We skip core.seedVersionBuckets(); We don't have a transaction log
log.info("Finished recovery process, successful=[{}]", successfulRecovery);
@@ -492,14 +475,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
log.warn("SolrCore is null, won't do recovery");
+ close = true;
throw new AlreadyClosedException();
}
- recoveryOnlyClient = core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyClient();
- SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
- replicationHandler = (ReplicationHandler) handler;
+
ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) {
SolrException.log(log, "No UpdateLog found - cannot recover.");
+ close = true;
recoveryFailed(zkController, baseUrl, this.coreDescriptor);
return;
}
@@ -512,8 +495,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
} catch (Exception e) {
- ParWork.propagateInterrupt(e);
- SolrException.log(log, "Corrupt tlog - ignoring.", e);
+ log.error("Corrupt tlog - ignoring.", e);
recentVersions = new ArrayList<>(0);
}
@@ -545,8 +527,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
}
} catch (Exception e) {
- ParWork.propagateInterrupt(e);
- SolrException.log(log, "Error getting recent versions.", e);
+ log.error("Error getting recent versions.", e);
recentVersions = Collections.emptyList();
}
}
@@ -586,13 +567,13 @@ public class RecoveryStrategy implements Runnable, Closeable {
// recalling buffer updates will drop the old buffer tlog
ulog.bufferUpdates();
- try {
- if (prevSendPreRecoveryHttpUriRequest != null) {
- prevSendPreRecoveryHttpUriRequest.cancel();
- }
- } catch (NullPointerException e) {
- // okay
- }
+// try {
+// if (prevSendPreRecoveryHttpUriRequest != null) {
+// prevSendPreRecoveryHttpUriRequest.cancel();
+// }
+// } catch (NullPointerException e) {
+// // okay
+// }
// sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getName(), slice);
@@ -601,12 +582,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
// are sure to have finished (see SOLR-7141 for
// discussion around current value)
// TODO since SOLR-11216, we probably won't need this
- try {
- Thread.sleep(waitForUpdatesWithStaleStatePauseMilliSeconds);
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
+// try {
+// Thread.sleep(waitForUpdatesWithStaleStatePauseMilliSeconds);
+// } catch (InterruptedException e) {
+// ParWork.propagateInterrupt(e);
+// throw new SolrException(ErrorCode.SERVER_ERROR, e);
+// }
// first thing we just try to sync
if (firstTime) {
@@ -617,7 +598,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
log.warn("SolrCore is null, won't do recovery");
- throw new AlreadyClosedException();
+ close = true;
+ successfulRecovery = false;
}
// System.out.println("Attempting to PeerSync from " + leaderUrl
@@ -653,7 +635,15 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
- replicate(leader);
+ IndexFetcher.IndexFetchResult result = replicate(leader);
+
+ if (result.getSuccessful()) {
+ log.info("replication fetch reported as success");
+ } else {
+ log.error("replication fetch reported as failed: {} {} {}", result.getMessage(), result, result.getException());
+ successfulRecovery = false;
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Replication fetch reported as failed");
+ }
replay();
@@ -661,8 +651,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
successfulRecovery = true;
} catch (InterruptedException | AlreadyClosedException e) {
log.info("Interrupted or already closed, bailing on recovery");
- throw new AlreadyClosedException();
+ close = true;
+ successfulRecovery = false;
} catch (Exception e) {
+ successfulRecovery = false;
log.error("Error while trying to recover", e);
}
}
@@ -676,39 +668,45 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (replicaType == Replica.Type.TLOG) {
zkController.startReplicationFromLeader(coreName, true);
}
- publishedActive = true;
+
+ // if replay was skipped (possibly to due pulling a full index from the leader),
+ // then we still need to update version bucket seeds after recovery
+ if (successfulRecovery && replayFuture == null) {
+ log.info("Updating version bucket highest from index after successful recovery.");
+ try (SolrCore core = cc.getCore(coreName)) {
+ if (core == null) {
+ log.warn("SolrCore is null, won't do recovery");
+ successfulRecovery = false;
+ } else {
+ core.seedVersionBuckets();
+ }
+ }
+ }
+
zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
- } catch (AlreadyClosedException e) {
+ publishedActive = true;
+ } catch (AlreadyClosedException e) {
+ log.error("Already closed");
+ successfulRecovery = false;
} catch (Exception e) {
- log.error("Could not publish as ACTIVE after succesful recovery", e);
+ log.error("Could not publish as ACTIVE after successful recovery", e);
+ successfulRecovery = false;
// core.getSolrCoreState().doRecovery(core);
}
} else {
- log.info("Recovery was not sucessful, will not register as ACTIVE {}", coreName);
+ log.info("Recovery was not successful, will not register as ACTIVE {}", coreName);
}
if (successfulRecovery) {
recoveryListener.recovered();
}
- // if replay was skipped (possibly to due pulling a full index from the leader),
- // then we still need to update version bucket seeds after recovery
- if (successfulRecovery && replayFuture == null) {
- log.info("Updating version bucket highest from index after successful recovery.");
- try (SolrCore core = cc.getCore(coreName)) {
- if (core == null) {
- log.warn("SolrCore is null, won't do recovery");
- throw new AlreadyClosedException();
- }
- core.seedVersionBuckets();
- }
- }
}
- if (!successfulRecovery && !isClosed()) {
+ if (!successfulRecovery) {
// lets pause for a moment and we need to try again...
// TODO: we don't want to retry for some problems?
// Or do a fall off retry...
@@ -717,23 +715,24 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (retries.incrementAndGet() >= maxRetries) {
SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
+ close = true;
try {
recoveryFailed(zkController, baseUrl, this.coreDescriptor);
} catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- return;
+
} catch (Exception e) {
- SolrException.log(log, "Could not publish that recovery failed", e);
+ log.error("Could not publish that recovery failed", e);
}
- break;
}
} catch (Exception e) {
- SolrException.log(log, "An error has occurred during recovery", e);
+ log.error("An error has occurred during recovery", e);
}
}
if (!successfulRecovery) {
waitForRetry();
+ } else {
+ break;
}
}
@@ -746,35 +745,35 @@ public class RecoveryStrategy implements Runnable, Closeable {
private final void waitForRetry() {
try {
-
+ if (close) return;
long wait = startingRecoveryDelayMilliSeconds;
if (retries.get() >= 0 && retries.get() < 10) {
- wait = 0;
- } else if (retries.get() >= 10 && retries.get() < 20) {
+ wait = 20;
+ } else if (retries.get() >= 10 && retries.get() < 30) {
wait = 1500;
- } else if (retries.get() > 0) {
- wait = TimeUnit.SECONDS.toMillis(60);
+ } else {
+ wait = 10000;
}
log.info("Wait [{}] ms before trying to recover again (attempt={})", wait, retries);
- if (wait > 1000) {
- TimeOut timeout = new TimeOut(wait, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
- while (!timeout.hasTimedOut()) {
- if (isClosed()) {
- log.info("RecoveryStrategy has been closed");
- throw new AlreadyClosedException();
- }
+ TimeOut timeout = new TimeOut(wait, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ while (!timeout.hasTimedOut()) {
+ if (isClosed()) {
+ log.info("RecoveryStrategy has been closed");
+ return;
+ }
+ if (wait > 1000) {
Thread.sleep(1000);
+ } else {
+ Thread.sleep(wait);
}
- } else {
- Thread.sleep(wait);
+
}
} catch (InterruptedException e) {
- ParWork.propagateInterrupt(e, true);
- throw new AlreadyClosedException();
+
}
}
@@ -789,13 +788,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
log.warn("SolrCore is null, won't do recovery");
+ close = true;
throw new AlreadyClosedException();
}
if (replicaType == Replica.Type.TLOG) {
// roll over all updates during buffering to new tlog, make RTG available
- SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
- core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false));
- req.close();
+ try (SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams())) {
+ core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false));
+ }
}
Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
if (future == null) {
@@ -809,8 +809,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
report = future.get(10, TimeUnit.MINUTES); // nocommit - how long? make configurable too
} catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- throw new InterruptedException();
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
} catch (TimeoutException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
@@ -821,7 +820,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
// the index may ahead of the tlog's caches after recovery, by calling this tlog's caches will be purged
- core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ if (ulog != null) {
+ ulog.openRealtimeSearcher();
+ }
}
// solrcloud_debug
// cloudDebugLog(core, "replayed");
@@ -889,6 +891,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Timeout waiting for prep recovery cmd on leader");
}
} catch (InterruptedException e) {
+ close = true;
ParWork.propagateInterrupt(e);
} finally {
prevSendPreRecoveryHttpUriRequest = null;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
index 4b123d4..54d762d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
@@ -23,15 +23,18 @@ import org.apache.solr.core.CoreDescriptor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
/**
* Used to manage all ZkShardTerms of a collection
*/
class ZkCollectionTerms implements AutoCloseable {
private final String collection;
- private final Map<String, ZkShardTerms> terms;
+ private final Map<String,ZkShardTerms> terms;
+
+ private final ReentrantLock collectionToTermsLock = new ReentrantLock(true);
+
private final SolrZkClient zkClient;
- private volatile boolean closed;
ZkCollectionTerms(String collection, SolrZkClient client) {
this.collection = collection;
@@ -40,19 +43,26 @@ class ZkCollectionTerms implements AutoCloseable {
assert ObjectReleaseTracker.track(this);
}
-
ZkShardTerms getShard(String shardId) {
- synchronized (terms) {
+ collectionToTermsLock.lock();
+ try {
if (!terms.containsKey(shardId)) {
terms.put(shardId, new ZkShardTerms(collection, shardId, zkClient));
}
return terms.get(shardId);
+ } finally {
+ collectionToTermsLock.unlock();
}
}
public ZkShardTerms getShardOrNull(String shardId) {
- if (!terms.containsKey(shardId)) return null;
- return terms.get(shardId);
+ collectionToTermsLock.lock();
+ try {
+ if (!terms.containsKey(shardId)) return null;
+ return terms.get(shardId);
+ } finally {
+ collectionToTermsLock.unlock();
+ }
}
public void register(String shardId, String coreNodeName) {
@@ -60,31 +70,42 @@ class ZkCollectionTerms implements AutoCloseable {
}
public void remove(String shardId, CoreDescriptor coreDescriptor) {
- ZkShardTerms zterms = getShardOrNull(shardId);
- if (zterms != null) {
- if (zterms.removeTerm(coreDescriptor)) {
- terms.remove(shardId).close();
+ collectionToTermsLock.lock();
+ try {
+ ZkShardTerms zterms = getShardOrNull(shardId);
+ if (zterms != null) {
+ if (zterms.removeTerm(coreDescriptor)) {
+ terms.remove(shardId).close();
+ }
}
+ } finally {
+ collectionToTermsLock.unlock();
}
}
public void close() {
- synchronized (terms) {
- this.closed = true;
-
+ collectionToTermsLock.lock();
+ try {
terms.values().forEach(ZkShardTerms::close);
terms.clear();
+ } finally {
+ collectionToTermsLock.unlock();
}
assert ObjectReleaseTracker.release(this);
}
public boolean cleanUp() {
- for (ZkShardTerms zkShardTerms : terms.values()) {
- if (zkShardTerms.getTerms().size() > 0) {
- return false;
+ collectionToTermsLock.lock();
+ try {
+ for (ZkShardTerms zkShardTerms : terms.values()) {
+ if (zkShardTerms.getTerms().size() > 0) {
+ return false;
+ }
}
+ return true;
+ } finally {
+ collectionToTermsLock.unlock();
}
- return true;
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 35d416d..375bea0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -147,7 +147,7 @@ public class ZkController implements Closeable, Runnable {
public final int WAIT_FOR_STATE = Integer.getInteger("solr.waitForState", 10);
private final int zkClientConnectTimeout;
- private final Supplier<List<CoreDescriptor>> descriptorsSupplier;
+
private final ZkACLProvider zkACLProvider;
private CloseTracker closeTracker;
@@ -263,10 +263,10 @@ public class ZkController implements Closeable, Runnable {
private volatile LeaderElector overseerElector;
- private final Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>(132, 0.75f, 50);
- private final Map<String, ZkCollectionTerms> collectionToTerms = new ConcurrentHashMap<>(132, 0.75f, 50);
+ private final Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>(16, 0.75f, 3);
+ private final Map<String, ZkCollectionTerms> collectionToTerms = new ConcurrentHashMap<>(16, 0.75f, 3);
- private final ReentrantLock collectionToTermsLock = new ReentrantLock(false);
+ private final ReentrantLock collectionToTermsLock = new ReentrantLock(true);
// for now, this can be null in tests, in which case recovery will be inactive, and other features
// may accept defaults or use mocks rather than pulling things from a CoreContainer
@@ -285,7 +285,7 @@ public class ZkController implements Closeable, Runnable {
private final Object initLock = new Object();
- private final ConcurrentHashMap<String, Throwable> replicasMetTragicEvent = new ConcurrentHashMap<>(132, 0.75f, 12);
+ private final ConcurrentHashMap<String, Throwable> replicasMetTragicEvent = new ConcurrentHashMap<>(16, 0.75f, 1);
@Deprecated
// keeps track of replicas that have been asked to recover by leaders running on this node
@@ -352,24 +352,22 @@ public class ZkController implements Closeable, Runnable {
}
- public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, final Supplier<List<CoreDescriptor>> descriptorsSupplier) throws InterruptedException, IOException, TimeoutException {
- this(cc, new SolrZkClient(), cloudConfig, descriptorsSupplier);
+ public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig) throws InterruptedException, IOException, TimeoutException {
+ this(cc, new SolrZkClient(), cloudConfig);
this.closeZkClient = true;
}
/**
* @param cc Core container associated with this controller. cannot be null.
* @param cloudConfig configuration for this controller. TODO: possibly redundant with CoreContainer
- * @param descriptorsSupplier a supplier of the current core descriptors. used to know which cores to re-register on reconnect
*/
- public ZkController(final CoreContainer cc, SolrZkClient zkClient, CloudConfig cloudConfig, final Supplier<List<CoreDescriptor>> descriptorsSupplier)
+ public ZkController(final CoreContainer cc, SolrZkClient zkClient, CloudConfig cloudConfig)
throws InterruptedException, TimeoutException, IOException {
assert (closeTracker = new CloseTracker()) != null;
if (cc == null) log.error("null corecontainer");
if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
try {
this.cc = cc;
- this.descriptorsSupplier = descriptorsSupplier;
this.cloudConfig = cloudConfig;
this.zkClientConnectTimeout = zkClient.getZkClientTimeout();
this.genericCoreNodeNames = cloudConfig.getGenericCoreNodeNames();
@@ -417,6 +415,22 @@ public class ZkController implements Closeable, Runnable {
started = true;
+ this.overseer = new Overseer(cc.getUpdateShardHandler(), CommonParams.CORES_HANDLER_PATH, this, cloudConfig);
+ try {
+ this.overseerRunningMap = Overseer.getRunningMap(zkClient);
+
+ this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
+ this.overseerFailureMap = Overseer.getFailureMap(zkClient);
+ this.asyncIdsMap = Overseer.getAsyncIdsMap(zkClient);
+ } catch (KeeperException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ this.overseerJobQueue = overseer.getStateUpdateQueue();
+ this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient);
+ this.overseerConfigSetQueue = overseer.getConfigSetQueue(zkClient);
+ statePublisher = new StatePublisher(overseerJobQueue);
+ statePublisher.start();
+
boolean isRegistered = SolrLifcycleListener.isRegistered(this);
if (!isRegistered) {
SolrLifcycleListener.registerShutdown(this);
@@ -429,13 +443,6 @@ public class ZkController implements Closeable, Runnable {
zkClient.getConnectionManager().setZkCredentialsToAddAutomatically(new DefaultZkCredentialsProvider());
}
addOnReconnectListener(getConfigDirListener());
- zkClient.getConnectionManager().setBeforeReconnect(new BeforeReconnect() {
-
- @Override
- public synchronized void command() {
- clearZkCollectionTerms();
- }
- });
zkClient.setAclProvider(zkACLProvider);
zkClient.getConnectionManager().setOnReconnect(new OnReconnect() {
@@ -450,8 +457,11 @@ public class ZkController implements Closeable, Runnable {
ParWork.getRootSharedExecutor().submit(() -> {
log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
try {
+
+ removeEphemeralLiveNode();
+
// recreate our watchers first so that they exist even on any problems below
- zkStateReader.createClusterStateWatchersAndUpdate();
+ zkStateReader.createClusterStateWatchersAndUpdate();
// this is troublesome - we dont want to kill anything the old
// leader accepted
@@ -468,7 +478,7 @@ public class ZkController implements Closeable, Runnable {
overseerElector.retryElection(false);
- List<CoreDescriptor> descriptors = descriptorsSupplier.get();
+ List<CoreDescriptor> descriptors = getCoreContainer().getCoreDescriptors();
// re register all descriptors
if (descriptors != null) {
@@ -528,7 +538,8 @@ public class ZkController implements Closeable, Runnable {
@Override
public boolean isClosed() {
return cc.isShutDown();
- }});
+ }
+ });
zkClient.setDisconnectListener(() -> {
try (ParWork worker = new ParWork("disconnected", true, true)) {
worker.collect(ZkController.this.overseerElector);
@@ -1160,14 +1171,7 @@ public class ZkController implements Closeable, Runnable {
zkStateReader.createClusterStateWatchersAndUpdate();
- this.overseer = new Overseer(cc.getUpdateShardHandler(), CommonParams.CORES_HANDLER_PATH, zkStateReader, this, cloudConfig);
- this.overseerRunningMap = Overseer.getRunningMap(zkClient);
- this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
- this.overseerFailureMap = Overseer.getFailureMap(zkClient);
- this.asyncIdsMap = Overseer.getAsyncIdsMap(zkClient);
- this.overseerJobQueue = overseer.getStateUpdateQueue();
- this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient);
- this.overseerConfigSetQueue = overseer.getConfigSetQueue(zkClient);
+
this.sysPropsCacher = new NodesSysPropsCacher(getSolrCloudManager().getNodeStateProvider(), getNodeName(), zkStateReader);
overseerElector = new LeaderElector(this, new ContextKey("overseer", "overseer"));
//try (ParWork worker = new ParWork(this, false, true)) {
@@ -1196,8 +1200,6 @@ public class ZkController implements Closeable, Runnable {
// }
// });
//}
- statePublisher = new StatePublisher(overseerJobQueue);
- statePublisher.start();
// nocommit
//publishDownStates();
@@ -1289,7 +1291,7 @@ public class ZkController implements Closeable, Runnable {
zkClient.getSolrZooKeeper().create(nodePath, null, zkClient.getZkACLProvider().getACLsToAdd(nodePath), CreateMode.EPHEMERAL);
zkClient.setData(ZkStateReader.LIVE_NODES_ZKNODE, (byte[]) null, true);
} catch (KeeperException.NodeExistsException e) {
- log.warn("Found our ephemeral live node already exists. This must be a quick restart after a hard shutdown ... {}", nodePath);
+ log.warn("Found our ephemeral live node already exists. This must be a quick restart after a hard shutdown? ... {}", nodePath);
// TODO nocommit wait for expiration properly and try again?
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
@@ -1307,6 +1309,8 @@ public class ZkController implements Closeable, Runnable {
zkClient.setData(ZkStateReader.LIVE_NODES_ZKNODE, (byte[]) null, true);
} catch (NoNodeException e) {
// okay
+ } catch (Exception e) {
+ log.warn("Could not remove ephemeral live node {}", nodePath, e);
}
}
@@ -1348,20 +1352,23 @@ public class ZkController implements Closeable, Runnable {
}
MDCLoggingContext.setCoreDescriptor(cc, desc);
String coreName = core.getName();
- LeaderElector leaderElector = leaderElectors.get(coreName);
- if (core.isClosing() || cc.isShutDown() || (leaderElector != null && leaderElector.isClosed())) {
+
+ if (core.isClosing() || cc.isShutDown()) {
throw new AlreadyClosedException();
}
boolean success = false;
try {
- // pre register has published our down state
-
final String baseUrl = getBaseUrl();
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
final String collection = cloudDesc.getCollectionName();
final String shardId = cloudDesc.getShardId();
+
+ log.info("Register terms for replica {}", coreName);
+ createCollectionTerms(collection);
+ ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
+
// the watcher is added to a set so multiple calls of this method will left only one watcher
getZkStateReader().registerCore(cloudDesc.getCollectionName());
@@ -1370,7 +1377,7 @@ public class ZkController implements Closeable, Runnable {
AtomicReference<Replica> replicaRef = new AtomicReference<>();
try {
log.info("Waiting to see our entry in state.json {}", desc.getName());
- zkStateReader.waitForState(collection, Integer.getInteger("solr.zkregister.leaderwait", 5000), TimeUnit.MILLISECONDS, (l, c) -> { // nocommit timeout
+ zkStateReader.waitForState(collection, Integer.getInteger("solr.zkregister.leaderwait", 60000), TimeUnit.MILLISECONDS, (l, c) -> { // nocommit timeout
if (c == null) {
return false;
}
@@ -1394,18 +1401,13 @@ public class ZkController implements Closeable, Runnable {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica is removed from clusterstate \n" + zkStateReader.getClusterState().getCollectionOrNull(collection));
}
}
- ZkShardTerms shardTerms = null;
if (replica.getType() != Type.PULL) {
- log.info("Register terms for replica {}", coreName);
- createCollectionTerms(collection).register(cloudDesc.getShardId(), coreName);
- shardTerms = getShardTermsOrNull(collection, cloudDesc.getShardId());
+ getCollectionTerms(collection).register(cloudDesc.getShardId(), coreName);
}
-
-
log.info("Register replica - core:{} address:{} collection:{} shard:{} type={}", coreName, baseUrl, collection, shardId, replica.getType());
synchronized (leaderElectors) {
- leaderElector = leaderElectors.get(replica.getName());
+ LeaderElector leaderElector = leaderElectors.get(replica.getName());
if (leaderElector == null) {
ContextKey contextKey = new ContextKey(collection, coreName);
leaderElector = new LeaderElector(this, contextKey);
@@ -1822,7 +1824,11 @@ public class ZkController implements Closeable, Runnable {
}
public ZkShardTerms getShardTerms(String collection, String shardId) {
- return getCollectionTerms(collection).getShard(shardId);
+ ZkCollectionTerms ct = getCollectionTerms(collection);
+ if (ct == null) {
+ throw new AlreadyClosedException();
+ }
+ return ct.getShard(shardId);
}
public ZkShardTerms getShardTermsOrNull(String collection, String shardId) {
@@ -1842,7 +1848,7 @@ public class ZkController implements Closeable, Runnable {
}
}
- private ZkCollectionTerms getCollectionTerms(String collection) {
+ public ZkCollectionTerms getCollectionTerms(String collection) {
collectionToTermsLock.lock();
try {
return collectionToTerms.get(collection);
@@ -1851,14 +1857,10 @@ public class ZkController implements Closeable, Runnable {
}
}
- private ZkCollectionTerms createCollectionTerms(String collection) {
+ public ZkCollectionTerms createCollectionTerms(String collection) {
collectionToTermsLock.lock();
try {
- ZkCollectionTerms ct = collectionToTerms.get(collection);
- if (ct != null) {
- return ct;
- }
- ct = new ZkCollectionTerms(collection, zkClient);
+ ZkCollectionTerms ct = new ZkCollectionTerms(collection, zkClient);
IOUtils.closeQuietly(collectionToTerms.put(collection, ct));
return ct;
} finally {
@@ -1870,7 +1872,6 @@ public class ZkController implements Closeable, Runnable {
collectionToTermsLock.lock();
try {
collectionToTerms.values().forEach(ZkCollectionTerms::close);
- collectionToTerms.clear();
} finally {
collectionToTermsLock.unlock();
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 872d639..40eba00 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -326,7 +326,7 @@ public class ZkShardTerms implements AutoCloseable{
log.info("Failed to save terms, version is not a match, retrying version={}", newTerms.getVersion());
refreshTerms();
} catch (KeeperException.NoNodeException e) {
- throw e;
+ return true;
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error while saving shard term for collection: " + collection, e);
@@ -348,7 +348,7 @@ public class ZkShardTerms implements AutoCloseable{
} catch (KeeperException.NoNodeException e) {
log.warn("No node found for shard terms", e);
// we have likely been deleted
- throw new AlreadyClosedException(e);
+ return;
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index a2f008b..8c6185d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -105,6 +105,13 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
collection = extCollection;
}
+ log.info("Check if collection exists in zookeeper {}", collection);
+
+ if (!zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection");
+ }
+
+
checkNotColocatedWith(zkStateReader, collection);
final boolean deleteHistory = message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true);
@@ -115,13 +122,6 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
SolrZkClient zkClient = zkStateReader.getZkClient();
SolrSnapshotManager.cleanupCollectionLevelSnapshots(zkClient, collection);
- log.info("Check if collection exists in zookeeper {}", collection);
-
- if (!zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection");
- }
-
-
log.info("Collection exists, remove it, {}", collection);
// remove collection-level metrics history
if (deleteHistory) {
@@ -137,6 +137,8 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
params.set(CoreAdminParams.DELETE_DATA_DIR, true);
params.set(CoreAdminParams.DELETE_METRICS_HISTORY, deleteHistory);
+ params.set("idleTimeout", 8000);
+
String asyncId = message.getStr(ASYNC);
ZkNodeProps internalMsg = message.plus(NAME, collection);
@@ -160,7 +162,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
try {
ocmh.overseer.getCoreContainer().getZkController().removeCollectionTerms(collection);
zkStateReader.getZkClient().clean(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
- ocmh.overseer.getCoreContainer().getZkController().removeCollectionTerms(collection);
+
} catch (Exception e) {
log.error("Exception while trying to remove collection zknode", e);
}
@@ -192,36 +194,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
finalShardRequestTracker.processResponses(results, finalShardHandler, false, null, okayExceptions);
// TODO: wait for delete collection?
// zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (l, c) -> c == null);
- CountDownLatch latch = new CountDownLatch(1);
- Stat stat = zkStateReader.getZkClient().exists(ZkStateReader.getCollectionPath(collection), new Watcher() {
- @Override
- public void process(WatchedEvent event) {
-
- if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
- latch.countDown();
- } else {
- try {
- Stat stat2 = zkStateReader.getZkClient().exists(ZkStateReader.getCollectionPath(collection), this, true);
- if (stat2 != null) {
- latch.countDown();
- }
- } catch (KeeperException e) {
- log.error("", e);
- } catch (InterruptedException e) {
- log.error("", e);
- }
- }
-
- }
- }, true);
- if (stat == null) {
- latch.countDown();
- }
-
- boolean success = latch.await(10, TimeUnit.SECONDS);
- if (!success) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting for collection to be removed");
- }
+
} catch (Exception e) {
log.error("Exception waiting for results of delete collection cmd", e);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 54375f2..55cd23d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -1109,13 +1109,13 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
List<Replica> notLiveReplicas = new ArrayList<>();
for (Replica replica : slice.getReplicas()) {
if ((stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
- if (zkStateReader.isNodeLive(replica.getStr(ZkStateReader.NODE_NAME_PROP))) {
+ if (zkStateReader.isNodeLive(replica.getNodeName())) {
// For thread safety, only simple clone the ModifiableSolrParams
ModifiableSolrParams cloneParams = new ModifiableSolrParams();
cloneParams.add(params);
cloneParams.set(CoreAdminParams.CORE, replica.getName());
- sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler);
+ sendShardRequest(replica.getNodeName(), cloneParams, shardHandler);
} else {
notLiveReplicas.add(replica);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index e00e455..57ca7bf 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -20,7 +20,6 @@ import org.apache.solr.client.solrj.cloud.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.cloud.LeaderElector;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index af43a83..415c5ca 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -532,6 +532,8 @@ public class ZkStateWriter {
trackVersions.remove(collection);
reader.getZkClient().delete(ZkStateReader.getCollectionSCNPath(collection), -1);
reader.getZkClient().delete(ZkStateReader.getCollectionStateUpdatesPath(collection), -1);
+ } catch (KeeperException.NoNodeException e) {
+
} catch (InterruptedException e) {
log.error("", e);
} catch (KeeperException e) {
diff --git a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
index 41f120e..28c8028 100644
--- a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
+++ b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
@@ -617,7 +617,7 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
}
}
- protected synchronized void removeDirectory(CacheValue cacheValue) throws IOException {
+ protected void removeDirectory(CacheValue cacheValue) throws IOException {
// this page intentionally left blank
}
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 540b0d4..56a38db 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1769,15 +1769,15 @@ public final class SolrCore implements SolrInfoBean, Closeable {
coreContainer.getZkController().removeShardLeaderElector(name);
}
- int noops = searcherExecutor.getPoolSize() - searcherExecutor.getActiveCount();
- for (int i = 0; i < noops + 1; i++) {
- try {
- searcherExecutor.submit(() -> {
- });
- } catch (RejectedExecutionException e) {
- break;
- }
- }
+// int noops = searcherExecutor.getPoolSize() - searcherExecutor.getActiveCount();
+// for (int i = 0; i < noops + 1; i++) {
+// try {
+// searcherExecutor.submit(() -> {
+// });
+// } catch (RejectedExecutionException e) {
+// break;
+// }
+// }
searcherExecutor.shutdown();
@@ -3310,20 +3310,27 @@ public final class SolrCore implements SolrInfoBean, Closeable {
log.info("Removing SolrCore dataDir on unload {}", cd.getInstanceDir().resolve(cd.getDataDir()));
Path dataDir = cd.getInstanceDir().resolve(cd.getDataDir());
try {
- if (Files.exists(dataDir)) {
- Files.walk(dataDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
- }
+ while (Files.exists(dataDir)) {
+ try {
+ Files.walk(dataDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
+ } catch (NoSuchFileException e) {
- } catch (Exception e) {
- log.error("Failed to delete data dir for unloaded core: {} dir: {}", cd.getName(), dataDir, e);
- }
+ }
+ }
+ } catch (IOException e) {
+ log.error("Failed to delete data dir for unloaded core: {} dir: {}", cd.getName(), dataDir, e);
+ }
}
if (deleteInstanceDir) {
try {
- if (Files.exists(cd.getInstanceDir())) {
- Files.walk(cd.getInstanceDir()).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
+ while (Files.exists(cd.getInstanceDir())) {
+ try {
+ Files.walk(cd.getInstanceDir()).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
+ } catch (NoSuchFileException e) {
+
+ }
}
- } catch (Exception e) {
+ } catch (IOException e) {
log.error("Failed to delete instance dir for unloaded core: {} dir: {}", cd.getName(), cd.getInstanceDir(), e);
}
}
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCores.java b/solr/core/src/java/org/apache/solr/core/SolrCores.java
index 6f85747..638e16e 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCores.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCores.java
@@ -403,7 +403,7 @@ class SolrCores implements Closeable {
while (isCoreLoading(core)) {
synchronized (loadingSignal) {
try {
- loadingSignal.wait(1000);
+ loadingSignal.wait(250);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
return;
@@ -417,7 +417,7 @@ class SolrCores implements Closeable {
}
public boolean isCoreLoading(String name) {
- if (container.startedLoadingCores() && currentlyLoadingCores.contains(name)) {
+ if (currentlyLoadingCores.contains(name)) {
return true;
}
return false;
diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
index 5e91cd0..08916cb 100644
--- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
@@ -152,7 +152,7 @@ public class ZkContainer implements Closeable {
if (log.isDebugEnabled()) {
log.debug("create zkController");
}
- zkController = new ZkController(cc, zkClient, config, descriptorsSupplier);
+ zkController = new ZkController(cc, zkClient, config);
if (log.isDebugEnabled()) log.debug("done zkController create");
} catch (InterruptedException e) {
diff --git a/solr/core/src/java/org/apache/solr/handler/CheckSumFailException.java b/solr/core/src/java/org/apache/solr/handler/CheckSumFailException.java
new file mode 100644
index 0000000..a75c6fd
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/CheckSumFailException.java
@@ -0,0 +1,4 @@
+package org.apache.solr.handler;
+
+public class CheckSumFailException extends RuntimeException {
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 1cb5851..c45f1a4 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -90,6 +90,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.SuppressForbidden;
@@ -121,7 +122,7 @@ import static org.apache.solr.handler.ReplicationHandler.*;
* @since solr 1.4
*/
public class IndexFetcher {
- private static final int _100K = 100000;
+ private static final int _10K = 10000;
public static final String INDEX_PROPERTIES = "index.properties";
@@ -168,7 +169,7 @@ public class IndexFetcher {
private Integer soTimeout;
- private boolean skipCommitOnMasterVersionZero = true;
+ private boolean skipCommitOnMasterVersionZero = false;
private boolean clearLocalIndexFirst = false;
@@ -332,8 +333,6 @@ public class IndexFetcher {
files = (List<Map<String,Object>>) response.get(CONF_FILES);
if (files != null) confFilesToDownload = Collections.synchronizedList(files);
-
-
} catch (SolrServerException e) {
throw new IOException(e);
}
@@ -353,7 +352,7 @@ public class IndexFetcher {
* @throws IOException if an exception occurs
*/
IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException {
-
+ stop = false;
this.clearLocalIndexFirst = false;
boolean cleanupDone = false;
boolean successfulInstall = false;
@@ -518,7 +517,6 @@ public class IndexFetcher {
tmpIndexDir = solrCore.getDirectoryFactory().get(tmpIndexDirPath, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
-
// cindex dir...
indexDirPath = solrCore.getIndexDir();
indexDir = solrCore.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
@@ -570,16 +568,21 @@ public class IndexFetcher {
boolean reloadCore = false;
try {
- // we have to be careful and do this after we know isFullCopyNeeded won't be flipped
- if (!isFullCopyNeeded) {
- solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(solrCore, true);
- }
log.info("Starting download (fullCopy={}) to {}", isFullCopyNeeded, tmpIndexDir);
successfulInstall = false;
+ long bytesDownloaded;
+ try {
+ bytesDownloaded = downloadIndexFiles(isFullCopyNeeded, indexDir, tmpIndexDir, indexDirPath, tmpIndexDirPath, latestGeneration);
+ } catch (CheckSumFailException e) {
+ isFullCopyNeeded = true;
+ bytesDownloaded = downloadIndexFiles(isFullCopyNeeded, indexDir, tmpIndexDir, indexDirPath, tmpIndexDirPath, latestGeneration);
+ }
- long bytesDownloaded = downloadIndexFiles(isFullCopyNeeded, indexDir,
- tmpIndexDir, indexDirPath, tmpIndexDirPath, latestGeneration);
+ // we have to be careful and do this after we know isFullCopyNeeded won't be flipped
+ if (!isFullCopyNeeded) {
+ solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(solrCore, true);
+ }
final long timeTakenSeconds = getReplicationTimeElapsed();
final Long bytesDownloadedPerSecond = (timeTakenSeconds != 0 ? Long.valueOf(bytesDownloaded / timeTakenSeconds) : null);
@@ -596,7 +599,6 @@ public class IndexFetcher {
} else {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
-
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
@@ -617,20 +619,19 @@ public class IndexFetcher {
}
} else {
terminateAndWaitFsyncService();
- if (isFullCopyNeeded) {
+ if (isFullCopyNeeded && successfulInstall) {
successfulInstall = solrCore.modifyIndexProps(tmpIdxDirName);
if (!successfulInstall) {
log.error("Modify index props failed");
}
if (successfulInstall) deleteTmpIdxDir = false;
- } else {
+ } else if (successfulInstall) {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
if (!successfulInstall) {
log.error("Move index files failed");
}
}
-
if (successfulInstall) {
logReplicationTimeAndConfFiles(modifiedConfFiles,
successfulInstall);
@@ -639,7 +640,7 @@ public class IndexFetcher {
} finally {
solrCore.searchEnabled = true;
solrCore.indexEnabled = true;
- if (!isFullCopyNeeded) {
+ if (!isFullCopyNeeded && successfulInstall) {
solrCore.getUpdateHandler().getSolrCoreState().openIndexWriter(solrCore);
}
}
@@ -678,8 +679,6 @@ public class IndexFetcher {
reloadCore);
successfulInstall = fetchLatestIndex(true, reloadCore).getSuccessful();
}
-
- markReplicationStop();
return successfulInstall ? IndexFetchResult.INDEX_FETCH_SUCCESS : IndexFetchResult.INDEX_FETCH_FAILURE;
} catch (ReplicationHandlerException e) {
log.error("User aborted Replication");
@@ -712,8 +711,6 @@ public class IndexFetcher {
if (!successfulInstall) {
try {
logReplicationTimeAndConfFiles(null, successfulInstall);
- } catch (AlreadyClosedException e) {
-
} catch (Exception e) {
// this can happen on shutdown, a fetch may be running in a thread after DirectoryFactory is closed
log.warn("Could not log failed replication details", e);
@@ -726,7 +723,7 @@ public class IndexFetcher {
}
} finally {
- filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload;
+ filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
markReplicationStop();
dirFileFetcher = null;
localFileFetcher = null;
@@ -948,22 +945,7 @@ public class IndexFetcher {
}
private void reloadCore() {
- final CountDownLatch latch = new CountDownLatch(1);
- new Thread(() -> {
- try {
- solrCore.getCoreContainer().reload(solrCore.getName());
- } catch (Exception e) {
- log.error("Could not reload core ", e);
- } finally {
- latch.countDown();
- }
- }).start();
- try {
- latch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted while waiting for core reload to finish", e);
- }
+ solrCore.getCoreContainer().reload(solrCore.getName());
}
private void downloadConfFiles(List<Map<String, Object>> confFilesToDownload, long latestGeneration) throws Exception {
@@ -1030,9 +1012,6 @@ public class IndexFetcher {
&& (tmpIndexDir instanceof FSDirectory ||
(tmpIndexDir instanceof FilterDirectory && FilterDirectory.unwrap(tmpIndexDir) instanceof FSDirectory));
- // nocommit
- doDifferentialCopy = false; // what about windows or link unsupported?
-
long totalSpaceRequired = 0;
synchronized (filesToDownload) {
for (Map<String,Object> file : filesToDownload) {
@@ -1057,8 +1036,12 @@ public class IndexFetcher {
for (Map<String,Object> file : filesToDownload) {
String filename = (String) file.get(NAME);
long size = (Long) file.get(SIZE);
- CompareResult compareResult = compareFile(indexDir, filename, size, (Long) file.get(CHECKSUM));
- boolean alwaysDownload = filesToAlwaysDownloadIfNoChecksums(filename, size, compareResult);
+ Long serverChecksum = (Long) file.get(CHECKSUM);
+ CompareResult compareResult = compareFile(indexDir, filename, size, serverChecksum);
+ boolean alwaysDownload = false;
+ if (serverChecksum == null) {
+ alwaysDownload = filesToAlwaysDownloadIfNoChecksums(filename, size, compareResult);
+ }
boolean finalDoDifferentialCopy = doDifferentialCopy;
// parWork.collect("IndexFetcher", () -> {
@@ -1077,19 +1060,24 @@ public class IndexFetcher {
// TODO: only for local
//Files.createLink(new File(tmpIndexDirPath, filename).toPath(), localFile.toPath());
bytesDownloaded.add(localFile.length());
- moveAFile(tmpIndexDir, tmpIndexDir, filename);
+ moveAFile(indexDir, tmpIndexDir, filename);
} else {
try {
dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file, (String) file.get(NAME), FILE, latestGeneration);
currentFile = file;
dirFileFetcher.fetchFile();
bytesDownloaded.add(dirFileFetcher.getBytesDownloaded());
+ } catch(CheckSumFailException e) {
+ throw e;
} catch (Exception e) {
log.error("Problem downloading file {}", file, e);
} finally {
fileFetchRequests.remove(file.get(NAME));
}
}
+ if (stop) {
+ throw new AlreadyClosedException();
+ }
filesDownloaded.add(new HashMap<>(file));
} else {
if (log.isDebugEnabled()) {
@@ -1188,7 +1176,7 @@ public class IndexFetcher {
// without checksums to compare, we always download .si, .liv, segments_N,
// and any very small files
return !compareResult.checkSummed && (filename.endsWith(".si") || filename.endsWith(".liv")
- || filename.startsWith("segments_") || size < _100K);
+ || filename.startsWith("segments_") || size < _10K);
}
protected static class CompareResult {
@@ -1200,7 +1188,7 @@ public class IndexFetcher {
CompareResult compareResult = new CompareResult();
try {
try (final IndexInput indexInput = indexDir.openInput(filename, IOContext.READONCE)) {
- long indexFileLen = indexDir.fileLength(filename);
+ long indexFileLen = indexInput.length();
long indexFileChecksum = 0;
if (backupIndexFileChecksum != null) {
@@ -1661,6 +1649,8 @@ public class IndexFetcher {
bytesDownloaded = 0;
try {
fetch();
+ } catch(CheckSumFailException e) {
+ throw e;
} catch(Exception e) {
SolrException.log(IndexFetcher.log, "Error fetching file", e);
throw e;
@@ -1669,8 +1659,9 @@ public class IndexFetcher {
private void fetch() throws Exception {
try {
- final InputStream is = getStream();
- while (true) {
+
+ while (true && !aborted) {
+ final FastInputStream is = getStream();
int result;
try {
//fetch packets one by one in a single request
@@ -1701,18 +1692,17 @@ public class IndexFetcher {
}
}
- private int fetchPackets(InputStream fis) throws Exception {
+ private int fetchPackets(FastInputStream fis) throws Exception {
byte[] intbytes = new byte[4];
byte[] longbytes = new byte[8];
try {
while (true) {
if (stop) {
- stop = false;
aborted = true;
throw new ReplicationHandlerException("User aborted replication");
}
long checkSumServer = -1;
- fis.read(intbytes, 0, intbytes.length);
+ fis.readFully(intbytes);
//read the size of the packet
int packetSize = readInt(intbytes);
if (packetSize <= 0) {
@@ -1726,21 +1716,20 @@ public class IndexFetcher {
}
if (checksum != null) {
//read the checksum
- fis.read(longbytes, 0, longbytes.length);
+ fis.readFully(longbytes);
checkSumServer = readLong(longbytes);
}
//then read the packet of bytes
- fis.read(buf, 0, packetSize);
-
+ fis.readFully(buf, 0, packetSize);
//compare the checksum as sent from the master
if (includeChecksum) {
checksum.reset();
checksum.update(buf, 0, packetSize);
long checkSumClient = checksum.getValue();
if (checkSumClient != checkSumServer) {
- log.error("Checksum not matched between client and server for file: {}", fileName);
+ log.error("Checksum not matched between client and server for file: {} {} {}", fileName, checkSumClient, checkSumServer);
//if checksum is wrong it is a problem return (there doesn't seem to be a retry in this case.)
- return 1;
+ throw new CheckSumFailException();
}
}
//if everything is fine, write down the packet to the file
@@ -1753,6 +1742,8 @@ public class IndexFetcher {
if (bytesDownloaded >= size)
return 0;
}
+ } catch (CheckSumFailException e) {
+ throw e;
} catch (ReplicationHandlerException e) {
log.error("Exception fetching files", e);
throw e;
@@ -1821,7 +1812,7 @@ public class IndexFetcher {
/**
* Open a new stream using HttpClient
*/
- private InputStream getStream() throws IOException {
+ private FastInputStream getStream() throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
@@ -1835,16 +1826,16 @@ public class IndexFetcher {
// params.set(COMPRESSION, "true");
// }
//use checksum
-
- params.set(CHECKSUM, true);
-
+ if (this.includeChecksum) {
+ params.set(CHECKSUM, true);
+ }
//wt=filestream this is a custom protocol
params.set(CommonParams.WT, FILE_STREAM);
// This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
// the server starts from the offset
-// if (bytesDownloaded > 0) {
-// params.set(OFFSET, Long.toString(bytesDownloaded));
-// }
+ if (bytesDownloaded > 0) {
+ params.set(OFFSET, Long.toString(bytesDownloaded));
+ }
@SuppressWarnings({"rawtypes"})
@@ -1875,7 +1866,7 @@ public class IndexFetcher {
fileFetchRequests.put(fileName, resp);
if (!stop) {
- latch.await(5, TimeUnit.SECONDS );
+ latch.await(15, TimeUnit.SECONDS );
}
is = ais.get();
if (is == null) {
@@ -1884,7 +1875,7 @@ public class IndexFetcher {
// if (useInternalCompression) {
// is = new InflaterInputStream(is);
// }
- return is;
+ return new FastInputStream(is);
} catch (Exception e) {
//close stream on error
try {
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index 2e04320..a154061 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -654,6 +654,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
SegmentInfos infos = SegmentInfos.readCommit(dir, commit.getSegmentsFileName());
for (SegmentCommitInfo commitInfo : infos) {
for (String file : commitInfo.files()) {
+ if (file.equals("write.lock")) continue;
Map<String, Object> fileMeta = new HashMap<>();
fileMeta.put(NAME, file);
fileMeta.put(SIZE, dir.fileLength(file));
@@ -705,8 +706,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
}
rsp.add(CMD_GET_FILE_LIST, result);
- if (confFileNameAlias.size() < 1 || core.getCoreContainer().isZooKeeperAware())
+ if (confFileNameAlias.size() < 1 || core.getCoreContainer().isZooKeeperAware()) {
+ rsp.add(STATUS, OK_STATUS);
return;
+ }
log.debug("Adding config files to list: {}", includeConfFiles);
//if configuration files need to be included get their details
rsp.add(CONF_FILES, getConfFileInfoFromCache(confFileNameAlias, confFileInfoCache));
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 26350ff..7bff7f8 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -246,7 +246,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
protected void copyFromClusterProp(Map<String, Object> props, String prop) throws IOException {
if (props.get(prop) != null) return;//if it's already specified , return
- Object defVal = new ClusterProperties(coreContainer.getZkController().getZkStateReader().getZkClient())
+ Object defVal = new ClusterProperties(coreContainer.getZkController().getZkClient())
.getClusterProperty(ImmutableList.of(CollectionAdminParams.DEFAULTS, CollectionAdminParams.COLLECTION, prop), null);
if (defVal != null) props.put(prop, String.valueOf(defVal));
}
@@ -474,7 +474,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
private static void createSysConfigSet(CoreContainer coreContainer) throws KeeperException, InterruptedException {
- SolrZkClient zk = coreContainer.getZkController().getZkStateReader().getZkClient();
+ SolrZkClient zk = coreContainer.getZkController().getZkClient();
zk.mkdir(ZkStateReader.CONFIGS_ZKNODE + "/" + CollectionAdminParams.SYSTEM_COLL);
try {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ConfigSetsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/ConfigSetsHandler.java
index 9ac377e..6419594 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ConfigSetsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ConfigSetsHandler.java
@@ -293,7 +293,7 @@ public class ConfigSetsHandler extends RequestHandlerBase implements PermissionN
@Override
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, ConfigSetsHandler h) throws Exception {
NamedList<Object> results = new NamedList<>();
- SolrZkClient zk = h.coreContainer.getZkController().getZkStateReader().getZkClient();
+ SolrZkClient zk = h.coreContainer.getZkController().getZkClient();
ZkConfigManager zkConfigManager = new ZkConfigManager(zk);
List<String> configSetsList = zkConfigManager.listConfigs();
results.add("configSets", configSetsList);
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index f91d9e3..ed3d6db 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -257,6 +257,9 @@ public class HttpSolrCall {
path = path.substring(idx2);
}
+ cores.waitForLoadingCore(origCorename, 15000);
+ // the core may have just finished loading
+
// Try to resolve a Solr core name
core = cores.getCore(origCorename);
@@ -266,8 +269,6 @@ public class HttpSolrCall {
path = path.substring(idx);
if (log.isDebugEnabled()) log.debug("Path is parsed as {}", path);
} else {
- cores.waitForLoadingCore(origCorename, 1000);
- // the core may have just finished loading
core = cores.getCore(origCorename);
if (core != null) {
path = path.substring(idx);
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
index 0a15345..957ae2a 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
@@ -341,7 +341,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
protected synchronized CoreContainer createCoreContainer(Path solrHome, Properties extraProperties) throws IOException {
String zkHost = System.getProperty("zkHost");
if (!StringUtils.isEmpty(zkHost)) {
- int zkClientTimeout = Integer.getInteger("zkClientTimeout", 30000); // nocommit - must come from zk settings, we should parse more here and set this up vs waiting for zkController
+ int zkClientTimeout = Integer.getInteger("zkClientTimeout", 45000); // nocommit - must come from zk settings, we should parse more here and set this up vs waiting for zkController
if (zkClient != null) {
throw new IllegalStateException();
}
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index abee254..1e8adb3 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -52,7 +52,7 @@ import org.slf4j.LoggerFactory;
* Used for distributing commands from a shard leader to its replicas.
*/
public class SolrCmdDistributor implements Closeable {
- private static final int MAX_RETRIES_ON_FORWARD = 1;
+ private static final int MAX_RETRIES_ON_FORWARD = 2;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ConnectionManager.IsClosed isClosed;
private final ZkStateReader zkStateReader;
@@ -65,19 +65,19 @@ public class SolrCmdDistributor implements Closeable {
private final Http2SolrClient solrClient;
private volatile boolean closed;
- private Set<Cancellable> cancels = ConcurrentHashMap.newKeySet(32);
+ private final Set<Cancellable> cancels = ConcurrentHashMap.newKeySet(32);
public SolrCmdDistributor(ZkStateReader zkStateReader, UpdateShardHandler updateShardHandler) {
assert ObjectReleaseTracker.track(this);
this.zkStateReader = zkStateReader;
- this.solrClient = new Http2SolrClient.Builder().withHttpClient(updateShardHandler.getTheSharedHttpClient()).markInternalRequest().idleTimeout((int) TimeUnit.SECONDS.toMillis(30)).build();
+ this.solrClient = new Http2SolrClient.Builder().withHttpClient(updateShardHandler.getTheSharedHttpClient()).markInternalRequest().build();
isClosed = null;
}
public SolrCmdDistributor(ZkStateReader zkStateReader, UpdateShardHandler updateShardHandler, ConnectionManager.IsClosed isClosed) {
assert ObjectReleaseTracker.track(this);
this.zkStateReader = zkStateReader;
- this.solrClient = new Http2SolrClient.Builder().withHttpClient(updateShardHandler.getTheSharedHttpClient()).markInternalRequest().idleTimeout((int) TimeUnit.SECONDS.toMillis(30)).build();
+ this.solrClient = new Http2SolrClient.Builder().withHttpClient(updateShardHandler.getTheSharedHttpClient()).markInternalRequest().build();
this.isClosed = isClosed;
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 5df00fe..49734be 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -108,8 +108,7 @@ public class UpdateShardHandler implements SolrInfoBean {
.connectionTimeout(cfg.getDistributedConnectionTimeout())
.idleTimeout(cfg.getDistributedSocketTimeout());
}
- updateOnlyClient = updateOnlyClientBuilder.markInternalRequest()
- .maxRequestsQueuedPerDestination(12000).strictEventOrdering(false).build();
+ updateOnlyClient = updateOnlyClientBuilder.markInternalRequest().strictEventOrdering(false).build();
updateOnlyClient.enableCloseLock();
// updateOnlyClient.addListenerFactory(updateHttpListenerFactory);
Set<String> queryParams = new HashSet<>(2);
@@ -120,7 +119,7 @@ public class UpdateShardHandler implements SolrInfoBean {
Http2SolrClient.Builder recoveryOnlyClientBuilder = new Http2SolrClient.Builder();
- recoveryOnlyClientBuilder = recoveryOnlyClientBuilder.connectionTimeout(5000).idleTimeout(30000);
+ recoveryOnlyClientBuilder = recoveryOnlyClientBuilder.connectionTimeout(5000).idleTimeout(60000);
recoveryOnlyClient = recoveryOnlyClientBuilder.markInternalRequest().build();
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 4d16cb2..28b1111 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -685,7 +685,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
clusterState = zkController.getClusterState();
- DocCollection coll = clusterState.getCollectionOrNull(collection, true);
+ DocCollection coll = clusterState.getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
if (slice == null) {
@@ -1311,6 +1311,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is shutting down.");
}
+ clusterState.getCollection(collection);
+
if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
// for log reply or peer sync, we don't need to be connected to ZK
return;
diff --git a/solr/core/src/java/org/apache/solr/util/ExportTool.java b/solr/core/src/java/org/apache/solr/util/ExportTool.java
index 1b6349d..82c0c34 100644
--- a/solr/core/src/java/org/apache/solr/util/ExportTool.java
+++ b/solr/core/src/java/org/apache/solr/util/ExportTool.java
@@ -442,9 +442,11 @@ public class ExportTool extends SolrCLI.ToolBase {
sink.end();
if (producerThreadpool != null) {
+ producerThreadpool.shutdown();
producerThreadpool.shutdownNow();
}
if (consumerThreadpool != null) {
+ consumerThreadpool.shutdown();
consumerThreadpool.shutdownNow();
}
diff --git a/solr/core/src/test/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/solr/core/src/test/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 9cba3fb..6050c2b 100644
--- a/solr/core/src/test/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/solr/core/src/test/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -181,7 +181,7 @@ public class BlockPoolSlice {
shutdownHook = new Runnable() {
@Override
public void run() {
- addReplicaThreadPool.shutdownNow();
+ addReplicaThreadPool.shutdown();
}
};
ShutdownHookManager.get().addShutdownHook(shutdownHook,
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
index 6b88fbc..a79088f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
@@ -33,7 +33,6 @@ import org.apache.solr.core.CloudConfig;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.update.UpdateShardHandlerConfig;
import org.apache.zookeeper.KeeperException;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -254,8 +253,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
"overseer"));
UpdateShardHandler updateShardHandler = new UpdateShardHandler(UpdateShardHandlerConfig.DEFAULT);
// TODO: close Overseer
- Overseer overseer = new Overseer(updateShardHandler, "/admin/cores",
- reader, null, new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build());
+ Overseer overseer = new Overseer(updateShardHandler, "/admin/cores", null, new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build());
overseer.close();
ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), zkClient, overseer);
overseerElector.setup(ec);
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
index 3b47064..1cf2d3b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
@@ -540,7 +540,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
killThread.interrupt();
scheduleThread.join();
- scheduler.shutdownNow();
+ scheduler.shutdown();
connLossThread.join();
killThread.join();
diff --git a/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java b/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java
index c33ed01..8aa916c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MockSimpleZkController.java
@@ -29,7 +29,7 @@ public class MockSimpleZkController extends ZkController {
public MockSimpleZkController(CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig,
Supplier<List<CoreDescriptor>> registerOnReconnect) throws InterruptedException, TimeoutException, IOException {
- super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig, registerOnReconnect);
+ super(cc, zkServerAddress, zkClientConnectTimeout, cloudConfig);
}
@Override
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 3fb3981..436b3b8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -716,7 +716,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory();
httpShardHandlerFactory.init(new PluginInfo("shardHandlerFactory", Collections.emptyMap()));
httpShardHandlerFactorys.add(httpShardHandlerFactory);
- Overseer overseer = new Overseer(updateShardHandler, "/admin/cores", reader, zkController,
+ Overseer overseer = new Overseer(updateShardHandler, "/admin/cores", zkController,
new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").build());
overseers.add(overseer);
ElectionContext ec = new OverseerElectionContext(server.getZkAddress().replaceAll("/", "_"), zkClient, overseer);
@@ -1240,7 +1240,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
httpShardHandlerFactorys.add(httpShardHandlerFactory);
- Overseer overseer = new Overseer(updateShardHandler, "/admin/cores", reader, zkController,
+ Overseer overseer = new Overseer(updateShardHandler, "/admin/cores", zkController,
new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").build());
overseers.add(overseer);
ElectionContext ec = new OverseerElectionContext(server.getZkAddress().replaceAll("/", "_"), zkClient, overseer);
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java b/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java
index 7bd61fd..c871665 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionZkExpiry.java
@@ -18,7 +18,6 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
-import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
@@ -53,7 +52,7 @@ public class TestLeaderElectionZkExpiry extends SolrTestCaseJ4 {
.setLeaderConflictResolveWait(5000)
.setLeaderVoteWait(5000)
.build();
- final ZkController zkController = new ZkController(cc, server.getZkClient(), cloudConfig, () -> Collections.emptyList());
+ final ZkController zkController = new ZkController(cc, server.getZkClient(), cloudConfig);
try {
Thread killer = new Thread() {
@Override
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
index ae91ac3..1bc0ffe 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
@@ -191,7 +191,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
CreateMode.PERSISTENT, true);
CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build();
- ZkController zkController = new ZkController(cc, zkClient, cloudConfig, () -> null);
+ ZkController zkController = new ZkController(cc, zkClient, cloudConfig);
zkController.start();
try {
String configName = zkController.getZkStateReader().readConfigName(COLLECTION_NAME);
@@ -221,7 +221,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
try {
CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build();
- zkController = new ZkController(cc, server.getZkClient(), cloudConfig, () -> null);
+ zkController = new ZkController(cc, server.getZkClient(), cloudConfig);
} catch (IllegalArgumentException e) {
fail("ZkController did not normalize host name correctly");
} finally {
@@ -277,7 +277,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
try {
CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build();
- zkController = new ZkController(cc, server.getZkClient(), cloudConfig, () -> null);
+ zkController = new ZkController(cc, server.getZkClient(), cloudConfig);
zkControllerRef.set(zkController);
zkController.getZkClient().makePath(ZkStateReader.getCollectionPathRoot(collectionName), new byte[0], CreateMode.PERSISTENT, true);
diff --git a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
index 5d33437..b1eb087 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
@@ -159,7 +159,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
locked = lock.tryLock(versionBucketLockTimeoutMs, TimeUnit.MILLISECONDS);
if (locked) {
- Thread.sleep(100);
+ Thread.sleep(150);
return function.apply();
} else {
diff --git a/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java
index c11f170..0897b0c 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/RoutedAliasUpdateProcessorTest.java
@@ -275,22 +275,14 @@ public abstract class RoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
// Send in separate threads. Choose random collection & solrClient
ExecutorService exec = null;
try (CloudSolrClient solrClient = SolrTestCaseJ4.getCloudSolrClient(cluster)) {
- try {
- exec = getTestExecutor();
- List<Future<UpdateResponse>> futures = new ArrayList<>(solrInputDocuments.length);
- for (SolrInputDocument solrInputDocument : solrInputDocuments) {
- String col = collections.get(random().nextInt(collections.size()));
- futures.add(exec.submit(() -> solrClient.add(col, solrInputDocument, commitWithin)));
- }
- for (Future<UpdateResponse> future : futures) {
- assertUpdateResponse(future.get());
- }
- // at this point there shouldn't be any tasks running
- assertEquals(0, exec.shutdownNow().size());
- } finally {
- if (exec != null) {
- exec.shutdownNow();
- }
+ exec = getTestExecutor();
+ List<Future<UpdateResponse>> futures = new ArrayList<>(solrInputDocuments.length);
+ for (SolrInputDocument solrInputDocument : solrInputDocuments) {
+ String col = collections.get(random().nextInt(collections.size()));
+ futures.add(exec.submit(() -> solrClient.add(col, solrInputDocument, commitWithin)));
+ }
+ for (Future<UpdateResponse> future : futures) {
+ assertUpdateResponse(future.get());
}
}
} else {
diff --git a/solr/server/contexts/solr-jetty-context.xml b/solr/server/contexts/solr-jetty-context.xml
index 34c108a..3d06ad7 100644
--- a/solr/server/contexts/solr-jetty-context.xml
+++ b/solr/server/contexts/solr-jetty-context.xml
@@ -1,7 +1,6 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_0.dtd">
<Configure class="org.eclipse.jetty.quickstart.QuickStartWebApp">
- <Set name="autoPreconfigure">true</Set>
<Set name="contextPath"><Property name="hostContext" default="/solr"/></Set>
<Set name="war"><Property name="jetty.base"/>/solr-webapp/webapp</Set>
<Set name="defaultsDescriptor"><Property name="jetty.base"/>/etc/webdefault.xml</Set>
diff --git a/solr/server/etc/jetty-http.xml b/solr/server/etc/jetty-http.xml
index 9c7b646..a38208b 100644
--- a/solr/server/etc/jetty-http.xml
+++ b/solr/server/etc/jetty-http.xml
@@ -36,7 +36,7 @@
<Arg name="config"><Ref refid="httpConfig" /></Arg>
<Set name="maxConcurrentStreams">1024</Set>
<Set name="inputBufferSize">8192</Set>
- <Set name="streamIdleTimeout"><Property name="solr.jetty.http.streamIdleTimeout" default="600000"/></Set>
+ <Set name="streamIdleTimeout"><Property name="solr.jetty.http.streamIdleTimeout" default="240000"/></Set>
<Set name="rateControlFactory">
<New class="org.eclipse.jetty.http2.parser.WindowRateControl$Factory">
<Arg type="int"><Property name="jetty.http2.rateControl.maxEventsPerSecond" default="5000"/></Arg>
@@ -48,7 +48,8 @@
</Arg>
<Set name="host"><Property name="solr.jetty.host" default="127.0.0.1"/></Set>
<Set name="port"><Property name="jetty.port" default="8983" /></Set>
- <Set name="idleTimeout"><Property name="solr.jetty.http.idleTimeout" default="600000"/></Set>
+ <Set name="reuseAddress">true</Set>
+ <Set name="idleTimeout"><Property name="solr.jetty.http.idleTimeout" default="240000"/></Set>
<Set name="acceptorPriorityDelta"><Property name="solr.jetty.http.acceptorPriorityDelta" default="0"/></Set>
<Set name="acceptQueueSize"><Property name="solr.jetty.http.acceptQueueSize" default="4096"/></Set>
<Call name="addLifeCycleListener">
diff --git a/solr/server/etc/jetty-https.xml b/solr/server/etc/jetty-https.xml
index 803bb23..331cb3d 100644
--- a/solr/server/etc/jetty-https.xml
+++ b/solr/server/etc/jetty-https.xml
@@ -56,10 +56,10 @@
<Arg name="config"><Ref refid="sslHttpConfig"/></Arg>
<Set name="maxConcurrentStreams">1024</Set>
<Set name="inputBufferSize">8192</Set>
- <Set name="streamIdleTimeout"><Property name="solr.jetty.http.streamIdleTimeout" default="600000"/></Set>
+ <Set name="streamIdleTimeout"><Property name="solr.jetty.http.streamIdleTimeout" default="240000"/></Set>
<Set name="rateControlFactory">
<New class="org.eclipse.jetty.http2.parser.WindowRateControl$Factory">
- <Arg type="int"><Property name="jetty.http2.rateControl.maxEventsPerSecond" default="1000"/></Arg>
+ <Arg type="int"><Property name="jetty.http2.rateControl.maxEventsPerSecond" default="5000"/></Arg>
</New>
</Set>
</New>
@@ -73,7 +73,8 @@
</Arg>
<Set name="host"><Property name="solr.jetty.host" default="127.0.0.1"/></Set>
<Set name="port"><Property name="solr.jetty.https.port" default="8983" /></Set>
- <Set name="idleTimeout"><Property name="solr.jetty.https.timeout" default="600000"/></Set>
+ <Set name="reuseAddress">true</Set>
+ <Set name="idleTimeout"><Property name="solr.jetty.https.timeout" default="240000"/></Set>
<Set name="acceptorPriorityDelta"><Property name="solr.jetty.ssl.acceptorPriorityDelta" default="0"/></Set>
<Set name="acceptQueueSize"><Property name="solr.jetty.https.acceptQueueSize" default="4096"/></Set>
<Call name="addLifeCycleListener">
diff --git a/solr/server/modules/quickstart.mod b/solr/server/modules/quickstart.mod
new file mode 100644
index 0000000..b0a2390
--- /dev/null
+++ b/solr/server/modules/quickstart.mod
@@ -0,0 +1,9 @@
+#
+# Jetty Quickstart module
+#
+
+[depend]
+server
+
+[lib]
+lib/jetty-quickstart-${jetty.version}.jar
\ No newline at end of file
diff --git a/solr/server/solr/configsets/_default/conf/solrconfig.xml b/solr/server/solr/configsets/_default/conf/solrconfig.xml
index e1855f8..3ad2282 100644
--- a/solr/server/solr/configsets/_default/conf/solrconfig.xml
+++ b/solr/server/solr/configsets/_default/conf/solrconfig.xml
@@ -204,7 +204,7 @@
More details on the nuances of each LockFactory...
http://wiki.apache.org/lucene-java/AvailableLockFactories
-->
- <lockType>${solr.lock.type:native}</lockType>
+ <lockType>${solr.lock.type:none}</lockType>
<!-- Commit Deletion Policy
Custom deletion policies can be specified here. The class must
diff --git a/solr/server/solr/solr.xml b/solr/server/solr/solr.xml
index 7fce0e8..50e1dc4 100644
--- a/solr/server/solr/solr.xml
+++ b/solr/server/solr/solr.xml
@@ -39,9 +39,9 @@
<bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool>
- <int name="zkClientTimeout">${zkClientTimeout:30000}</int>
- <int name="distribUpdateSoTimeout">${distribUpdateSoTimeout:600000}</int>
- <int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:60000}</int>
+ <int name="zkClientTimeout">${zkClientTimeout:45000}</int>
+ <int name="distribUpdateSoTimeout">${distribUpdateSoTimeout:60000}</int>
+ <int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:5000}</int>
<str name="zkCredentialsProvider">${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider}</str>
<str name="zkACLProvider">${zkACLProvider:org.apache.solr.common.cloud.DefaultZkACLProvider}</str>
@@ -49,8 +49,8 @@
<shardHandlerFactory name="shardHandlerFactory"
class="HttpShardHandlerFactory">
- <int name="socketTimeout">${socketTimeout:600000}</int>
- <int name="connTimeout">${connTimeout:60000}</int>
+ <int name="socketTimeout">${socketTimeout:60000}</int>
+ <int name="connTimeout">${connTimeout:5000}</int>
<str name="shardsWhitelist">${solr.shardsWhitelist:}</str>
</shardHandlerFactory>
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 425a9bd..c9cbd02 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -217,7 +217,7 @@ public class Http2SolrClient extends SolrClient {
ssl = true;
}
// nocommit - look at config again as well
- int minThreads = Integer.getInteger("solr.minHttp2ClientThreads", 12);
+ int minThreads = Integer.getInteger("solr.minHttp2ClientThreads", 6);
httpClientExecutor = new SolrQueuedThreadPool("http2Client", builder.maxThreadPoolSize, minThreads,
this.headers != null && this.headers.containsKey(QoSParams.REQUEST_SOURCE) && this.headers.get(QoSParams.REQUEST_SOURCE).equals(QoSParams.INTERNAL) ? 3000 : 5000,
null, -1, null);
@@ -434,9 +434,15 @@ public class Http2SolrClient extends SolrClient {
private static final Cancellable FAILED_MAKING_REQUEST_CANCELLABLE = () -> {};
public Cancellable asyncRequest(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection, AsyncListener<NamedList<Object>> asyncListener) {
+ Integer idleTimeout = solrRequest.getParams().getInt("idleTimeout");
+
+
Request req;
try {
req = makeRequest(solrRequest, collection);
+ if (idleTimeout != null) {
+ req.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
+ }
} catch (Exception e) {
asyncListener.onFailure(e);
return FAILED_MAKING_REQUEST_CANCELLABLE;
@@ -1094,12 +1100,12 @@ public class Http2SolrClient extends SolrClient {
public static class Builder {
public int maxThreadPoolSize = Integer.getInteger("solr.maxHttp2ClientThreads", 512);
- public int maxRequestsQueuedPerDestination = 512;
+ public int maxRequestsQueuedPerDestination = 1600;
private Http2SolrClient http2SolrClient;
private SSLConfig sslConfig = defaultSSLConfig;
private Integer idleTimeout = Integer.getInteger("solr.http2solrclient.default.idletimeout", 120000);
private Integer connectionTimeout;
- private Integer maxConnectionsPerHost = 6;
+ private Integer maxConnectionsPerHost = 16;
private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
protected String baseSolrUrl;
protected Map<String,String> headers = new ConcurrentHashMap<>();
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
index 420ae52..9129eca 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
@@ -80,7 +80,7 @@ public class HttpClientUtil {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final int DEFAULT_CONNECT_TIMEOUT = 5000;
- public static final int DEFAULT_SO_TIMEOUT = (int) TimeUnit.MINUTES.toMillis(5);
+ public static final int DEFAULT_SO_TIMEOUT = (int) TimeUnit.MINUTES.toMillis(1);
public static final int DEFAULT_MAXCONNECTIONSPERHOST = 100000;
public static final int DEFAULT_MAXCONNECTIONS = 100000;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
index c746af5..cc3ccd6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
@@ -165,6 +165,7 @@ public class LBHttp2SolrClient extends LBSolrClient {
boolean isZombie, RetryListener listener) {
rsp.server = baseUrl;
req.getRequest().setBasePath(baseUrl);
+
return ((Http2SolrClient)getClient(baseUrl)).asyncRequest(req.getRequest(), null, new AsyncListener<>() {
@Override
public void onSuccess(NamedList<Object> result) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 8826ead..964ffa9 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -79,7 +79,7 @@ public class ParWork implements Closeable {
synchronized (ParWork.class) {
if (EXEC == null) {
EXEC = (ParWorkExecutor) getParExecutorService("RootExec",
- Integer.getInteger("solr.rootSharedThreadPoolCoreSize", 250), Integer.MAX_VALUE, 5000,
+ Integer.getInteger("solr.rootSharedThreadPoolCoreSize", 15), Integer.MAX_VALUE, 3000,
new SynchronousQueue());
((ParWorkExecutor)EXEC).enableCloseLock();
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 272a996..c0a554b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -243,17 +243,14 @@ public class ConnectionManager implements Watcher, Closeable {
log.warn("Our previous ZooKeeper session was expired. Attempting to reconnect to recover relationship with ZooKeeper...");
client.zkConnManagerCallbackExecutor.execute(() -> {
+ disconnected();
reconnect();
});
} else if (state == KeeperState.Disconnected) {
log.info("zkClient has disconnected");
- client.zkConnManagerCallbackExecutor.execute(() -> {
- disconnected();
- });
- } else if (state == KeeperState.Closed) {
- log.info("zkClient state == closed");
- //disconnected();
- //connectionStrategy.disconnected();
+// client.zkConnManagerCallbackExecutor.execute(() -> {
+//
+// });
} else if (state == KeeperState.AuthFailed) {
log.warn("zkClient received AuthFailed");
}
@@ -302,7 +299,7 @@ public class ConnectionManager implements Watcher, Closeable {
try {
updatezk();
try {
- waitForConnected(5000);
+ waitForConnected(30000);
if (onReconnect != null) {
try {
onReconnect.command();
@@ -346,7 +343,8 @@ public class ConnectionManager implements Watcher, Closeable {
}
public boolean isConnected() {
- return connected;
+ SolrZooKeeper fkeeper = keeper;
+ return fkeeper != null & fkeeper.getState().isConnected();
}
public void close() {
@@ -355,9 +353,10 @@ public class ConnectionManager implements Watcher, Closeable {
client.zkCallbackExecutor.shutdown();
client.zkConnManagerCallbackExecutor.shutdown();
- if (keeper != null) {
- keeper.register(new NullWatcher());
- keeper.close();
+ SolrZooKeeper fkeeper = keeper;
+ if (fkeeper != null) {
+ fkeeper.register(new NullWatcher());
+ fkeeper.close();
}
ExecutorUtil.awaitTermination(client.zkCallbackExecutor);
@@ -373,14 +372,20 @@ public class ConnectionManager implements Watcher, Closeable {
public void waitForConnected(long waitForConnection)
throws TimeoutException, InterruptedException {
if (log.isDebugEnabled()) log.debug("Waiting for client to connect to ZooKeeper");
- if (isConnected()) return;
+ SolrZooKeeper fkeeper = keeper;
+ if (fkeeper != null && fkeeper.getState().isConnected()) return;
TimeOut timeout = new TimeOut(waitForConnection, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
while (!timeout.hasTimedOut() && !isClosed()) {
- if (isConnected()) return;
+ fkeeper = keeper;
+ if (fkeeper != null && fkeeper.getState().isConnected()) return;
boolean success = connectedLatch.await(50, TimeUnit.MILLISECONDS);
- if (success || isConnected()) return;
+ if (success) return;
+ fkeeper = keeper;
+ if (fkeeper != null && fkeeper.getState().isConnected()) return;
+ }
+ if (isClosed()) {
+ throw new AlreadyClosedException();
}
-
if (timeout.hasTimedOut()) {
throw new TimeoutException("Timeout waiting to connect to ZooKeeper "
+ zkServerAddress + " " + waitForConnection + "ms");
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 67fff8d..057915a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -157,7 +157,7 @@ public class SolrZkClient implements Closeable {
this.zkACLProvider = zkACLProvider;
}
- zkCmdExecutor = new ZkCmdExecutor(this,5, new IsClosed() {
+ zkCmdExecutor = new ZkCmdExecutor(this,30, new IsClosed() {
@Override
public boolean isClosed() {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
index 99e2d9d..bcfb3a9 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
@@ -28,7 +28,7 @@ public class ZkCmdExecutor {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SolrZkClient solrZkClient;
- private long retryDelay = 50L;
+ private long retryDelay = 500L;
private int retryCount;
private IsClosed isClosed;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java
index 9634ce9..a96c865 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java
@@ -244,7 +244,7 @@ public class ZkMaintenanceUtils {
}
}
} catch (KeeperException.NoNodeException r) {
- return;
+
}
});
}
@@ -260,7 +260,7 @@ public class ZkMaintenanceUtils {
}
}
} catch (KeeperException.NoNodeException r) {
- return;
+
}
});
}
@@ -283,11 +283,7 @@ public class ZkMaintenanceUtils {
for (String subpath : paths) {
if (!subpath.equals("/")) {
- try {
- zkClient.delete(subpath, -1);
- } catch (KeeperException.NotEmptyException | KeeperException.NoNodeException e) {
- // expected
- }
+ clean(zkClient, path);
}
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 8eb4134..6c35c33 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -91,7 +91,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
}
public SolrQueuedThreadPool(String name) {
- this(name, Integer.MAX_VALUE, Integer.getInteger("solr.minContainerThreads", 250), Integer.getInteger("solr.containerThreadsIdleTimeout", 5000), -1, null, -1, null,
+ this(name, Integer.MAX_VALUE, Integer.getInteger("solr.minContainerThreads", 20), Integer.getInteger("solr.containerThreadsIdleTimeout", 5000), -1, null, -1, null,
new SolrNamedThreadFactory(name));
}
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index 4e2a831..ae5b79c 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -883,7 +883,7 @@ public class SolrTestCase extends LuceneTestCase {
if (testExecutor != null) {
return testExecutor;
}
- testExecutor = (ParWorkExecutor) ParWork.getParExecutorService("testExecutor", 10, 100, 500, new BlockingArrayQueue(30, 16));
+ testExecutor = (ParWorkExecutor) ParWork.getParExecutorService("testExecutor", 3, 100, 500, new BlockingArrayQueue(30, 16));
((ParWorkExecutor) testExecutor).enableCloseLock();
return testExecutor;
}
diff --git a/solr/webapp/web/WEB-INF/quickstart-web.xml b/solr/webapp/web/WEB-INF/quickstart-web.xml
deleted file mode 100644
index e69de29..0000000