You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ro...@apache.org on 2014/12/31 15:05:50 UTC
svn commit: r1648697 [9/13] - in /lucene/dev/trunk/solr: ./
contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/
contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/
contrib/map-reduce/src/java/org/apache/solr/hadoop...
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java?rev=1648697&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java Wed Dec 31 14:05:48 2014
@@ -0,0 +1,815 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.entity.mime.FormBodyPart;
+import org.apache.http.entity.mime.HttpMultipartMode;
+import org.apache.http.entity.mime.MultipartEntity;
+import org.apache.http.entity.mime.content.InputStreamBody;
+import org.apache.http.entity.mime.content.StringBody;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class HttpSolrClient extends SolrClient {
+
+ private static final String UTF_8 = StandardCharsets.UTF_8.name();
+ private static final String DEFAULT_PATH = "/select";
+ private static final long serialVersionUID = -946812319974801896L;
+
+ /**
+ * User-Agent String.
+ */
+ public static final String AGENT = "Solr[" + HttpSolrClient.class.getName() + "] 1.0";
+
+ private static Logger log = LoggerFactory.getLogger(HttpSolrClient.class);
+
+ /**
+ * The URL of the Solr server.
+ */
+ protected volatile String baseUrl;
+
+ /**
+ * Default value: null / empty.
+ * <p/>
+ * Parameters that are added to every request regardless. This may be a place
+ * to add something like an authentication token.
+ */
+ protected ModifiableSolrParams invariantParams;
+
+ /**
+ * Default response parser is BinaryResponseParser
+ * <p/>
+ * This parser represents the default Response Parser chosen to parse the
+ * response if the parser were not specified as part of the request.
+ *
+ * @see org.apache.solr.client.solrj.impl.BinaryResponseParser
+ */
+ protected volatile ResponseParser parser;
+
+ /**
+ * The RequestWriter used to write all requests to Solr
+ *
+ * @see org.apache.solr.client.solrj.request.RequestWriter
+ */
+ protected volatile RequestWriter requestWriter = new RequestWriter();
+
+ private final HttpClient httpClient;
+
+ private volatile boolean followRedirects = false;
+
+ private volatile int maxRetries = 0;
+
+ private volatile boolean useMultiPartPost;
+ private final boolean internalClient;
+
+ private volatile Set<String> queryParams = Collections.emptySet();
+
+ /**
+ * @param baseURL
+ * The URL of the Solr server. For example, "
+ * <code>http://localhost:8983/solr/</code>" if you are using the
+ * standard distribution Solr webapp on your local machine.
+ */
+ public HttpSolrClient(String baseURL) {
+ this(baseURL, null, new BinaryResponseParser());
+ }
+
+ public HttpSolrClient(String baseURL, HttpClient client) {
+ this(baseURL, client, new BinaryResponseParser());
+ }
+
+ public HttpSolrClient(String baseURL, HttpClient client, ResponseParser parser) {
+ this.baseUrl = baseURL;
+ if (baseUrl.endsWith("/")) {
+ baseUrl = baseUrl.substring(0, baseUrl.length() - 1);
+ }
+ if (baseUrl.indexOf('?') >= 0) {
+ throw new RuntimeException(
+ "Invalid base url for solrj. The base URL must not contain parameters: "
+ + baseUrl);
+ }
+
+ if (client != null) {
+ httpClient = client;
+ internalClient = false;
+ } else {
+ internalClient = true;
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
+ params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
+ params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects);
+ httpClient = HttpClientUtil.createClient(params);
+ }
+
+ this.parser = parser;
+ }
+
+ public Set<String> getQueryParams() {
+ return queryParams;
+ }
+
+ /**
+ * Expert Method
+ * @param queryParams set of param keys to only send via the query string
+ * Note that the param will be sent as a query string if the key is part
+ * of this Set or the SolrRequest's query params.
+ * @see org.apache.solr.client.solrj.SolrRequest#getQueryParams
+ */
+ public void setQueryParams(Set<String> queryParams) {
+ this.queryParams = queryParams;
+ }
+
+ /**
+ * Process the request. If
+ * {@link org.apache.solr.client.solrj.SolrRequest#getResponseParser()} is
+ * null, then use {@link #getParser()}
+ *
+ * @param request
+ * The {@link org.apache.solr.client.solrj.SolrRequest} to process
+ * @return The {@link org.apache.solr.common.util.NamedList} result
+ * @throws IOException If there is a low-level I/O error.
+ *
+ * @see #request(org.apache.solr.client.solrj.SolrRequest,
+ * org.apache.solr.client.solrj.ResponseParser)
+ */
+ @Override
+ public NamedList<Object> request(final SolrRequest request)
+ throws SolrServerException, IOException {
+ ResponseParser responseParser = request.getResponseParser();
+ if (responseParser == null) {
+ responseParser = parser;
+ }
+ return request(request, responseParser);
+ }
+
+ public NamedList<Object> request(final SolrRequest request, final ResponseParser processor) throws SolrServerException, IOException {
+ return executeMethod(createMethod(request),processor);
+ }
+
+ /**
+ * @lucene.experimental
+ */
+ public static class HttpUriRequestResponse {
+ public HttpUriRequest httpUriRequest;
+ public Future<NamedList<Object>> future;
+ }
+
+ /**
+ * @lucene.experimental
+ */
+ public HttpUriRequestResponse httpUriRequest(final SolrRequest request)
+ throws SolrServerException, IOException {
+ ResponseParser responseParser = request.getResponseParser();
+ if (responseParser == null) {
+ responseParser = parser;
+ }
+ return httpUriRequest(request, responseParser);
+ }
+
+ /**
+ * @lucene.experimental
+ */
+ public HttpUriRequestResponse httpUriRequest(final SolrRequest request, final ResponseParser processor) throws SolrServerException, IOException {
+ HttpUriRequestResponse mrr = new HttpUriRequestResponse();
+ final HttpRequestBase method = createMethod(request);
+ ExecutorService pool = Executors.newFixedThreadPool(1, new SolrjNamedThreadFactory("httpUriRequest"));
+ try {
+ mrr.future = pool.submit(new Callable<NamedList<Object>>(){
+
+ @Override
+ public NamedList<Object> call() throws Exception {
+ return executeMethod(method, processor);
+ }});
+
+ } finally {
+ pool.shutdown();
+ }
+ assert method != null;
+ mrr.httpUriRequest = method;
+ return mrr;
+ }
+
+ protected ModifiableSolrParams calculateQueryParams(Set<String> queryParamNames,
+ ModifiableSolrParams wparams) {
+ ModifiableSolrParams queryModParams = new ModifiableSolrParams();
+ if (queryParamNames != null) {
+ for (String param : queryParamNames) {
+ String[] value = wparams.getParams(param) ;
+ if (value != null) {
+ for (String v : value) {
+ queryModParams.add(param, v);
+ }
+ wparams.remove(param);
+ }
+ }
+ }
+ return queryModParams;
+ }
+
+ protected HttpRequestBase createMethod(final SolrRequest request) throws IOException, SolrServerException {
+ HttpRequestBase method = null;
+ InputStream is = null;
+ SolrParams params = request.getParams();
+ Collection<ContentStream> streams = requestWriter.getContentStreams(request);
+ String path = requestWriter.getPath(request);
+ if (path == null || !path.startsWith("/")) {
+ path = DEFAULT_PATH;
+ }
+
+ ResponseParser parser = request.getResponseParser();
+ if (parser == null) {
+ parser = this.parser;
+ }
+
+ // The parser 'wt=' and 'version=' params are used instead of the original
+ // params
+ ModifiableSolrParams wparams = new ModifiableSolrParams(params);
+ if (parser != null) {
+ wparams.set(CommonParams.WT, parser.getWriterType());
+ wparams.set(CommonParams.VERSION, parser.getVersion());
+ }
+ if (invariantParams != null) {
+ wparams.add(invariantParams);
+ }
+
+ int tries = maxRetries + 1;
+ try {
+ while( tries-- > 0 ) {
+ // Note: since we aren't do intermittent time keeping
+ // ourselves, the potential non-timeout latency could be as
+ // much as tries-times (plus scheduling effects) the given
+ // timeAllowed.
+ try {
+ if( SolrRequest.METHOD.GET == request.getMethod() ) {
+ if( streams != null ) {
+ throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!" );
+ }
+ method = new HttpGet( baseUrl + path + ClientUtils.toQueryString( wparams, false ) );
+ }
+ else if( SolrRequest.METHOD.POST == request.getMethod() || SolrRequest.METHOD.PUT == request.getMethod() ) {
+
+ String url = baseUrl + path;
+ boolean hasNullStreamName = false;
+ if (streams != null) {
+ for (ContentStream cs : streams) {
+ if (cs.getName() == null) {
+ hasNullStreamName = true;
+ break;
+ }
+ }
+ }
+ boolean isMultipart = ((this.useMultiPartPost && SolrRequest.METHOD.POST == request.getMethod())
+ || ( streams != null && streams.size() > 1 )) && !hasNullStreamName;
+
+ LinkedList<NameValuePair> postOrPutParams = new LinkedList<>();
+ if (streams == null || isMultipart) {
+ // send server list and request list as query string params
+ ModifiableSolrParams queryParams = calculateQueryParams(this.queryParams, wparams);
+ queryParams.add(calculateQueryParams(request.getQueryParams(), wparams));
+ String fullQueryUrl = url + ClientUtils.toQueryString( queryParams, false );
+ HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod() ?
+ new HttpPost(fullQueryUrl) : new HttpPut(fullQueryUrl);
+ if (!isMultipart) {
+ postOrPut.addHeader("Content-Type",
+ "application/x-www-form-urlencoded; charset=UTF-8");
+ }
+
+ List<FormBodyPart> parts = new LinkedList<>();
+ Iterator<String> iter = wparams.getParameterNamesIterator();
+ while (iter.hasNext()) {
+ String p = iter.next();
+ String[] vals = wparams.getParams(p);
+ if (vals != null) {
+ for (String v : vals) {
+ if (isMultipart) {
+ parts.add(new FormBodyPart(p, new StringBody(v, StandardCharsets.UTF_8)));
+ } else {
+ postOrPutParams.add(new BasicNameValuePair(p, v));
+ }
+ }
+ }
+ }
+
+ if (isMultipart && streams != null) {
+ for (ContentStream content : streams) {
+ String contentType = content.getContentType();
+ if(contentType==null) {
+ contentType = BinaryResponseParser.BINARY_CONTENT_TYPE; // default
+ }
+ String name = content.getName();
+ if(name==null) {
+ name = "";
+ }
+ parts.add(new FormBodyPart(name,
+ new InputStreamBody(
+ content.getStream(),
+ contentType,
+ content.getName())));
+ }
+ }
+
+ if (parts.size() > 0) {
+ MultipartEntity entity = new MultipartEntity(HttpMultipartMode.STRICT);
+ for(FormBodyPart p: parts) {
+ entity.addPart(p);
+ }
+ postOrPut.setEntity(entity);
+ } else {
+ //not using multipart
+ postOrPut.setEntity(new UrlEncodedFormEntity(postOrPutParams, StandardCharsets.UTF_8));
+ }
+
+ method = postOrPut;
+ }
+ // It is has one stream, it is the post body, put the params in the URL
+ else {
+ String pstr = ClientUtils.toQueryString(wparams, false);
+ HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod() ?
+ new HttpPost(url + pstr) : new HttpPut(url + pstr);
+
+ // Single stream as body
+ // Using a loop just to get the first one
+ final ContentStream[] contentStream = new ContentStream[1];
+ for (ContentStream content : streams) {
+ contentStream[0] = content;
+ break;
+ }
+ if (contentStream[0] instanceof RequestWriter.LazyContentStream) {
+ postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), -1) {
+ @Override
+ public Header getContentType() {
+ return new BasicHeader("Content-Type", contentStream[0].getContentType());
+ }
+
+ @Override
+ public boolean isRepeatable() {
+ return false;
+ }
+
+ });
+ } else {
+ postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), -1) {
+ @Override
+ public Header getContentType() {
+ return new BasicHeader("Content-Type", contentStream[0].getContentType());
+ }
+
+ @Override
+ public boolean isRepeatable() {
+ return false;
+ }
+ });
+ }
+ method = postOrPut;
+ }
+ }
+ else {
+ throw new SolrServerException("Unsupported method: "+request.getMethod() );
+ }
+ }
+ catch( NoHttpResponseException r ) {
+ method = null;
+ if(is != null) {
+ is.close();
+ }
+ // If out of tries then just rethrow (as normal error).
+ if (tries < 1) {
+ throw r;
+ }
+ }
+ }
+ } catch (IOException ex) {
+ throw new SolrServerException("error reading streams", ex);
+ }
+
+ return method;
+ }
+
+ protected NamedList<Object> executeMethod(HttpRequestBase method, final ResponseParser processor) throws SolrServerException {
+ method.addHeader("User-Agent", AGENT);
+
+ InputStream respBody = null;
+ boolean shouldClose = true;
+ boolean success = false;
+ try {
+ // Execute the method.
+ final HttpResponse response = httpClient.execute(method);
+ int httpStatus = response.getStatusLine().getStatusCode();
+
+ // Read the contents
+ respBody = response.getEntity().getContent();
+ Header ctHeader = response.getLastHeader("content-type");
+ String contentType;
+ if (ctHeader != null) {
+ contentType = ctHeader.getValue();
+ } else {
+ contentType = "";
+ }
+
+ // handle some http level checks before trying to parse the response
+ switch (httpStatus) {
+ case HttpStatus.SC_OK:
+ case HttpStatus.SC_BAD_REQUEST:
+ case HttpStatus.SC_CONFLICT: // 409
+ break;
+ case HttpStatus.SC_MOVED_PERMANENTLY:
+ case HttpStatus.SC_MOVED_TEMPORARILY:
+ if (!followRedirects) {
+ throw new SolrServerException("Server at " + getBaseURL()
+ + " sent back a redirect (" + httpStatus + ").");
+ }
+ break;
+ default:
+ if (processor == null) {
+ throw new RemoteSolrException(baseUrl, httpStatus, "non ok status: " + httpStatus
+ + ", message:" + response.getStatusLine().getReasonPhrase(),
+ null);
+ }
+ }
+ if (processor == null) {
+
+ // no processor specified, return raw stream
+ NamedList<Object> rsp = new NamedList<>();
+ rsp.add("stream", respBody);
+ // Only case where stream should not be closed
+ shouldClose = false;
+ success = true;
+ return rsp;
+ }
+
+ String procCt = processor.getContentType();
+ if (procCt != null) {
+ String procMimeType = ContentType.parse(procCt).getMimeType().trim().toLowerCase(Locale.ROOT);
+ String mimeType = ContentType.parse(contentType).getMimeType().trim().toLowerCase(Locale.ROOT);
+ if (!procMimeType.equals(mimeType)) {
+ // unexpected mime type
+ String msg = "Expected mime type " + procMimeType + " but got " + mimeType + ".";
+ Header encodingHeader = response.getEntity().getContentEncoding();
+ String encoding;
+ if (encodingHeader != null) {
+ encoding = encodingHeader.getValue();
+ } else {
+ encoding = "UTF-8"; // try UTF-8
+ }
+ try {
+ msg = msg + " " + IOUtils.toString(respBody, encoding);
+ } catch (IOException e) {
+ throw new RemoteSolrException(baseUrl, httpStatus, "Could not parse response with encoding " + encoding, e);
+ }
+ throw new RemoteSolrException(baseUrl, httpStatus, msg, null);
+ }
+ }
+
+ NamedList<Object> rsp = null;
+ String charset = EntityUtils.getContentCharSet(response.getEntity());
+ try {
+ rsp = processor.processResponse(respBody, charset);
+ } catch (Exception e) {
+ throw new RemoteSolrException(baseUrl, httpStatus, e.getMessage(), e);
+ }
+ if (httpStatus != HttpStatus.SC_OK) {
+ NamedList<String> metadata = null;
+ String reason = null;
+ try {
+ NamedList err = (NamedList) rsp.get("error");
+ if (err != null) {
+ reason = (String) err.get("msg");
+ if(reason == null) {
+ reason = (String) err.get("trace");
+ }
+ metadata = (NamedList<String>)err.get("metadata");
+ }
+ } catch (Exception ex) {}
+ if (reason == null) {
+ StringBuilder msg = new StringBuilder();
+ msg.append(response.getStatusLine().getReasonPhrase());
+ msg.append("\n\n");
+ msg.append("request: " + method.getURI());
+ reason = java.net.URLDecoder.decode(msg.toString(), UTF_8);
+ }
+ RemoteSolrException rss = new RemoteSolrException(baseUrl, httpStatus, reason, null);
+ if (metadata != null) rss.setMetadata(metadata);
+ throw rss;
+ }
+ success = true;
+ return rsp;
+ } catch (ConnectException e) {
+ throw new SolrServerException("Server refused connection at: "
+ + getBaseURL(), e);
+ } catch (SocketTimeoutException e) {
+ throw new SolrServerException(
+ "Timeout occured while waiting response from server at: "
+ + getBaseURL(), e);
+ } catch (IOException e) {
+ throw new SolrServerException(
+ "IOException occured when talking to server at: " + getBaseURL(), e);
+ } finally {
+ if (respBody != null && shouldClose) {
+ try {
+ respBody.close();
+ } catch (IOException e) {
+ log.error("", e);
+ } finally {
+ if (!success) {
+ method.abort();
+ }
+ }
+ }
+ }
+ }
+
+ // -------------------------------------------------------------------
+ // -------------------------------------------------------------------
+
+ /**
+ * Retrieve the default list of parameters are added to every request
+ * regardless.
+ *
+ * @see #invariantParams
+ */
+ public ModifiableSolrParams getInvariantParams() {
+ return invariantParams;
+ }
+
+ public String getBaseURL() {
+ return baseUrl;
+ }
+
+ public void setBaseURL(String baseURL) {
+ this.baseUrl = baseURL;
+ }
+
+ public ResponseParser getParser() {
+ return parser;
+ }
+
+ /**
+ * Note: This setter method is <b>not thread-safe</b>.
+ *
+ * @param processor
+ * Default Response Parser chosen to parse the response if the parser
+ * were not specified as part of the request.
+ * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+ */
+ public void setParser(ResponseParser processor) {
+ parser = processor;
+ }
+
+ /**
+ * Return the HttpClient this instance uses.
+ */
+ public HttpClient getHttpClient() {
+ return httpClient;
+ }
+
+ /**
+ * HttpConnectionParams.setConnectionTimeout
+ *
+ * @param timeout
+ * Timeout in milliseconds
+ **/
+ public void setConnectionTimeout(int timeout) {
+ HttpClientUtil.setConnectionTimeout(httpClient, timeout);
+ }
+
+ /**
+ * Set SoTimeout (read timeout). This is desirable
+ * for queries, but probably not for indexing.
+ *
+ * @param timeout
+ * Timeout in milliseconds
+ **/
+ public void setSoTimeout(int timeout) {
+ HttpClientUtil.setSoTimeout(httpClient, timeout);
+ }
+
+ /**
+ * Configure whether the client should follow redirects or not.
+ * <p>
+ * This defaults to false under the assumption that if you are following a
+ * redirect to get to a Solr installation, something is misconfigured
+ * somewhere.
+ * </p>
+ */
+ public void setFollowRedirects(boolean followRedirects) {
+ this.followRedirects = followRedirects;
+ HttpClientUtil.setFollowRedirects(httpClient, followRedirects);
+ }
+
+ /**
+ * Allow server->client communication to be compressed. Currently gzip and
+ * deflate are supported. If the server supports compression the response will
+ * be compressed. This method is only allowed if the http client is of type
+ * DefatulHttpClient.
+ */
+ public void setAllowCompression(boolean allowCompression) {
+ if (httpClient instanceof DefaultHttpClient) {
+ HttpClientUtil.setAllowCompression((DefaultHttpClient) httpClient, allowCompression);
+ } else {
+ throw new UnsupportedOperationException(
+ "HttpClient instance was not of type DefaultHttpClient");
+ }
+ }
+
+ /**
+ * Set maximum number of retries to attempt in the event of transient errors.
+ * <p>
+ * Maximum number of retries to attempt in the event of transient errors.
+ * Default: 0 (no) retries. No more than 1 recommended.
+ * </p>
+ * @param maxRetries
+ * No more than 1 recommended
+ */
+ public void setMaxRetries(int maxRetries) {
+ if (maxRetries > 1) {
+ log.warn("HttpSolrServer: maximum Retries " + maxRetries
+ + " > 1. Maximum recommended retries is 1.");
+ }
+ this.maxRetries = maxRetries;
+ }
+
+ public void setRequestWriter(RequestWriter requestWriter) {
+ this.requestWriter = requestWriter;
+ }
+
+ /**
+ * Adds the documents supplied by the given iterator.
+ *
+ * @param docIterator
+ * the iterator which returns SolrInputDocument instances
+ *
+ * @return the response from the SolrServer
+ */
+ public UpdateResponse add(Iterator<SolrInputDocument> docIterator)
+ throws SolrServerException, IOException {
+ UpdateRequest req = new UpdateRequest();
+ req.setDocIterator(docIterator);
+ return req.process(this);
+ }
+
+ /**
+ * Adds the beans supplied by the given iterator.
+ *
+ * @param beanIterator
+ * the iterator which returns Beans
+ *
+ * @return the response from the SolrServer
+ */
+ public UpdateResponse addBeans(final Iterator<?> beanIterator)
+ throws SolrServerException, IOException {
+ UpdateRequest req = new UpdateRequest();
+ req.setDocIterator(new Iterator<SolrInputDocument>() {
+
+ @Override
+ public boolean hasNext() {
+ return beanIterator.hasNext();
+ }
+
+ @Override
+ public SolrInputDocument next() {
+ Object o = beanIterator.next();
+ if (o == null) return null;
+ return getBinder().toSolrInputDocument(o);
+ }
+
+ @Override
+ public void remove() {
+ beanIterator.remove();
+ }
+ });
+ return req.process(this);
+ }
+
+ /**
+ * Close the {@link ClientConnectionManager} from the internal client.
+ */
+ @Override
+ public void shutdown() {
+ if (httpClient != null && internalClient) {
+ httpClient.getConnectionManager().shutdown();
+ }
+ }
+
+ /**
+ * Set the maximum number of connections that can be open to a single host at
+ * any given time. If http client was created outside the operation is not
+ * allowed.
+ */
+ public void setDefaultMaxConnectionsPerHost(int max) {
+ if (internalClient) {
+ HttpClientUtil.setMaxConnectionsPerHost(httpClient, max);
+ } else {
+ throw new UnsupportedOperationException(
+ "Client was created outside of HttpSolrServer");
+ }
+ }
+
+ /**
+ * Set the maximum number of connections that can be open at any given time.
+ * If http client was created outside the operation is not allowed.
+ */
+ public void setMaxTotalConnections(int max) {
+ if (internalClient) {
+ HttpClientUtil.setMaxConnections(httpClient, max);
+ } else {
+ throw new UnsupportedOperationException(
+ "Client was created outside of HttpSolrServer");
+ }
+ }
+
+ public boolean isUseMultiPartPost() {
+ return useMultiPartPost;
+ }
+
+ /**
+ * Set the multipart connection properties
+ */
+ public void setUseMultiPartPost(boolean useMultiPartPost) {
+ this.useMultiPartPost = useMultiPartPost;
+ }
+
+ /**
+ * Subclass of SolrException that allows us to capture an arbitrary HTTP
+ * status code that may have been returned by the remote server or a
+ * proxy along the way.
+ */
+ public static class RemoteSolrException extends SolrException {
+ /**
+ * @param remoteHost the host the error was received from
+ * @param code Arbitrary HTTP status code
+ * @param msg Exception Message
+ * @param th Throwable to wrap with this Exception
+ */
+ public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
+ super(code, "Error from server at " + remoteHost + ": " + msg, th);
+ }
+ }
+}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java?rev=1648697&r1=1648696&r2=1648697&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java Wed Dec 31 14:05:48 2014
@@ -14,810 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.solr.client.solrj.impl;
-import org.apache.commons.io.IOUtils;
-import org.apache.http.Header;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.NameValuePair;
-import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpPut;
-import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.conn.ClientConnectionManager;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.InputStreamEntity;
-import org.apache.http.entity.mime.FormBodyPart;
-import org.apache.http.entity.mime.HttpMultipartMode;
-import org.apache.http.entity.mime.MultipartEntity;
-import org.apache.http.entity.mime.content.InputStreamBody;
-import org.apache.http.entity.mime.content.StringBody;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.message.BasicHeader;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.util.EntityUtils;
import org.apache.solr.client.solrj.ResponseParser;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServer;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.RequestWriter;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.client.solrj.util.ClientUtils;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ContentStream;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrjNamedThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-public class HttpSolrServer extends SolrServer {
- private static final String UTF_8 = StandardCharsets.UTF_8.name();
- private static final String DEFAULT_PATH = "/select";
- private static final long serialVersionUID = -946812319974801896L;
-
- /**
- * User-Agent String.
- */
- public static final String AGENT = "Solr[" + HttpSolrServer.class.getName() + "] 1.0";
-
- private static Logger log = LoggerFactory.getLogger(HttpSolrServer.class);
-
- /**
- * The URL of the Solr server.
- */
- protected volatile String baseUrl;
-
- /**
- * Default value: null / empty.
- * <p/>
- * Parameters that are added to every request regardless. This may be a place
- * to add something like an authentication token.
- */
- protected ModifiableSolrParams invariantParams;
-
- /**
- * Default response parser is BinaryResponseParser
- * <p/>
- * This parser represents the default Response Parser chosen to parse the
- * response if the parser were not specified as part of the request.
- *
- * @see org.apache.solr.client.solrj.impl.BinaryResponseParser
- */
- protected volatile ResponseParser parser;
-
- /**
- * The RequestWriter used to write all requests to Solr
- *
- * @see org.apache.solr.client.solrj.request.RequestWriter
- */
- protected volatile RequestWriter requestWriter = new RequestWriter();
-
- private final HttpClient httpClient;
-
- private volatile boolean followRedirects = false;
-
- private volatile int maxRetries = 0;
-
- private volatile boolean useMultiPartPost;
- private final boolean internalClient;
- private volatile Set<String> queryParams = Collections.emptySet();
+/**
+ * @deprecated Use {@link org.apache.solr.client.solrj.impl.HttpSolrClient}
+ */
+@Deprecated
+public class HttpSolrServer extends HttpSolrClient {
- /**
- * @param baseURL
- * The URL of the Solr server. For example, "
- * <code>http://localhost:8983/solr/</code>" if you are using the
- * standard distribution Solr webapp on your local machine.
- */
public HttpSolrServer(String baseURL) {
- this(baseURL, null, new BinaryResponseParser());
- }
-
- public HttpSolrServer(String baseURL, HttpClient client) {
- this(baseURL, client, new BinaryResponseParser());
- }
-
- public HttpSolrServer(String baseURL, HttpClient client, ResponseParser parser) {
- this.baseUrl = baseURL;
- if (baseUrl.endsWith("/")) {
- baseUrl = baseUrl.substring(0, baseUrl.length() - 1);
- }
- if (baseUrl.indexOf('?') >= 0) {
- throw new RuntimeException(
- "Invalid base url for solrj. The base URL must not contain parameters: "
- + baseUrl);
- }
-
- if (client != null) {
- httpClient = client;
- internalClient = false;
- } else {
- internalClient = true;
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
- params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
- params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects);
- httpClient = HttpClientUtil.createClient(params);
- }
-
- this.parser = parser;
- }
-
- public Set<String> getQueryParams() {
- return queryParams;
- }
-
- /**
- * Expert Method
- * @param queryParams set of param keys to only send via the query string
- * Note that the param will be sent as a query string if the key is part
- * of this Set or the SolrRequest's query params.
- * @see org.apache.solr.client.solrj.SolrRequest#getQueryParams
- */
- public void setQueryParams(Set<String> queryParams) {
- this.queryParams = queryParams;
- }
-
- /**
- * Process the request. If
- * {@link org.apache.solr.client.solrj.SolrRequest#getResponseParser()} is
- * null, then use {@link #getParser()}
- *
- * @param request
- * The {@link org.apache.solr.client.solrj.SolrRequest} to process
- * @return The {@link org.apache.solr.common.util.NamedList} result
- * @throws IOException If there is a low-level I/O error.
- *
- * @see #request(org.apache.solr.client.solrj.SolrRequest,
- * org.apache.solr.client.solrj.ResponseParser)
- */
- @Override
- public NamedList<Object> request(final SolrRequest request)
- throws SolrServerException, IOException {
- ResponseParser responseParser = request.getResponseParser();
- if (responseParser == null) {
- responseParser = parser;
- }
- return request(request, responseParser);
- }
-
- public NamedList<Object> request(final SolrRequest request, final ResponseParser processor) throws SolrServerException, IOException {
- return executeMethod(createMethod(request),processor);
- }
-
- /**
- * @lucene.experimental
- */
- public static class HttpUriRequestResponse {
- public HttpUriRequest httpUriRequest;
- public Future<NamedList<Object>> future;
- }
-
- /**
- * @lucene.experimental
- */
- public HttpUriRequestResponse httpUriRequest(final SolrRequest request)
- throws SolrServerException, IOException {
- ResponseParser responseParser = request.getResponseParser();
- if (responseParser == null) {
- responseParser = parser;
- }
- return httpUriRequest(request, responseParser);
- }
-
- /**
- * @lucene.experimental
- */
- public HttpUriRequestResponse httpUriRequest(final SolrRequest request, final ResponseParser processor) throws SolrServerException, IOException {
- HttpUriRequestResponse mrr = new HttpUriRequestResponse();
- final HttpRequestBase method = createMethod(request);
- ExecutorService pool = Executors.newFixedThreadPool(1, new SolrjNamedThreadFactory("httpUriRequest"));
- try {
- mrr.future = pool.submit(new Callable<NamedList<Object>>(){
-
- @Override
- public NamedList<Object> call() throws Exception {
- return executeMethod(method, processor);
- }});
-
- } finally {
- pool.shutdown();
- }
- assert method != null;
- mrr.httpUriRequest = method;
- return mrr;
- }
-
- protected ModifiableSolrParams calculateQueryParams(Set<String> queryParamNames,
- ModifiableSolrParams wparams) {
- ModifiableSolrParams queryModParams = new ModifiableSolrParams();
- if (queryParamNames != null) {
- for (String param : queryParamNames) {
- String[] value = wparams.getParams(param) ;
- if (value != null) {
- for (String v : value) {
- queryModParams.add(param, v);
- }
- wparams.remove(param);
- }
- }
- }
- return queryModParams;
- }
-
- protected HttpRequestBase createMethod(final SolrRequest request) throws IOException, SolrServerException {
- HttpRequestBase method = null;
- InputStream is = null;
- SolrParams params = request.getParams();
- Collection<ContentStream> streams = requestWriter.getContentStreams(request);
- String path = requestWriter.getPath(request);
- if (path == null || !path.startsWith("/")) {
- path = DEFAULT_PATH;
- }
-
- ResponseParser parser = request.getResponseParser();
- if (parser == null) {
- parser = this.parser;
- }
-
- // The parser 'wt=' and 'version=' params are used instead of the original
- // params
- ModifiableSolrParams wparams = new ModifiableSolrParams(params);
- if (parser != null) {
- wparams.set(CommonParams.WT, parser.getWriterType());
- wparams.set(CommonParams.VERSION, parser.getVersion());
- }
- if (invariantParams != null) {
- wparams.add(invariantParams);
- }
-
- int tries = maxRetries + 1;
- try {
- while( tries-- > 0 ) {
- // Note: since we aren't do intermittent time keeping
- // ourselves, the potential non-timeout latency could be as
- // much as tries-times (plus scheduling effects) the given
- // timeAllowed.
- try {
- if( SolrRequest.METHOD.GET == request.getMethod() ) {
- if( streams != null ) {
- throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!" );
- }
- method = new HttpGet( baseUrl + path + ClientUtils.toQueryString( wparams, false ) );
- }
- else if( SolrRequest.METHOD.POST == request.getMethod() || SolrRequest.METHOD.PUT == request.getMethod() ) {
-
- String url = baseUrl + path;
- boolean hasNullStreamName = false;
- if (streams != null) {
- for (ContentStream cs : streams) {
- if (cs.getName() == null) {
- hasNullStreamName = true;
- break;
- }
- }
- }
- boolean isMultipart = ((this.useMultiPartPost && SolrRequest.METHOD.POST == request.getMethod())
- || ( streams != null && streams.size() > 1 )) && !hasNullStreamName;
-
- LinkedList<NameValuePair> postOrPutParams = new LinkedList<>();
- if (streams == null || isMultipart) {
- // send server list and request list as query string params
- ModifiableSolrParams queryParams = calculateQueryParams(this.queryParams, wparams);
- queryParams.add(calculateQueryParams(request.getQueryParams(), wparams));
- String fullQueryUrl = url + ClientUtils.toQueryString( queryParams, false );
- HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod() ?
- new HttpPost(fullQueryUrl) : new HttpPut(fullQueryUrl);
- if (!isMultipart) {
- postOrPut.addHeader("Content-Type",
- "application/x-www-form-urlencoded; charset=UTF-8");
- }
-
- List<FormBodyPart> parts = new LinkedList<>();
- Iterator<String> iter = wparams.getParameterNamesIterator();
- while (iter.hasNext()) {
- String p = iter.next();
- String[] vals = wparams.getParams(p);
- if (vals != null) {
- for (String v : vals) {
- if (isMultipart) {
- parts.add(new FormBodyPart(p, new StringBody(v, StandardCharsets.UTF_8)));
- } else {
- postOrPutParams.add(new BasicNameValuePair(p, v));
- }
- }
- }
- }
-
- if (isMultipart && streams != null) {
- for (ContentStream content : streams) {
- String contentType = content.getContentType();
- if(contentType==null) {
- contentType = BinaryResponseParser.BINARY_CONTENT_TYPE; // default
- }
- String name = content.getName();
- if(name==null) {
- name = "";
- }
- parts.add(new FormBodyPart(name,
- new InputStreamBody(
- content.getStream(),
- contentType,
- content.getName())));
- }
- }
-
- if (parts.size() > 0) {
- MultipartEntity entity = new MultipartEntity(HttpMultipartMode.STRICT);
- for(FormBodyPart p: parts) {
- entity.addPart(p);
- }
- postOrPut.setEntity(entity);
- } else {
- //not using multipart
- postOrPut.setEntity(new UrlEncodedFormEntity(postOrPutParams, StandardCharsets.UTF_8));
- }
-
- method = postOrPut;
- }
- // It is has one stream, it is the post body, put the params in the URL
- else {
- String pstr = ClientUtils.toQueryString(wparams, false);
- HttpEntityEnclosingRequestBase postOrPut = SolrRequest.METHOD.POST == request.getMethod() ?
- new HttpPost(url + pstr) : new HttpPut(url + pstr);
-
- // Single stream as body
- // Using a loop just to get the first one
- final ContentStream[] contentStream = new ContentStream[1];
- for (ContentStream content : streams) {
- contentStream[0] = content;
- break;
- }
- if (contentStream[0] instanceof RequestWriter.LazyContentStream) {
- postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), -1) {
- @Override
- public Header getContentType() {
- return new BasicHeader("Content-Type", contentStream[0].getContentType());
- }
-
- @Override
- public boolean isRepeatable() {
- return false;
- }
-
- });
- } else {
- postOrPut.setEntity(new InputStreamEntity(contentStream[0].getStream(), -1) {
- @Override
- public Header getContentType() {
- return new BasicHeader("Content-Type", contentStream[0].getContentType());
- }
-
- @Override
- public boolean isRepeatable() {
- return false;
- }
- });
- }
- method = postOrPut;
- }
- }
- else {
- throw new SolrServerException("Unsupported method: "+request.getMethod() );
- }
- }
- catch( NoHttpResponseException r ) {
- method = null;
- if(is != null) {
- is.close();
- }
- // If out of tries then just rethrow (as normal error).
- if (tries < 1) {
- throw r;
- }
- }
- }
- } catch (IOException ex) {
- throw new SolrServerException("error reading streams", ex);
- }
-
- return method;
- }
-
- protected NamedList<Object> executeMethod(HttpRequestBase method, final ResponseParser processor) throws SolrServerException {
- method.addHeader("User-Agent", AGENT);
-
- InputStream respBody = null;
- boolean shouldClose = true;
- boolean success = false;
- try {
- // Execute the method.
- final HttpResponse response = httpClient.execute(method);
- int httpStatus = response.getStatusLine().getStatusCode();
-
- // Read the contents
- respBody = response.getEntity().getContent();
- Header ctHeader = response.getLastHeader("content-type");
- String contentType;
- if (ctHeader != null) {
- contentType = ctHeader.getValue();
- } else {
- contentType = "";
- }
-
- // handle some http level checks before trying to parse the response
- switch (httpStatus) {
- case HttpStatus.SC_OK:
- case HttpStatus.SC_BAD_REQUEST:
- case HttpStatus.SC_CONFLICT: // 409
- break;
- case HttpStatus.SC_MOVED_PERMANENTLY:
- case HttpStatus.SC_MOVED_TEMPORARILY:
- if (!followRedirects) {
- throw new SolrServerException("Server at " + getBaseURL()
- + " sent back a redirect (" + httpStatus + ").");
- }
- break;
- default:
- if (processor == null) {
- throw new RemoteSolrException(baseUrl, httpStatus, "non ok status: " + httpStatus
- + ", message:" + response.getStatusLine().getReasonPhrase(),
- null);
- }
- }
- if (processor == null) {
-
- // no processor specified, return raw stream
- NamedList<Object> rsp = new NamedList<>();
- rsp.add("stream", respBody);
- // Only case where stream should not be closed
- shouldClose = false;
- success = true;
- return rsp;
- }
-
- String procCt = processor.getContentType();
- if (procCt != null) {
- String procMimeType = ContentType.parse(procCt).getMimeType().trim().toLowerCase(Locale.ROOT);
- String mimeType = ContentType.parse(contentType).getMimeType().trim().toLowerCase(Locale.ROOT);
- if (!procMimeType.equals(mimeType)) {
- // unexpected mime type
- String msg = "Expected mime type " + procMimeType + " but got " + mimeType + ".";
- Header encodingHeader = response.getEntity().getContentEncoding();
- String encoding;
- if (encodingHeader != null) {
- encoding = encodingHeader.getValue();
- } else {
- encoding = "UTF-8"; // try UTF-8
- }
- try {
- msg = msg + " " + IOUtils.toString(respBody, encoding);
- } catch (IOException e) {
- throw new RemoteSolrException(baseUrl, httpStatus, "Could not parse response with encoding " + encoding, e);
- }
- RemoteSolrException e = new RemoteSolrException(baseUrl, httpStatus, msg, null);
- throw e;
- }
- }
-
-// if(true) {
-// ByteArrayOutputStream copy = new ByteArrayOutputStream();
-// IOUtils.copy(respBody, copy);
-// String val = new String(copy.toByteArray());
-// System.out.println(">RESPONSE>"+val+"<"+val.length());
-// respBody = new ByteArrayInputStream(copy.toByteArray());
-// }
-
- NamedList<Object> rsp = null;
- String charset = EntityUtils.getContentCharSet(response.getEntity());
- try {
- rsp = processor.processResponse(respBody, charset);
- } catch (Exception e) {
- throw new RemoteSolrException(baseUrl, httpStatus, e.getMessage(), e);
- }
- if (httpStatus != HttpStatus.SC_OK) {
- NamedList<String> metadata = null;
- String reason = null;
- try {
- NamedList err = (NamedList) rsp.get("error");
- if (err != null) {
- reason = (String) err.get("msg");
- if(reason == null) {
- reason = (String) err.get("trace");
- }
- metadata = (NamedList<String>)err.get("metadata");
- }
- } catch (Exception ex) {}
- if (reason == null) {
- StringBuilder msg = new StringBuilder();
- msg.append(response.getStatusLine().getReasonPhrase());
- msg.append("\n\n");
- msg.append("request: " + method.getURI());
- reason = java.net.URLDecoder.decode(msg.toString(), UTF_8);
- }
- RemoteSolrException rss = new RemoteSolrException(baseUrl, httpStatus, reason, null);
- if (metadata != null) rss.setMetadata(metadata);
- throw rss;
- }
- success = true;
- return rsp;
- } catch (ConnectException e) {
- throw new SolrServerException("Server refused connection at: "
- + getBaseURL(), e);
- } catch (SocketTimeoutException e) {
- throw new SolrServerException(
- "Timeout occured while waiting response from server at: "
- + getBaseURL(), e);
- } catch (IOException e) {
- throw new SolrServerException(
- "IOException occured when talking to server at: " + getBaseURL(), e);
- } finally {
- if (respBody != null && shouldClose) {
- try {
- respBody.close();
- } catch (IOException e) {
- log.error("", e);
- } finally {
- if (!success) {
- method.abort();
- }
- }
- }
- }
- }
-
- // -------------------------------------------------------------------
- // -------------------------------------------------------------------
-
- /**
- * Retrieve the default list of parameters are added to every request
- * regardless.
- *
- * @see #invariantParams
- */
- public ModifiableSolrParams getInvariantParams() {
- return invariantParams;
- }
-
- public String getBaseURL() {
- return baseUrl;
- }
-
- public void setBaseURL(String baseURL) {
- this.baseUrl = baseURL;
- }
-
- public ResponseParser getParser() {
- return parser;
- }
-
- /**
- * Note: This setter method is <b>not thread-safe</b>.
- *
- * @param processor
- * Default Response Parser chosen to parse the response if the parser
- * were not specified as part of the request.
- * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
- */
- public void setParser(ResponseParser processor) {
- parser = processor;
- }
-
- /**
- * Return the HttpClient this instance uses.
- */
- public HttpClient getHttpClient() {
- return httpClient;
- }
-
- /**
- * HttpConnectionParams.setConnectionTimeout
- *
- * @param timeout
- * Timeout in milliseconds
- **/
- public void setConnectionTimeout(int timeout) {
- HttpClientUtil.setConnectionTimeout(httpClient, timeout);
- }
-
- /**
- * Set SoTimeout (read timeout). This is desirable
- * for queries, but probably not for indexing.
- *
- * @param timeout
- * Timeout in milliseconds
- **/
- public void setSoTimeout(int timeout) {
- HttpClientUtil.setSoTimeout(httpClient, timeout);
- }
-
- /**
- * Configure whether the client should follow redirects or not.
- * <p>
- * This defaults to false under the assumption that if you are following a
- * redirect to get to a Solr installation, something is misconfigured
- * somewhere.
- * </p>
- */
- public void setFollowRedirects(boolean followRedirects) {
- this.followRedirects = followRedirects;
- HttpClientUtil.setFollowRedirects(httpClient, followRedirects);
- }
-
- /**
- * Allow server->client communication to be compressed. Currently gzip and
- * deflate are supported. If the server supports compression the response will
- * be compressed. This method is only allowed if the http client is of type
- * DefatulHttpClient.
- */
- public void setAllowCompression(boolean allowCompression) {
- if (httpClient instanceof DefaultHttpClient) {
- HttpClientUtil.setAllowCompression((DefaultHttpClient) httpClient, allowCompression);
- } else {
- throw new UnsupportedOperationException(
- "HttpClient instance was not of type DefaultHttpClient");
- }
- }
-
- /**
- * Set maximum number of retries to attempt in the event of transient errors.
- * <p>
- * Maximum number of retries to attempt in the event of transient errors.
- * Default: 0 (no) retries. No more than 1 recommended.
- * </p>
- * @param maxRetries
- * No more than 1 recommended
- */
- public void setMaxRetries(int maxRetries) {
- if (maxRetries > 1) {
- log.warn("HttpSolrServer: maximum Retries " + maxRetries
- + " > 1. Maximum recommended retries is 1.");
- }
- this.maxRetries = maxRetries;
- }
-
- public void setRequestWriter(RequestWriter requestWriter) {
- this.requestWriter = requestWriter;
- }
-
- /**
- * Adds the documents supplied by the given iterator.
- *
- * @param docIterator
- * the iterator which returns SolrInputDocument instances
- *
- * @return the response from the SolrServer
- */
- public UpdateResponse add(Iterator<SolrInputDocument> docIterator)
- throws SolrServerException, IOException {
- UpdateRequest req = new UpdateRequest();
- req.setDocIterator(docIterator);
- return req.process(this);
- }
-
- /**
- * Adds the beans supplied by the given iterator.
- *
- * @param beanIterator
- * the iterator which returns Beans
- *
- * @return the response from the SolrServer
- */
- public UpdateResponse addBeans(final Iterator<?> beanIterator)
- throws SolrServerException, IOException {
- UpdateRequest req = new UpdateRequest();
- req.setDocIterator(new Iterator<SolrInputDocument>() {
-
- @Override
- public boolean hasNext() {
- return beanIterator.hasNext();
- }
-
- @Override
- public SolrInputDocument next() {
- Object o = beanIterator.next();
- if (o == null) return null;
- return getBinder().toSolrInputDocument(o);
- }
-
- @Override
- public void remove() {
- beanIterator.remove();
- }
- });
- return req.process(this);
- }
-
- /**
- * Close the {@link ClientConnectionManager} from the internal client.
- */
- @Override
- public void shutdown() {
- if (httpClient != null && internalClient) {
- httpClient.getConnectionManager().shutdown();
- }
+ super(baseURL);
}
- /**
- * Set the maximum number of connections that can be open to a single host at
- * any given time. If http client was created outside the operation is not
- * allowed.
- */
- public void setDefaultMaxConnectionsPerHost(int max) {
- if (internalClient) {
- HttpClientUtil.setMaxConnectionsPerHost(httpClient, max);
- } else {
- throw new UnsupportedOperationException(
- "Client was created outside of HttpSolrServer");
- }
- }
-
- /**
- * Set the maximum number of connections that can be open at any given time.
- * If http client was created outside the operation is not allowed.
- */
- public void setMaxTotalConnections(int max) {
- if (internalClient) {
- HttpClientUtil.setMaxConnections(httpClient, max);
- } else {
- throw new UnsupportedOperationException(
- "Client was created outside of HttpSolrServer");
- }
- }
-
- public boolean isUseMultiPartPost() {
- return useMultiPartPost;
+ public HttpSolrServer(String baseURL, HttpClient client) {
+ super(baseURL, client);
}
- /**
- * Set the multipart connection properties
- */
- public void setUseMultiPartPost(boolean useMultiPartPost) {
- this.useMultiPartPost = useMultiPartPost;
+ public HttpSolrServer(String baseURL, HttpClient client, ResponseParser parser) {
+ super(baseURL, client, parser);
}
- /**
- * Subclass of SolrException that allows us to capture an arbitrary HTTP
- * status code that may have been returned by the remote server or a
- * proxy along the way.
- */
- public static class RemoteSolrException extends SolrException {
- /**
- * @param remoteHost the host the error was received from
- * @param code Arbitrary HTTP status code
- * @param msg Exception Message
- * @param th Throwable to wrap with this Exception
- */
- public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
- super(code, "Error from server at " + remoteHost + ": " + msg, th);
- }
- }
}
Added: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java?rev=1648697&view=auto
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java (added)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java Wed Dec 31 14:05:48 2014
@@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.impl;
+
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.*;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.common.SolrException;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.net.ConnectException;
+import java.net.MalformedURLException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URL;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.*;
+
+/**
+ * LBHttpSolrClient or "LoadBalanced HttpSolrClient" is a load balancing wrapper around
+ * {@link HttpSolrClient}. This is useful when you
+ * have multiple Solr servers and the requests need to be Load Balanced among them.
+ *
+ * Do <b>NOT</b> use this class for indexing in master/slave scenarios since documents must be sent to the
+ * correct master; no inter-node routing is done.
+ *
+ * In SolrCloud (leader/replica) scenarios, it is usually better to use
+ * {@link CloudSolrClient}, but this class may be used
+ * for updates because the server will forward them to the appropriate leader.
+ *
+ * <p/>
+ * It offers automatic failover when a server goes down and it detects when the server comes back up.
+ * <p/>
+ * Load balancing is done using a simple round-robin on the list of servers.
+ * <p/>
+ * If a request to a server fails by an IOException due to a connection timeout or read timeout then the host is taken
+ * off the list of live servers and moved to a 'dead server list' and the request is resent to the next live server.
+ * This process is continued till it tries all the live servers. If at least one server is alive, the request succeeds,
+ * and if not it fails.
+ * <blockquote><pre>
+ * SolrClient lbHttpSolrClient = new LBHttpSolrClient("http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr");
+ * //or if you wish to pass the HttpClient do as follows
+ * httpClient httpClient = new HttpClient();
+ * SolrClient lbHttpSolrClient = new LBHttpSolrClient(httpClient, "http://host1:8080/solr/", "http://host2:8080/solr", "http://host2:8080/solr");
+ * </pre></blockquote>
+ * This detects if a dead server comes alive automatically. The check is done in fixed intervals in a dedicated thread.
+ * This interval can be set using {@link #setAliveCheckInterval} , the default is set to one minute.
+ * <p/>
+ * <b>When to use this?</b><br/> This can be used as a software load balancer when you do not wish to setup an external
+ * load balancer. Alternatives to this code are to use
+ * a dedicated hardware load balancer or using Apache httpd with mod_proxy_balancer as a load balancer. See <a
+ * href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load balancing on Wikipedia</a>
+ *
+ * @since solr 1.4
+ */
+public class LBHttpSolrClient extends SolrClient {
+ private static Set<Integer> RETRY_CODES = new HashSet<>(4);
+
+ static {
+ RETRY_CODES.add(404);
+ RETRY_CODES.add(403);
+ RETRY_CODES.add(503);
+ RETRY_CODES.add(500);
+ }
+
+ // keys to the maps are currently of the form "http://localhost:8983/solr"
+ // which should be equivalent to HttpSolrServer.getBaseURL()
+ private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<>();
+ // access to aliveServers should be synchronized on itself
+
+ protected final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
+
+ // changes to aliveServers are reflected in this array, no need to synchronize
+ private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
+
+
+ private ScheduledExecutorService aliveCheckExecutor;
+
+ private final HttpClient httpClient;
+ private final boolean clientIsInternal;
+ private final AtomicInteger counter = new AtomicInteger(-1);
+
+ private static final SolrQuery solrQuery = new SolrQuery("*:*");
+ private volatile ResponseParser parser;
+ private volatile RequestWriter requestWriter;
+
+ private Set<String> queryParams = new HashSet<>();
+
+ static {
+ solrQuery.setRows(0);
+ /**
+ * Default sort (if we don't supply a sort) is by score and since
+ * we request 0 rows any sorting and scoring is not necessary.
+ * SolrQuery.DOCID schema-independently specifies a non-scoring sort.
+ * <code>_docid_ asc</code> sort is efficient,
+ * <code>_docid_ desc</code> sort is not, so choose ascending DOCID sort.
+ */
+ solrQuery.setSort(SolrQuery.DOCID, SolrQuery.ORDER.asc);
+ // not a top-level request, we are interested only in the server being sent to i.e. it need not distribute our request to further servers
+ solrQuery.setDistrib(false);
+ }
+
+ protected static class ServerWrapper {
+
+ final HttpSolrClient client;
+
+ long lastUsed; // last time used for a real request
+ long lastChecked; // last time checked for liveness
+
+ // "standard" servers are used by default. They normally live in the alive list
+ // and move to the zombie list when unavailable. When they become available again,
+ // they move back to the alive list.
+ boolean standard = true;
+
+ int failedPings = 0;
+
+ public ServerWrapper(HttpSolrClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public String toString() {
+ return client.getBaseURL();
+ }
+
+ public String getKey() {
+ return client.getBaseURL();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getKey().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof ServerWrapper)) return false;
+ return this.getKey().equals(((ServerWrapper)obj).getKey());
+ }
+ }
+
+ public static class Req {
+ protected SolrRequest request;
+ protected List<String> servers;
+ protected int numDeadServersToTry;
+
+ public Req(SolrRequest request, List<String> servers) {
+ this.request = request;
+ this.servers = servers;
+ this.numDeadServersToTry = servers.size();
+ }
+
+ public SolrRequest getRequest() {
+ return request;
+ }
+ public List<String> getServers() {
+ return servers;
+ }
+
+ /** @return the number of dead servers to try if there are no live servers left */
+ public int getNumDeadServersToTry() {
+ return numDeadServersToTry;
+ }
+
+ /** @param numDeadServersToTry The number of dead servers to try if there are no live servers left.
+ * Defaults to the number of servers in this request. */
+ public void setNumDeadServersToTry(int numDeadServersToTry) {
+ this.numDeadServersToTry = numDeadServersToTry;
+ }
+ }
+
+ public static class Rsp {
+ protected String server;
+ protected NamedList<Object> rsp;
+
+ /** The response from the server */
+ public NamedList<Object> getResponse() {
+ return rsp;
+ }
+
+ /** The server that returned the response */
+ public String getServer() {
+ return server;
+ }
+ }
+
+ public LBHttpSolrClient(String... solrServerUrls) throws MalformedURLException {
+ this(null, solrServerUrls);
+ }
+
+ /** The provided httpClient should use a multi-threaded connection manager */
+ public LBHttpSolrClient(HttpClient httpClient, String... solrServerUrl) {
+ this(httpClient, new BinaryResponseParser(), solrServerUrl);
+ }
+
+ /** The provided httpClient should use a multi-threaded connection manager */
+ public LBHttpSolrClient(HttpClient httpClient, ResponseParser parser, String... solrServerUrl) {
+ clientIsInternal = (httpClient == null);
+ this.parser = parser;
+ if (httpClient == null) {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(HttpClientUtil.PROP_USE_RETRY, false);
+ this.httpClient = HttpClientUtil.createClient(params);
+ } else {
+ this.httpClient = httpClient;
+ }
+ for (String s : solrServerUrl) {
+ ServerWrapper wrapper = new ServerWrapper(makeSolrClient(s));
+ aliveServers.put(wrapper.getKey(), wrapper);
+ }
+ updateAliveList();
+ }
+
+ public Set<String> getQueryParams() {
+ return queryParams;
+ }
+
+ /**
+ * Expert Method.
+ * @param queryParams set of param keys to only send via the query string
+ */
+ public void setQueryParams(Set<String> queryParams) {
+ this.queryParams = queryParams;
+ }
+ public void addQueryParams(String queryOnlyParam) {
+ this.queryParams.add(queryOnlyParam) ;
+ }
+
+ public static String normalize(String server) {
+ if (server.endsWith("/"))
+ server = server.substring(0, server.length() - 1);
+ return server;
+ }
+
+ protected HttpSolrClient makeSolrClient(String server) {
+ HttpSolrClient client = new HttpSolrClient(server, httpClient, parser);
+ if (requestWriter != null) {
+ client.setRequestWriter(requestWriter);
+ }
+ if (queryParams != null) {
+ client.setQueryParams(queryParams);
+ }
+ return client;
+ }
+
+ /**
+ * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
+ * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
+ * time, or until a test request on that server succeeds.
+ *
+ * Servers are queried in the exact order given (except servers currently in the dead pool are skipped).
+ * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.
+ * Req.getNumDeadServersToTry() controls how many dead servers will be tried.
+ *
+ * If no live servers are found a SolrServerException is thrown.
+ *
+ * @param req contains both the request as well as the list of servers to query
+ *
+ * @return the result of the request
+ *
+ * @throws IOException If there is a low-level I/O error.
+ */
+ public Rsp request(Req req) throws SolrServerException, IOException {
+ Rsp rsp = new Rsp();
+ Exception ex = null;
+ boolean isUpdate = req.request instanceof IsUpdateRequest;
+ List<ServerWrapper> skipped = null;
+
+ for (String serverStr : req.getServers()) {
+ serverStr = normalize(serverStr);
+ // if the server is currently a zombie, just skip to the next one
+ ServerWrapper wrapper = zombieServers.get(serverStr);
+ if (wrapper != null) {
+ // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
+ final int numDeadServersToTry = req.getNumDeadServersToTry();
+ if (numDeadServersToTry > 0) {
+ if (skipped == null) {
+ skipped = new ArrayList<>(numDeadServersToTry);
+ skipped.add(wrapper);
+ }
+ else if (skipped.size() < numDeadServersToTry) {
+ skipped.add(wrapper);
+ }
+ }
+ continue;
+ }
+ rsp.server = serverStr;
+ HttpSolrClient client = makeSolrClient(serverStr);
+
+ ex = doRequest(client, req, rsp, isUpdate, false, null);
+ if (ex == null) {
+ return rsp; // SUCCESS
+ }
+ }
+
+ // try the servers we previously skipped
+ if (skipped != null) {
+ for (ServerWrapper wrapper : skipped) {
+ ex = doRequest(wrapper.client, req, rsp, isUpdate, true, wrapper.getKey());
+ if (ex == null) {
+ return rsp; // SUCCESS
+ }
+ }
+ }
+
+
+ if (ex == null) {
+ throw new SolrServerException("No live SolrServers available to handle this request");
+ } else {
+ throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex);
+ }
+
+ }
+
+ protected Exception addZombie(HttpSolrClient server, Exception e) {
+
+ ServerWrapper wrapper;
+
+ wrapper = new ServerWrapper(server);
+ wrapper.lastUsed = System.currentTimeMillis();
+ wrapper.standard = false;
+ zombieServers.put(wrapper.getKey(), wrapper);
+ startAliveCheckExecutor();
+ return e;
+ }
+
+ protected Exception doRequest(HttpSolrClient client, Req req, Rsp rsp, boolean isUpdate,
+ boolean isZombie, String zombieKey) throws SolrServerException, IOException {
+ Exception ex = null;
+ try {
+ rsp.rsp = client.request(req.getRequest());
+ if (isZombie) {
+ zombieServers.remove(zombieKey);
+ }
+ } catch (SolrException e) {
+ // we retry on 404 or 403 or 503 or 500
+ // unless it's an update - then we only retry on connect exception
+ if (!isUpdate && RETRY_CODES.contains(e.code())) {
+ ex = (!isZombie) ? addZombie(client, e) : e;
+ } else {
+ // Server is alive but the request was likely malformed or invalid
+ if (isZombie) {
+ zombieServers.remove(zombieKey);
+ }
+ throw e;
+ }
+ } catch (SocketException e) {
+ if (!isUpdate || e instanceof ConnectException) {
+ ex = (!isZombie) ? addZombie(client, e) : e;
+ } else {
+ throw e;
+ }
+ } catch (SocketTimeoutException e) {
+ if (!isUpdate) {
+ ex = (!isZombie) ? addZombie(client, e) : e;
+ } else {
+ throw e;
+ }
+ } catch (SolrServerException e) {
+ Throwable rootCause = e.getRootCause();
+ if (!isUpdate && rootCause instanceof IOException) {
+ ex = (!isZombie) ? addZombie(client, e) : e;
+ } else if (isUpdate && rootCause instanceof ConnectException) {
+ ex = (!isZombie) ? addZombie(client, e) : e;
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ throw new SolrServerException(e);
+ }
+
+ return ex;
+ }
+
+ private void updateAliveList() {
+ synchronized (aliveServers) {
+ aliveServerList = aliveServers.values().toArray(new ServerWrapper[aliveServers.size()]);
+ }
+ }
+
+ private ServerWrapper removeFromAlive(String key) {
+ synchronized (aliveServers) {
+ ServerWrapper wrapper = aliveServers.remove(key);
+ if (wrapper != null)
+ updateAliveList();
+ return wrapper;
+ }
+ }
+
+ private void addToAlive(ServerWrapper wrapper) {
+ synchronized (aliveServers) {
+ ServerWrapper prev = aliveServers.put(wrapper.getKey(), wrapper);
+ // TODO: warn if there was a previous entry?
+ updateAliveList();
+ }
+ }
+
+ public void addSolrServer(String server) throws MalformedURLException {
+ HttpSolrClient client = makeSolrClient(server);
+ addToAlive(new ServerWrapper(client));
+ }
+
+ public String removeSolrServer(String server) {
+ try {
+ server = new URL(server).toExternalForm();
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ if (server.endsWith("/")) {
+ server = server.substring(0, server.length() - 1);
+ }
+
+ // there is a small race condition here - if the server is in the process of being moved between
+ // lists, we could fail to remove it.
+ removeFromAlive(server);
+ zombieServers.remove(server);
+ return null;
+ }
+
+ public void setConnectionTimeout(int timeout) {
+ HttpClientUtil.setConnectionTimeout(httpClient, timeout);
+ }
+
+ /**
+ * set soTimeout (read timeout) on the underlying HttpConnectionManager. This is desirable for queries, but probably
+ * not for indexing.
+ */
+ public void setSoTimeout(int timeout) {
+ HttpClientUtil.setSoTimeout(httpClient, timeout);
+ }
+
+ @Override
+ public void shutdown() {
+ if (aliveCheckExecutor != null) {
+ aliveCheckExecutor.shutdownNow();
+ }
+ if(clientIsInternal) {
+ httpClient.getConnectionManager().shutdown();
+ }
+ }
+
+ /**
+ * Tries to query a live server. A SolrServerException is thrown if all servers are dead.
+ * If the request failed due to IOException then the live server is moved to dead pool and the request is
+ * retried on another live server. After live servers are exhausted, any servers previously marked as dead
+ * will be tried before failing the request.
+ *
+ * @param request the SolrRequest.
+ *
+ * @return response
+ *
+ * @throws IOException If there is a low-level I/O error.
+ */
+ @Override
+ public NamedList<Object> request(final SolrRequest request)
+ throws SolrServerException, IOException {
+ Exception ex = null;
+ ServerWrapper[] serverList = aliveServerList;
+
+ int maxTries = serverList.length;
+ Map<String,ServerWrapper> justFailed = null;
+
+ for (int attempts=0; attempts<maxTries; attempts++) {
+ int count = counter.incrementAndGet() & Integer.MAX_VALUE;
+ ServerWrapper wrapper = serverList[count % serverList.length];
+ wrapper.lastUsed = System.currentTimeMillis();
+
+ try {
+ return wrapper.client.request(request);
+ } catch (SolrException e) {
+ // Server is alive but the request was malformed or invalid
+ throw e;
+ } catch (SolrServerException e) {
+ if (e.getRootCause() instanceof IOException) {
+ ex = e;
+ moveAliveToDead(wrapper);
+ if (justFailed == null) justFailed = new HashMap<>();
+ justFailed.put(wrapper.getKey(), wrapper);
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ throw new SolrServerException(e);
+ }
+ }
+
+
+ // try other standard servers that we didn't try just now
+ for (ServerWrapper wrapper : zombieServers.values()) {
+ if (wrapper.standard==false || justFailed!=null && justFailed.containsKey(wrapper.getKey())) continue;
+ try {
+ NamedList<Object> rsp = wrapper.client.request(request);
+ // remove from zombie list *before* adding to alive to avoid a race that could lose a server
+ zombieServers.remove(wrapper.getKey());
+ addToAlive(wrapper);
+ return rsp;
+ } catch (SolrException e) {
+ // Server is alive but the request was malformed or invalid
+ throw e;
+ } catch (SolrServerException e) {
+ if (e.getRootCause() instanceof IOException) {
+ ex = e;
+ // still dead
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ throw new SolrServerException(e);
+ }
+ }
+
+
+ if (ex == null) {
+ throw new SolrServerException("No live SolrServers available to handle this request");
+ } else {
+ throw new SolrServerException("No live SolrServers available to handle this request", ex);
+ }
+ }
+
+ /**
+ * Takes up one dead server and check for aliveness. The check is done in a roundrobin. Each server is checked for
+ * aliveness once in 'x' millis where x is decided by the setAliveCheckinterval() or it is defaulted to 1 minute
+ *
+ * @param zombieServer a server in the dead pool
+ */
+ private void checkAZombieServer(ServerWrapper zombieServer) {
+ long currTime = System.currentTimeMillis();
+ try {
+ zombieServer.lastChecked = currTime;
+ QueryResponse resp = zombieServer.client.query(solrQuery);
+ if (resp.getStatus() == 0) {
+ // server has come back up.
+ // make sure to remove from zombies before adding to alive to avoid a race condition
+ // where another thread could mark it down, move it back to zombie, and then we delete
+ // from zombie and lose it forever.
+ ServerWrapper wrapper = zombieServers.remove(zombieServer.getKey());
+ if (wrapper != null) {
+ wrapper.failedPings = 0;
+ if (wrapper.standard) {
+ addToAlive(wrapper);
+ }
+ } else {
+ // something else already moved the server from zombie to alive
+ }
+ }
+ } catch (Exception e) {
+ //Expected. The server is still down.
+ zombieServer.failedPings++;
+
+ // If the server doesn't belong in the standard set belonging to this load balancer
+ // then simply drop it after a certain number of failed pings.
+ if (!zombieServer.standard && zombieServer.failedPings >= NONSTANDARD_PING_LIMIT) {
+ zombieServers.remove(zombieServer.getKey());
+ }
+ }
+ }
+
+ private void moveAliveToDead(ServerWrapper wrapper) {
+ wrapper = removeFromAlive(wrapper.getKey());
+ if (wrapper == null)
+ return; // another thread already detected the failure and removed it
+ zombieServers.put(wrapper.getKey(), wrapper);
+ startAliveCheckExecutor();
+ }
+
+ private int interval = CHECK_INTERVAL;
+
+ /**
+ * LBHttpSolrServer keeps pinging the dead servers at fixed interval to find if it is alive. Use this to set that
+ * interval
+ *
+ * @param interval time in milliseconds
+ */
+ public void setAliveCheckInterval(int interval) {
+ if (interval <= 0) {
+ throw new IllegalArgumentException("Alive check interval must be " +
+ "positive, specified value = " + interval);
+ }
+ this.interval = interval;
+ }
+
+ private void startAliveCheckExecutor() {
+ // double-checked locking, but it's OK because we don't *do* anything with aliveCheckExecutor
+ // if it's not null.
+ if (aliveCheckExecutor == null) {
+ synchronized (this) {
+ if (aliveCheckExecutor == null) {
+ aliveCheckExecutor = Executors.newSingleThreadScheduledExecutor(
+ new SolrjNamedThreadFactory("aliveCheckExecutor"));
+ aliveCheckExecutor.scheduleAtFixedRate(
+ getAliveCheckRunner(new WeakReference<>(this)),
+ this.interval, this.interval, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+
+ private static Runnable getAliveCheckRunner(final WeakReference<LBHttpSolrClient> lbRef) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ LBHttpSolrClient lb = lbRef.get();
+ if (lb != null && lb.zombieServers != null) {
+ for (ServerWrapper zombieServer : lb.zombieServers.values()) {
+ lb.checkAZombieServer(zombieServer);
+ }
+ }
+ }
+ };
+ }
+
+ /**
+ * Return the HttpClient this instance uses.
+ */
+ public HttpClient getHttpClient() {
+ return httpClient;
+ }
+
+ public ResponseParser getParser() {
+ return parser;
+ }
+
+ /**
+ * Changes the {@link ResponseParser} that will be used for the internal
+ * SolrServer objects.
+ *
+ * @param parser Default Response Parser chosen to parse the response if the parser
+ * were not specified as part of the request.
+ * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+ */
+ public void setParser(ResponseParser parser) {
+ this.parser = parser;
+ }
+
+ /**
+ * Changes the {@link RequestWriter} that will be used for the internal
+ * SolrServer objects.
+ *
+ * @param requestWriter Default RequestWriter, used to encode requests sent to the server.
+ */
+ public void setRequestWriter(RequestWriter requestWriter) {
+ this.requestWriter = requestWriter;
+ }
+
+ public RequestWriter getRequestWriter() {
+ return requestWriter;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ try {
+ if(this.aliveCheckExecutor!=null)
+ this.aliveCheckExecutor.shutdownNow();
+ } finally {
+ super.finalize();
+ }
+ }
+
+ // defaults
+ private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
+ private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list
+
+}