You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2018/11/29 18:19:13 UTC
[04/16] lucene-solr:master: SOLR-12801: Make massive improvements to
the tests.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/SocketProxy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/SocketProxy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/SocketProxy.java
new file mode 100644
index 0000000..e4487cf
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/SocketProxy.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.cloud;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kindly borrowed the idea and base implementation from the ActiveMQ project;
+ * useful for blocking traffic on a specified port.
+ */
+public class SocketProxy {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final int ACCEPT_TIMEOUT_MILLIS = 100;
+
+ // should be as large as the HttpShardHandlerFactory socket timeout ... or larger?
+ public static final int PUMP_SOCKET_TIMEOUT_MS = 100 * 1000;
+
+ private URI proxyUrl;
+ private URI target;
+
+ private Acceptor acceptor;
+ private ServerSocket serverSocket;
+
+ private CountDownLatch closed = new CountDownLatch(1);
+
+ public List<Bridge> connections = new LinkedList<Bridge>();
+
+ private final int listenPort;
+
+ private int receiveBufferSize = -1;
+
+ private boolean pauseAtStart = false;
+
+ private int acceptBacklog = 50;
+
+ private boolean usesSSL;
+
+ public SocketProxy() throws Exception {
+ this(0, false);
+ }
+
+ public SocketProxy( boolean useSSL) throws Exception {
+ this(0, useSSL);
+ }
+
+ public SocketProxy(int port, boolean useSSL) throws Exception {
+ int listenPort = port;
+ this.usesSSL = useSSL;
+ serverSocket = createServerSocket(useSSL);
+ serverSocket.setReuseAddress(true);
+ if (receiveBufferSize > 0) {
+ serverSocket.setReceiveBufferSize(receiveBufferSize);
+ }
+ serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
+ this.listenPort = serverSocket.getLocalPort();
+ }
+
+ public void open(URI uri) throws Exception {
+ target = uri;
+ proxyUrl = urlFromSocket(target, serverSocket);
+ doOpen();
+ }
+
+ public String toString() {
+ return "SocketyProxy: port="+listenPort+"; target="+target;
+ }
+
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public void setTarget(URI tcpBrokerUri) {
+ target = tcpBrokerUri;
+ }
+
+ private void doOpen() throws Exception {
+
+ acceptor = new Acceptor(serverSocket, target);
+ if (pauseAtStart) {
+ acceptor.pause();
+ }
+ new Thread(null, acceptor, "SocketProxy-Acceptor-"
+ + serverSocket.getLocalPort()).start();
+ closed = new CountDownLatch(1);
+ }
+
+ public int getListenPort() {
+ return listenPort;
+ }
+
+ private ServerSocket createServerSocket(boolean useSSL) throws Exception {
+ if (useSSL) {
+ return SSLServerSocketFactory.getDefault().createServerSocket();
+ }
+ return new ServerSocket();
+ }
+
+ private Socket createSocket(boolean useSSL) throws Exception {
+ if (useSSL) {
+ return SSLSocketFactory.getDefault().createSocket();
+ }
+ return new Socket();
+ }
+
+ public URI getUrl() {
+ return proxyUrl;
+ }
+
+ /*
+ * close all proxy connections and acceptor
+ */
+ public void close() {
+ List<Bridge> connections;
+ synchronized (this.connections) {
+ connections = new ArrayList<Bridge>(this.connections);
+ }
+ log.warn("Closing " + connections.size()+" connections to: "+getUrl()+", target: "+target);
+ for (Bridge con : connections) {
+ closeConnection(con);
+ }
+ acceptor.close();
+ closed.countDown();
+ }
+
+ /*
+ * close all proxy receive connections, leaving acceptor open
+ */
+ public void halfClose() {
+ List<Bridge> connections;
+ synchronized (this.connections) {
+ connections = new ArrayList<Bridge>(this.connections);
+ }
+ log.info("halfClose, numConnections=" + connections.size());
+ for (Bridge con : connections) {
+ halfCloseConnection(con);
+ }
+ }
+
+ public boolean waitUntilClosed(long timeoutSeconds)
+ throws InterruptedException {
+ return closed.await(timeoutSeconds, TimeUnit.SECONDS);
+ }
+
+ /*
+ * called after a close to restart the acceptor on the same port
+ */
+ public void reopen() {
+ log.info("Re-opening connectivity to "+getUrl());
+ try {
+ if (proxyUrl == null) {
+ throw new IllegalStateException("Can not call open before open(URI uri).");
+ }
+ serverSocket = createServerSocket(usesSSL);
+ serverSocket.setReuseAddress(true);
+ if (receiveBufferSize > 0) {
+ serverSocket.setReceiveBufferSize(receiveBufferSize);
+ }
+ serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
+ doOpen();
+ } catch (Exception e) {
+ log.debug("exception on reopen url:" + getUrl(), e);
+ }
+ }
+
+ /*
+ * pause accepting new connections and data transfer through existing proxy
+ * connections. All sockets remain open
+ */
+ public void pause() {
+ synchronized (connections) {
+ log.info("pause, numConnections=" + connections.size());
+ acceptor.pause();
+ for (Bridge con : connections) {
+ con.pause();
+ }
+ }
+ }
+
+ /*
+ * continue after pause
+ */
+ public void goOn() {
+ synchronized (connections) {
+ log.info("goOn, numConnections=" + connections.size());
+ for (Bridge con : connections) {
+ con.goOn();
+ }
+ }
+ acceptor.goOn();
+ }
+
+ private void closeConnection(Bridge c) {
+ try {
+ c.close();
+ } catch (Exception e) {
+ log.debug("exception on close of: " + c, e);
+ }
+ }
+
+ private void halfCloseConnection(Bridge c) {
+ try {
+ c.halfClose();
+ } catch (Exception e) {
+ log.debug("exception on half close of: " + c, e);
+ }
+ }
+
+ public boolean isPauseAtStart() {
+ return pauseAtStart;
+ }
+
+ public void setPauseAtStart(boolean pauseAtStart) {
+ this.pauseAtStart = pauseAtStart;
+ }
+
+ public int getAcceptBacklog() {
+ return acceptBacklog;
+ }
+
+ public void setAcceptBacklog(int acceptBacklog) {
+ this.acceptBacklog = acceptBacklog;
+ }
+
+ private URI urlFromSocket(URI uri, ServerSocket serverSocket)
+ throws Exception {
+ int listenPort = serverSocket.getLocalPort();
+
+ return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(),
+ listenPort, uri.getPath(), uri.getQuery(), uri.getFragment());
+ }
+
+ public class Bridge {
+
+ private Socket receiveSocket;
+ private Socket sendSocket;
+ private Pump requestThread;
+ private Pump responseThread;
+
+ public Bridge(Socket socket, URI target) throws Exception {
+ receiveSocket = socket;
+ sendSocket = createSocket(usesSSL);
+ if (receiveBufferSize > 0) {
+ sendSocket.setReceiveBufferSize(receiveBufferSize);
+ }
+ sendSocket.connect(new InetSocketAddress(target.getHost(), target
+ .getPort()));
+ linkWithThreads(receiveSocket, sendSocket);
+ log.info("proxy connection " + sendSocket + ", receiveBufferSize="
+ + sendSocket.getReceiveBufferSize());
+ }
+
+ public void goOn() {
+ responseThread.goOn();
+ requestThread.goOn();
+ }
+
+ public void pause() {
+ requestThread.pause();
+ responseThread.pause();
+ }
+
+ public void close() throws Exception {
+ synchronized (connections) {
+ connections.remove(this);
+ }
+ receiveSocket.close();
+ sendSocket.close();
+ }
+
+ public void halfClose() throws Exception {
+ receiveSocket.close();
+ }
+
+ private void linkWithThreads(Socket source, Socket dest) {
+ requestThread = new Pump("Request", source, dest);
+ requestThread.start();
+ responseThread = new Pump("Response", dest, source);
+ responseThread.start();
+ }
+
+ public class Pump extends Thread {
+
+ protected Socket src;
+ private Socket destination;
+ private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+
+ public Pump(String kind, Socket source, Socket dest) {
+ super("SocketProxy-"+kind+"-" + source.getPort() + ":"
+ + dest.getPort());
+ src = source;
+ destination = dest;
+ pause.set(new CountDownLatch(0));
+ }
+
+ public void pause() {
+ pause.set(new CountDownLatch(1));
+ }
+
+ public void goOn() {
+ pause.get().countDown();
+ }
+
+ public void run() {
+ byte[] buf = new byte[1024];
+
+ try {
+ src.setSoTimeout(PUMP_SOCKET_TIMEOUT_MS);
+ } catch (SocketException e) {
+ if (e.getMessage().equals("Socket is closed")) {
+ log.warn("Failed to set socket timeout on "+src+" due to: "+e);
+ return;
+ }
+ log.error("Failed to set socket timeout on "+src+" due to: "+e);
+ throw new RuntimeException(e);
+ }
+
+ InputStream in = null;
+ OutputStream out = null;
+ try {
+ in = src.getInputStream();
+ out = destination.getOutputStream();
+ while (true) {
+ int len = -1;
+ try {
+ len = in.read(buf);
+ } catch (SocketTimeoutException ste) {
+ log.warn(ste+" when reading from "+src);
+ }
+
+ if (len == -1) {
+ log.debug("read eof from:" + src);
+ break;
+ }
+ pause.get().await();
+ if (len > 0)
+ out.write(buf, 0, len);
+ }
+ } catch (Exception e) {
+ log.debug("read/write failed, reason: " + e.getLocalizedMessage());
+ try {
+ if (!receiveSocket.isClosed()) {
+ // for halfClose, on read/write failure if we close the
+ // remote end will see a close at the same time.
+ close();
+ }
+ } catch (Exception ignore) {}
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (Exception exc) {
+ log.debug(exc+" when closing InputStream on socket: "+src);
+ }
+ }
+ if (out != null) {
+ try {
+ out.close();
+ } catch (Exception exc) {
+ log.debug(exc+" when closing OutputStream on socket: "+destination);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public class Acceptor implements Runnable {
+
+ private ServerSocket socket;
+ private URI target;
+ private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+
+ public Acceptor(ServerSocket serverSocket, URI uri) {
+ socket = serverSocket;
+ target = uri;
+ pause.set(new CountDownLatch(0));
+ try {
+ socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS);
+ } catch (SocketException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void pause() {
+ pause.set(new CountDownLatch(1));
+ }
+
+ public void goOn() {
+ pause.get().countDown();
+ }
+
+ public void run() {
+ try {
+ while (!socket.isClosed()) {
+ pause.get().await();
+ try {
+ Socket source = socket.accept();
+ pause.get().await();
+ if (receiveBufferSize > 0) {
+ source.setReceiveBufferSize(receiveBufferSize);
+ }
+ log.info("accepted " + source + ", receiveBufferSize:"
+ + source.getReceiveBufferSize());
+ synchronized (connections) {
+ connections.add(new Bridge(source, target));
+ }
+ } catch (SocketTimeoutException expected) {}
+ }
+ } catch (Exception e) {
+ log.debug("acceptor: finished for reason: " + e.getLocalizedMessage());
+ }
+ }
+
+ public void close() {
+ try {
+ socket.close();
+ closed.countDown();
+ goOn();
+ } catch (IOException ignored) {}
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 193555a..37cdba7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -112,7 +112,7 @@ public class CloudSolrClient extends SolrClient {
private HttpClient myClient;
private final boolean clientIsInternal;
//no of times collection state to be reloaded if stale state error is received
- private static final int MAX_STALE_RETRIES = 5;
+ private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "5"));
Random rand = new Random();
private final boolean updatesToLeaders;
@@ -212,9 +212,9 @@ public class CloudSolrClient extends SolrClient {
final DocCollection cached;
final long cachedAt;
//This is the time at which the collection is retried and got the same old version
- long retriedAt = -1;
+ volatile long retriedAt = -1;
//flag that suggests that this is potentially to be rechecked
- boolean maybeStale = false;
+ volatile boolean maybeStale = false;
ExpiringCachedDocCollection(DocCollection cached) {
this.cached = cached;
@@ -916,17 +916,17 @@ public class CloudSolrClient extends SolrClient {
int errorCode = (rootCause instanceof SolrException) ?
((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
- log.error("Request to collection {} failed due to (" + errorCode + ") {}, retry? " + retryCount,
- inputCollections, rootCause.toString());
-
- boolean wasCommError =
- (rootCause instanceof ConnectException ||
- rootCause instanceof ConnectTimeoutException ||
- rootCause instanceof NoHttpResponseException ||
- rootCause instanceof SocketException);
+ boolean wasCommError =
+ (rootCause instanceof ConnectException ||
+ rootCause instanceof ConnectTimeoutException ||
+ rootCause instanceof NoHttpResponseException ||
+ rootCause instanceof SocketException);
+
+ log.error("Request to collection {} failed due to (" + errorCode + ") {}, retry={} commError={} errorCode={} ",
+ inputCollections, rootCause.toString(), retryCount, wasCommError, errorCode);
if (wasCommError
- || (exc instanceof RouteException && (errorCode == 404 || errorCode == 503)) // 404 because the core does not exist 503 service unavailable
+ || (exc instanceof RouteException && (errorCode == 503)) // 404 because the core does not exist 503 service unavailable
//TODO there are other reasons for 404. We need to change the solr response format from HTML to structured data to know that
) {
// it was a communication error. it is likely that
@@ -946,15 +946,18 @@ public class CloudSolrClient extends SolrClient {
// and we could not get any information from the server
//it is probably not worth trying again and again because
// the state would not have been updated
+ log.info("trying request again");
return requestWithRetryOnStaleState(request, retryCount + 1, inputCollections);
}
+ } else {
+ log.info("request was not communication error it seems");
}
boolean stateWasStale = false;
if (retryCount < MAX_STALE_RETRIES &&
requestedCollections != null &&
!requestedCollections.isEmpty() &&
- SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE)
+ (SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE || errorCode == 404))
{
// cached state for one or more external collections was stale
// re-issue request using updated state
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
index c97ef94..d415f21 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
@@ -347,7 +347,7 @@ public class HttpClientUtil {
HttpClientBuilder retBuilder = builder.setDefaultRequestConfig(requestConfig);
if (config.getBool(HttpClientUtil.PROP_USE_RETRY, true)) {
- retBuilder = retBuilder.setRetryHandler(new SolrHttpRequestRetryHandler(3));
+ retBuilder = retBuilder.setRetryHandler(new SolrHttpRequestRetryHandler(Integer.getInteger("solr.httpclient.retries", 3)));
} else {
retBuilder = retBuilder.setRetryHandler(NO_RETRY);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
index 6c2737d..b0322a7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
@@ -51,6 +51,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.slf4j.MDC;
@@ -115,11 +116,11 @@ public class LBHttpSolrClient extends SolrClient {
private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
- private ScheduledExecutorService aliveCheckExecutor;
+ private volatile ScheduledExecutorService aliveCheckExecutor;
private final HttpClient httpClient;
private final boolean clientIsInternal;
- private HttpSolrClient.Builder httpSolrClientBuilder;
+ private final HttpSolrClient.Builder httpSolrClientBuilder;
private final AtomicInteger counter = new AtomicInteger(-1);
private static final SolrQuery solrQuery = new SolrQuery("*:*");
@@ -129,7 +130,7 @@ public class LBHttpSolrClient extends SolrClient {
private Set<String> queryParams = new HashSet<>();
private Integer connectionTimeout;
- private Integer soTimeout;
+ private volatile Integer soTimeout;
static {
solrQuery.setRows(0);
@@ -612,9 +613,13 @@ public class LBHttpSolrClient extends SolrClient {
@Override
public void close() {
- if (aliveCheckExecutor != null) {
- aliveCheckExecutor.shutdownNow();
+ synchronized (this) {
+ if (aliveCheckExecutor != null) {
+ aliveCheckExecutor.shutdownNow();
+ ExecutorUtil.shutdownAndAwaitTermination(aliveCheckExecutor);
+ }
}
+
if(clientIsInternal) {
HttpClientUtil.close(httpClient);
}
@@ -863,16 +868,6 @@ public class LBHttpSolrClient extends SolrClient {
public RequestWriter getRequestWriter() {
return requestWriter;
}
-
- @Override
- protected void finalize() throws Throwable {
- try {
- if(this.aliveCheckExecutor!=null)
- this.aliveCheckExecutor.shutdownNow();
- } finally {
- super.finalize();
- }
- }
// defaults
private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
index 74e981d..ce44a18 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
@@ -24,8 +24,8 @@ public abstract class SolrClientBuilder<B extends SolrClientBuilder<B>> {
protected HttpClient httpClient;
protected ResponseParser responseParser;
- protected Integer connectionTimeoutMillis;
- protected Integer socketTimeoutMillis;
+ protected Integer connectionTimeoutMillis = 15000;
+ protected Integer socketTimeoutMillis = 120000;
/** The solution for the unchecked cast warning. */
public abstract B getThis();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
index 8a4b35c..e057c3e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
@@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -31,6 +32,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.http.NoHttpResponseException;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
@@ -42,6 +44,7 @@ import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.response.SimpleSolrResponse;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
@@ -192,9 +195,36 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("key", metricsKeyVsTag.keySet().toArray(new String[0]));
try {
- SimpleSolrResponse rsp = ctx.invoke(solrNode, CommonParams.METRICS_PATH, params);
+
+ SimpleSolrResponse rsp = null;
+ int cnt = 0;
+ while (cnt++ < 3) {
+ try {
+ rsp = ctx.invoke(solrNode, CommonParams.METRICS_PATH, params);
+ } catch (SolrException | SolrServerException | NoHttpResponseException e) {
+ boolean hasCauseNoHttpResponseException = false;
+ Throwable cause = e;
+ while (cause != null) {
+ if (cause instanceof NoHttpResponseException) {
+ hasCauseNoHttpResponseException = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+ if (hasCauseNoHttpResponseException || e instanceof NoHttpResponseException) {
+ log.info("Error on getting remote info, trying again: " + e.getMessage());
+ Thread.sleep(500);
+ continue;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+
+ SimpleSolrResponse frsp = rsp;
metricsKeyVsTag.forEach((key, tag) -> {
- Object v = Utils.getObjectByPath(rsp.nl, true, Arrays.asList("metrics", key));
+ Object v = Utils.getObjectByPath(frsp.nl, true, Arrays.asList("metrics", key));
if (tag instanceof Function) {
Pair<String, Object> p = (Pair<String, Object>) ((Function) tag).apply(v);
ctx.getTags().put(p.first(), p.second());
@@ -271,7 +301,36 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
params.add("prefix", StrUtils.join(prefixes, ','));
try {
- SimpleSolrResponse rsp = snitchContext.invoke(solrNode, CommonParams.METRICS_PATH, params);
+ SimpleSolrResponse rsp = null;
+ int retries = 5;
+ int cnt = 0;
+ while (cnt++ < retries) {
+ try {
+ rsp = snitchContext.invoke(solrNode, CommonParams.METRICS_PATH, params);
+ } catch (SolrException | SolrServerException | SocketException e) {
+ boolean hasCauseSocketException = false;
+ Throwable cause = e;
+ while (cause != null) {
+ if (cause instanceof SocketException) {
+ hasCauseSocketException = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+ if (hasCauseSocketException || e instanceof SocketException) {
+ log.info("Error on getting remote info, trying again: " + e.getMessage());
+ Thread.sleep(500);
+ continue;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ if (cnt == retries) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not get remote info after many retries on NoHttpResponseException");
+ }
+
Map m = rsp.nl.asMap(4);
if (requestedTags.contains(FREEDISK.tagName)) {
Object n = Utils.getObjectByPath(m, true, "metrics/solr.node/CONTAINER.fs.usableSpace");
@@ -298,7 +357,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
if (n != null) ctx.getTags().put(HEAPUSAGE, n.doubleValue() * 100.0d);
}
} catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error getting remote info", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
index 968e514..53ff466 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -39,11 +40,14 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- ZkStateReader zkStateReader;
+ volatile ZkStateReader zkStateReader;
private boolean closeZkStateReader = true;
String zkHost;
- int zkConnectTimeout = 10000;
- int zkClientTimeout = 10000;
+ int zkConnectTimeout = 15000;
+ int zkClientTimeout = 45000;
+
+
+ private volatile boolean isClosed = false;
public ZkClientClusterStateProvider(ZkStateReader zkStateReader) {
this.zkStateReader = zkStateReader;
@@ -73,6 +77,7 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
@Override
public Set<String> getLiveNodes() {
+ if (isClosed) throw new AlreadyClosedException();
ClusterState clusterState = zkStateReader.getClusterState();
if (clusterState != null) {
return clusterState.getLiveNodes();
@@ -175,6 +180,7 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
@Override
public void close() throws IOException {
+ isClosed = true;
if (zkStateReader != null && closeZkStateReader) {
synchronized (this) {
if (zkStateReader != null)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
index 613ba25..77bd84c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
@@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.AutoScalingParams;
@@ -57,7 +58,8 @@ public class ZkDistribStateManager implements DistribStateManager {
try {
return zkClient.exists(path, true);
} catch (InterruptedException e) {
- throw e;
+ Thread.currentThread().interrupt();
+ throw new AlreadyClosedException();
}
}
@@ -68,7 +70,8 @@ public class ZkDistribStateManager implements DistribStateManager {
} catch (KeeperException.NoNodeException e) {
throw new NoSuchElementException(path);
} catch (InterruptedException e) {
- throw e;
+ Thread.currentThread().interrupt();
+ throw new AlreadyClosedException();
}
}
@@ -86,7 +89,8 @@ public class ZkDistribStateManager implements DistribStateManager {
} catch (KeeperException.NoNodeException e) {
throw new NoSuchElementException(path);
} catch (InterruptedException e) {
- throw e;
+ Thread.currentThread().interrupt();
+ throw new AlreadyClosedException();
}
}
@@ -97,7 +101,8 @@ public class ZkDistribStateManager implements DistribStateManager {
} catch (KeeperException.NodeExistsException e) {
throw new AlreadyExistsException(path);
} catch (InterruptedException e) {
- throw e;
+ Thread.currentThread().interrupt();
+ throw new AlreadyClosedException();
}
}
@@ -108,7 +113,8 @@ public class ZkDistribStateManager implements DistribStateManager {
} catch (KeeperException.NodeExistsException e) {
throw new AlreadyExistsException(path);
} catch (InterruptedException e) {
- throw e;
+ Thread.currentThread().interrupt();
+ throw new AlreadyClosedException();
}
}
@@ -121,7 +127,8 @@ public class ZkDistribStateManager implements DistribStateManager {
} catch (KeeperException.NodeExistsException e) {
throw new AlreadyExistsException(path);
} catch (InterruptedException e) {
- throw e;
+ Thread.currentThread().interrupt();
+ throw new AlreadyClosedException();
}
}
@@ -136,7 +143,8 @@ public class ZkDistribStateManager implements DistribStateManager {
} catch (KeeperException.BadVersionException e) {
throw new BadVersionException(version, path);
} catch (InterruptedException e) {
- throw e;
+ Thread.currentThread().interrupt();
+ throw new AlreadyClosedException();
}
}
@@ -149,7 +157,8 @@ public class ZkDistribStateManager implements DistribStateManager {
} catch (KeeperException.BadVersionException e) {
throw new BadVersionException(version, path);
} catch (InterruptedException e) {
- throw e;
+ Thread.currentThread().interrupt();
+ throw new AlreadyClosedException();
}
}
@@ -164,7 +173,8 @@ public class ZkDistribStateManager implements DistribStateManager {
} catch (KeeperException.BadVersionException e) {
throw new BadVersionException(-1, ops.toString());
} catch (InterruptedException e) {
- throw e;
+ Thread.currentThread().interrupt();
+ throw new AlreadyClosedException();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
index a45c5de..a813f30 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
@@ -59,7 +59,7 @@ public class SolrClientCache implements Serializable {
} else {
final List<String> hosts = new ArrayList<String>();
hosts.add(zkHost);
- CloudSolrClient.Builder builder = new CloudSolrClient.Builder(hosts, Optional.empty());
+ CloudSolrClient.Builder builder = new CloudSolrClient.Builder(hosts, Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000);
if (httpClient != null) {
builder = builder.withHttpClient(httpClient);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index 126df81..ee4cb5d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -484,7 +484,7 @@ public class FacetStream extends TupleStream implements Expressible {
} else {
final List<String> hosts = new ArrayList<>();
hosts.add(zkHost);
- cloudSolrClient = new Builder(hosts, Optional.empty()).build();
+ cloudSolrClient = new Builder(hosts, Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000).build();
}
FieldComparator[] adjustedSorts = adjustSorts(buckets, bucketSorts);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
index 01aa047..052fc30 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
@@ -178,7 +178,7 @@ public class RandomStream extends TupleStream implements Expressible {
} else {
final List<String> hosts = new ArrayList<>();
hosts.add(zkHost);
- cloudSolrClient = new CloudSolrClient.Builder(hosts, Optional.empty()).build();
+ cloudSolrClient = new CloudSolrClient.Builder(hosts, Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000).build();
}
ModifiableSolrParams params = getParams(this.props);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/AlreadyClosedException.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/AlreadyClosedException.java b/solr/solrj/src/java/org/apache/solr/common/AlreadyClosedException.java
new file mode 100644
index 0000000..bdb5429
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/AlreadyClosedException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common;
+
+/**
+ *
+ */
+public class AlreadyClosedException extends IllegalStateException {
+
+ public AlreadyClosedException() {
+ super();
+ }
+
+ public AlreadyClosedException(String msg) {
+ super(msg);
+ }
+
+ public AlreadyClosedException(Throwable th) {
+ super(th);
+ }
+
+ public AlreadyClosedException(String msg, Throwable th) {
+ super(msg, th);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 98ddb47..3a55988 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -73,16 +73,23 @@ public class ConnectionManager implements Watcher {
|| ( stateType == StateType.TRACKING_TIME && (System.nanoTime() - lastDisconnectTime > TimeUnit.NANOSECONDS.convert(timeToExpire, TimeUnit.MILLISECONDS)));
}
}
+
+ public static abstract class IsClosed {
+ public abstract boolean isClosed();
+ }
private volatile LikelyExpiredState likelyExpiredState = LikelyExpiredState.EXPIRED;
- public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, ZkClientConnectionStrategy strat, OnReconnect onConnect, BeforeReconnect beforeReconnect) {
+ private IsClosed isClosedCheck;
+
+ public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, ZkClientConnectionStrategy strat, OnReconnect onConnect, BeforeReconnect beforeReconnect, IsClosed isClosed) {
this.name = name;
this.client = client;
this.connectionStrategy = strat;
this.zkServerAddress = zkServerAddress;
this.onReconnect = onConnect;
this.beforeReconnect = beforeReconnect;
+ this.isClosedCheck = isClosed;
}
private synchronized void connected() {
@@ -108,7 +115,7 @@ public class ConnectionManager implements Watcher {
log.debug("Watcher {} name: {} got event {} path: {} type: {}", this, name, event, event.getPath(), event.getType());
}
- if (isClosed) {
+ if (isClosed()) {
log.debug("Client->ZooKeeper status change trigger but we are already closed");
return;
}
@@ -120,6 +127,9 @@ public class ConnectionManager implements Watcher {
connected();
connectionStrategy.connected();
} else if (state == Expired) {
+ if (isClosed()) {
+ return;
+ }
// we don't call disconnected here, because we know we are expired
connected = false;
likelyExpiredState = LikelyExpiredState.EXPIRED;
@@ -177,7 +187,7 @@ public class ConnectionManager implements Watcher {
waitSleep(1000);
}
- } while (!isClosed);
+ } while (!isClosed());
log.info("zkClient Connected:" + connected);
} else if (state == KeeperState.Disconnected) {
log.warn("zkClient has disconnected");
@@ -188,8 +198,12 @@ public class ConnectionManager implements Watcher {
}
}
+ public synchronized boolean isConnectedAndNotClosed() {
+ return !isClosed() && connected;
+ }
+
public synchronized boolean isConnected() {
- return !isClosed && connected;
+ return connected;
}
// we use a volatile rather than sync
@@ -199,8 +213,12 @@ public class ConnectionManager implements Watcher {
this.likelyExpiredState = LikelyExpiredState.EXPIRED;
}
+ private boolean isClosed() {
+ return isClosed || isClosedCheck.isClosed();
+ }
+
public boolean isLikelyExpired() {
- return isClosed || likelyExpiredState.isLikelyExpired((long) (client.getZkClientTimeout() * 0.90));
+ return isClosed() || likelyExpiredState.isLikelyExpired((long) (client.getZkClientTimeout() * 0.90));
}
public synchronized void waitSleep(long waitFor) {
@@ -217,7 +235,7 @@ public class ConnectionManager implements Watcher {
long expire = System.nanoTime() + TimeUnit.NANOSECONDS.convert(waitForConnection, TimeUnit.MILLISECONDS);
long left = 1;
while (!connected && left > 0) {
- if (isClosed) {
+ if (isClosed()) {
break;
}
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
index e16ca68..2ed88e2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeoutException;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
@@ -57,6 +58,8 @@ public class DefaultConnectionStrategy extends ZkClientConnectionStrategy {
.update(zk);
success = true;
log.info("Reconnected to ZooKeeper");
+ } catch (AlreadyClosedException e) {
+
} catch (Exception e) {
SolrException.log(log, "Reconnect to ZooKeeper failed", e);
log.warn("Reconnect to ZooKeeper failed");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index adf0211..e896272 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -92,9 +92,9 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
this.nodeNameLeaderReplicas = new HashMap<>();
this.nodeNameReplicas = new HashMap<>();
this.replicationFactor = (Integer) verifyProp(props, REPLICATION_FACTOR);
- this.numNrtReplicas = (Integer) verifyProp(props, NRT_REPLICAS);
- this.numTlogReplicas = (Integer) verifyProp(props, TLOG_REPLICAS);
- this.numPullReplicas = (Integer) verifyProp(props, PULL_REPLICAS);
+ this.numNrtReplicas = (Integer) verifyProp(props, NRT_REPLICAS, 0);
+ this.numTlogReplicas = (Integer) verifyProp(props, TLOG_REPLICAS, 0);
+ this.numPullReplicas = (Integer) verifyProp(props, PULL_REPLICAS, 0);
this.maxShardsPerNode = (Integer) verifyProp(props, MAX_SHARDS_PER_NODE);
Boolean autoAddReplicas = (Boolean) verifyProp(props, AUTO_ADD_REPLICAS);
this.policy = (String) props.get(Policy.POLICY);
@@ -136,10 +136,14 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
leaderReplicas.add(replica);
}
}
-
+
public static Object verifyProp(Map<String, Object> props, String propName) {
+ return verifyProp(props, propName, null);
+ }
+
+ public static Object verifyProp(Map<String, Object> props, String propName, Object def) {
Object o = props.get(propName);
- if (o == null) return null;
+ if (o == null) return def;
switch (propName) {
case MAX_SHARDS_PER_NODE:
case REPLICATION_FACTOR:
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
index 1cf16e1..8d11b9a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
@@ -33,6 +33,8 @@ public interface LiveNodesListener {
*
* @param oldLiveNodes set of live nodes before the change
* @param newLiveNodes set of live nodes after the change
+ *
+ * @return true if the listener should be removed
*/
- void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes);
+ boolean onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesPredicate.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesPredicate.java b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesPredicate.java
new file mode 100644
index 0000000..a29e1df
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesPredicate.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Interface to determine if live nodes matches a required state
+ *
+ * @see ZkStateReader#waitForLiveNodes(long, TimeUnit, LiveNodesPredicate)
+ */
+public interface LiveNodesPredicate {
+
+ boolean matches(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes);
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesWatcher.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesWatcher.java b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesWatcher.java
new file mode 100644
index 0000000..8de2cce
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesWatcher.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.util.SortedSet;
+
+public interface LiveNodesWatcher {
+
+ boolean onStateChanged(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes);
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 2fb2718..d73282b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -163,7 +163,7 @@ public class Replica extends ZkNodeProps {
}
public boolean isActive(Set<String> liveNodes) {
- return liveNodes.contains(this.nodeName) && this.state == State.ACTIVE;
+ return this.nodeName != null && liveNodes.contains(this.nodeName) && this.state == State.ACTIVE;
}
public Type getType() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 1875073..a25fc45 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -16,12 +16,6 @@
*/
package org.apache.solr.common.cloud;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Source;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.stream.StreamResult;
-import javax.xml.transform.stream.StreamSource;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
@@ -38,15 +32,24 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Source;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.stream.StreamSource;
+
import org.apache.commons.io.FileUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.StringUtils;
+import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoAuthException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.Op;
@@ -90,6 +93,8 @@ public class SolrZkClient implements Closeable {
private ZkACLProvider zkACLProvider;
private String zkServerAddress;
+ private IsClosed higherLevelIsClosed;
+
public int getZkClientTimeout() {
return zkClientTimeout;
}
@@ -118,18 +123,18 @@ public class SolrZkClient implements Closeable {
public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect) {
- this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, null, null);
+ this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, null, null, null);
}
public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect) {
- this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, beforeReconnect, null);
+ this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, beforeReconnect, null, null);
}
public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
- ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect, ZkACLProvider zkACLProvider) {
+ ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect, ZkACLProvider zkACLProvider, IsClosed higherLevelIsClosed) {
this.zkServerAddress = zkServerAddress;
-
+ this.higherLevelIsClosed = higherLevelIsClosed;
if (strat == null) {
strat = new DefaultConnectionStrategy();
}
@@ -142,9 +147,21 @@ public class SolrZkClient implements Closeable {
this.zkClientTimeout = zkClientTimeout;
// we must retry at least as long as the session timeout
- zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout);
+ zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout, new IsClosed() {
+
+ @Override
+ public boolean isClosed() {
+ return SolrZkClient.this.isClosed();
+ }
+ });
connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
- + zkServerAddress, this, zkServerAddress, strat, onReconnect, beforeReconnect);
+ + zkServerAddress, this, zkServerAddress, strat, onReconnect, beforeReconnect, new IsClosed() {
+
+ @Override
+ public boolean isClosed() {
+ return SolrZkClient.this.isClosed();
+ }
+ });
try {
strat.connect(zkServerAddress, zkClientTimeout, wrapWatcher(connManager),
@@ -513,50 +530,46 @@ public class SolrZkClient implements Closeable {
}
byte[] bytes = null;
final String currentPath = sbPath.toString();
- Object exists = exists(currentPath, watcher, retryOnConnLoss);
- if (exists == null || ((i == paths.length -1) && failOnExists)) {
- CreateMode mode = CreateMode.PERSISTENT;
- if (i == paths.length - 1) {
- mode = createMode;
- bytes = data;
- if (!retryOnConnLoss) retry = false;
- }
- try {
- if (retry) {
- final CreateMode finalMode = mode;
- final byte[] finalBytes = bytes;
- zkCmdExecutor.retryOperation(() -> {
- keeper.create(currentPath, finalBytes, zkACLProvider.getACLsToAdd(currentPath), finalMode);
- return null;
- });
- } else {
- keeper.create(currentPath, bytes, zkACLProvider.getACLsToAdd(currentPath), mode);
- }
- } catch (NodeExistsException e) {
-
- if (!failOnExists) {
- // TODO: version ? for now, don't worry about race
- setData(currentPath, data, -1, retryOnConnLoss);
- // set new watch
- exists(currentPath, watcher, retryOnConnLoss);
- return;
- }
- // ignore unless it's the last node in the path
- if (i == paths.length - 1) {
- throw e;
- }
+ CreateMode mode = CreateMode.PERSISTENT;
+ if (i == paths.length - 1) {
+ mode = createMode;
+ bytes = data;
+ if (!retryOnConnLoss) retry = false;
+ }
+ try {
+ if (retry) {
+ final CreateMode finalMode = mode;
+ final byte[] finalBytes = bytes;
+ zkCmdExecutor.retryOperation(() -> {
+ keeper.create(currentPath, finalBytes, zkACLProvider.getACLsToAdd(currentPath), finalMode);
+ return null;
+ });
+ } else {
+ keeper.create(currentPath, bytes, zkACLProvider.getACLsToAdd(currentPath), mode);
}
- if(i == paths.length -1) {
+ } catch (NoAuthException e) {
+ // in auth cases, we may not have permission for an earlier part of a path, which is fine
+ if (i == paths.length - 1 || !exists(currentPath, retryOnConnLoss)) {
+
+ throw e;
+ }
+ } catch (NodeExistsException e) {
+
+ if (!failOnExists && i == paths.length - 1) {
+ // TODO: version ? for now, don't worry about race
+ setData(currentPath, data, -1, retryOnConnLoss);
// set new watch
exists(currentPath, watcher, retryOnConnLoss);
+ return;
+ }
+
+ // ignore unless it's the last node in the path
+ if (i == paths.length - 1) {
+ throw e;
}
- } else if (i == paths.length - 1) {
- // TODO: version ? for now, don't worry about race
- setData(currentPath, data, -1, retryOnConnLoss);
- // set new watch
- exists(currentPath, watcher, retryOnConnLoss);
}
+
}
}
@@ -672,16 +685,16 @@ public class SolrZkClient implements Closeable {
if (isClosed) return; // it's okay if we over close - same as solrcore
isClosed = true;
try {
- closeKeeper(keeper);
+ closeCallbackExecutor();
} finally {
connManager.close();
- closeCallbackExecutor();
+ closeKeeper(keeper);
}
assert ObjectReleaseTracker.release(this);
}
public boolean isClosed() {
- return isClosed;
+ return isClosed || (higherLevelIsClosed != null && higherLevelIsClosed.isClosed());
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
index 268ba2d..a60a275 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
@@ -93,9 +93,6 @@ public class SolrZooKeeper extends ZooKeeper {
@Override
public synchronized void close() throws InterruptedException {
- for (Thread t : spawnedThreads) {
- if (t.isAlive()) t.interrupt();
- }
super.close();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
index c27f767..aaba7ae 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
@@ -16,6 +16,8 @@
*/
package org.apache.solr.common.cloud;
+import org.apache.solr.common.AlreadyClosedException;
+import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -25,6 +27,11 @@ public class ZkCmdExecutor {
private long retryDelay = 1500L; // 1 second would match timeout, so 500 ms over for padding
private int retryCount;
private double timeouts;
+ private IsClosed isClosed;
+
+ public ZkCmdExecutor(int timeoutms) {
+ this(timeoutms, null);
+ }
/**
* TODO: At this point, this should probably take a SolrZkClient in
@@ -34,9 +41,10 @@ public class ZkCmdExecutor {
* the client timeout for the ZooKeeper clients that will be used
* with this class.
*/
- public ZkCmdExecutor(int timeoutms) {
+ public ZkCmdExecutor(int timeoutms, IsClosed isClosed) {
timeouts = timeoutms / 1000.0;
this.retryCount = Math.round(0.5f * ((float)Math.sqrt(8.0f * timeouts + 1.0f) - 1.0f)) + 1;
+ this.isClosed = isClosed;
}
public long getRetryDelay() {
@@ -57,6 +65,9 @@ public class ZkCmdExecutor {
KeeperException exception = null;
for (int i = 0; i < retryCount; i++) {
try {
+ if (i > 0 && isClosed()) {
+ throw new AlreadyClosedException();
+ }
return (T) operation.execute();
} catch (KeeperException.ConnectionLossException e) {
if (exception == null) {
@@ -74,6 +85,10 @@ public class ZkCmdExecutor {
throw exception;
}
+ private boolean isClosed() {
+ return isClosed != null && isClosed.isClosed();
+ }
+
public void ensureExists(String path, final SolrZkClient zkClient) throws KeeperException, InterruptedException {
ensureExists(path, null, CreateMode.PERSISTENT, zkClient, 0);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 6011f8a..ff53f51 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -45,16 +45,19 @@ import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.Callable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
@@ -142,7 +145,7 @@ public class ZkStateReader implements Closeable {
protected volatile ClusterState clusterState;
private static final int GET_LEADER_RETRY_INTERVAL_MS = 50;
- private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = 4000;
+ private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = Integer.parseInt(System.getProperty("zkReaderGetLeaderRetryTimeoutMs", "4000"));;
public static final String LEADER_ELECT_ZKNODE = "leader_elect";
@@ -181,6 +184,8 @@ public class ZkStateReader implements Closeable {
private Set<CloudCollectionsListener> cloudCollectionsListeners = ConcurrentHashMap.newKeySet();
private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
+
+ private Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();
/** Used to submit notifications to Collection Properties watchers in order **/
private final ExecutorService collectionPropsNotifications = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("collectionPropsNotifications"));
@@ -229,8 +234,6 @@ public class ZkStateReader implements Closeable {
}
- private Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();
-
public static final Set<String> KNOWN_CLUSTER_PROPS = unmodifiableSet(new HashSet<>(asList(
LEGACY_CLOUD,
URL_SCHEME,
@@ -283,6 +286,8 @@ public class ZkStateReader implements Closeable {
private final boolean closeClient;
private volatile boolean closed = false;
+
+ private Set<CountDownLatch> waitLatches = ConcurrentHashMap.newKeySet();
public ZkStateReader(SolrZkClient zkClient) {
this(zkClient, null);
@@ -293,6 +298,7 @@ public class ZkStateReader implements Closeable {
this.configManager = new ZkConfigManager(zkClient);
this.closeClient = false;
this.securityNodeListener = securityNodeListener;
+ assert ObjectReleaseTracker.track(this);
}
@@ -318,6 +324,8 @@ public class ZkStateReader implements Closeable {
this.configManager = new ZkConfigManager(zkClient);
this.closeClient = true;
this.securityNodeListener = null;
+
+ assert ObjectReleaseTracker.track(this);
}
public ZkConfigManager getConfigManager() {
@@ -794,12 +802,20 @@ public class ZkStateReader implements Closeable {
log.debug("Updated live nodes from ZooKeeper... {} -> {}", oldLiveNodes, newLiveNodes);
}
if (!oldLiveNodes.equals(newLiveNodes)) { // fire listeners
- liveNodesListeners.forEach(listener ->
- listener.onChange(new TreeSet<>(oldLiveNodes), new TreeSet<>(newLiveNodes)));
+ liveNodesListeners.forEach(listener -> {
+ if (listener.onChange(new TreeSet<>(oldLiveNodes), new TreeSet<>(newLiveNodes))) {
+ removeLiveNodesListener(listener);
+ }
+ });
}
}
public void registerLiveNodesListener(LiveNodesListener listener) {
+ // fire it once with current live nodes
+ if (listener.onChange(new TreeSet<>(getClusterState().getLiveNodes()), new TreeSet<>(getClusterState().getLiveNodes()))) {
+ removeLiveNodesListener(listener);
+ }
+
liveNodesListeners.add(listener);
}
@@ -820,18 +836,30 @@ public class ZkStateReader implements Closeable {
public void close() {
this.closed = true;
- notifications.shutdown();
+ notifications.shutdownNow();
+
+ waitLatches.parallelStream().forEach(c -> { c.countDown(); });
+
+ ExecutorUtil.shutdownAndAwaitTermination(notifications);
ExecutorUtil.shutdownAndAwaitTermination(collectionPropsNotifications);
if (closeClient) {
zkClient.close();
}
+ assert ObjectReleaseTracker.release(this);
}
public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException {
ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, shard, timeout));
return props.getCoreUrl();
}
-
+
+ public Replica getLeader(Set<String> liveNodes, DocCollection docCollection, String shard) {
+ Replica replica = docCollection != null ? docCollection.getLeader(shard) : null;
+ if (replica != null && liveNodes.contains(replica.getNodeName())) {
+ return replica;
+ }
+ return null;
+ }
public Replica getLeader(String collection, String shard) {
if (clusterState != null) {
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
@@ -854,16 +882,25 @@ public class ZkStateReader implements Closeable {
* Get shard leader properties, with retry if none exist.
*/
public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException {
- long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
- while (true) {
- Replica leader = getLeader(collection, shard);
- if (leader != null) return leader;
- if (System.nanoTime() >= timeoutAt || closed) break;
- Thread.sleep(GET_LEADER_RETRY_INTERVAL_MS);
+
+ AtomicReference<Replica> leader = new AtomicReference<>();
+ try {
+ waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
+ if (c == null)
+ return false;
+ Replica l = getLeader(n, c, shard);
+ if (l != null) {
+ leader.set(l);
+ return true;
+ }
+ return false;
+ });
+ } catch (TimeoutException | InterruptedException e) {
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found after waiting for "
+ + timeout + "ms " + ", collection: " + collection + " slice: " + shard + " saw state=" + clusterState.getCollectionOrNull(collection)
+ + " with live_nodes=" + clusterState.getLiveNodes());
}
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found after waiting for "
- + timeout + "ms " + ", collection: " + collection + " slice: " + shard + " saw state=" + clusterState.getCollectionOrNull(collection)
- + " with live_nodes=" + clusterState.getLiveNodes());
+ return leader.get();
}
/**
@@ -1257,6 +1294,10 @@ public class ZkStateReader implements Closeable {
@Override
public void process(WatchedEvent event) {
+ if (ZkStateReader.this.closed) {
+ return;
+ }
+
// session events are not change events, and do not remove the watcher
if (EventType.None.equals(event.getType())) {
return;
@@ -1457,13 +1498,20 @@ public class ZkStateReader implements Closeable {
*/
public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
throws InterruptedException, TimeoutException {
-
+
+ if (closed) {
+ throw new AlreadyClosedException();
+ }
+
final CountDownLatch latch = new CountDownLatch(1);
-
+ waitLatches.add(latch);
+ AtomicReference<DocCollection> docCollection = new AtomicReference<>();
CollectionStateWatcher watcher = (n, c) -> {
+ docCollection.set(c);
boolean matches = predicate.matches(n, c);
if (matches)
latch.countDown();
+
return matches;
};
registerCollectionStateWatcher(collection, watcher);
@@ -1471,15 +1519,61 @@ public class ZkStateReader implements Closeable {
try {
// wait for the watcher predicate to return true, or time out
if (!latch.await(wait, unit))
- throw new TimeoutException();
+ throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());
}
finally {
removeCollectionStateWatcher(collection, watcher);
+ waitLatches.remove(latch);
}
}
/**
+ * Block until a LiveNodesStatePredicate returns true, or the wait times out
+ *
+ * Note that the predicate may be called again even after it has returned true, so
+ * implementors should avoid changing state within the predicate call itself.
+ *
+ * @param wait how long to wait
+ * @param unit the units of the wait parameter
+ * @param predicate the predicate to call on state changes
+ * @throws InterruptedException on interrupt
+ * @throws TimeoutException on timeout
+ */
+ public void waitForLiveNodes(long wait, TimeUnit unit, LiveNodesPredicate predicate)
+ throws InterruptedException, TimeoutException {
+
+ if (closed) {
+ throw new AlreadyClosedException();
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ waitLatches.add(latch);
+
+
+ LiveNodesListener listener = (o, n) -> {
+ boolean matches = predicate.matches(o, n);
+ if (matches)
+ latch.countDown();
+ return matches;
+ };
+
+ registerLiveNodesListener(listener);
+
+ try {
+ // wait for the watcher predicate to return true, or time out
+ if (!latch.await(wait, unit))
+ throw new TimeoutException("Timeout waiting for live nodes, currently they are: " + getClusterState().getLiveNodes());
+
+ }
+ finally {
+ removeLiveNodesListener(listener);
+ waitLatches.remove(latch);
+ }
+ }
+
+
+ /**
* Remove a watcher from a collection's watch list.
*
* This allows Zookeeper watches to be removed if there is no interest in the
@@ -1611,6 +1705,9 @@ public class ZkStateReader implements Closeable {
}
private void notifyStateWatchers(Set<String> liveNodes, String collection, DocCollection collectionState) {
+ if (this.closed) {
+ return;
+ }
try {
notifications.submit(new Notification(liveNodes, collection, collectionState));
}
@@ -1786,6 +1883,8 @@ public class ZkStateReader implements Closeable {
final byte[] data = zkClient.getData(ALIASES, this, stat, true);
// note: it'd be nice to avoid possibly needlessly parsing if we don't update aliases but not a big deal
setIfNewer(Aliases.fromJSON(data, stat.getVersion()));
+ } catch (NoNodeException e) {
+ // /aliases.json will not always exist
} catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
// note: aliases.json is required to be present
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
index b45e702..c87bb87 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
@@ -68,6 +68,7 @@ public class UsingSolrJRefGuideExamplesTest extends SolrCloudTestCase {
CollectionAdminResponse response = CollectionAdminRequest.createCollection("techproducts", "conf", 1, 1)
.process(cluster.getSolrClient());
+ cluster.waitForActiveCollection("techproducts", 1, 1);
}
@Before
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleBinaryTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleBinaryTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleBinaryTest.java
index b1f1ee9..a4bd61a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleBinaryTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleBinaryTest.java
@@ -31,7 +31,7 @@ import org.junit.BeforeClass;
public class SolrExampleBinaryTest extends SolrExampleTests {
@BeforeClass
public static void beforeTest() throws Exception {
- createJetty(legacyExampleCollection1SolrHome());
+ createAndStartJetty(legacyExampleCollection1SolrHome());
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleXMLTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleXMLTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleXMLTest.java
index 5290347..538255b 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleXMLTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleXMLTest.java
@@ -30,7 +30,7 @@ import org.junit.BeforeClass;
public class SolrExampleXMLTest extends SolrExampleTests {
@BeforeClass
public static void beforeTest() throws Exception {
- createJetty(legacyExampleCollection1SolrHome());
+ createAndStartJetty(legacyExampleCollection1SolrHome());
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/SolrSchemalessExampleTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrSchemalessExampleTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrSchemalessExampleTest.java
index 47faf78..55d83c3 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrSchemalessExampleTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrSchemalessExampleTest.java
@@ -65,7 +65,7 @@ public class SolrSchemalessExampleTest extends SolrExampleTestsBase {
} catch (Exception ignore){}
}
}
- createJetty(tempSolrHome.getAbsolutePath());
+ createAndStartJetty(tempSolrHome.getAbsolutePath());
}
@Test
public void testArbitraryJsonIndexing() throws Exception {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java b/solr/solrj/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java
index a47b1ef..3e6f03d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java
@@ -41,7 +41,7 @@ public class TestBatchUpdate extends SolrJettyTestBase {
@BeforeClass
public static void beforeTest() throws Exception {
- createJetty(legacyExampleCollection1SolrHome());
+ createAndStartJetty(legacyExampleCollection1SolrHome());
}
static final int numdocs = 1000;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrClient.java b/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrClient.java
index 84aff76..d739c0e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrClient.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrClient.java
@@ -134,69 +134,71 @@ public class TestLBHttpSolrClient extends SolrTestCaseJ4 {
for (int i = 0; i < solr.length; i++) {
s[i] = solr[i].getUrl();
}
- LBHttpSolrClient client = getLBHttpSolrClient(httpClient, s);
- client.setAliveCheckInterval(500);
- SolrQuery solrQuery = new SolrQuery("*:*");
- Set<String> names = new HashSet<>();
- QueryResponse resp = null;
- for (String value : s) {
- resp = client.query(solrQuery);
- assertEquals(10, resp.getResults().getNumFound());
- names.add(resp.getResults().get(0).getFieldValue("name").toString());
- }
- assertEquals(3, names.size());
+ try (LBHttpSolrClient client = getLBHttpSolrClient(httpClient, s)) {
+ client.setAliveCheckInterval(500);
+ SolrQuery solrQuery = new SolrQuery("*:*");
+ Set<String> names = new HashSet<>();
+ QueryResponse resp = null;
+ for (String value : s) {
+ resp = client.query(solrQuery);
+ assertEquals(10, resp.getResults().getNumFound());
+ names.add(resp.getResults().get(0).getFieldValue("name").toString());
+ }
+ assertEquals(3, names.size());
- // Kill a server and test again
- solr[1].jetty.stop();
- solr[1].jetty = null;
- names.clear();
- for (String value : s) {
- resp = client.query(solrQuery);
- assertEquals(10, resp.getResults().getNumFound());
- names.add(resp.getResults().get(0).getFieldValue("name").toString());
- }
- assertEquals(2, names.size());
- assertFalse(names.contains("solr1"));
-
- // Start the killed server once again
- solr[1].startJetty();
- // Wait for the alive check to complete
- Thread.sleep(1200);
- names.clear();
- for (String value : s) {
- resp = client.query(solrQuery);
- assertEquals(10, resp.getResults().getNumFound());
- names.add(resp.getResults().get(0).getFieldValue("name").toString());
+ // Kill a server and test again
+ solr[1].jetty.stop();
+ solr[1].jetty = null;
+ names.clear();
+ for (String value : s) {
+ resp = client.query(solrQuery);
+ assertEquals(10, resp.getResults().getNumFound());
+ names.add(resp.getResults().get(0).getFieldValue("name").toString());
+ }
+ assertEquals(2, names.size());
+ assertFalse(names.contains("solr1"));
+
+ // Start the killed server once again
+ solr[1].startJetty();
+ // Wait for the alive check to complete
+ Thread.sleep(1200);
+ names.clear();
+ for (String value : s) {
+ resp = client.query(solrQuery);
+ assertEquals(10, resp.getResults().getNumFound());
+ names.add(resp.getResults().get(0).getFieldValue("name").toString());
+ }
+ assertEquals(3, names.size());
}
- assertEquals(3, names.size());
}
public void testTwoServers() throws Exception {
- LBHttpSolrClient client = getLBHttpSolrClient(httpClient, solr[0].getUrl(), solr[1].getUrl());
- client.setAliveCheckInterval(500);
- SolrQuery solrQuery = new SolrQuery("*:*");
- QueryResponse resp = null;
- solr[0].jetty.stop();
- solr[0].jetty = null;
- resp = client.query(solrQuery);
- String name = resp.getResults().get(0).getFieldValue("name").toString();
- Assert.assertEquals("solr/collection11", name);
- resp = client.query(solrQuery);
- name = resp.getResults().get(0).getFieldValue("name").toString();
- Assert.assertEquals("solr/collection11", name);
- solr[1].jetty.stop();
- solr[1].jetty = null;
- solr[0].startJetty();
- Thread.sleep(1200);
- try {
+ try (LBHttpSolrClient client = getLBHttpSolrClient(httpClient, solr[0].getUrl(), solr[1].getUrl())) {
+ client.setAliveCheckInterval(500);
+ SolrQuery solrQuery = new SolrQuery("*:*");
+ QueryResponse resp = null;
+ solr[0].jetty.stop();
+ solr[0].jetty = null;
resp = client.query(solrQuery);
- } catch(SolrServerException e) {
- // try again after a pause in case the error is lack of time to start server
- Thread.sleep(3000);
+ String name = resp.getResults().get(0).getFieldValue("name").toString();
+ Assert.assertEquals("solr/collection11", name);
resp = client.query(solrQuery);
+ name = resp.getResults().get(0).getFieldValue("name").toString();
+ Assert.assertEquals("solr/collection11", name);
+ solr[1].jetty.stop();
+ solr[1].jetty = null;
+ solr[0].startJetty();
+ Thread.sleep(1200);
+ try {
+ resp = client.query(solrQuery);
+ } catch (SolrServerException e) {
+ // try again after a pause in case the error is lack of time to start server
+ Thread.sleep(3000);
+ resp = client.query(solrQuery);
+ }
+ name = resp.getResults().get(0).getFieldValue("name").toString();
+ Assert.assertEquals("solr/collection10", name);
}
- name = resp.getResults().get(0).getFieldValue("name").toString();
- Assert.assertEquals("solr/collection10", name);
}
public void testReliability() throws Exception {
@@ -207,21 +209,22 @@ public class TestLBHttpSolrClient extends SolrTestCaseJ4 {
CloseableHttpClient myHttpClient = HttpClientUtil.createClient(null);
try {
- LBHttpSolrClient client = getLBHttpSolrClient(myHttpClient, 500, 500, s);
- client.setAliveCheckInterval(500);
-
- // Kill a server and test again
- solr[1].jetty.stop();
- solr[1].jetty = null;
-
- // query the servers
- for (String value : s)
- client.query(new SolrQuery("*:*"));
-
- // Start the killed server once again
- solr[1].startJetty();
- // Wait for the alive check to complete
- waitForServer(30, client, 3, solr[1].name);
+ try (LBHttpSolrClient client = getLBHttpSolrClient(myHttpClient, 500, 500, s)) {
+ client.setAliveCheckInterval(500);
+
+ // Kill a server and test again
+ solr[1].jetty.stop();
+ solr[1].jetty = null;
+
+ // query the servers
+ for (String value : s)
+ client.query(new SolrQuery("*:*"));
+
+ // Start the killed server once again
+ solr[1].startJetty();
+ // Wait for the alive check to complete
+ waitForServer(30, client, 3, solr[1].name);
+ }
} finally {
HttpClientUtil.close(myHttpClient);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/TestSolrJErrorHandling.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/TestSolrJErrorHandling.java b/solr/solrj/src/test/org/apache/solr/client/solrj/TestSolrJErrorHandling.java
index a9c7fb1..0b36569 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/TestSolrJErrorHandling.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/TestSolrJErrorHandling.java
@@ -58,7 +58,7 @@ public class TestSolrJErrorHandling extends SolrJettyTestBase {
@BeforeClass
public static void beforeTest() throws Exception {
- createJetty(legacyExampleCollection1SolrHome());
+ createAndStartJetty(legacyExampleCollection1SolrHome());
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeBinaryJettyTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeBinaryJettyTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeBinaryJettyTest.java
index fc28449..ebe2693 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeBinaryJettyTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeBinaryJettyTest.java
@@ -28,6 +28,6 @@ import org.junit.BeforeClass;
public class LargeVolumeBinaryJettyTest extends LargeVolumeTestBase {
@BeforeClass
public static void beforeTest() throws Exception {
- createJetty(legacyExampleCollection1SolrHome());
+ createAndStartJetty(legacyExampleCollection1SolrHome());
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeJettyTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeJettyTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeJettyTest.java
index 02764fb..5c7f36a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeJettyTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeJettyTest.java
@@ -25,6 +25,6 @@ import org.junit.BeforeClass;
public class LargeVolumeJettyTest extends LargeVolumeTestBase {
@BeforeClass
public static void beforeTest() throws Exception {
- createJetty(legacyExampleCollection1SolrHome());
+ createAndStartJetty(legacyExampleCollection1SolrHome());
}
}