You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2019/03/04 15:49:31 UTC
[lucene-solr] branch branch_8_0 updated: SOLR-13276: push back to
8.1
This is an automated email from the ASF dual-hosted git repository.
datcm pushed a commit to branch branch_8_0
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8_0 by this push:
new 2f7dc0d SOLR-13276: push back to 8.1
2f7dc0d is described below
commit 2f7dc0d7566afa062cc6cf47ceb007fa7ab13c2c
Author: Cao Manh Dat <da...@apache.org>
AuthorDate: Mon Mar 4 15:49:22 2019 +0000
SOLR-13276: push back to 8.1
---
solr/CHANGES.txt | 2 -
.../client/solrj/impl/BaseCloudSolrClient.java | 1244 --------------------
.../solrj/impl/BaseHttpClusterStateProvider.java | 309 -----
.../solr/client/solrj/impl/BaseHttpSolrClient.java | 80 --
.../client/solrj/impl/CloudHttp2SolrClient.java | 237 ----
.../solr/client/solrj/impl/CloudSolrClient.java | 1186 ++++++++++++++++++-
.../solrj/impl/Http2ClusterStateProvider.java | 46 -
.../solr/client/solrj/impl/Http2SolrClient.java | 83 +-
.../solrj/impl/HttpClusterStateProvider.java | 301 ++++-
.../solr/client/solrj/impl/HttpSolrClient.java | 52 +-
.../solr/client/solrj/request/UpdateRequest.java | 76 +-
.../impl/CloudHttp2SolrClientBadInputTest.java | 73 --
.../impl/CloudHttp2SolrClientBuilderTest.java | 84 --
.../CloudHttp2SolrClientMultiConstructorTest.java | 85 --
.../solrj/impl/CloudHttp2SolrClientRetryTest.java | 83 --
.../solrj/impl/CloudHttp2SolrClientTest.java | 978 ---------------
.../solrj/impl/CloudSolrClientCacheTest.java | 2 +-
.../src/java/org/apache/solr/SolrTestCaseJ4.java | 56 -
18 files changed, 1577 insertions(+), 3400 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 0bc0479..6d46019 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -173,8 +173,6 @@ New Features
* SOLR-13241: Add 'autoscaling' tool support to solr.cmd (Jason Gerlowski)
-* SOLR-13276: Adding Http2 equivalent classes of CloudSolrClient and HttpClusterStateProvider (Cao Manh Dat)
-
Bug Fixes
----------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
deleted file mode 100644
index 7ae1e02..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ /dev/null
@@ -1,1244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.net.ConnectException;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
-
-import org.apache.solr.client.solrj.ResponseParser;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.V2RequestSupport;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.IsUpdateRequest;
-import org.apache.solr.client.solrj.request.RequestWriter;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.request.V2Request;
-import org.apache.solr.client.solrj.util.ClientUtils;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.ToleratedUpdateError;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CollectionStatePredicate;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.ImplicitDocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.Hash;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.common.util.SolrjNamedThreadFactory;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
-import static org.apache.solr.common.params.CommonParams.ID;
-
-public abstract class BaseCloudSolrClient extends SolrClient {
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private volatile String defaultCollection;
- //no of times collection state to be reloaded if stale state error is received
- private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "5"));
- private Random rand = new Random();
-
- private final boolean updatesToLeaders;
- private final boolean directUpdatesToLeadersOnly;
- boolean parallelUpdates; //TODO final
- private ExecutorService threadPool = ExecutorUtil
- .newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
- "CloudSolrClient ThreadPool"));
- private String idField = ID;
- public static final String STATE_VERSION = "_stateVer_";
- private long retryExpiryTime = TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);//3 seconds or 3 million nanos
- private final Set<String> NON_ROUTABLE_PARAMS;
- {
- NON_ROUTABLE_PARAMS = new HashSet<>();
- NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
- NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
- NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
- NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
- NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
-
- NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
- NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
- NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
-
- // Not supported via SolrCloud
- // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
-
- }
- private volatile List<Object> locks = objectList(3);
-
-
- static class StateCache extends ConcurrentHashMap<String, ExpiringCachedDocCollection> {
- final AtomicLong puts = new AtomicLong();
- final AtomicLong hits = new AtomicLong();
- final Lock evictLock = new ReentrantLock(true);
- protected volatile long timeToLive = 60 * 1000L;
-
- @Override
- public ExpiringCachedDocCollection get(Object key) {
- ExpiringCachedDocCollection val = super.get(key);
- if(val == null) {
- // a new collection is likely to be added now.
- //check if there are stale items and remove them
- evictStale();
- return null;
- }
- if(val.isExpired(timeToLive)) {
- super.remove(key);
- return null;
- }
- hits.incrementAndGet();
- return val;
- }
-
- @Override
- public ExpiringCachedDocCollection put(String key, ExpiringCachedDocCollection value) {
- puts.incrementAndGet();
- return super.put(key, value);
- }
-
- void evictStale() {
- if(!evictLock.tryLock()) return;
- try {
- for (Entry<String, ExpiringCachedDocCollection> e : entrySet()) {
- if(e.getValue().isExpired(timeToLive)){
- super.remove(e.getKey());
- }
- }
- } finally {
- evictLock.unlock();
- }
- }
-
- }
-
- /**
- * This is the time to wait to refetch the state
- * after getting the same state version from ZK
- * <p>
- * secs
- */
- public void setRetryExpiryTime(int secs) {
- this.retryExpiryTime = TimeUnit.NANOSECONDS.convert(secs, TimeUnit.SECONDS);
- }
-
- protected final StateCache collectionStateCache = new StateCache();
-
- class ExpiringCachedDocCollection {
- final DocCollection cached;
- final long cachedAt;
- //This is the time at which the collection is retried and got the same old version
- volatile long retriedAt = -1;
- //flag that suggests that this is potentially to be rechecked
- volatile boolean maybeStale = false;
-
- ExpiringCachedDocCollection(DocCollection cached) {
- this.cached = cached;
- this.cachedAt = System.nanoTime();
- }
-
- boolean isExpired(long timeToLiveMs) {
- return (System.nanoTime() - cachedAt)
- > TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS);
- }
-
- boolean shouldRetry() {
- if (maybeStale) {// we are not sure if it is stale so check with retry time
- if ((retriedAt == -1 ||
- (System.nanoTime() - retriedAt) > retryExpiryTime)) {
- return true;// we retried a while back. and we could not get anything new.
- //it's likely that it is not going to be available now also.
- }
- }
- return false;
- }
-
- void setRetriedAt() {
- retriedAt = System.nanoTime();
- }
- }
-
- protected BaseCloudSolrClient(boolean updatesToLeaders, boolean parallelUpdates, boolean directUpdatesToLeadersOnly) {
- this.updatesToLeaders = updatesToLeaders;
- this.parallelUpdates = parallelUpdates;
- this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
- }
-
- /** Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
- * @param seconds ttl value in seconds
- */
- public void setCollectionCacheTTl(int seconds){
- assert seconds > 0;
- this.collectionStateCache.timeToLive = seconds * 1000L;
- }
-
- protected abstract LBSolrClient getLbClient();
-
- public abstract ClusterStateProvider getClusterStateProvider();
-
- protected abstract boolean wasCommError(Throwable t);
-
- @Override
- public void close() throws IOException {
- if(this.threadPool != null && !this.threadPool.isShutdown()) {
- this.threadPool.shutdown();
- }
- }
-
- public ResponseParser getParser() {
- return getLbClient().getParser();
- }
-
- /**
- * Note: This setter method is <b>not thread-safe</b>.
- *
- * @param processor
- * Default Response Parser chosen to parse the response if the parser
- * were not specified as part of the request.
- * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
- */
- public void setParser(ResponseParser processor) {
- getLbClient().setParser(processor);
- }
-
- public RequestWriter getRequestWriter() {
- return getLbClient().getRequestWriter();
- }
-
- public void setRequestWriter(RequestWriter requestWriter) {
- getLbClient().setRequestWriter(requestWriter);
- }
-
- /**
- * @return the zkHost value used to connect to zookeeper.
- */
- public String getZkHost() {
- return assertZKStateProvider().zkHost;
- }
-
- public ZkStateReader getZkStateReader() {
- if (getClusterStateProvider() instanceof ZkClientClusterStateProvider) {
- ZkClientClusterStateProvider provider = (ZkClientClusterStateProvider) getClusterStateProvider();
- getClusterStateProvider().connect();
- return provider.zkStateReader;
- }
- throw new IllegalStateException("This has no Zk stateReader");
- }
-
- /**
- * @param idField the field to route documents on.
- */
- public void setIdField(String idField) {
- this.idField = idField;
- }
-
- /**
- * @return the field that updates are routed on.
- */
- public String getIdField() {
- return idField;
- }
-
- /** Sets the default collection for request */
- public void setDefaultCollection(String collection) {
- this.defaultCollection = collection;
- }
-
- /** Gets the default collection for request */
- public String getDefaultCollection() {
- return defaultCollection;
- }
-
- /** Set the connect timeout to the zookeeper ensemble in ms */
- public void setZkConnectTimeout(int zkConnectTimeout) {
- assertZKStateProvider().zkConnectTimeout = zkConnectTimeout;
- }
-
- /** Set the timeout to the zookeeper ensemble in ms */
- public void setZkClientTimeout(int zkClientTimeout) {
- assertZKStateProvider().zkClientTimeout = zkClientTimeout;
- }
-
- /** Gets whether direct updates are sent in parallel */
- public boolean isParallelUpdates() {
- return parallelUpdates;
- }
-
- /**
- * Connect to the zookeeper ensemble.
- * This is an optional method that may be used to force a connect before any other requests are sent.
- */
- public void connect() {
- getClusterStateProvider().connect();
- }
-
- /**
- * Connect to a cluster. If the cluster is not ready, retry connection up to a given timeout.
- * @param duration the timeout
- * @param timeUnit the units of the timeout
- * @throws TimeoutException if the cluster is not ready after the timeout
- * @throws InterruptedException if the wait is interrupted
- */
- public void connect(long duration, TimeUnit timeUnit) throws TimeoutException, InterruptedException {
- log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, getClusterStateProvider());
- long timeout = System.nanoTime() + timeUnit.toNanos(duration);
- while (System.nanoTime() < timeout) {
- try {
- connect();
- log.info("Cluster at {} ready", getClusterStateProvider());
- return;
- }
- catch (RuntimeException e) {
- // not ready yet, then...
- }
- TimeUnit.MILLISECONDS.sleep(250);
- }
- throw new TimeoutException("Timed out waiting for cluster");
- }
-
- private ZkClientClusterStateProvider assertZKStateProvider() {
- if (getClusterStateProvider() instanceof ZkClientClusterStateProvider) {
- return (ZkClientClusterStateProvider) getClusterStateProvider();
- }
- throw new IllegalArgumentException("This client does not use ZK");
-
- }
-
- /**
- * Block until a collection state matches a predicate, or a timeout
- *
- * Note that the predicate may be called again even after it has returned true, so
- * implementors should avoid changing state within the predicate call itself.
- *
- * @param collection the collection to watch
- * @param wait how long to wait
- * @param unit the units of the wait parameter
- * @param predicate a {@link CollectionStatePredicate} to check the collection state
- * @throws InterruptedException on interrupt
- * @throws TimeoutException on timeout
- */
- public void waitForState(String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
- throws InterruptedException, TimeoutException {
- getClusterStateProvider().connect();
- assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
- }
-
- /**
- * Register a CollectionStateWatcher to be called when the cluster state for a collection changes
- *
- * Note that the watcher is unregistered after it has been called once. To make a watcher persistent,
- * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)}
- * call
- *
- * @param collection the collection to watch
- * @param watcher a watcher that will be called when the state changes
- */
- public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
- getClusterStateProvider().connect();
- assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher);
- }
-
- private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException {
- UpdateRequest updateRequest = (UpdateRequest) request;
- SolrParams params = request.getParams();
- ModifiableSolrParams routableParams = new ModifiableSolrParams();
- ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
-
- if(params != null) {
- nonRoutableParams.add(params);
- routableParams.add(params);
- for(String param : NON_ROUTABLE_PARAMS) {
- routableParams.remove(param);
- }
- }
-
- if (collection == null) {
- throw new SolrServerException("No collection param specified on request and no default collection has been set.");
- }
-
- //Check to see if the collection is an alias.
- List<String> aliasedCollections = getClusterStateProvider().resolveAlias(collection);
- collection = aliasedCollections.get(0); // pick 1st (consistent with HttpSolrCall behavior)
-
- DocCollection col = getDocCollection(collection, null);
-
- DocRouter router = col.getRouter();
-
- if (router instanceof ImplicitDocRouter) {
- // short circuit as optimization
- return null;
- }
-
- //Create the URL map, which is keyed on slice name.
- //The value is a list of URLs for each replica in the slice.
- //The first value in the list is the leader for the slice.
- final Map<String,List<String>> urlMap = buildUrlMap(col);
- final Map<String, ? extends LBSolrClient.Req> routes = createRoutes(updateRequest, routableParams, col, router, urlMap, idField);
- if (routes == null) {
- if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
- // we have info (documents with ids and/or ids to delete) with
- // which to find the leaders but we could not find (all of) them
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
- "directUpdatesToLeadersOnly==true but could not find leader(s)");
- } else {
- // we could not find a leader or routes yet - use unoptimized general path
- return null;
- }
- }
-
- final NamedList<Throwable> exceptions = new NamedList<>();
- final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery
-
- long start = System.nanoTime();
-
- if (parallelUpdates) {
- final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size());
- for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) {
- final String url = entry.getKey();
- final LBSolrClient.Req lbRequest = entry.getValue();
- try {
- MDC.put("CloudSolrClient.url", url);
- responseFutures.put(url, threadPool.submit(() -> {
- return getLbClient().request(lbRequest).getResponse();
- }));
- } finally {
- MDC.remove("CloudSolrClient.url");
- }
- }
-
- for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
- final String url = entry.getKey();
- final Future<NamedList<?>> responseFuture = entry.getValue();
- try {
- shardResponses.add(url, responseFuture.get());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- exceptions.add(url, e.getCause());
- }
- }
-
- if (exceptions.size() > 0) {
- Throwable firstException = exceptions.getVal(0);
- if(firstException instanceof SolrException) {
- SolrException e = (SolrException) firstException;
- throw getRouteException(SolrException.ErrorCode.getErrorCode(e.code()), exceptions, routes);
- } else {
- throw getRouteException(SolrException.ErrorCode.SERVER_ERROR, exceptions, routes);
- }
- }
- } else {
- for (Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) {
- String url = entry.getKey();
- LBSolrClient.Req lbRequest = entry.getValue();
- try {
- NamedList<Object> rsp = getLbClient().request(lbRequest).getResponse();
- shardResponses.add(url, rsp);
- } catch (Exception e) {
- if(e instanceof SolrException) {
- throw (SolrException) e;
- } else {
- throw new SolrServerException(e);
- }
- }
- }
- }
-
- UpdateRequest nonRoutableRequest = null;
- List<String> deleteQuery = updateRequest.getDeleteQuery();
- if (deleteQuery != null && deleteQuery.size() > 0) {
- UpdateRequest deleteQueryRequest = new UpdateRequest();
- deleteQueryRequest.setDeleteQuery(deleteQuery);
- nonRoutableRequest = deleteQueryRequest;
- }
-
- Set<String> paramNames = nonRoutableParams.getParameterNames();
-
- Set<String> intersection = new HashSet<>(paramNames);
- intersection.retainAll(NON_ROUTABLE_PARAMS);
-
- if (nonRoutableRequest != null || intersection.size() > 0) {
- if (nonRoutableRequest == null) {
- nonRoutableRequest = new UpdateRequest();
- }
- nonRoutableRequest.setParams(nonRoutableParams);
- nonRoutableRequest.setBasicAuthCredentials(request.getBasicAuthUser(), request.getBasicAuthPassword());
- List<String> urlList = new ArrayList<>();
- urlList.addAll(routes.keySet());
- Collections.shuffle(urlList, rand);
- LBSolrClient.Req req = new LBSolrClient.Req(nonRoutableRequest, urlList);
- try {
- LBSolrClient.Rsp rsp = getLbClient().request(req);
- shardResponses.add(urlList.get(0), rsp.getResponse());
- } catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, urlList.get(0), e);
- }
- }
-
- long end = System.nanoTime();
-
- RouteResponse rr = condenseResponse(shardResponses, (int) TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS));
- rr.setRouteResponses(shardResponses);
- rr.setRoutes(routes);
- return rr;
- }
-
- protected RouteException getRouteException(SolrException.ErrorCode serverError, NamedList<Throwable> exceptions, Map<String, ? extends LBSolrClient.Req> routes) {
- return new RouteException(serverError, exceptions, routes);
- }
-
- protected Map<String, ? extends LBSolrClient.Req> createRoutes(UpdateRequest updateRequest, ModifiableSolrParams routableParams,
- DocCollection col, DocRouter router, Map<String, List<String>> urlMap,
- String idField) {
- return urlMap == null ? null : updateRequest.getRoutesToCollection(router, col, urlMap, routableParams, idField);
- }
-
- private Map<String,List<String>> buildUrlMap(DocCollection col) {
- Map<String, List<String>> urlMap = new HashMap<>();
- Slice[] slices = col.getActiveSlicesArr();
- for (Slice slice : slices) {
- String name = slice.getName();
- List<String> urls = new ArrayList<>();
- Replica leader = slice.getLeader();
- if (directUpdatesToLeadersOnly && leader == null) {
- for (Replica replica : slice.getReplicas(
- replica -> replica.isActive(getClusterStateProvider().getLiveNodes())
- && replica.getType() == Replica.Type.NRT)) {
- leader = replica;
- break;
- }
- }
- if (leader == null) {
- if (directUpdatesToLeadersOnly) {
- continue;
- }
- // take unoptimized general path - we cannot find a leader yet
- return null;
- }
- ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
- String url = zkProps.getCoreUrl();
- urls.add(url);
- if (!directUpdatesToLeadersOnly) {
- for (Replica replica : slice.getReplicas()) {
- if (!replica.getNodeName().equals(leader.getNodeName()) &&
- !replica.getName().equals(leader.getName())) {
- ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
- String url1 = zkProps1.getCoreUrl();
- urls.add(url1);
- }
- }
- }
- urlMap.put(name, urls);
- }
- return urlMap;
- }
-
- protected <T extends RouteResponse> T condenseResponse(NamedList response, int timeMillis, Supplier<T> supplier) {
- T condensed = supplier.get();
- int status = 0;
- Integer rf = null;
- Integer minRf = null;
-
- // TolerantUpdateProcessor
- List<SimpleOrderedMap<String>> toleratedErrors = null;
- int maxToleratedErrors = Integer.MAX_VALUE;
-
- // For "adds", "deletes", "deleteByQuery" etc.
- Map<String, NamedList> versions = new HashMap<>();
-
- for(int i=0; i<response.size(); i++) {
- NamedList shardResponse = (NamedList)response.getVal(i);
- NamedList header = (NamedList)shardResponse.get("responseHeader");
- Integer shardStatus = (Integer)header.get("status");
- int s = shardStatus.intValue();
- if(s > 0) {
- status = s;
- }
- Object rfObj = header.get(UpdateRequest.REPFACT);
- if (rfObj != null && rfObj instanceof Integer) {
- Integer routeRf = (Integer)rfObj;
- if (rf == null || routeRf < rf)
- rf = routeRf;
- }
- minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT);
-
- List<SimpleOrderedMap<String>> shardTolerantErrors =
- (List<SimpleOrderedMap<String>>) header.get("errors");
- if (null != shardTolerantErrors) {
- Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
- assert null != shardMaxToleratedErrors : "TolerantUpdateProcessor reported errors but not maxErrors";
- // if we get into some weird state where the nodes disagree about the effective maxErrors,
- // assume the min value seen to decide if we should fail.
- maxToleratedErrors = Math.min(maxToleratedErrors,
- ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
-
- if (null == toleratedErrors) {
- toleratedErrors = new ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
- }
- for (SimpleOrderedMap<String> err : shardTolerantErrors) {
- toleratedErrors.add(err);
- }
- }
- for (String updateType: Arrays.asList("adds", "deletes", "deleteByQuery")) {
- Object obj = shardResponse.get(updateType);
- if (obj instanceof NamedList) {
- NamedList versionsList = versions.containsKey(updateType) ?
- versions.get(updateType): new NamedList();
- versionsList.addAll((NamedList)obj);
- versions.put(updateType, versionsList);
- }
- }
- }
-
- NamedList cheader = new NamedList();
- cheader.add("status", status);
- cheader.add("QTime", timeMillis);
- if (rf != null)
- cheader.add(UpdateRequest.REPFACT, rf);
- if (minRf != null)
- cheader.add(UpdateRequest.MIN_REPFACT, minRf);
- if (null != toleratedErrors) {
- cheader.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors));
- cheader.add("errors", toleratedErrors);
- if (maxToleratedErrors < toleratedErrors.size()) {
- // cumulative errors are too high, we need to throw a client exception w/correct metadata
-
- // NOTE: it shouldn't be possible for 1 == toleratedErrors.size(), because if that were the case
- // then at least one shard should have thrown a real error before this, so we don't worry
- // about having a more "singular" exception msg for that situation
- StringBuilder msgBuf = new StringBuilder()
- .append(toleratedErrors.size()).append(" Async failures during distributed update: ");
-
- NamedList metadata = new NamedList<String>();
- for (SimpleOrderedMap<String> err : toleratedErrors) {
- ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
- metadata.add(te.getMetadataKey(), te.getMetadataValue());
-
- msgBuf.append("\n").append(te.getMessage());
- }
-
- SolrException toThrow = new SolrException(SolrException.ErrorCode.BAD_REQUEST, msgBuf.toString());
- toThrow.setMetadata(metadata);
- throw toThrow;
- }
- }
- for (String updateType: versions.keySet()) {
- condensed.add(updateType, versions.get(updateType));
- }
- condensed.add("responseHeader", cheader);
- return condensed;
- }
-
- public RouteResponse condenseResponse(NamedList response, int timeMillis) {
- return condenseResponse(response, timeMillis, RouteResponse::new);
- }
-
- public static class RouteResponse<T extends LBSolrClient.Req> extends NamedList {
- private NamedList routeResponses;
- private Map<String, T> routes;
-
- public void setRouteResponses(NamedList routeResponses) {
- this.routeResponses = routeResponses;
- }
-
- public NamedList getRouteResponses() {
- return routeResponses;
- }
-
- public void setRoutes(Map<String, T> routes) {
- this.routes = routes;
- }
-
- public Map<String, T> getRoutes() {
- return routes;
- }
-
- }
-
- public static class RouteException extends SolrException {
-
- private NamedList<Throwable> throwables;
- private Map<String, ? extends LBSolrClient.Req> routes;
-
- public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, ? extends LBSolrClient.Req> routes){
- super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0));
- this.throwables = throwables;
- this.routes = routes;
-
- // create a merged copy of the metadata from all wrapped exceptions
- NamedList<String> metadata = new NamedList<String>();
- for (int i = 0; i < throwables.size(); i++) {
- Throwable t = throwables.getVal(i);
- if (t instanceof SolrException) {
- SolrException e = (SolrException) t;
- NamedList<String> eMeta = e.getMetadata();
- if (null != eMeta) {
- metadata.addAll(eMeta);
- }
- }
- }
- if (0 < metadata.size()) {
- this.setMetadata(metadata);
- }
- }
-
- public NamedList<Throwable> getThrowables() {
- return throwables;
- }
-
- public Map<String, ? extends LBSolrClient.Req> getRoutes() {
- return this.routes;
- }
- }
-
- @Override
- public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
- // the collection parameter of the request overrides that of the parameter to this method
- String requestCollection = request.getCollection();
- if (requestCollection != null) {
- collection = requestCollection;
- } else if (collection == null) {
- collection = defaultCollection;
- }
- List<String> inputCollections =
- collection == null ? Collections.emptyList() : StrUtils.splitSmart(collection, ",", true);
- return requestWithRetryOnStaleState(request, 0, inputCollections);
- }
-
- /**
- * As this class doesn't watch external collections on the client side,
- * there's a chance that the request will fail due to cached stale state,
- * which means the state must be refreshed from ZK and retried.
- */
- protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, List<String> inputCollections)
- throws SolrServerException, IOException {
- connect(); // important to call this before you start working with the ZkStateReader
-
- // build up a _stateVer_ param to pass to the server containing all of the
- // external collection state versions involved in this request, which allows
- // the server to notify us that our cached state for one or more of the external
- // collections is stale and needs to be refreshed ... this code has no impact on internal collections
- String stateVerParam = null;
- List<DocCollection> requestedCollections = null;
- boolean isCollectionRequestOfV2 = false;
- if (request instanceof V2RequestSupport) {
- request = ((V2RequestSupport) request).getV2Request();
- }
- if (request instanceof V2Request) {
- isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
- }
- boolean isAdmin = ADMIN_PATHS.contains(request.getPath());
- if (!inputCollections.isEmpty() && !isAdmin && !isCollectionRequestOfV2) { // don't do _stateVer_ checking for admin, v2 api requests
- Set<String> requestedCollectionNames = resolveAliases(inputCollections);
-
- StringBuilder stateVerParamBuilder = null;
- for (String requestedCollection : requestedCollectionNames) {
- // track the version of state we're using on the client side using the _stateVer_ param
- DocCollection coll = getDocCollection(requestedCollection, null);
- if (coll == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + requestedCollection);
- }
- int collVer = coll.getZNodeVersion();
- if (coll.getStateFormat()>1) {
- if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
- requestedCollections.add(coll);
-
- if (stateVerParamBuilder == null) {
- stateVerParamBuilder = new StringBuilder();
- } else {
- stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
- }
-
- stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
- }
- }
-
- if (stateVerParamBuilder != null) {
- stateVerParam = stateVerParamBuilder.toString();
- }
- }
-
- if (request.getParams() instanceof ModifiableSolrParams) {
- ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
- if (stateVerParam != null) {
- params.set(STATE_VERSION, stateVerParam);
- } else {
- params.remove(STATE_VERSION);
- }
- } // else: ??? how to set this ???
-
- NamedList<Object> resp = null;
- try {
- resp = sendRequest(request, inputCollections);
- //to avoid an O(n) operation we always add STATE_VERSION to the last and try to read it from there
- Object o = resp == null || resp.size() == 0 ? null : resp.get(STATE_VERSION, resp.size() - 1);
- if(o != null && o instanceof Map) {
- //remove this because no one else needs this and tests would fail if they are comparing responses
- resp.remove(resp.size()-1);
- Map invalidStates = (Map) o;
- for (Object invalidEntries : invalidStates.entrySet()) {
- Map.Entry e = (Map.Entry) invalidEntries;
- getDocCollection((String) e.getKey(), (Integer) e.getValue());
- }
-
- }
- } catch (Exception exc) {
-
- Throwable rootCause = SolrException.getRootCause(exc);
- // don't do retry support for admin requests
- // or if the request doesn't have a collection specified
- // or request is v2 api and its method is not GET
- if (inputCollections.isEmpty() || isAdmin || (request instanceof V2Request && request.getMethod() != SolrRequest.METHOD.GET)) {
- if (exc instanceof SolrServerException) {
- throw (SolrServerException)exc;
- } else if (exc instanceof IOException) {
- throw (IOException)exc;
- }else if (exc instanceof RuntimeException) {
- throw (RuntimeException) exc;
- }
- else {
- throw new SolrServerException(rootCause);
- }
- }
-
- int errorCode = (rootCause instanceof SolrException) ?
- ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
-
- boolean wasCommError =
- (rootCause instanceof ConnectException ||
- rootCause instanceof SocketException ||
- wasCommError(rootCause));
-
- log.error("Request to collection {} failed due to (" + errorCode + ") {}, retry={} commError={} errorCode={} ",
- inputCollections, rootCause.toString(), retryCount, wasCommError, errorCode);
-
- if (wasCommError
- || (exc instanceof RouteException && (errorCode == 503)) // 404 because the core does not exist 503 service unavailable
- //TODO there are other reasons for 404. We need to change the solr response format from HTML to structured data to know that
- ) {
- // it was a communication error. it is likely that
- // the node to which the request to be sent is down . So , expire the state
- // so that the next attempt would fetch the fresh state
- // just re-read state for all of them, if it has not been retried
- // in retryExpiryTime time
- if (requestedCollections != null) {
- for (DocCollection ext : requestedCollections) {
- ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(ext.getName());
- if (cacheEntry == null) continue;
- cacheEntry.maybeStale = true;
- }
- }
- if (retryCount < MAX_STALE_RETRIES) {//if it is a communication error , we must try again
- //may be, we have a stale version of the collection state
- // and we could not get any information from the server
- //it is probably not worth trying again and again because
- // the state would not have been updated
- log.info("trying request again");
- return requestWithRetryOnStaleState(request, retryCount + 1, inputCollections);
- }
- } else {
- log.info("request was not communication error it seems");
- }
-
- boolean stateWasStale = false;
- if (retryCount < MAX_STALE_RETRIES &&
- requestedCollections != null &&
- !requestedCollections.isEmpty() &&
- (SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE || errorCode == 404))
- {
- // cached state for one or more external collections was stale
- // re-issue request using updated state
- stateWasStale = true;
-
- // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
- for (DocCollection ext : requestedCollections) {
- collectionStateCache.remove(ext.getName());
- }
- }
-
- // if we experienced a communication error, it's worth checking the state
- // with ZK just to make sure the node we're trying to hit is still part of the collection
- if (retryCount < MAX_STALE_RETRIES &&
- !stateWasStale &&
- requestedCollections != null &&
- !requestedCollections.isEmpty() &&
- wasCommError) {
- for (DocCollection ext : requestedCollections) {
- DocCollection latestStateFromZk = getDocCollection(ext.getName(), null);
- if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
- // looks like we couldn't reach the server because the state was stale == retry
- stateWasStale = true;
- // we just pulled state from ZK, so update the cache so that the retry uses it
- collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
- }
- }
- }
-
- if (requestedCollections != null) {
- requestedCollections.clear(); // done with this
- }
-
- // if the state was stale, then we retry the request once with new state pulled from Zk
- if (stateWasStale) {
- log.warn("Re-trying request to collection(s) "+inputCollections+" after stale state error from server.");
- resp = requestWithRetryOnStaleState(request, retryCount+1, inputCollections);
- } else {
- if (exc instanceof SolrException || exc instanceof SolrServerException || exc instanceof IOException) {
- throw exc;
- } else {
- throw new SolrServerException(rootCause);
- }
- }
- }
-
- return resp;
- }
-
- protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputCollections)
- throws SolrServerException, IOException {
- connect();
-
- boolean sendToLeaders = false;
-
- if (request instanceof IsUpdateRequest) {
- if (request instanceof UpdateRequest) {
- String collection = inputCollections.isEmpty() ? null : inputCollections.get(0); // getting first mimics HttpSolrCall
- NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, collection);
- if (response != null) {
- return response;
- }
- }
- sendToLeaders = true;
- }
-
- SolrParams reqParams = request.getParams();
- if (reqParams == null) { // TODO fix getParams to never return null!
- reqParams = new ModifiableSolrParams();
- }
-
- final Set<String> liveNodes = getClusterStateProvider().getLiveNodes();
-
- final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
-
- if (request instanceof V2Request) {
- if (!liveNodes.isEmpty()) {
- List<String> liveNodesList = new ArrayList<>(liveNodes);
- Collections.shuffle(liveNodesList, rand);
- theUrlList.add(Utils.getBaseUrlForNodeName(liveNodesList.get(0),
- getClusterStateProvider().getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
- }
-
- } else if (ADMIN_PATHS.contains(request.getPath())) {
- for (String liveNode : liveNodes) {
- theUrlList.add(Utils.getBaseUrlForNodeName(liveNode,
- getClusterStateProvider().getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
- }
-
- } else { // Typical...
- Set<String> collectionNames = resolveAliases(inputCollections);
- if (collectionNames.isEmpty()) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "No collection param specified on request and no default collection has been set: " + inputCollections);
- }
-
- // TODO: not a big deal because of the caching, but we could avoid looking
- // at every shard when getting leaders if we tweaked some things
-
- // Retrieve slices from the cloud state and, for each collection specified, add it to the Map of slices.
- Map<String,Slice> slices = new HashMap<>();
- String shardKeys = reqParams.get(ShardParams._ROUTE_);
- for (String collectionName : collectionNames) {
- DocCollection col = getDocCollection(collectionName, null);
- if (col == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
- }
- Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
- ClientUtils.addSlices(slices, collectionName, routeSlices, true);
- }
-
- // Gather URLs, grouped by leader or replica
- // TODO: allow filtering by group, role, etc
- Set<String> seenNodes = new HashSet<>();
- List<String> replicas = new ArrayList<>();
- String joinedInputCollections = StrUtils.join(inputCollections, ',');
- for (Slice slice : slices.values()) {
- for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
- ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
- String node = coreNodeProps.getNodeName();
- if (!liveNodes.contains(node) // Must be a live node to continue
- || Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
- continue;
- if (seenNodes.add(node)) { // if we haven't yet collected a URL to this node...
- String url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), joinedInputCollections);
- if (sendToLeaders && coreNodeProps.isLeader()) {
- theUrlList.add(url); // put leaders here eagerly (if sendToLeader mode)
- } else {
- replicas.add(url); // replicas here
- }
- }
- }
- }
-
- // Shuffle the leaders, if any (none if !sendToLeaders)
- Collections.shuffle(theUrlList, rand);
-
- // Shuffle the replicas, if any, and append to our list
- Collections.shuffle(replicas, rand);
- theUrlList.addAll(replicas);
-
- if (theUrlList.isEmpty()) {
- collectionStateCache.keySet().removeAll(collectionNames);
- throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
- "Could not find a healthy node to handle the request.");
- }
- }
-
- LBSolrClient.Req req = new LBSolrClient.Req(request, theUrlList);
- LBSolrClient.Rsp rsp = getLbClient().request(req);
- return rsp.getResponse();
- }
-
- /** Resolves the input collections to their possible aliased collections. Doesn't validate collection existence. */
- private LinkedHashSet<String> resolveAliases(List<String> inputCollections) {
- LinkedHashSet<String> collectionNames = new LinkedHashSet<>(); // consistent ordering
- for (String collectionName : inputCollections) {
- if (getClusterStateProvider().getState(collectionName) == null) {
- // perhaps it's an alias
- List<String> aliasedCollections = getClusterStateProvider().resolveAlias(collectionName);
- // one more level of alias indirection... (dubious that we should support this)
- for (String aliasedCollection : aliasedCollections) {
- collectionNames.addAll(getClusterStateProvider().resolveAlias(aliasedCollection));
- }
- } else {
- collectionNames.add(collectionName); // it's a collection
- }
- }
- return collectionNames;
- }
-
- public boolean isUpdatesToLeaders() {
- return updatesToLeaders;
- }
-
- /**
- * @return true if direct updates are sent to shard leaders only
- */
- public boolean isDirectUpdatesToLeadersOnly() {
- return directUpdatesToLeadersOnly;
- }
-
- /**If caches are expired they are refreshed after acquiring a lock.
- * use this to set the number of locks
- */
- public void setParallelCacheRefreshes(int n){ locks = objectList(n); }
-
- protected static ArrayList<Object> objectList(int n) {
- ArrayList<Object> l = new ArrayList<>(n);
- for(int i=0;i<n;i++) l.add(new Object());
- return l;
- }
-
-
- protected DocCollection getDocCollection(String collection, Integer expectedVersion) throws SolrException {
- if (expectedVersion == null) expectedVersion = -1;
- if (collection == null) return null;
- ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(collection);
- DocCollection col = cacheEntry == null ? null : cacheEntry.cached;
- if (col != null) {
- if (expectedVersion <= col.getZNodeVersion()
- && !cacheEntry.shouldRetry()) return col;
- }
-
- ClusterState.CollectionRef ref = getCollectionRef(collection);
- if (ref == null) {
- //no such collection exists
- return null;
- }
- if (!ref.isLazilyLoaded()) {
- //it is readily available just return it
- return ref.get();
- }
- List locks = this.locks;
- final Object lock = locks.get(Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(), 0) % locks.size()));
- DocCollection fetchedCol = null;
- synchronized (lock) {
- /*we have waited for sometime just check once again*/
- cacheEntry = collectionStateCache.get(collection);
- col = cacheEntry == null ? null : cacheEntry.cached;
- if (col != null) {
- if (expectedVersion <= col.getZNodeVersion()
- && !cacheEntry.shouldRetry()) return col;
- }
- // We are going to fetch a new version
- // we MUST try to get a new version
- fetchedCol = ref.get();//this is a call to ZK
- if (fetchedCol == null) return null;// this collection no more exists
- if (col != null && fetchedCol.getZNodeVersion() == col.getZNodeVersion()) {
- cacheEntry.setRetriedAt();//we retried and found that it is the same version
- cacheEntry.maybeStale = false;
- } else {
- if (fetchedCol.getStateFormat() > 1)
- collectionStateCache.put(collection, new ExpiringCachedDocCollection(fetchedCol));
- }
- return fetchedCol;
- }
- }
-
- ClusterState.CollectionRef getCollectionRef(String collection) {
- return getClusterStateProvider().getState(collection);
- }
-
- /**
- * Useful for determining the minimum achieved replication factor across
- * all shards involved in processing an update request, typically useful
- * for gauging the replication factor of a batch.
- */
- @SuppressWarnings("rawtypes")
- public int getMinAchievedReplicationFactor(String collection, NamedList resp) {
- // it's probably already on the top-level header set by condense
- NamedList header = (NamedList)resp.get("responseHeader");
- Integer achRf = (Integer)header.get(UpdateRequest.REPFACT);
- if (achRf != null)
- return achRf.intValue();
-
- // not on the top-level header, walk the shard route tree
- Map<String,Integer> shardRf = getShardReplicationFactor(collection, resp);
- for (Integer rf : shardRf.values()) {
- if (achRf == null || rf < achRf) {
- achRf = rf;
- }
- }
- return (achRf != null) ? achRf.intValue() : -1;
- }
-
- /**
- * Walks the NamedList response after performing an update request looking for
- * the replication factor that was achieved in each shard involved in the request.
- * For single doc updates, there will be only one shard in the return value.
- */
- @SuppressWarnings("rawtypes")
- public Map<String,Integer> getShardReplicationFactor(String collection, NamedList resp) {
- connect();
-
- Map<String,Integer> results = new HashMap<String,Integer>();
- if (resp instanceof RouteResponse) {
- NamedList routes = ((RouteResponse)resp).getRouteResponses();
- DocCollection coll = getDocCollection(collection, null);
- Map<String,String> leaders = new HashMap<String,String>();
- for (Slice slice : coll.getActiveSlicesArr()) {
- Replica leader = slice.getLeader();
- if (leader != null) {
- ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
- String leaderUrl = zkProps.getBaseUrl() + "/" + zkProps.getCoreName();
- leaders.put(leaderUrl, slice.getName());
- String altLeaderUrl = zkProps.getBaseUrl() + "/" + collection;
- leaders.put(altLeaderUrl, slice.getName());
- }
- }
-
- Iterator<Map.Entry<String,Object>> routeIter = routes.iterator();
- while (routeIter.hasNext()) {
- Map.Entry<String,Object> next = routeIter.next();
- String host = next.getKey();
- NamedList hostResp = (NamedList)next.getValue();
- Integer rf = (Integer)((NamedList)hostResp.get("responseHeader")).get(UpdateRequest.REPFACT);
- if (rf != null) {
- String shard = leaders.get(host);
- if (shard == null) {
- if (host.endsWith("/"))
- shard = leaders.get(host.substring(0,host.length()-1));
- if (shard == null) {
- shard = host;
- }
- }
- results.put(shard, rf);
- }
- }
- }
- return results;
- }
-
- private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, String idField) {
- final Map<SolrInputDocument,Map<String,Object>> documents = updateRequest.getDocumentsMap();
- final Map<String,Map<String,Object>> deleteById = updateRequest.getDeleteByIdMap();
-
- final boolean hasNoDocuments = (documents == null || documents.isEmpty());
- final boolean hasNoDeleteById = (deleteById == null || deleteById.isEmpty());
- if (hasNoDocuments && hasNoDeleteById) {
- // no documents and no delete-by-id, so no info to find leader(s)
- return false;
- }
-
- if (documents != null) {
- for (final Map.Entry<SolrInputDocument,Map<String,Object>> entry : documents.entrySet()) {
- final SolrInputDocument doc = entry.getKey();
- final Object fieldValue = doc.getFieldValue(idField);
- if (fieldValue == null) {
- // a document with no id field value, so can't find leader for it
- return false;
- }
- }
- }
-
- return true;
- }
-
-}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
deleted file mode 100644
index 042b6e4..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.common.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.*;
-
-public abstract class BaseHttpClusterStateProvider implements ClusterStateProvider {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private String urlScheme;
- volatile Set<String> liveNodes;
- long liveNodesTimestamp = 0;
- volatile Map<String, List<String>> aliases;
- long aliasesTimestamp = 0;
-
- private int cacheTimeout = 5; // the liveNodes and aliases cache will be invalidated after 5 secs
-
- public void init(List<String> solrUrls) throws Exception {
- for (String solrUrl: solrUrls) {
- urlScheme = solrUrl.startsWith("https")? "https": "http";
- try (SolrClient initialClient = getSolrClient(solrUrl)) {
- this.liveNodes = fetchLiveNodes(initialClient);
- liveNodesTimestamp = System.nanoTime();
- break;
- } catch (IOException e) {
- log.warn("Attempt to fetch cluster state from {} failed.", solrUrl, e);
- }
- }
-
- if (this.liveNodes == null || this.liveNodes.isEmpty()) {
- throw new RuntimeException("Tried fetching live_nodes using Solr URLs provided, i.e. " + solrUrls + ". However, "
- + "succeeded in obtaining the cluster state from none of them."
- + "If you think your Solr cluster is up and is accessible,"
- + " you could try re-creating a new CloudSolrClient using working"
- + " solrUrl(s) or zkHost(s).");
- }
- }
-
- protected abstract SolrClient getSolrClient(String baseUrl);
-
- @Override
- public ClusterState.CollectionRef getState(String collection) {
- for (String nodeName: liveNodes) {
- String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
- try (SolrClient client = getSolrClient(baseUrl)) {
- ClusterState cs = fetchClusterState(client, collection, null);
- return cs.getCollectionRef(collection);
- } catch (SolrServerException | IOException e) {
- log.warn("Attempt to fetch cluster state from " +
- Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
- } catch (RemoteSolrException e) {
- if ("NOT_FOUND".equals(e.getMetadata("CLUSTERSTATUS"))) {
- return null;
- }
- log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
- } catch (NotACollectionException e) {
- // Cluster state for the given collection was not found, could be an alias.
- // Lets fetch/update our aliases:
- getAliases(true);
- return null;
- }
- }
- throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
- + "succeeded in obtaining the cluster state from none of them."
- + "If you think your Solr cluster is up and is accessible,"
- + " you could try re-creating a new CloudSolrClient using working"
- + " solrUrl(s) or zkHost(s).");
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private ClusterState fetchClusterState(SolrClient client, String collection, Map<String, Object> clusterProperties) throws SolrServerException, IOException, NotACollectionException {
- ModifiableSolrParams params = new ModifiableSolrParams();
- if (collection != null) {
- params.set("collection", collection);
- }
- params.set("action", "CLUSTERSTATUS");
- QueryRequest request = new QueryRequest(params);
- request.setPath("/admin/collections");
- NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
- Map<String, Object> collectionsMap;
- if (collection != null) {
- collectionsMap = Collections.singletonMap(collection,
- ((NamedList) cluster.get("collections")).get(collection));
- } else {
- collectionsMap = ((NamedList)cluster.get("collections")).asMap(10);
- }
- int znodeVersion;
- Map<String, Object> collFromStatus = (Map<String, Object>) (collectionsMap).get(collection);
- if (collection != null && collFromStatus == null) {
- throw new NotACollectionException(); // probably an alias
- }
- if (collection != null) { // can be null if alias
- znodeVersion = (int) collFromStatus.get("znodeVersion");
- } else {
- znodeVersion = -1;
- }
- Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
- this.liveNodes = liveNodes;
- liveNodesTimestamp = System.nanoTime();
- //TODO SOLR-11877 we don't know the znode path; CLUSTER_STATE is probably wrong leading to bad stateFormat
- ClusterState cs = ClusterState.load(znodeVersion, collectionsMap, liveNodes, ZkStateReader.CLUSTER_STATE);
- if (clusterProperties != null) {
- Map<String, Object> properties = (Map<String, Object>) cluster.get("properties");
- if (properties != null) {
- clusterProperties.putAll(properties);
- }
- }
- return cs;
- }
-
- @Override
- public Set<String> getLiveNodes() {
- if (liveNodes == null) {
- throw new RuntimeException("We don't know of any live_nodes to fetch the"
- + " latest live_nodes information from. "
- + "If you think your Solr cluster is up and is accessible,"
- + " you could try re-creating a new CloudSolrClient using working"
- + " solrUrl(s) or zkHost(s).");
- }
- if (TimeUnit.SECONDS.convert((System.nanoTime() - liveNodesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
- for (String nodeName: liveNodes) {
- String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
- try (SolrClient client = getSolrClient(baseUrl)) {
- Set<String> liveNodes = fetchLiveNodes(client);
- this.liveNodes = (liveNodes);
- liveNodesTimestamp = System.nanoTime();
- return liveNodes;
- } catch (Exception e) {
- log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
- }
- }
- throw new RuntimeException("Tried fetching live_nodes using all the node names we knew of, i.e. " + liveNodes +". However, "
- + "succeeded in obtaining the cluster state from none of them."
- + "If you think your Solr cluster is up and is accessible,"
- + " you could try re-creating a new CloudSolrClient using working"
- + " solrUrl(s) or zkHost(s).");
- } else {
- return liveNodes; // cached copy is fresh enough
- }
- }
-
- private static Set<String> fetchLiveNodes(SolrClient client) throws Exception {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set("action", "CLUSTERSTATUS");
- QueryRequest request = new QueryRequest(params);
- request.setPath("/admin/collections");
- NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
- return (Set<String>) new HashSet((List<String>)(cluster.get("live_nodes")));
- }
-
- @Override
- public List<String> resolveAlias(String aliasName) {
- return Aliases.resolveAliasesGivenAliasMap(getAliases(false), aliasName);
- }
-
- private Map<String, List<String>> getAliases(boolean forceFetch) {
- if (this.liveNodes == null) {
- throw new RuntimeException("We don't know of any live_nodes to fetch the"
- + " latest aliases information from. "
- + "If you think your Solr cluster is up and is accessible,"
- + " you could try re-creating a new CloudSolrClient using working"
- + " solrUrl(s) or zkHost(s).");
- }
-
- if (forceFetch || this.aliases == null ||
- TimeUnit.SECONDS.convert((System.nanoTime() - aliasesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
- for (String nodeName: liveNodes) {
- String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
- try (SolrClient client = getSolrClient(baseUrl)) {
-
- this.aliases = new CollectionAdminRequest.ListAliases().process(client).getAliasesAsLists();
- this.aliasesTimestamp = System.nanoTime();
- return Collections.unmodifiableMap(this.aliases);
- } catch (SolrServerException | RemoteSolrException | IOException e) {
- // Situation where we're hitting an older Solr which doesn't have LISTALIASES
- if (e instanceof RemoteSolrException && ((RemoteSolrException)e).code()==400) {
- log.warn("LISTALIASES not found, possibly using older Solr server. Aliases won't work"
- + " unless you re-create the CloudSolrClient using zkHost(s) or upgrade Solr server", e);
- this.aliases = Collections.emptyMap();
- this.aliasesTimestamp = System.nanoTime();
- return aliases;
- }
- log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
- }
- }
-
- throw new RuntimeException("Tried fetching aliases using all the node names we knew of, i.e. " + liveNodes +". However, "
- + "succeeded in obtaining the cluster state from none of them."
- + "If you think your Solr cluster is up and is accessible,"
- + " you could try re-creating a new CloudSolrClient using a working"
- + " solrUrl or zkHost.");
- } else {
- return Collections.unmodifiableMap(this.aliases); // cached copy is fresh enough
- }
- }
-
- @Override
- public ClusterState getClusterState() throws IOException {
- for (String nodeName: liveNodes) {
- String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
- try (SolrClient client = getSolrClient(baseUrl)) {
- return fetchClusterState(client, null, null);
- } catch (SolrServerException | HttpSolrClient.RemoteSolrException | IOException e) {
- log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
- } catch (NotACollectionException e) {
- // not possible! (we passed in null for collection so it can't be an alias)
- throw new RuntimeException("null should never cause NotACollectionException in " +
- "fetchClusterState() Please report this as a bug!");
- }
- }
- throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
- + "succeeded in obtaining the cluster state from none of them."
- + "If you think your Solr cluster is up and is accessible,"
- + " you could try re-creating a new CloudSolrClient using working"
- + " solrUrl(s) or zkHost(s).");
- }
-
- @Override
- public Map<String, Object> getClusterProperties() {
- for (String nodeName : liveNodes) {
- String baseUrl = Utils.getBaseUrlForNodeName(nodeName, urlScheme);
- try (SolrClient client = getSolrClient(baseUrl)) {
- Map<String, Object> clusterProperties = new HashMap<>();
- fetchClusterState(client, null, clusterProperties);
- return clusterProperties;
- } catch (SolrServerException | HttpSolrClient.RemoteSolrException | IOException e) {
- log.warn("Attempt to fetch cluster state from {} failed.", baseUrl, e);
- } catch (NotACollectionException e) {
- // not possible! (we passed in null for collection so it can't be an alias)
- throw new RuntimeException("null should never cause NotACollectionException in " +
- "fetchClusterState() Please report this as a bug!");
- }
- }
- throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes + ". However, "
- + "succeeded in obtaining the cluster state from none of them."
- + "If you think your Solr cluster is up and is accessible,"
- + " you could try re-creating a new CloudSolrClient using working"
- + " solrUrl(s) or zkHost(s).");
- }
-
- @Override
- public String getPolicyNameByCollection(String coll) {
- throw new UnsupportedOperationException("Fetching cluster properties not supported"
- + " using the HttpClusterStateProvider. "
- + "ZkClientClusterStateProvider can be used for this."); // TODO
- }
-
- @Override
- public Object getClusterProperty(String propertyName) {
- if (propertyName.equals(ZkStateReader.URL_SCHEME)) {
- return this.urlScheme;
- }
- return getClusterProperties().get(propertyName);
- }
-
- @Override
- public void connect() {}
-
- public int getCacheTimeout() {
- return cacheTimeout;
- }
-
- public void setCacheTimeout(int cacheTimeout) {
- this.cacheTimeout = cacheTimeout;
- }
-
- // This exception is not meant to escape this class it should be caught and wrapped.
- private class NotACollectionException extends Exception {
- }
-}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpSolrClient.java
deleted file mode 100644
index dceb13c..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpSolrClient.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.util.Collections;
-
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.util.NamedList;
-
-import static org.apache.solr.common.util.Utils.getObjectByPath;
-
-public abstract class BaseHttpSolrClient extends SolrClient {
-
- /**
- * Subclass of SolrException that allows us to capture an arbitrary HTTP
- * status code that may have been returned by the remote server or a
- * proxy along the way.
- */
- public static class RemoteSolrException extends SolrException {
- /**
- * @param remoteHost the host the error was received from
- * @param code Arbitrary HTTP status code
- * @param msg Exception Message
- * @param th Throwable to wrap with this Exception
- */
- public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
- super(code, "Error from server at " + remoteHost + ": " + msg, th);
- }
- }
-
- /**
- * This should be thrown when a server has an error in executing the request and
- * it sends a proper payload back to the client
- */
- public static class RemoteExecutionException extends HttpSolrClient.RemoteSolrException {
- private NamedList meta;
-
- public RemoteExecutionException(String remoteHost, int code, String msg, NamedList meta) {
- super(remoteHost, code, msg, null);
- this.meta = meta;
- }
-
-
- public static HttpSolrClient.RemoteExecutionException create(String host, NamedList errResponse) {
- Object errObj = errResponse.get("error");
- if (errObj != null) {
- Number code = (Number) getObjectByPath(errObj, true, Collections.singletonList("code"));
- String msg = (String) getObjectByPath(errObj, true, Collections.singletonList("msg"));
- return new HttpSolrClient.RemoteExecutionException(host, code == null ? ErrorCode.UNKNOWN.code : code.intValue(),
- msg == null ? "Unknown Error" : msg, errResponse);
-
- } else {
- throw new RuntimeException("No error");
- }
-
- }
-
- public NamedList getMetaData() {
-
- return meta;
- }
- }
-
-}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
deleted file mode 100644
index 8e6ac82..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudHttp2SolrClient.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.common.SolrException;
-
-/**
- * SolrJ client class to communicate with SolrCloud using Http2SolrClient.
- * Instances of this class communicate with Zookeeper to discover
- * Solr endpoints for SolrCloud collections, and then use the
- * {@link LBHttp2SolrClient} to issue requests.
- *
- * This class assumes the id field for your documents is called
- * 'id' - if this is not the case, you must set the right name
- * with {@link #setIdField(String)}.
- *
- * @lucene.experimental
- * @since solr 8.0
- */
-@SuppressWarnings("serial")
-public class CloudHttp2SolrClient extends BaseCloudSolrClient {
-
- private final ClusterStateProvider stateProvider;
- private final LBHttp2SolrClient lbClient;
- private Http2SolrClient myClient;
- private final boolean clientIsInternal;
-
- /**
- * Create a new client object that connects to Zookeeper and is always aware
- * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
- * SolrCloud has enough replicas for every shard in a collection, there is no
- * single point of failure. Updates will be sent to shard leaders by default.
- *
- * @param builder a {@link Http2SolrClient.Builder} with the options used to create the client.
- */
- protected CloudHttp2SolrClient(Builder builder) {
- super(builder.shardLeadersOnly, builder.parallelUpdates, builder.directUpdatesToLeadersOnly);
- this.clientIsInternal = builder.httpClient == null;
- this.myClient = (builder.httpClient == null) ? new Http2SolrClient.Builder().build() : builder.httpClient;
- if (builder.stateProvider == null) {
- if (builder.zkHosts != null && builder.solrUrls != null) {
- throw new IllegalArgumentException("Both zkHost(s) & solrUrl(s) have been specified. Only specify one.");
- }
- if (builder.zkHosts != null) {
- this.stateProvider = new ZkClientClusterStateProvider(builder.zkHosts, builder.zkChroot);
- } else if (builder.solrUrls != null && !builder.solrUrls.isEmpty()) {
- try {
- this.stateProvider = new Http2ClusterStateProvider(builder.solrUrls, builder.httpClient);
- } catch (Exception e) {
- throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the "
- + "Solr server(s), " + builder.solrUrls + ", down?)", e);
- }
- } else {
- throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
- }
- } else {
- this.stateProvider = builder.stateProvider;
- }
- this.lbClient = new LBHttp2SolrClient(myClient);
-
- }
-
-
- @Override
- public void close() throws IOException {
- stateProvider.close();
- lbClient.close();
-
- if (clientIsInternal && myClient!=null) {
- myClient.close();
- }
-
- super.close();
- }
-
- public LBHttp2SolrClient getLbClient() {
- return lbClient;
- }
-
- @Override
- public ClusterStateProvider getClusterStateProvider() {
- return stateProvider;
- }
-
- public Http2SolrClient getHttpClient() {
- return myClient;
- }
-
- @Override
- protected boolean wasCommError(Throwable rootCause) {
- return false;
- }
-
- /**
- * Constructs {@link CloudHttp2SolrClient} instances from provided configuration.
- */
- public static class Builder {
- protected Collection<String> zkHosts = new ArrayList<>();
- protected List<String> solrUrls = new ArrayList<>();
- protected String zkChroot;
- protected Http2SolrClient httpClient;
- protected boolean shardLeadersOnly = true;
- protected boolean directUpdatesToLeadersOnly = false;
- protected boolean parallelUpdates = true;
- protected ClusterStateProvider stateProvider;
-
- /**
- * Provide a series of Solr URLs to be used when configuring {@link CloudHttp2SolrClient} instances.
- * The solr client will use these urls to understand the cluster topology, which solr nodes are active etc.
- *
- * Provided Solr URLs are expected to point to the root Solr path ("http://hostname:8983/solr"); they should not
- * include any collections, cores, or other path components.
- *
- * Usage example:
- *
- * <pre>
- * final List<String> solrBaseUrls = new ArrayList<String>();
- * solrBaseUrls.add("http://solr1:8983/solr"); solrBaseUrls.add("http://solr2:8983/solr"); solrBaseUrls.add("http://solr3:8983/solr");
- * final SolrClient client = new CloudHttp2SolrClient.Builder(solrBaseUrls).build();
- * </pre>
- */
- public Builder(List<String> solrUrls) {
- this.solrUrls = solrUrls;
- }
-
- /**
- * Provide a series of ZK hosts which will be used when configuring {@link CloudHttp2SolrClient} instances.
- *
- * Usage example when Solr stores data at the ZooKeeper root ('/'):
- *
- * <pre>
- * final List<String> zkServers = new ArrayList<String>();
- * zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); zkServers.add("zookeeper3:2181");
- * final SolrClient client = new CloudHttp2SolrClient.Builder(zkServers, Optional.empty()).build();
- * </pre>
- *
- * Usage example when Solr data is stored in a ZooKeeper chroot:
- *
- * <pre>
- * final List<String> zkServers = new ArrayList<String>();
- * zkServers.add("zookeeper1:2181"); zkServers.add("zookeeper2:2181"); zkServers.add("zookeeper3:2181");
- * final SolrClient client = new CloudHttp2SolrClient.Builder(zkServers, Optional.of("/solr")).build();
- * </pre>
- *
- * @param zkHosts a List of at least one ZooKeeper host and port (e.g. "zookeeper1:2181")
- * @param zkChroot the path to the root ZooKeeper node containing Solr data. Provide {@code java.util.Optional.empty()} if no ZK chroot is used.
- */
- public Builder(List<String> zkHosts, Optional<String> zkChroot) {
- this.zkHosts = zkHosts;
- if (zkChroot.isPresent()) this.zkChroot = zkChroot.get();
- }
-
- /**
- * Tells {@link CloudHttp2SolrClient.Builder} that created clients should send direct updates to shard leaders only.
- *
- * UpdateRequests whose leaders cannot be found will "fail fast" on the client side with a {@link SolrException}
- */
- public Builder sendDirectUpdatesToShardLeadersOnly() {
- directUpdatesToLeadersOnly = true;
- return this;
- }
-
- /**
- * Tells {@link CloudHttp2SolrClient.Builder} that created clients can send updates to any shard replica (shard leaders and non-leaders).
- *
- * Shard leaders are still preferred, but the created clients will fallback to using other replicas if a leader
- * cannot be found.
- */
- public Builder sendDirectUpdatesToAnyShardReplica() {
- directUpdatesToLeadersOnly = false;
- return this;
- }
-
- /**
- * Tells {@link CloudHttp2SolrClient.Builder} whether created clients should send shard updates serially or in parallel
- *
- * When an {@link UpdateRequest} affects multiple shards, {@link CloudHttp2SolrClient} splits it up and sends a request
- * to each affected shard. This setting chooses whether those sub-requests are sent serially or in parallel.
- * <p>
- * If not set, this defaults to 'true' and sends sub-requests in parallel.
- */
- public Builder withParallelUpdates(boolean parallelUpdates) {
- this.parallelUpdates = parallelUpdates;
- return this;
- }
-
- public Builder withHttpClient(Http2SolrClient httpClient) {
- this.httpClient = httpClient;
- return this;
- }
-
- /**
- * Create a {@link CloudHttp2SolrClient} based on the provided configuration.
- */
- public CloudHttp2SolrClient build() {
- if (stateProvider == null) {
- if (!zkHosts.isEmpty()) {
- stateProvider = new ZkClientClusterStateProvider(zkHosts, zkChroot);
- }
- else if (!this.solrUrls.isEmpty()) {
- try {
- stateProvider = new Http2ClusterStateProvider(solrUrls, httpClient);
- } catch (Exception e) {
- throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the "
- + "Solr server(s), " + solrUrls + ", down?)", e);
- }
- } else {
- throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
- }
- }
- return new CloudHttp2SolrClient(this);
- }
-
- }
-}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 0b08780..37cdba7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -17,21 +17,78 @@
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.ConnectException;
+import java.net.SocketException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.V2RequestSupport;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.ToleratedUpdateError;
+import org.apache.solr.common.cloud.ClusterState.CollectionRef;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
+import static org.apache.solr.common.params.CommonParams.ID;
/**
* SolrJ client class to communicate with SolrCloud.
@@ -44,15 +101,102 @@ import org.apache.solr.common.util.NamedList;
* with {@link #setIdField(String)}.
*/
@SuppressWarnings("serial")
-public class CloudSolrClient extends BaseCloudSolrClient {
+public class CloudSolrClient extends SolrClient {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ClusterStateProvider stateProvider;
+ private volatile String defaultCollection;
private final LBHttpSolrClient lbClient;
private final boolean shutdownLBHttpSolrServer;
private HttpClient myClient;
private final boolean clientIsInternal;
+ //no of times collection state to be reloaded if stale state error is received
+ private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "5"));
+ Random rand = new Random();
+
+ private final boolean updatesToLeaders;
+ private final boolean directUpdatesToLeadersOnly;
+ private boolean parallelUpdates; //TODO final
+ private ExecutorService threadPool = ExecutorUtil
+ .newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
+ "CloudSolrClient ThreadPool"));
+ private String idField = ID;
+ public static final String STATE_VERSION = "_stateVer_";
+ private long retryExpiryTime = TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);//3 seconds or 3 million nanos
+ private final Set<String> NON_ROUTABLE_PARAMS;
+ {
+ NON_ROUTABLE_PARAMS = new HashSet<>();
+ NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
+
+ NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
+ NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
+
+ // Not supported via SolrCloud
+ // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
+
+ }
+ private volatile List<Object> locks = objectList(3);
+
+
+ static class StateCache extends ConcurrentHashMap<String, ExpiringCachedDocCollection> {
+ final AtomicLong puts = new AtomicLong();
+ final AtomicLong hits = new AtomicLong();
+ final Lock evictLock = new ReentrantLock(true);
+ private volatile long timeToLive = 60 * 1000L;
+
+ @Override
+ public ExpiringCachedDocCollection get(Object key) {
+ ExpiringCachedDocCollection val = super.get(key);
+ if(val == null) {
+ // a new collection is likely to be added now.
+ //check if there are stale items and remove them
+ evictStale();
+ return null;
+ }
+ if(val.isExpired(timeToLive)) {
+ super.remove(key);
+ return null;
+ }
+ hits.incrementAndGet();
+ return val;
+ }
+
+ @Override
+ public ExpiringCachedDocCollection put(String key, ExpiringCachedDocCollection value) {
+ puts.incrementAndGet();
+ return super.put(key, value);
+ }
+
+ void evictStale() {
+ if(!evictLock.tryLock()) return;
+ try {
+ for (Entry<String, ExpiringCachedDocCollection> e : entrySet()) {
+ if(e.getValue().isExpired(timeToLive)){
+ super.remove(e.getKey());
+ }
+ }
+ } finally {
+ evictLock.unlock();
+ }
+ }
+
+ }
- public static final String STATE_VERSION = BaseCloudSolrClient.STATE_VERSION;
+ /**
+ * This is the time to wait to refetch the state
+ * after getting the same state version from ZK
+ * <p>
+ * secs
+ */
+ public void setRetryExpiryTime(int secs) {
+ this.retryExpiryTime = TimeUnit.NANOSECONDS.convert(secs, TimeUnit.SECONDS);
+ }
/**
* @deprecated since 7.0 Use {@link Builder} methods instead.
@@ -62,6 +206,42 @@ public class CloudSolrClient extends BaseCloudSolrClient {
lbClient.setSoTimeout(timeout);
}
+ protected final StateCache collectionStateCache = new StateCache();
+
+ class ExpiringCachedDocCollection {
+ final DocCollection cached;
+ final long cachedAt;
+ //This is the time at which the collection is retried and got the same old version
+ volatile long retriedAt = -1;
+ //flag that suggests that this is potentially to be rechecked
+ volatile boolean maybeStale = false;
+
+ ExpiringCachedDocCollection(DocCollection cached) {
+ this.cached = cached;
+ this.cachedAt = System.nanoTime();
+ }
+
+ boolean isExpired(long timeToLiveMs) {
+ return (System.nanoTime() - cachedAt)
+ > TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS);
+ }
+
+ boolean shouldRetry() {
+ if (maybeStale) {// we are not sure if it is stale so check with retry time
+ if ((retriedAt == -1 ||
+ (System.nanoTime() - retriedAt) > retryExpiryTime)) {
+ return true;// we retried a while back. and we could not get anything new.
+ //it's likely that it is not going to be available now also.
+ }
+ }
+ return false;
+ }
+
+ void setRetriedAt() {
+ retriedAt = System.nanoTime();
+ }
+ }
+
/**
* Create a new client object that connects to Zookeeper and is always aware
* of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
@@ -71,7 +251,6 @@ public class CloudSolrClient extends BaseCloudSolrClient {
* @param builder a {@link CloudSolrClient.Builder} with the options used to create the client.
*/
protected CloudSolrClient(Builder builder) {
- super(builder.shardLeadersOnly, builder.parallelUpdates, builder.directUpdatesToLeadersOnly);
if (builder.stateProvider == null) {
if (builder.zkHosts != null && builder.solrUrls != null) {
throw new IllegalArgumentException("Both zkHost(s) & solrUrl(s) have been specified. Only specify one.");
@@ -101,6 +280,9 @@ public class CloudSolrClient extends BaseCloudSolrClient {
this.myClient = (builder.httpClient == null) ? HttpClientUtil.createClient(null) : builder.httpClient;
if (builder.loadBalancedSolrClient == null) builder.loadBalancedSolrClient = createLBHttpSolrClient(builder, myClient);
this.lbClient = builder.loadBalancedSolrClient;
+ this.updatesToLeaders = builder.shardLeadersOnly;
+ this.parallelUpdates = builder.parallelUpdates;
+ this.directUpdatesToLeadersOnly = builder.directUpdatesToLeadersOnly;
}
private void propagateLBClientConfigOptions(Builder builder) {
@@ -115,14 +297,91 @@ public class CloudSolrClient extends BaseCloudSolrClient {
}
}
- protected Map<String, LBHttpSolrClient.Req> createRoutes(UpdateRequest updateRequest, ModifiableSolrParams routableParams,
- DocCollection col, DocRouter router, Map<String, List<String>> urlMap,
- String idField) {
- return urlMap == null ? null : updateRequest.getRoutes(router, col, urlMap, routableParams, idField);
+ /** Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
+ * @param seconds ttl value in seconds
+ */
+ public void setCollectionCacheTTl(int seconds){
+ assert seconds > 0;
+ this.collectionStateCache.timeToLive = seconds * 1000L;
+ }
+
+ public ResponseParser getParser() {
+ return lbClient.getParser();
+ }
+
+ /**
+ * Note: This setter method is <b>not thread-safe</b>.
+ *
+ * @param processor
+ * Default Response Parser chosen to parse the response if the parser
+ * were not specified as part of the request.
+ * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+ */
+ public void setParser(ResponseParser processor) {
+ lbClient.setParser(processor);
+ }
+
+ public RequestWriter getRequestWriter() {
+ return lbClient.getRequestWriter();
+ }
+
+ public void setRequestWriter(RequestWriter requestWriter) {
+ lbClient.setRequestWriter(requestWriter);
+ }
+
+ /**
+ * @return the zkHost value used to connect to zookeeper.
+ */
+ public String getZkHost() {
+ return assertZKStateProvider().zkHost;
+ }
+
+ public ZkStateReader getZkStateReader() {
+ if (stateProvider instanceof ZkClientClusterStateProvider) {
+ ZkClientClusterStateProvider provider = (ZkClientClusterStateProvider) stateProvider;
+ stateProvider.connect();
+ return provider.zkStateReader;
+ }
+ throw new IllegalStateException("This has no Zk stateReader");
+ }
+
+ /**
+ * @param idField the field to route documents on.
+ */
+ public void setIdField(String idField) {
+ this.idField = idField;
+ }
+
+ /**
+ * @return the field that updates are routed on.
+ */
+ public String getIdField() {
+ return idField;
+ }
+
+ /** Sets the default collection for request */
+ public void setDefaultCollection(String collection) {
+ this.defaultCollection = collection;
+ }
+
+ /** Gets the default collection for request */
+ public String getDefaultCollection() {
+ return defaultCollection;
+ }
+
+ /** Set the connect timeout to the zookeeper ensemble in ms */
+ public void setZkConnectTimeout(int zkConnectTimeout) {
+ assertZKStateProvider().zkConnectTimeout = zkConnectTimeout;
+ }
+
+ /** Set the timeout to the zookeeper ensemble in ms */
+ public void setZkClientTimeout(int zkClientTimeout) {
+ assertZKStateProvider().zkClientTimeout = zkClientTimeout;
}
- protected RouteException getRouteException(SolrException.ErrorCode serverError, NamedList<Throwable> exceptions, Map<String, ? extends LBSolrClient.Req> routes) {
- return new RouteException(serverError, exceptions, routes);
+ /** Gets whether direct updates are sent in parallel */
+ public boolean isParallelUpdates() {
+ return parallelUpdates;
}
/** @deprecated since 7.2 Use {@link Builder} methods instead. */
@@ -132,33 +391,745 @@ public class CloudSolrClient extends BaseCloudSolrClient {
}
/**
- * @deprecated since Solr 8.0
+ * Connect to the zookeeper ensemble.
+ * This is an optional method that may be used to force a connect before any other requests are sent.
+ *
*/
- @Deprecated
- public RouteResponse condenseResponse(NamedList response, int timeMillis) {
- return condenseResponse(response, timeMillis, RouteResponse::new);
+ public void connect() {
+ stateProvider.connect();
}
/**
- * @deprecated since Solr 8.0
+ * Connect to a cluster. If the cluster is not ready, retry connection up to a given timeout.
+ * @param duration the timeout
+ * @param timeUnit the units of the timeout
+ * @throws TimeoutException if the cluster is not ready after the timeout
+ * @throws InterruptedException if the wait is interrupted
*/
- @Deprecated
- public static class RouteResponse extends BaseCloudSolrClient.RouteResponse<LBHttpSolrClient.Req> {
+ public void connect(long duration, TimeUnit timeUnit) throws TimeoutException, InterruptedException {
+ log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, stateProvider);
+ long timeout = System.nanoTime() + timeUnit.toNanos(duration);
+ while (System.nanoTime() < timeout) {
+ try {
+ connect();
+ log.info("Cluster at {} ready", stateProvider);
+ return;
+ }
+ catch (RuntimeException e) {
+ // not ready yet, then...
+ }
+ TimeUnit.MILLISECONDS.sleep(250);
+ }
+ throw new TimeoutException("Timed out waiting for cluster");
+ }
+
+ private ZkClientClusterStateProvider assertZKStateProvider() {
+ if (stateProvider instanceof ZkClientClusterStateProvider) {
+ return (ZkClientClusterStateProvider) stateProvider;
+ }
+ throw new IllegalArgumentException("This client does not use ZK");
}
/**
- * @deprecated since Solr 8.0
+ * Block until a collection state matches a predicate, or a timeout
+ *
+ * Note that the predicate may be called again even after it has returned true, so
+ * implementors should avoid changing state within the predicate call itself.
+ *
+ * @param collection the collection to watch
+ * @param wait how long to wait
+ * @param unit the units of the wait parameter
+ * @param predicate a {@link CollectionStatePredicate} to check the collection state
+ * @throws InterruptedException on interrupt
+ * @throws TimeoutException on timeout
*/
- @Deprecated
- public static class RouteException extends BaseCloudSolrClient.RouteException {
+ public void waitForState(String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
+ throws InterruptedException, TimeoutException {
+ stateProvider.connect();
+ assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
+ }
+
+ /**
+ * Register a CollectionStateWatcher to be called when the cluster state for a collection changes
+ *
+ * Note that the watcher is unregistered after it has been called once. To make a watcher persistent,
+ * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)}
+ * call
+ *
+ * @param collection the collection to watch
+ * @param watcher a watcher that will be called when the state changes
+ */
+ public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
+ stateProvider.connect();
+ assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher);
+ }
+
+ private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException {
+ UpdateRequest updateRequest = (UpdateRequest) request;
+ SolrParams params = request.getParams();
+ ModifiableSolrParams routableParams = new ModifiableSolrParams();
+ ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
+
+ if(params != null) {
+ nonRoutableParams.add(params);
+ routableParams.add(params);
+ for(String param : NON_ROUTABLE_PARAMS) {
+ routableParams.remove(param);
+ }
+ }
+
+ if (collection == null) {
+ throw new SolrServerException("No collection param specified on request and no default collection has been set.");
+ }
+
+ //Check to see if the collection is an alias.
+ List<String> aliasedCollections = stateProvider.resolveAlias(collection);
+ collection = aliasedCollections.get(0); // pick 1st (consistent with HttpSolrCall behavior)
+
+ DocCollection col = getDocCollection(collection, null);
+
+ DocRouter router = col.getRouter();
+
+ if (router instanceof ImplicitDocRouter) {
+ // short circuit as optimization
+ return null;
+ }
+
+ //Create the URL map, which is keyed on slice name.
+ //The value is a list of URLs for each replica in the slice.
+ //The first value in the list is the leader for the slice.
+ final Map<String,List<String>> urlMap = buildUrlMap(col);
+ final Map<String, LBHttpSolrClient.Req> routes = (urlMap == null ? null : updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField));
+ if (routes == null) {
+ if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
+ // we have info (documents with ids and/or ids to delete) with
+ // which to find the leaders but we could not find (all of) them
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+ "directUpdatesToLeadersOnly==true but could not find leader(s)");
+ } else {
+ // we could not find a leader or routes yet - use unoptimized general path
+ return null;
+ }
+ }
+
+ final NamedList<Throwable> exceptions = new NamedList<>();
+ final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery
+
+ long start = System.nanoTime();
+
+ if (parallelUpdates) {
+ final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size());
+ for (final Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
+ final String url = entry.getKey();
+ final LBHttpSolrClient.Req lbRequest = entry.getValue();
+ try {
+ MDC.put("CloudSolrClient.url", url);
+ responseFutures.put(url, threadPool.submit(() -> lbClient.request(lbRequest).getResponse()));
+ } finally {
+ MDC.remove("CloudSolrClient.url");
+ }
+ }
+
+ for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
+ final String url = entry.getKey();
+ final Future<NamedList<?>> responseFuture = entry.getValue();
+ try {
+ shardResponses.add(url, responseFuture.get());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ exceptions.add(url, e.getCause());
+ }
+ }
+
+ if (exceptions.size() > 0) {
+ Throwable firstException = exceptions.getVal(0);
+ if(firstException instanceof SolrException) {
+ SolrException e = (SolrException) firstException;
+ throw new RouteException(ErrorCode.getErrorCode(e.code()), exceptions, routes);
+ } else {
+ throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
+ }
+ }
+ } else {
+ for (Map.Entry<String, LBHttpSolrClient.Req> entry : routes.entrySet()) {
+ String url = entry.getKey();
+ LBHttpSolrClient.Req lbRequest = entry.getValue();
+ try {
+ NamedList<Object> rsp = lbClient.request(lbRequest).getResponse();
+ shardResponses.add(url, rsp);
+ } catch (Exception e) {
+ if(e instanceof SolrException) {
+ throw (SolrException) e;
+ } else {
+ throw new SolrServerException(e);
+ }
+ }
+ }
+ }
- public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, ? extends LBSolrClient.Req> routes) {
- super(errorCode, throwables, routes);
+ UpdateRequest nonRoutableRequest = null;
+ List<String> deleteQuery = updateRequest.getDeleteQuery();
+ if (deleteQuery != null && deleteQuery.size() > 0) {
+ UpdateRequest deleteQueryRequest = new UpdateRequest();
+ deleteQueryRequest.setDeleteQuery(deleteQuery);
+ nonRoutableRequest = deleteQueryRequest;
+ }
+
+ Set<String> paramNames = nonRoutableParams.getParameterNames();
+
+ Set<String> intersection = new HashSet<>(paramNames);
+ intersection.retainAll(NON_ROUTABLE_PARAMS);
+
+ if (nonRoutableRequest != null || intersection.size() > 0) {
+ if (nonRoutableRequest == null) {
+ nonRoutableRequest = new UpdateRequest();
+ }
+ nonRoutableRequest.setParams(nonRoutableParams);
+ nonRoutableRequest.setBasicAuthCredentials(request.getBasicAuthUser(), request.getBasicAuthPassword());
+ List<String> urlList = new ArrayList<>();
+ urlList.addAll(routes.keySet());
+ Collections.shuffle(urlList, rand);
+ LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(nonRoutableRequest, urlList);
+ try {
+ LBHttpSolrClient.Rsp rsp = lbClient.request(req);
+ shardResponses.add(urlList.get(0), rsp.getResponse());
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e);
+ }
+ }
+
+ long end = System.nanoTime();
+
+ RouteResponse rr = condenseResponse(shardResponses, (int) TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS));
+ rr.setRouteResponses(shardResponses);
+ rr.setRoutes(routes);
+ return rr;
+ }
+
+ private Map<String,List<String>> buildUrlMap(DocCollection col) {
+ Map<String, List<String>> urlMap = new HashMap<>();
+ Slice[] slices = col.getActiveSlicesArr();
+ for (Slice slice : slices) {
+ String name = slice.getName();
+ List<String> urls = new ArrayList<>();
+ Replica leader = slice.getLeader();
+ if (directUpdatesToLeadersOnly && leader == null) {
+ for (Replica replica : slice.getReplicas(
+ replica -> replica.isActive(getClusterStateProvider().getLiveNodes())
+ && replica.getType() == Replica.Type.NRT)) {
+ leader = replica;
+ break;
+ }
+ }
+ if (leader == null) {
+ if (directUpdatesToLeadersOnly) {
+ continue;
+ }
+ // take unoptimized general path - we cannot find a leader yet
+ return null;
+ }
+ ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+ String url = zkProps.getCoreUrl();
+ urls.add(url);
+ if (!directUpdatesToLeadersOnly) {
+ for (Replica replica : slice.getReplicas()) {
+ if (!replica.getNodeName().equals(leader.getNodeName()) &&
+ !replica.getName().equals(leader.getName())) {
+ ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
+ String url1 = zkProps1.getCoreUrl();
+ urls.add(url1);
+ }
+ }
+ }
+ urlMap.put(name, urls);
+ }
+ return urlMap;
+ }
+
+ public RouteResponse condenseResponse(NamedList response, int timeMillis) {
+ RouteResponse condensed = new RouteResponse();
+ int status = 0;
+ Integer rf = null;
+ Integer minRf = null;
+
+ // TolerantUpdateProcessor
+ List<SimpleOrderedMap<String>> toleratedErrors = null;
+ int maxToleratedErrors = Integer.MAX_VALUE;
+
+ // For "adds", "deletes", "deleteByQuery" etc.
+ Map<String, NamedList> versions = new HashMap<>();
+
+ for(int i=0; i<response.size(); i++) {
+ NamedList shardResponse = (NamedList)response.getVal(i);
+ NamedList header = (NamedList)shardResponse.get("responseHeader");
+ Integer shardStatus = (Integer)header.get("status");
+ int s = shardStatus.intValue();
+ if(s > 0) {
+ status = s;
+ }
+ Object rfObj = header.get(UpdateRequest.REPFACT);
+ if (rfObj != null && rfObj instanceof Integer) {
+ Integer routeRf = (Integer)rfObj;
+ if (rf == null || routeRf < rf)
+ rf = routeRf;
+ }
+ minRf = (Integer)header.get(UpdateRequest.MIN_REPFACT);
+
+ List<SimpleOrderedMap<String>> shardTolerantErrors =
+ (List<SimpleOrderedMap<String>>) header.get("errors");
+ if (null != shardTolerantErrors) {
+ Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
+ assert null != shardMaxToleratedErrors : "TolerantUpdateProcessor reported errors but not maxErrors";
+ // if we get into some weird state where the nodes disagree about the effective maxErrors,
+ // assume the min value seen to decide if we should fail.
+ maxToleratedErrors = Math.min(maxToleratedErrors,
+ ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
+
+ if (null == toleratedErrors) {
+ toleratedErrors = new ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
+ }
+ for (SimpleOrderedMap<String> err : shardTolerantErrors) {
+ toleratedErrors.add(err);
+ }
+ }
+ for (String updateType: Arrays.asList("adds", "deletes", "deleteByQuery")) {
+ Object obj = shardResponse.get(updateType);
+ if (obj instanceof NamedList) {
+ NamedList versionsList = versions.containsKey(updateType) ?
+ versions.get(updateType): new NamedList();
+ versionsList.addAll((NamedList)obj);
+ versions.put(updateType, versionsList);
+ }
+ }
+ }
+
+ NamedList cheader = new NamedList();
+ cheader.add("status", status);
+ cheader.add("QTime", timeMillis);
+ if (rf != null)
+ cheader.add(UpdateRequest.REPFACT, rf);
+ if (minRf != null)
+ cheader.add(UpdateRequest.MIN_REPFACT, minRf);
+ if (null != toleratedErrors) {
+ cheader.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors));
+ cheader.add("errors", toleratedErrors);
+ if (maxToleratedErrors < toleratedErrors.size()) {
+ // cumulative errors are too high, we need to throw a client exception w/correct metadata
+
+ // NOTE: it shouldn't be possible for 1 == toleratedErrors.size(), because if that were the case
+ // then at least one shard should have thrown a real error before this, so we don't worry
+ // about having a more "singular" exception msg for that situation
+ StringBuilder msgBuf = new StringBuilder()
+ .append(toleratedErrors.size()).append(" Async failures during distributed update: ");
+
+ NamedList metadata = new NamedList<String>();
+ for (SimpleOrderedMap<String> err : toleratedErrors) {
+ ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
+ metadata.add(te.getMetadataKey(), te.getMetadataValue());
+
+ msgBuf.append("\n").append(te.getMessage());
+ }
+
+ SolrException toThrow = new SolrException(ErrorCode.BAD_REQUEST, msgBuf.toString());
+ toThrow.setMetadata(metadata);
+ throw toThrow;
+ }
+ }
+ for (String updateType: versions.keySet()) {
+ condensed.add(updateType, versions.get(updateType));
+ }
+ condensed.add("responseHeader", cheader);
+ return condensed;
+ }
+
+ public static class RouteResponse extends NamedList {
+ private NamedList routeResponses;
+ private Map<String, LBHttpSolrClient.Req> routes;
+
+ public void setRouteResponses(NamedList routeResponses) {
+ this.routeResponses = routeResponses;
+ }
+
+ public NamedList getRouteResponses() {
+ return routeResponses;
+ }
+
+ public void setRoutes(Map<String, LBHttpSolrClient.Req> routes) {
+ this.routes = routes;
+ }
+
+ public Map<String, LBHttpSolrClient.Req> getRoutes() {
+ return routes;
+ }
+
+ }
+
+ public static class RouteException extends SolrException {
+
+ private NamedList<Throwable> throwables;
+ private Map<String, LBHttpSolrClient.Req> routes;
+
+ public RouteException(ErrorCode errorCode, NamedList<Throwable> throwables, Map<String, LBHttpSolrClient.Req> routes){
+ super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0));
+ this.throwables = throwables;
+ this.routes = routes;
+
+ // create a merged copy of the metadata from all wrapped exceptions
+ NamedList<String> metadata = new NamedList<String>();
+ for (int i = 0; i < throwables.size(); i++) {
+ Throwable t = throwables.getVal(i);
+ if (t instanceof SolrException) {
+ SolrException e = (SolrException) t;
+ NamedList<String> eMeta = e.getMetadata();
+ if (null != eMeta) {
+ metadata.addAll(eMeta);
+ }
+ }
+ }
+ if (0 < metadata.size()) {
+ this.setMetadata(metadata);
+ }
+ }
+
+ public NamedList<Throwable> getThrowables() {
+ return throwables;
+ }
+
+ public Map<String, LBHttpSolrClient.Req> getRoutes() {
+ return this.routes;
}
}
@Override
+ public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
+ // the collection parameter of the request overrides that of the parameter to this method
+ String requestCollection = request.getCollection();
+ if (requestCollection != null) {
+ collection = requestCollection;
+ } else if (collection == null) {
+ collection = defaultCollection;
+ }
+ List<String> inputCollections =
+ collection == null ? Collections.emptyList() : StrUtils.splitSmart(collection, ",", true);
+ return requestWithRetryOnStaleState(request, 0, inputCollections);
+ }
+
+ /**
+ * As this class doesn't watch external collections on the client side,
+ * there's a chance that the request will fail due to cached stale state,
+ * which means the state must be refreshed from ZK and retried.
+ */
+ protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, List<String> inputCollections)
+ throws SolrServerException, IOException {
+ connect(); // important to call this before you start working with the ZkStateReader
+
+ // build up a _stateVer_ param to pass to the server containing all of the
+ // external collection state versions involved in this request, which allows
+ // the server to notify us that our cached state for one or more of the external
+ // collections is stale and needs to be refreshed ... this code has no impact on internal collections
+ String stateVerParam = null;
+ List<DocCollection> requestedCollections = null;
+ boolean isCollectionRequestOfV2 = false;
+ if (request instanceof V2RequestSupport) {
+ request = ((V2RequestSupport) request).getV2Request();
+ }
+ if (request instanceof V2Request) {
+ isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
+ }
+ boolean isAdmin = ADMIN_PATHS.contains(request.getPath());
+ if (!inputCollections.isEmpty() && !isAdmin && !isCollectionRequestOfV2) { // don't do _stateVer_ checking for admin, v2 api requests
+ Set<String> requestedCollectionNames = resolveAliases(inputCollections);
+
+ StringBuilder stateVerParamBuilder = null;
+ for (String requestedCollection : requestedCollectionNames) {
+ // track the version of state we're using on the client side using the _stateVer_ param
+ DocCollection coll = getDocCollection(requestedCollection, null);
+ if (coll == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + requestedCollection);
+ }
+ int collVer = coll.getZNodeVersion();
+ if (coll.getStateFormat()>1) {
+ if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
+ requestedCollections.add(coll);
+
+ if (stateVerParamBuilder == null) {
+ stateVerParamBuilder = new StringBuilder();
+ } else {
+ stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
+ }
+
+ stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
+ }
+ }
+
+ if (stateVerParamBuilder != null) {
+ stateVerParam = stateVerParamBuilder.toString();
+ }
+ }
+
+ if (request.getParams() instanceof ModifiableSolrParams) {
+ ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+ if (stateVerParam != null) {
+ params.set(STATE_VERSION, stateVerParam);
+ } else {
+ params.remove(STATE_VERSION);
+ }
+ } // else: ??? how to set this ???
+
+ NamedList<Object> resp = null;
+ try {
+ resp = sendRequest(request, inputCollections);
+ //to avoid an O(n) operation we always add STATE_VERSION to the last and try to read it from there
+ Object o = resp == null || resp.size() == 0 ? null : resp.get(STATE_VERSION, resp.size() - 1);
+ if(o != null && o instanceof Map) {
+ //remove this because no one else needs this and tests would fail if they are comparing responses
+ resp.remove(resp.size()-1);
+ Map invalidStates = (Map) o;
+ for (Object invalidEntries : invalidStates.entrySet()) {
+ Map.Entry e = (Map.Entry) invalidEntries;
+ getDocCollection((String) e.getKey(), (Integer) e.getValue());
+ }
+
+ }
+ } catch (Exception exc) {
+
+ Throwable rootCause = SolrException.getRootCause(exc);
+ // don't do retry support for admin requests
+ // or if the request doesn't have a collection specified
+ // or request is v2 api and its method is not GET
+ if (inputCollections.isEmpty() || isAdmin || (request instanceof V2Request && request.getMethod() != SolrRequest.METHOD.GET)) {
+ if (exc instanceof SolrServerException) {
+ throw (SolrServerException)exc;
+ } else if (exc instanceof IOException) {
+ throw (IOException)exc;
+ }else if (exc instanceof RuntimeException) {
+ throw (RuntimeException) exc;
+ }
+ else {
+ throw new SolrServerException(rootCause);
+ }
+ }
+
+ int errorCode = (rootCause instanceof SolrException) ?
+ ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
+
+ boolean wasCommError =
+ (rootCause instanceof ConnectException ||
+ rootCause instanceof ConnectTimeoutException ||
+ rootCause instanceof NoHttpResponseException ||
+ rootCause instanceof SocketException);
+
+ log.error("Request to collection {} failed due to (" + errorCode + ") {}, retry={} commError={} errorCode={} ",
+ inputCollections, rootCause.toString(), retryCount, wasCommError, errorCode);
+
+ if (wasCommError
+ || (exc instanceof RouteException && (errorCode == 503)) // 404 because the core does not exist 503 service unavailable
+ //TODO there are other reasons for 404. We need to change the solr response format from HTML to structured data to know that
+ ) {
+ // it was a communication error. it is likely that
+ // the node to which the request to be sent is down . So , expire the state
+ // so that the next attempt would fetch the fresh state
+ // just re-read state for all of them, if it has not been retried
+ // in retryExpiryTime time
+ if (requestedCollections != null) {
+ for (DocCollection ext : requestedCollections) {
+ ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(ext.getName());
+ if (cacheEntry == null) continue;
+ cacheEntry.maybeStale = true;
+ }
+ }
+ if (retryCount < MAX_STALE_RETRIES) {//if it is a communication error , we must try again
+ //may be, we have a stale version of the collection state
+ // and we could not get any information from the server
+ //it is probably not worth trying again and again because
+ // the state would not have been updated
+ log.info("trying request again");
+ return requestWithRetryOnStaleState(request, retryCount + 1, inputCollections);
+ }
+ } else {
+ log.info("request was not communication error it seems");
+ }
+
+ boolean stateWasStale = false;
+ if (retryCount < MAX_STALE_RETRIES &&
+ requestedCollections != null &&
+ !requestedCollections.isEmpty() &&
+ (SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE || errorCode == 404))
+ {
+ // cached state for one or more external collections was stale
+ // re-issue request using updated state
+ stateWasStale = true;
+
+ // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
+ for (DocCollection ext : requestedCollections) {
+ collectionStateCache.remove(ext.getName());
+ }
+ }
+
+ // if we experienced a communication error, it's worth checking the state
+ // with ZK just to make sure the node we're trying to hit is still part of the collection
+ if (retryCount < MAX_STALE_RETRIES &&
+ !stateWasStale &&
+ requestedCollections != null &&
+ !requestedCollections.isEmpty() &&
+ wasCommError) {
+ for (DocCollection ext : requestedCollections) {
+ DocCollection latestStateFromZk = getDocCollection(ext.getName(), null);
+ if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
+ // looks like we couldn't reach the server because the state was stale == retry
+ stateWasStale = true;
+ // we just pulled state from ZK, so update the cache so that the retry uses it
+ collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
+ }
+ }
+ }
+
+ if (requestedCollections != null) {
+ requestedCollections.clear(); // done with this
+ }
+
+ // if the state was stale, then we retry the request once with new state pulled from Zk
+ if (stateWasStale) {
+ log.warn("Re-trying request to collection(s) "+inputCollections+" after stale state error from server.");
+ resp = requestWithRetryOnStaleState(request, retryCount+1, inputCollections);
+ } else {
+ if (exc instanceof SolrException || exc instanceof SolrServerException || exc instanceof IOException) {
+ throw exc;
+ } else {
+ throw new SolrServerException(rootCause);
+ }
+ }
+ }
+
+ return resp;
+ }
+
+ protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputCollections)
+ throws SolrServerException, IOException {
+ connect();
+
+ boolean sendToLeaders = false;
+
+ if (request instanceof IsUpdateRequest) {
+ if (request instanceof UpdateRequest) {
+ String collection = inputCollections.isEmpty() ? null : inputCollections.get(0); // getting first mimics HttpSolrCall
+ NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, collection);
+ if (response != null) {
+ return response;
+ }
+ }
+ sendToLeaders = true;
+ }
+
+ SolrParams reqParams = request.getParams();
+ if (reqParams == null) { // TODO fix getParams to never return null!
+ reqParams = new ModifiableSolrParams();
+ }
+
+ final Set<String> liveNodes = stateProvider.getLiveNodes();
+
+ final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
+
+ if (request instanceof V2Request) {
+ if (!liveNodes.isEmpty()) {
+ List<String> liveNodesList = new ArrayList<>(liveNodes);
+ Collections.shuffle(liveNodesList, rand);
+ theUrlList.add(Utils.getBaseUrlForNodeName(liveNodesList.get(0),
+ (String) stateProvider.getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
+ }
+
+ } else if (ADMIN_PATHS.contains(request.getPath())) {
+ for (String liveNode : liveNodes) {
+ theUrlList.add(Utils.getBaseUrlForNodeName(liveNode,
+ (String) stateProvider.getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
+ }
+
+ } else { // Typical...
+ Set<String> collectionNames = resolveAliases(inputCollections);
+ if (collectionNames.isEmpty()) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "No collection param specified on request and no default collection has been set: " + inputCollections);
+ }
+
+ // TODO: not a big deal because of the caching, but we could avoid looking
+ // at every shard when getting leaders if we tweaked some things
+
+ // Retrieve slices from the cloud state and, for each collection specified, add it to the Map of slices.
+ Map<String,Slice> slices = new HashMap<>();
+ String shardKeys = reqParams.get(ShardParams._ROUTE_);
+ for (String collectionName : collectionNames) {
+ DocCollection col = getDocCollection(collectionName, null);
+ if (col == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
+ }
+ Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
+ ClientUtils.addSlices(slices, collectionName, routeSlices, true);
+ }
+
+ // Gather URLs, grouped by leader or replica
+ // TODO: allow filtering by group, role, etc
+ Set<String> seenNodes = new HashSet<>();
+ List<String> replicas = new ArrayList<>();
+ String joinedInputCollections = StrUtils.join(inputCollections, ',');
+ for (Slice slice : slices.values()) {
+ for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
+ ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+ String node = coreNodeProps.getNodeName();
+ if (!liveNodes.contains(node) // Must be a live node to continue
+ || Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
+ continue;
+ if (seenNodes.add(node)) { // if we haven't yet collected a URL to this node...
+ String url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), joinedInputCollections);
+ if (sendToLeaders && coreNodeProps.isLeader()) {
+ theUrlList.add(url); // put leaders here eagerly (if sendToLeader mode)
+ } else {
+ replicas.add(url); // replicas here
+ }
+ }
+ }
+ }
+
+ // Shuffle the leaders, if any (none if !sendToLeaders)
+ Collections.shuffle(theUrlList, rand);
+
+ // Shuffle the replicas, if any, and append to our list
+ Collections.shuffle(replicas, rand);
+ theUrlList.addAll(replicas);
+
+ if (theUrlList.isEmpty()) {
+ collectionStateCache.keySet().removeAll(collectionNames);
+ throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
+ "Could not find a healthy node to handle the request.");
+ }
+ }
+
+ LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(request, theUrlList);
+ LBHttpSolrClient.Rsp rsp = lbClient.request(req);
+ return rsp.getResponse();
+ }
+
+ /** Resolves the input collections to their possible aliased collections. Doesn't validate collection existence. */
+ private LinkedHashSet<String> resolveAliases(List<String> inputCollections) {
+ LinkedHashSet<String> collectionNames = new LinkedHashSet<>(); // consistent ordering
+ for (String collectionName : inputCollections) {
+ if (stateProvider.getState(collectionName) == null) {
+ // perhaps it's an alias
+ List<String> aliasedCollections = stateProvider.resolveAlias(collectionName);
+ // one more level of alias indirection... (dubious that we should support this)
+ for (String aliasedCollection : aliasedCollections) {
+ collectionNames.addAll(stateProvider.resolveAlias(aliasedCollection));
+ }
+ } else {
+ collectionNames.add(collectionName); // it's a collection
+ }
+ }
+ return collectionNames;
+ }
+
+ @Override
public void close() throws IOException {
stateProvider.close();
@@ -170,7 +1141,9 @@ public class CloudSolrClient extends BaseCloudSolrClient {
HttpClientUtil.close(myClient);
}
- super.close();
+ if(this.threadPool != null && !this.threadPool.isShutdown()) {
+ this.threadPool.shutdown();
+ }
}
public LBHttpSolrClient getLbClient() {
@@ -180,8 +1153,150 @@ public class CloudSolrClient extends BaseCloudSolrClient {
public HttpClient getHttpClient() {
return myClient;
}
+
+ public boolean isUpdatesToLeaders() {
+ return updatesToLeaders;
+ }
/**
+ * @return true if direct updates are sent to shard leaders only
+ */
+ public boolean isDirectUpdatesToLeadersOnly() {
+ return directUpdatesToLeadersOnly;
+ }
+
+ /**If caches are expired they are refreshed after acquiring a lock.
+ * use this to set the number of locks
+ */
+ public void setParallelCacheRefreshes(int n){ locks = objectList(n); }
+
+ private static ArrayList<Object> objectList(int n) {
+ ArrayList<Object> l = new ArrayList<>(n);
+ for(int i=0;i<n;i++) l.add(new Object());
+ return l;
+ }
+
+
+ protected DocCollection getDocCollection(String collection, Integer expectedVersion) throws SolrException {
+ if (expectedVersion == null) expectedVersion = -1;
+ if (collection == null) return null;
+ ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(collection);
+ DocCollection col = cacheEntry == null ? null : cacheEntry.cached;
+ if (col != null) {
+ if (expectedVersion <= col.getZNodeVersion()
+ && !cacheEntry.shouldRetry()) return col;
+ }
+
+ CollectionRef ref = getCollectionRef(collection);
+ if (ref == null) {
+ //no such collection exists
+ return null;
+ }
+ if (!ref.isLazilyLoaded()) {
+ //it is readily available just return it
+ return ref.get();
+ }
+ List locks = this.locks;
+ final Object lock = locks.get(Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(), 0) % locks.size()));
+ DocCollection fetchedCol = null;
+ synchronized (lock) {
+ /*we have waited for sometime just check once again*/
+ cacheEntry = collectionStateCache.get(collection);
+ col = cacheEntry == null ? null : cacheEntry.cached;
+ if (col != null) {
+ if (expectedVersion <= col.getZNodeVersion()
+ && !cacheEntry.shouldRetry()) return col;
+ }
+ // We are going to fetch a new version
+ // we MUST try to get a new version
+ fetchedCol = ref.get();//this is a call to ZK
+ if (fetchedCol == null) return null;// this collection no more exists
+ if (col != null && fetchedCol.getZNodeVersion() == col.getZNodeVersion()) {
+ cacheEntry.setRetriedAt();//we retried and found that it is the same version
+ cacheEntry.maybeStale = false;
+ } else {
+ if (fetchedCol.getStateFormat() > 1)
+ collectionStateCache.put(collection, new ExpiringCachedDocCollection(fetchedCol));
+ }
+ return fetchedCol;
+ }
+ }
+
+ CollectionRef getCollectionRef(String collection) {
+ return stateProvider.getState(collection);
+ }
+
+ /**
+ * Useful for determining the minimum achieved replication factor across
+ * all shards involved in processing an update request, typically useful
+ * for gauging the replication factor of a batch.
+ */
+ @SuppressWarnings("rawtypes")
+ public int getMinAchievedReplicationFactor(String collection, NamedList resp) {
+ // it's probably already on the top-level header set by condense
+ NamedList header = (NamedList)resp.get("responseHeader");
+ Integer achRf = (Integer)header.get(UpdateRequest.REPFACT);
+ if (achRf != null)
+ return achRf.intValue();
+
+ // not on the top-level header, walk the shard route tree
+ Map<String,Integer> shardRf = getShardReplicationFactor(collection, resp);
+ for (Integer rf : shardRf.values()) {
+ if (achRf == null || rf < achRf) {
+ achRf = rf;
+ }
+ }
+ return (achRf != null) ? achRf.intValue() : -1;
+ }
+
+ /**
+ * Walks the NamedList response after performing an update request looking for
+ * the replication factor that was achieved in each shard involved in the request.
+ * For single doc updates, there will be only one shard in the return value.
+ */
+ @SuppressWarnings("rawtypes")
+ public Map<String,Integer> getShardReplicationFactor(String collection, NamedList resp) {
+ connect();
+
+ Map<String,Integer> results = new HashMap<String,Integer>();
+ if (resp instanceof CloudSolrClient.RouteResponse) {
+ NamedList routes = ((CloudSolrClient.RouteResponse)resp).getRouteResponses();
+ DocCollection coll = getDocCollection(collection, null);
+ Map<String,String> leaders = new HashMap<String,String>();
+ for (Slice slice : coll.getActiveSlicesArr()) {
+ Replica leader = slice.getLeader();
+ if (leader != null) {
+ ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+ String leaderUrl = zkProps.getBaseUrl() + "/" + zkProps.getCoreName();
+ leaders.put(leaderUrl, slice.getName());
+ String altLeaderUrl = zkProps.getBaseUrl() + "/" + collection;
+ leaders.put(altLeaderUrl, slice.getName());
+ }
+ }
+
+ Iterator<Map.Entry<String,Object>> routeIter = routes.iterator();
+ while (routeIter.hasNext()) {
+ Map.Entry<String,Object> next = routeIter.next();
+ String host = next.getKey();
+ NamedList hostResp = (NamedList)next.getValue();
+ Integer rf = (Integer)((NamedList)hostResp.get("responseHeader")).get(UpdateRequest.REPFACT);
+ if (rf != null) {
+ String shard = leaders.get(host);
+ if (shard == null) {
+ if (host.endsWith("/"))
+ shard = leaders.get(host.substring(0,host.length()-1));
+ if (shard == null) {
+ shard = host;
+ }
+ }
+ results.put(shard, rf);
+ }
+ }
+ }
+ return results;
+ }
+
+ /**
* @deprecated since 7.0 Use {@link Builder} methods instead.
*/
@Deprecated
@@ -193,10 +1308,29 @@ public class CloudSolrClient extends BaseCloudSolrClient {
return stateProvider;
}
- @Override
- protected boolean wasCommError(Throwable rootCause) {
- return rootCause instanceof ConnectTimeoutException ||
- rootCause instanceof NoHttpResponseException;
+ private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, String idField) {
+ final Map<SolrInputDocument,Map<String,Object>> documents = updateRequest.getDocumentsMap();
+ final Map<String,Map<String,Object>> deleteById = updateRequest.getDeleteByIdMap();
+
+ final boolean hasNoDocuments = (documents == null || documents.isEmpty());
+ final boolean hasNoDeleteById = (deleteById == null || deleteById.isEmpty());
+ if (hasNoDocuments && hasNoDeleteById) {
+ // no documents and no delete-by-id, so no info to find leader(s)
+ return false;
+ }
+
+ if (documents != null) {
+ for (final Map.Entry<SolrInputDocument,Map<String,Object>> entry : documents.entrySet()) {
+ final SolrInputDocument doc = entry.getKey();
+ final Object fieldValue = doc.getFieldValue(idField);
+ if (fieldValue == null) {
+ // a document with no id field value, so can't find leader for it
+ return false;
+ }
+ }
+ }
+
+ return true;
}
private static LBHttpSolrClient createLBHttpSolrClient(Builder cloudSolrClientBuilder, HttpClient httpClient) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java
deleted file mode 100644
index 335684a..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2ClusterStateProvider.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.solr.client.solrj.SolrClient;
-
-public class Http2ClusterStateProvider extends BaseHttpClusterStateProvider {
- final Http2SolrClient httpClient;
- final boolean closeClient;
-
- public Http2ClusterStateProvider(List<String> solrUrls, Http2SolrClient httpClient) throws Exception {
- this.httpClient = httpClient == null? new Http2SolrClient.Builder().build(): httpClient;
- this.closeClient = httpClient == null;
- init(solrUrls);
- }
-
- @Override
- public void close() throws IOException {
- if (this.closeClient && this.httpClient != null) {
- httpClient.close();
- }
- }
-
- @Override
- protected SolrClient getSolrClient(String baseUrl) {
- return new Http2SolrClient.Builder(baseUrl).withHttpClient(httpClient).build();
- }
-}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 580a849..9b60460 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -93,18 +93,10 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.*;
import static org.apache.solr.common.util.Utils.getObjectByPath;
// TODO: error handling, small Http2SolrClient features, security, ssl
/**
- * Difference between this {@link Http2SolrClient} and {@link HttpSolrClient}:
- * <ul>
- * <li>{@link Http2SolrClient} sends requests in HTTP/2</li>
- * <li>{@link Http2SolrClient} can point to multiple urls</li>
- * <li>{@link Http2SolrClient} does not expose its internal httpClient like {@link HttpSolrClient#getHttpClient()},
- * sharing connection pools should be done by {@link Http2SolrClient.Builder#withHttpClient(Http2SolrClient)} </li>
- * </ul>
* @lucene.experimental
*/
public class Http2SolrClient extends SolrClient {
@@ -147,12 +139,20 @@ public class Http2SolrClient extends SolrClient {
if (builder.idleTimeout != null) idleTimeout = builder.idleTimeout;
else idleTimeout = HttpClientUtil.DEFAULT_SO_TIMEOUT;
- if (builder.http2SolrClient == null) {
+ if (builder.httpClient == null) {
httpClient = createHttpClient(builder);
closeClient = true;
} else {
- httpClient = builder.http2SolrClient.httpClient;
+ httpClient = builder.httpClient;
}
+ if (!httpClient.isStarted()) {
+ try {
+ httpClient.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
assert ObjectReleaseTracker.track(this);
}
@@ -160,12 +160,10 @@ public class Http2SolrClient extends SolrClient {
this.listenerFactory.add(factory);
}
- // internal usage only
HttpClient getHttpClient() {
return httpClient;
}
- // internal usage only
ProtocolHandlers getProtocolHandlers() {
return httpClient.getProtocolHandlers();
}
@@ -215,11 +213,6 @@ public class Http2SolrClient extends SolrClient {
if (builder.idleTimeout != null) httpClient.setIdleTimeout(builder.idleTimeout);
if (builder.connectionTimeout != null) httpClient.setConnectTimeout(builder.connectionTimeout);
- try {
- httpClient.start();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
return httpClient;
}
@@ -452,7 +445,6 @@ public class Http2SolrClient extends SolrClient {
private Request makeRequest(SolrRequest solrRequest, String collection)
throws SolrServerException, IOException {
Request req = createRequest(solrRequest, collection);
- req.header(HttpHeader.ACCEPT_ENCODING, null);
setListeners(solrRequest, req);
if (solrRequest.getUserPrincipal() != null) {
req.attribute(REQ_PRINCIPAL_KEY, solrRequest.getUserPrincipal());
@@ -798,7 +790,7 @@ public class Http2SolrClient extends SolrClient {
public static class Builder {
- private Http2SolrClient http2SolrClient;
+ private HttpClient httpClient;
private SSLConfig sslConfig = defaultSSLConfig;
private Integer idleTimeout;
private Integer connectionTimeout;
@@ -818,11 +810,8 @@ public class Http2SolrClient extends SolrClient {
return new Http2SolrClient(baseSolrUrl, this);
}
- /**
- * Reuse {@code httpClient} connections pool
- */
- public Builder withHttpClient(Http2SolrClient httpClient) {
- this.http2SolrClient = httpClient;
+ public Builder withHttpClient(HttpClient httpClient) {
+ this.httpClient = httpClient;
return this;
}
@@ -856,6 +845,52 @@ public class Http2SolrClient extends SolrClient {
}
+ /**
+ * Subclass of SolrException that allows us to capture an arbitrary HTTP status code that may have been returned by
+ * the remote server or a proxy along the way.
+ */
+ public static class RemoteSolrException extends SolrException {
+ /**
+ * @param remoteHost the host the error was received from
+ * @param code Arbitrary HTTP status code
+ * @param msg Exception Message
+ * @param th Throwable to wrap with this Exception
+ */
+ public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
+ super(code, "Error from server at " + remoteHost + ": " + msg, th);
+ }
+ }
+
+ /**
+ * This should be thrown when a server has an error in executing the request and it sends a proper payload back to the
+ * client
+ */
+ public static class RemoteExecutionException extends RemoteSolrException {
+ private NamedList meta;
+
+ public RemoteExecutionException(String remoteHost, int code, String msg, NamedList meta) {
+ super(remoteHost, code, msg, null);
+ this.meta = meta;
+ }
+
+ public static RemoteExecutionException create(String host, NamedList errResponse) {
+ Object errObj = errResponse.get("error");
+ if (errObj != null) {
+ Number code = (Number) getObjectByPath(errObj, true, Collections.singletonList("code"));
+ String msg = (String) getObjectByPath(errObj, true, Collections.singletonList("msg"));
+ return new RemoteExecutionException(host, code == null ? ErrorCode.UNKNOWN.code : code.intValue(),
+ msg == null ? "Unknown Error" : msg, errResponse);
+
+ } else {
+ throw new RuntimeException("No error");
+ }
+ }
+
+ public NamedList getMetaData() {
+ return meta;
+ }
+ }
+
public Set<String> getQueryParams() {
return queryParams;
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
index 07fd8f8..bbab3aa 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
@@ -17,25 +17,67 @@
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ClusterState.CollectionRef;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class HttpClusterStateProvider extends BaseHttpClusterStateProvider {
+public class HttpClusterStateProvider implements ClusterStateProvider {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final HttpClient httpClient;
- private final boolean clientIsInternal;
+ private String urlScheme;
+ volatile Set<String> liveNodes;
+ long liveNodesTimestamp = 0;
+ volatile Map<String, List<String>> aliases;
+ long aliasesTimestamp = 0;
+
+ private int cacheTimeout = 5; // the liveNodes and aliases cache will be invalidated after 5 secs
+ final HttpClient httpClient;
+ final boolean clientIsInternal;
public HttpClusterStateProvider(List<String> solrUrls, HttpClient httpClient) throws Exception {
this.httpClient = httpClient == null? HttpClientUtil.createClient(null): httpClient;
this.clientIsInternal = httpClient == null;
- init(solrUrls);
- }
+ for (String solrUrl: solrUrls) {
+ urlScheme = solrUrl.startsWith("https")? "https": "http";
+ try (SolrClient initialClient = new HttpSolrClient.Builder().withBaseSolrUrl(solrUrl).withHttpClient(httpClient).build()) {
+ Set<String> liveNodes = fetchLiveNodes(initialClient); // throws exception if unable to fetch
+ this.liveNodes = liveNodes;
+ liveNodesTimestamp = System.nanoTime();
+ break;
+ } catch (IOException e) {
+ log.warn("Attempt to fetch live_nodes from " + solrUrl + " failed.", e);
+ }
+ }
- @Override
- protected SolrClient getSolrClient(String baseUrl) {
- return new HttpSolrClient.Builder().withBaseSolrUrl(baseUrl).withHttpClient(httpClient).build();
+ if (this.liveNodes == null || this.liveNodes.isEmpty()) {
+ throw new RuntimeException("Tried fetching live_nodes using Solr URLs provided, i.e. " + solrUrls + ". However, "
+ + "succeeded in obtaining the cluster state from none of them."
+ + "If you think your Solr cluster is up and is accessible,"
+ + " you could try re-creating a new CloudSolrClient using working"
+ + " solrUrl(s) or zkHost(s).");
+ }
}
@Override
@@ -44,4 +86,247 @@ public class HttpClusterStateProvider extends BaseHttpClusterStateProvider {
HttpClientUtil.close(httpClient);
}
}
+
+ @Override
+ public CollectionRef getState(String collection) {
+ for (String nodeName: liveNodes) {
+ try (HttpSolrClient client = new HttpSolrClient.Builder().
+ withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
+ withHttpClient(httpClient).build()) {
+ ClusterState cs = fetchClusterState(client, collection, null);
+ return cs.getCollectionRef(collection);
+ } catch (SolrServerException | IOException e) {
+ log.warn("Attempt to fetch cluster state from " +
+ Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+ } catch (RemoteSolrException e) {
+ if ("NOT_FOUND".equals(e.getMetadata("CLUSTERSTATUS"))) {
+ return null;
+ }
+ log.warn("Attempt to fetch cluster state from " +
+ Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+ } catch (NotACollectionException e) {
+ // Cluster state for the given collection was not found, could be an alias.
+ // Lets fetch/update our aliases:
+ getAliases(true);
+ return null;
+ }
+ }
+ throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
+ + "succeeded in obtaining the cluster state from none of them."
+ + "If you think your Solr cluster is up and is accessible,"
+ + " you could try re-creating a new CloudSolrClient using working"
+ + " solrUrl(s) or zkHost(s).");
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private ClusterState fetchClusterState(SolrClient client, String collection, Map<String, Object> clusterProperties) throws SolrServerException, IOException, NotACollectionException {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ if (collection != null) {
+ params.set("collection", collection);
+ }
+ params.set("action", "CLUSTERSTATUS");
+ QueryRequest request = new QueryRequest(params);
+ request.setPath("/admin/collections");
+ NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
+ Map<String, Object> collectionsMap;
+ if (collection != null) {
+ collectionsMap = Collections.singletonMap(collection,
+ ((NamedList) cluster.get("collections")).get(collection));
+ } else {
+ collectionsMap = ((NamedList)cluster.get("collections")).asMap(10);
+ }
+ int znodeVersion;
+ Map<String, Object> collFromStatus = (Map<String, Object>) (collectionsMap).get(collection);
+ if (collection != null && collFromStatus == null) {
+ throw new NotACollectionException(); // probably an alias
+ }
+ if (collection != null) { // can be null if alias
+ znodeVersion = (int) collFromStatus.get("znodeVersion");
+ } else {
+ znodeVersion = -1;
+ }
+ Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
+ this.liveNodes = liveNodes;
+ liveNodesTimestamp = System.nanoTime();
+ //TODO SOLR-11877 we don't know the znode path; CLUSTER_STATE is probably wrong leading to bad stateFormat
+ ClusterState cs = ClusterState.load(znodeVersion, collectionsMap, liveNodes, ZkStateReader.CLUSTER_STATE);
+ if (clusterProperties != null) {
+ Map<String, Object> properties = (Map<String, Object>) cluster.get("properties");
+ if (properties != null) {
+ clusterProperties.putAll(properties);
+ }
+ }
+ return cs;
+ }
+
+ @Override
+ public Set<String> getLiveNodes() {
+ if (liveNodes == null) {
+ throw new RuntimeException("We don't know of any live_nodes to fetch the"
+ + " latest live_nodes information from. "
+ + "If you think your Solr cluster is up and is accessible,"
+ + " you could try re-creating a new CloudSolrClient using working"
+ + " solrUrl(s) or zkHost(s).");
+ }
+ if (TimeUnit.SECONDS.convert((System.nanoTime() - liveNodesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
+ for (String nodeName: liveNodes) {
+ try (HttpSolrClient client = new HttpSolrClient.Builder().
+ withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
+ withHttpClient(httpClient).build()) {
+ Set<String> liveNodes = fetchLiveNodes(client);
+ this.liveNodes = (liveNodes);
+ liveNodesTimestamp = System.nanoTime();
+ return liveNodes;
+ } catch (Exception e) {
+ log.warn("Attempt to fetch live_nodes from " +
+ Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+ }
+ }
+ throw new RuntimeException("Tried fetching live_nodes using all the node names we knew of, i.e. " + liveNodes +". However, "
+ + "succeeded in obtaining the cluster state from none of them."
+ + "If you think your Solr cluster is up and is accessible,"
+ + " you could try re-creating a new CloudSolrClient using working"
+ + " solrUrl(s) or zkHost(s).");
+ } else {
+ return liveNodes; // cached copy is fresh enough
+ }
+ }
+
+ private static Set<String> fetchLiveNodes(SolrClient client) throws Exception {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("action", "CLUSTERSTATUS");
+ QueryRequest request = new QueryRequest(params);
+ request.setPath("/admin/collections");
+ NamedList cluster = (SimpleOrderedMap) client.request(request).get("cluster");
+ Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
+ return liveNodes;
+ }
+
+ @Override
+ public List<String> resolveAlias(String aliasName) {
+ return Aliases.resolveAliasesGivenAliasMap(getAliases(false), aliasName);
+ }
+
+ private Map<String, List<String>> getAliases(boolean forceFetch) {
+ if (this.liveNodes == null) {
+ throw new RuntimeException("We don't know of any live_nodes to fetch the"
+ + " latest aliases information from. "
+ + "If you think your Solr cluster is up and is accessible,"
+ + " you could try re-creating a new CloudSolrClient using working"
+ + " solrUrl(s) or zkHost(s).");
+ }
+
+ if (forceFetch || this.aliases == null ||
+ TimeUnit.SECONDS.convert((System.nanoTime() - aliasesTimestamp), TimeUnit.NANOSECONDS) > getCacheTimeout()) {
+ for (String nodeName: liveNodes) {
+ try (HttpSolrClient client = new HttpSolrClient.Builder().
+ withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
+ withHttpClient(httpClient).build()) {
+
+ Map<String, List<String>> aliases = new CollectionAdminRequest.ListAliases().process(client).getAliasesAsLists();
+ this.aliases = aliases;
+ this.aliasesTimestamp = System.nanoTime();
+ return Collections.unmodifiableMap(this.aliases);
+ } catch (SolrServerException | RemoteSolrException | IOException e) {
+ // Situation where we're hitting an older Solr which doesn't have LISTALIASES
+ if (e instanceof RemoteSolrException && ((RemoteSolrException)e).code()==400) {
+ log.warn("LISTALIASES not found, possibly using older Solr server. Aliases won't work"
+ + " unless you re-create the CloudSolrClient using zkHost(s) or upgrade Solr server", e);
+ this.aliases = Collections.emptyMap();
+ this.aliasesTimestamp = System.nanoTime();
+ return aliases;
+ }
+ log.warn("Attempt to fetch cluster state from " +
+ Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+ }
+ }
+
+ throw new RuntimeException("Tried fetching aliases using all the node names we knew of, i.e. " + liveNodes +". However, "
+ + "succeeded in obtaining the cluster state from none of them."
+ + "If you think your Solr cluster is up and is accessible,"
+ + " you could try re-creating a new CloudSolrClient using a working"
+ + " solrUrl or zkHost.");
+ } else {
+ return Collections.unmodifiableMap(this.aliases); // cached copy is fresh enough
+ }
+ }
+
+ @Override
+ public ClusterState getClusterState() throws IOException {
+ for (String nodeName: liveNodes) {
+ try (HttpSolrClient client = new HttpSolrClient.Builder().
+ withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
+ withHttpClient(httpClient).build()) {
+ ClusterState cs = fetchClusterState(client, null, null);
+ return cs;
+ } catch (SolrServerException | RemoteSolrException | IOException e) {
+ log.warn("Attempt to fetch cluster state from " +
+ Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+ } catch (NotACollectionException e) {
+ // not possible! (we passed in null for collection so it can't be an alias)
+ throw new RuntimeException("null should never cause NotACollectionException in " +
+ "fetchClusterState() Please report this as a bug!");
+ }
+ }
+ throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes +". However, "
+ + "succeeded in obtaining the cluster state from none of them."
+ + "If you think your Solr cluster is up and is accessible,"
+ + " you could try re-creating a new CloudSolrClient using working"
+ + " solrUrl(s) or zkHost(s).");
+ }
+
+ @Override
+ public Map<String, Object> getClusterProperties() {
+ for (String nodeName : liveNodes) {
+ try (HttpSolrClient client = new HttpSolrClient.Builder().
+ withBaseSolrUrl(Utils.getBaseUrlForNodeName(nodeName, urlScheme)).
+ withHttpClient(httpClient).build()) {
+ Map<String, Object> clusterProperties = new HashMap<>();
+ fetchClusterState(client, null, clusterProperties);
+ return clusterProperties;
+ } catch (SolrServerException | RemoteSolrException | IOException e) {
+ log.warn("Attempt to fetch cluster state from " +
+ Utils.getBaseUrlForNodeName(nodeName, urlScheme) + " failed.", e);
+ } catch (NotACollectionException e) {
+ // not possible! (we passed in null for collection so it can't be an alias)
+ throw new RuntimeException("null should never cause NotACollectionException in " +
+ "fetchClusterState() Please report this as a bug!");
+ }
+ }
+ throw new RuntimeException("Tried fetching cluster state using the node names we knew of, i.e. " + liveNodes + ". However, "
+ + "succeeded in obtaining the cluster state from none of them."
+ + "If you think your Solr cluster is up and is accessible,"
+ + " you could try re-creating a new CloudSolrClient using working"
+ + " solrUrl(s) or zkHost(s).");
+ }
+
+ @Override
+ public String getPolicyNameByCollection(String coll) {
+ throw new UnsupportedOperationException("Fetching cluster properties not supported"
+ + " using the HttpClusterStateProvider. "
+ + "ZkClientClusterStateProvider can be used for this."); // TODO
+ }
+
+ @Override
+ public Object getClusterProperty(String propertyName) {
+ if (propertyName.equals(ZkStateReader.URL_SCHEME)) {
+ return this.urlScheme;
+ }
+ return getClusterProperties().get(propertyName);
+ }
+
+ @Override
+ public void connect() {}
+
+ public int getCacheTimeout() {
+ return cacheTimeout;
+ }
+
+ public void setCacheTimeout(int cacheTimeout) {
+ this.cacheTimeout = cacheTimeout;
+ }
+
+ // This exception is not meant to escape this class it should be caught and wrapped.
+ private class NotACollectionException extends Exception {
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
index 8856be5..8831448 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
@@ -66,6 +66,7 @@ import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.V2RequestSupport;
@@ -90,7 +91,7 @@ import static org.apache.solr.common.util.Utils.getObjectByPath;
/**
* A SolrClient implementation that talks directly to a Solr server via HTTP
*/
-public class HttpSolrClient extends BaseHttpSolrClient {
+public class HttpSolrClient extends SolrClient {
private static final String UTF_8 = StandardCharsets.UTF_8.name();
private static final String DEFAULT_PATH = "/select";
@@ -559,7 +560,7 @@ public class HttpSolrClient extends BaseHttpSolrClient {
} else {
contentType = "";
}
-
+
// handle some http level checks before trying to parse the response
switch (httpStatus) {
case HttpStatus.SC_OK:
@@ -797,26 +798,53 @@ s * @deprecated since 7.0 Use {@link Builder} methods instead.
this.useMultiPartPost = useMultiPartPost;
}
-
/**
- * @deprecated since 8.0, catch {@link BaseHttpSolrClient.RemoteSolrException} instead
+ * Subclass of SolrException that allows us to capture an arbitrary HTTP
+ * status code that may have been returned by the remote server or a
+ * proxy along the way.
*/
- @Deprecated
- public static class RemoteSolrException extends BaseHttpSolrClient.RemoteSolrException {
-
+ public static class RemoteSolrException extends SolrException {
+ /**
+ * @param remoteHost the host the error was received from
+ * @param code Arbitrary HTTP status code
+ * @param msg Exception Message
+ * @param th Throwable to wrap with this Exception
+ */
public RemoteSolrException(String remoteHost, int code, String msg, Throwable th) {
- super(remoteHost, code, msg, th);
+ super(code, "Error from server at " + remoteHost + ": " + msg, th);
}
}
/**
- * @deprecated since 8.0, catch {@link BaseHttpSolrClient.RemoteExecutionException} instead
+ * This should be thrown when a server has an error in executing the request and
+ * it sends a proper payload back to the client
*/
- @Deprecated
- public static class RemoteExecutionException extends BaseHttpSolrClient.RemoteExecutionException {
+ public static class RemoteExecutionException extends RemoteSolrException {
+ private NamedList meta;
public RemoteExecutionException(String remoteHost, int code, String msg, NamedList meta) {
- super(remoteHost, code, msg, meta);
+ super(remoteHost, code, msg, null);
+ this.meta = meta;
+ }
+
+
+ public static RemoteExecutionException create(String host, NamedList errResponse) {
+ Object errObj = errResponse.get("error");
+ if (errObj != null) {
+ Number code = (Number) getObjectByPath(errObj, true, Collections.singletonList("code"));
+ String msg = (String) getObjectByPath(errObj, true, Collections.singletonList("msg"));
+ return new RemoteExecutionException(host, code == null ? ErrorCode.UNKNOWN.code : code.intValue(),
+ msg == null ? "Unknown Error" : msg, errResponse);
+
+ } else {
+ throw new RuntimeException("No error");
+ }
+
+ }
+
+ public NamedList getMetaData() {
+
+ return meta;
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
index 3eb20eb..8a142bf 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
@@ -31,10 +31,8 @@ import java.util.Objects;
import java.util.Set;
import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
-import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrInputDocument;
@@ -238,21 +236,25 @@ public class UpdateRequest extends AbstractUpdateRequest {
params.set(UpdateParams.COMMIT, "true");
return process(client, collection);
}
-
- private interface ReqSupplier<T extends LBSolrClient.Req> {
- T get(SolrRequest solrRequest, List<String> servers);
- }
-
- private <T extends LBSolrClient.Req> Map<String, T> getRoutes(DocRouter router,
- DocCollection col, Map<String,List<String>> urlMap,
- ModifiableSolrParams params, String idField,
- ReqSupplier<T> reqSupplier) {
+
+ /**
+ * @param router to route updates with
+ * @param col DocCollection for the updates
+ * @param urlMap of the cluster
+ * @param params params to use
+ * @param idField the id field
+ * @return a Map of urls to requests
+ */
+ public Map<String,LBHttpSolrClient.Req> getRoutes(DocRouter router,
+ DocCollection col, Map<String,List<String>> urlMap,
+ ModifiableSolrParams params, String idField) {
+
if ((documents == null || documents.size() == 0)
&& (deleteById == null || deleteById.size() == 0)) {
return null;
}
-
- Map<String,T> routes = new HashMap<>();
+
+ Map<String,LBHttpSolrClient.Req> routes = new HashMap<>();
if (documents != null) {
Set<Entry<SolrInputDocument,Map<String,Object>>> entries = documents.entrySet();
for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
@@ -271,7 +273,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
return null;
}
String leaderUrl = urls.get(0);
- T request = routes
+ LBHttpSolrClient.Req request = routes
.get(leaderUrl);
if (request == null) {
UpdateRequest updateRequest = new UpdateRequest();
@@ -281,7 +283,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
updateRequest.setPath(getPath());
updateRequest.setBasicAuthCredentials(getBasicAuthUser(), getBasicAuthPassword());
updateRequest.setResponseParser(getResponseParser());
- request = reqSupplier.get(updateRequest, urls);
+ request = new LBHttpSolrClient.Req(updateRequest, urls);
routes.put(leaderUrl, request);
}
UpdateRequest urequest = (UpdateRequest) request.getRequest();
@@ -297,17 +299,17 @@ public class UpdateRequest extends AbstractUpdateRequest {
}
}
}
-
+
// Route the deleteById's
-
+
if (deleteById != null) {
-
+
Iterator<Map.Entry<String,Map<String,Object>>> entries = deleteById.entrySet()
.iterator();
while (entries.hasNext()) {
-
+
Map.Entry<String,Map<String,Object>> entry = entries.next();
-
+
String deleteId = entry.getKey();
Map<String,Object> map = entry.getValue();
Long version = null;
@@ -323,7 +325,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
return null;
}
String leaderUrl = urls.get(0);
- T request = routes.get(leaderUrl);
+ LBHttpSolrClient.Req request = routes.get(leaderUrl);
if (request != null) {
UpdateRequest urequest = (UpdateRequest) request.getRequest();
urequest.deleteById(deleteId, version);
@@ -333,7 +335,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
urequest.deleteById(deleteId, version);
urequest.setCommitWithin(getCommitWithin());
urequest.setBasicAuthCredentials(getBasicAuthUser(), getBasicAuthPassword());
- request = reqSupplier.get(urequest, urls);
+ request = new LBHttpSolrClient.Req(urequest, urls);
routes.put(leaderUrl, request);
}
}
@@ -342,36 +344,6 @@ public class UpdateRequest extends AbstractUpdateRequest {
return routes;
}
- /**
- * @param router to route updates with
- * @param col DocCollection for the updates
- * @param urlMap of the cluster
- * @param params params to use
- * @param idField the id field
- * @return a Map of urls to requests
- */
- public Map<String, LBSolrClient.Req> getRoutesToCollection(DocRouter router,
- DocCollection col, Map<String,List<String>> urlMap,
- ModifiableSolrParams params, String idField) {
- return getRoutes(router, col, urlMap, params, idField, LBSolrClient.Req::new);
- }
-
- /**
- * @param router to route updates with
- * @param col DocCollection for the updates
- * @param urlMap of the cluster
- * @param params params to use
- * @param idField the id field
- * @return a Map of urls to requests
- * @deprecated since 8.0, uses {@link #getRoutesToCollection(DocRouter, DocCollection, Map, ModifiableSolrParams, String)} instead
- */
- @Deprecated
- public Map<String,LBHttpSolrClient.Req> getRoutes(DocRouter router,
- DocCollection col, Map<String,List<String>> urlMap,
- ModifiableSolrParams params, String idField) {
- return getRoutes(router, col, urlMap, params, idField, LBHttpSolrClient.Req::new);
- }
-
public void setDocIterator(Iterator<SolrInputDocument> docIterator) {
this.docIterator = docIterator;
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBadInputTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBadInputTest.java
deleted file mode 100644
index 6206d4d..0000000
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBadInputTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.collect.Lists;
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.cloud.SolrCloudTestCase;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.hamcrest.core.StringContains.containsString;
-
-public class CloudHttp2SolrClientBadInputTest extends SolrCloudTestCase {
- private static final List<String> NULL_STR_LIST = null;
- private static final List<String> EMPTY_STR_LIST = new ArrayList<>();
- private static final String ANY_COLLECTION = "ANY_COLLECTION";
- private static final int ANY_COMMIT_WITHIN_TIME = -1;
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- configureCluster(1)
- .configure();
-
- final List<String> solrUrls = new ArrayList<>();
- }
-
- @Test
- public void testDeleteByIdReportsInvalidIdLists() throws Exception {
- try (SolrClient client = getCloudHttp2SolrClient(cluster)) {
- assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "null"), () -> {
- client.deleteById(ANY_COLLECTION, NULL_STR_LIST);
- });
- assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "empty"), () -> {
- client.deleteById(ANY_COLLECTION, EMPTY_STR_LIST);
- });
- assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "null"), () -> {
- client.deleteById(ANY_COLLECTION, NULL_STR_LIST, ANY_COMMIT_WITHIN_TIME);
- });
- assertExceptionThrownWithMessageContaining(IllegalArgumentException.class, Lists.newArrayList("ids", "empty"), () -> {
- client.deleteById(ANY_COLLECTION, EMPTY_STR_LIST, ANY_COMMIT_WITHIN_TIME);
- });
- }
- }
-
- private void assertExceptionThrownWithMessageContaining(Class expectedType, List<String> expectedStrings, LuceneTestCase.ThrowingRunnable runnable) {
- Throwable thrown = expectThrows(expectedType, runnable);
-
- if (expectedStrings != null) {
- for (String expectedString : expectedStrings) {
- assertThat(thrown.getMessage(), containsString(expectedString));
- }
- }
- }
-}
\ No newline at end of file
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBuilderTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBuilderTest.java
deleted file mode 100644
index 72d5e8c..0000000
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientBuilderTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
-import org.apache.lucene.util.LuceneTestCase;
-import org.junit.Test;
-
-public class CloudHttp2SolrClientBuilderTest extends LuceneTestCase {
- private static final String ANY_CHROOT = "/ANY_CHROOT";
- private static final String ANY_ZK_HOST = "ANY_ZK_HOST";
- private static final String ANY_OTHER_ZK_HOST = "ANY_OTHER_ZK_HOST";
-
- @Test
- public void testSingleZkHostSpecified() throws IOException {
- try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(ANY_ZK_HOST), Optional.of(ANY_CHROOT))
- .build()) {
- final String clientZkHost = createdClient.getZkHost();
-
- assertTrue(clientZkHost.contains(ANY_ZK_HOST));
- }
- }
-
- @Test
- public void testSeveralZkHostsSpecifiedSingly() throws IOException {
- final List<String> zkHostList = new ArrayList<>();
- zkHostList.add(ANY_ZK_HOST); zkHostList.add(ANY_OTHER_ZK_HOST);
- try (CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(zkHostList, Optional.of(ANY_CHROOT))
- .build()) {
- final String clientZkHost = createdClient.getZkHost();
-
- assertTrue(clientZkHost.contains(ANY_ZK_HOST));
- assertTrue(clientZkHost.contains(ANY_OTHER_ZK_HOST));
- }
- }
-
- @Test
- public void testSeveralZkHostsSpecifiedTogether() throws IOException {
- final ArrayList<String> zkHosts = new ArrayList<String>();
- zkHosts.add(ANY_ZK_HOST);
- zkHosts.add(ANY_OTHER_ZK_HOST);
- try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(zkHosts, Optional.of(ANY_CHROOT)).build()) {
- final String clientZkHost = createdClient.getZkHost();
-
- assertTrue(clientZkHost.contains(ANY_ZK_HOST));
- assertTrue(clientZkHost.contains(ANY_OTHER_ZK_HOST));
- }
- }
-
- @Test
- public void testByDefaultConfiguresClientToSendUpdatesOnlyToShardLeaders() throws IOException {
- try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(ANY_ZK_HOST), Optional.of(ANY_CHROOT)).build()) {
- assertTrue(createdClient.isUpdatesToLeaders() == true);
- }
- }
-
- @Test
- public void testIsDirectUpdatesToLeadersOnlyDefault() throws IOException {
- try(CloudHttp2SolrClient createdClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(ANY_ZK_HOST), Optional.of(ANY_CHROOT)).build()) {
- assertFalse(createdClient.isDirectUpdatesToLeadersOnly());
- }
- }
-
-}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientMultiConstructorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientMultiConstructorTest.java
deleted file mode 100644
index fa3425b..0000000
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientMultiConstructorTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Optional;
-
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.TestUtil;
-import org.junit.Test;
-
-public class CloudHttp2SolrClientMultiConstructorTest extends LuceneTestCase {
-
- /*
- * NOTE: If you only include one String argument, it will NOT use the
- * constructor with the variable argument list, which is the one that
- * we are testing here.
- */
- Collection<String> hosts;
-
- @Test
- // commented out on: 24-Dec-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 20-Sep-2018
- public void testZkConnectionStringConstructorWithValidChroot() throws IOException {
- boolean setOrList = random().nextBoolean();
- int numOfZKServers = TestUtil.nextInt(random(), 1, 5);
- boolean withChroot = random().nextBoolean();
-
- final String chroot = "/mychroot";
-
- StringBuilder sb = new StringBuilder();
-
- if(setOrList) {
- /*
- A LinkedHashSet is required here for testing, or we can't guarantee
- the order of entries in the final string.
- */
- hosts = new LinkedHashSet<>();
- } else {
- hosts = new ArrayList<>();
- }
-
- for(int i=0; i<numOfZKServers; i++) {
- String ZKString = "host" + i + ":2181";
- hosts.add(ZKString);
- sb.append(ZKString);
- if(i<numOfZKServers -1) sb.append(",");
- }
-
- String clientChroot = null;
- if (withChroot) {
- sb.append(chroot);
- clientChroot = "/mychroot";
- }
-
- try (CloudHttp2SolrClient client = new CloudHttp2SolrClient.Builder(new ArrayList<>(hosts), Optional.ofNullable(clientChroot)).build()) {
- assertEquals(sb.toString(), client.getZkHost());
- }
- }
-
- @Test(expected = IllegalArgumentException.class)
- // commented out on: 24-Dec-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 20-Sep-2018
- public void testBadChroot() {
- final List<String> zkHosts = new ArrayList<>();
- zkHosts.add("host1:2181");
- new CloudHttp2SolrClient.Builder(zkHosts, Optional.of("foo")).build();
- }
-}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientRetryTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientRetryTest.java
deleted file mode 100644
index 52a4b84..0000000
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientRetryTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.util.Collections;
-import java.util.Optional;
-
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.cloud.SolrCloudTestCase;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.util.TestInjection;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class CloudHttp2SolrClientRetryTest extends SolrCloudTestCase {
- private static final int NODE_COUNT = 1;
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- configureCluster(NODE_COUNT)
- .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
- .configure();
- }
-
- @Test
- public void testRetry() throws Exception {
- String collectionName = "testRetry";
- try (CloudHttp2SolrClient solrClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty()).build()) {
- CollectionAdminRequest.createCollection(collectionName, 1, 1)
- .process(solrClient);
-
- solrClient.add(collectionName, new SolrInputDocument("id", "1"));
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CommonParams.QT, "/admin/metrics");
- String updateRequestCountKey = "solr.core.testRetry.shard1.replica_n1:UPDATE./update.requestTimes:count";
- params.set("key", updateRequestCountKey);
- params.set("indent", "true");
-
- QueryResponse response = solrClient.query(collectionName, params, SolrRequest.METHOD.GET);
- NamedList<Object> namedList = response.getResponse();
- System.out.println(namedList);
- NamedList metrics = (NamedList) namedList.get("metrics");
- assertEquals(1L, metrics.get(updateRequestCountKey));
-
- TestInjection.failUpdateRequests = "true:100";
- try {
- expectThrows(BaseCloudSolrClient.RouteException.class,
- "Expected an exception on the client when failure is injected during updates", () -> {
- solrClient.add(collectionName, new SolrInputDocument("id", "2"));
- });
- } finally {
- TestInjection.reset();
- }
-
- response = solrClient.query(collectionName, params, SolrRequest.METHOD.GET);
- namedList = response.getResponse();
- System.out.println(namedList);
- metrics = (NamedList) namedList.get("metrics");
- assertEquals(2L, metrics.get(updateRequestCountKey));
- }
- }
-}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
deleted file mode 100644
index de8c311..0000000
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
+++ /dev/null
@@ -1,978 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.lucene.util.LuceneTestCase.Slow;
-import org.apache.lucene.util.TestUtil;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.client.solrj.response.RequestStatusState;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.AbstractDistribZkTestBase;
-import org.apache.solr.cloud.SolrCloudTestCase;
-import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.handler.admin.CollectionsHandler;
-import org.apache.solr.handler.admin.ConfigSetsHandler;
-import org.apache.solr.handler.admin.CoreAdminHandler;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.client.solrj.impl.BaseCloudSolrClient.*;
-
-
-/**
- * This test would be faster if we simulated the zk state instead.
- */
-@Slow
-public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private static final String COLLECTION = "collection1";
- private static final String COLLECTION2 = "2nd_collection";
-
- private static final String id = "id";
-
- private static final int TIMEOUT = 30;
- private static final int NODE_COUNT = 3;
-
- private static CloudHttp2SolrClient httpBasedCloudSolrClient = null;
- private static CloudHttp2SolrClient zkBasedCloudSolrClient = null;
-
- @Before
- public void setupCluster() throws Exception {
- configureCluster(NODE_COUNT)
- .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
- .configure();
-
- final List<String> solrUrls = new ArrayList<>();
- solrUrls.add(cluster.getJettySolrRunner(0).getBaseUrl().toString());
- httpBasedCloudSolrClient = new CloudHttp2SolrClient.Builder(solrUrls).build();
- zkBasedCloudSolrClient = new CloudHttp2SolrClient.Builder(Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty()).build();
- }
-
-
- @After
- public void tearDown() throws Exception {
- if (httpBasedCloudSolrClient != null) {
- try {
- httpBasedCloudSolrClient.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- if (zkBasedCloudSolrClient != null) {
- try {
- zkBasedCloudSolrClient.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- shutdownCluster();
- super.tearDown();
- }
-
- /**
- * Randomly return the cluster's ZK based CSC, or HttpClusterProvider based CSC.
- */
- private CloudHttp2SolrClient getRandomClient() {
-// return random().nextBoolean()? zkBasedCloudSolrClient : httpBasedCloudSolrClient;
- return httpBasedCloudSolrClient;
- }
-
- @Test
- public void testParallelUpdateQTime() throws Exception {
- CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
- cluster.waitForActiveCollection(COLLECTION, 2, 2);
- UpdateRequest req = new UpdateRequest();
- for (int i=0; i<10; i++) {
- SolrInputDocument doc = new SolrInputDocument();
- doc.addField("id", String.valueOf(TestUtil.nextInt(random(), 1000, 1100)));
- req.add(doc);
- }
- UpdateResponse response = req.process(getRandomClient(), COLLECTION);
- // See SOLR-6547, we just need to ensure that no exception is thrown here
- assertTrue(response.getQTime() >= 0);
- }
-
- @Test
- public void testOverwriteOption() throws Exception {
-
- CollectionAdminRequest.createCollection("overwrite", "conf", 1, 1)
- .processAndWait(cluster.getSolrClient(), TIMEOUT);
- cluster.waitForActiveCollection("overwrite", 1, 1);
-
- new UpdateRequest()
- .add("id", "0", "a_t", "hello1")
- .add("id", "0", "a_t", "hello2")
- .commit(cluster.getSolrClient(), "overwrite");
-
- QueryResponse resp = cluster.getSolrClient().query("overwrite", new SolrQuery("*:*"));
- assertEquals("There should be one document because overwrite=true", 1, resp.getResults().getNumFound());
-
- new UpdateRequest()
- .add(new SolrInputDocument(id, "1", "a_t", "hello1"), /* overwrite = */ false)
- .add(new SolrInputDocument(id, "1", "a_t", "hello2"), false)
- .commit(cluster.getSolrClient(), "overwrite");
-
- resp = getRandomClient().query("overwrite", new SolrQuery("*:*"));
- assertEquals("There should be 3 documents because there should be two id=1 docs due to overwrite=false", 3, resp.getResults().getNumFound());
-
- }
-
- @Test
- public void testAliasHandling() throws Exception {
- CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
- cluster.waitForActiveCollection(COLLECTION, 2, 2);
-
- CollectionAdminRequest.createCollection(COLLECTION2, "conf", 2, 1).process(cluster.getSolrClient());
- cluster.waitForActiveCollection(COLLECTION2, 2, 2);
-
- CloudHttp2SolrClient client = getRandomClient();
- SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
- client.add(COLLECTION, doc);
- client.commit(COLLECTION);
- CollectionAdminRequest.createAlias("testalias", COLLECTION).process(cluster.getSolrClient());
-
- SolrInputDocument doc2 = new SolrInputDocument("id", "2", "title_s", "my doc too");
- client.add(COLLECTION2, doc2);
- client.commit(COLLECTION2);
- CollectionAdminRequest.createAlias("testalias2", COLLECTION2).process(cluster.getSolrClient());
-
- CollectionAdminRequest.createAlias("testaliascombined", COLLECTION + "," + COLLECTION2).process(cluster.getSolrClient());
-
- // ensure that the aliases have been registered
- Map<String, String> aliases = new CollectionAdminRequest.ListAliases().process(cluster.getSolrClient()).getAliases();
- assertEquals(COLLECTION, aliases.get("testalias"));
- assertEquals(COLLECTION2, aliases.get("testalias2"));
- assertEquals(COLLECTION + "," + COLLECTION2, aliases.get("testaliascombined"));
-
- assertEquals(1, client.query(COLLECTION, params("q", "*:*")).getResults().getNumFound());
- assertEquals(1, client.query("testalias", params("q", "*:*")).getResults().getNumFound());
-
- assertEquals(1, client.query(COLLECTION2, params("q", "*:*")).getResults().getNumFound());
- assertEquals(1, client.query("testalias2", params("q", "*:*")).getResults().getNumFound());
-
- assertEquals(2, client.query("testaliascombined", params("q", "*:*")).getResults().getNumFound());
-
- ModifiableSolrParams paramsWithBothCollections = params("q", "*:*", "collection", COLLECTION + "," + COLLECTION2);
- assertEquals(2, client.query(null, paramsWithBothCollections).getResults().getNumFound());
-
- ModifiableSolrParams paramsWithBothAliases = params("q", "*:*", "collection", "testalias,testalias2");
- assertEquals(2, client.query(null, paramsWithBothAliases).getResults().getNumFound());
-
- ModifiableSolrParams paramsWithCombinedAlias = params("q", "*:*", "collection", "testaliascombined");
- assertEquals(2, client.query(null, paramsWithCombinedAlias).getResults().getNumFound());
-
- ModifiableSolrParams paramsWithMixedCollectionAndAlias = params("q", "*:*", "collection", "testalias," + COLLECTION2);
- assertEquals(2, client.query(null, paramsWithMixedCollectionAndAlias).getResults().getNumFound());
- }
-
- @Test
- public void testRouting() throws Exception {
- CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1).process(cluster.getSolrClient());
- cluster.waitForActiveCollection("routing_collection", 2, 2);
-
- AbstractUpdateRequest request = new UpdateRequest()
- .add(id, "0", "a_t", "hello1")
- .add(id, "2", "a_t", "hello2")
- .setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
-
- // Test single threaded routed updates for UpdateRequest
- NamedList<Object> response = getRandomClient().request(request, "routing_collection");
- if (getRandomClient().isDirectUpdatesToLeadersOnly()) {
- checkSingleServer(response);
- }
- RouteResponse rr = (RouteResponse) response;
- Map<String,LBSolrClient.Req> routes = rr.getRoutes();
- Iterator<Map.Entry<String,LBSolrClient.Req>> it = routes.entrySet()
- .iterator();
- while (it.hasNext()) {
- Map.Entry<String,LBSolrClient.Req> entry = it.next();
- String url = entry.getKey();
- UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
- .getRequest();
- SolrInputDocument doc = updateRequest.getDocuments().get(0);
- String id = doc.getField("id").getValue().toString();
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.add("q", "id:" + id);
- params.add("distrib", "false");
- QueryRequest queryRequest = new QueryRequest(params);
- try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
- QueryResponse queryResponse = queryRequest.process(solrClient);
- SolrDocumentList docList = queryResponse.getResults();
- assertTrue(docList.getNumFound() == 1);
- }
- }
-
- // Test the deleteById routing for UpdateRequest
-
- final UpdateResponse uResponse = new UpdateRequest()
- .deleteById("0")
- .deleteById("2")
- .commit(cluster.getSolrClient(), "routing_collection");
- if (getRandomClient().isDirectUpdatesToLeadersOnly()) {
- checkSingleServer(uResponse.getResponse());
- }
-
- QueryResponse qResponse = getRandomClient().query("routing_collection", new SolrQuery("*:*"));
- SolrDocumentList docs = qResponse.getResults();
- assertEquals(0, docs.getNumFound());
-
- // Test Multi-Threaded routed updates for UpdateRequest
- try (CloudSolrClient threadedClient = new CloudSolrClientBuilder
- (Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
- .withParallelUpdates(true)
- .build()) {
- threadedClient.setDefaultCollection("routing_collection");
- response = threadedClient.request(request);
- if (threadedClient.isDirectUpdatesToLeadersOnly()) {
- checkSingleServer(response);
- }
- rr = (RouteResponse) response;
- routes = rr.getRoutes();
- it = routes.entrySet()
- .iterator();
- while (it.hasNext()) {
- Map.Entry<String,LBSolrClient.Req> entry = it.next();
- String url = entry.getKey();
- UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
- .getRequest();
- SolrInputDocument doc = updateRequest.getDocuments().get(0);
- String id = doc.getField("id").getValue().toString();
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.add("q", "id:" + id);
- params.add("distrib", "false");
- QueryRequest queryRequest = new QueryRequest(params);
- try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
- QueryResponse queryResponse = queryRequest.process(solrClient);
- SolrDocumentList docList = queryResponse.getResults();
- assertTrue(docList.getNumFound() == 1);
- }
- }
- }
-
- // Test that queries with _route_ params are routed by the client
-
- // Track request counts on each node before query calls
- ClusterState clusterState = cluster.getSolrClient().getZkStateReader().getClusterState();
- DocCollection col = clusterState.getCollection("routing_collection");
- Map<String, Long> requestCountsMap = Maps.newHashMap();
- for (Slice slice : col.getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
- requestCountsMap.put(baseURL, getNumRequests(baseURL, "routing_collection"));
- }
- }
-
- // Collect the base URLs of the replicas of shard that's expected to be hit
- DocRouter router = col.getRouter();
- Collection<Slice> expectedSlices = router.getSearchSlicesSingle("0", null, col);
- Set<String> expectedBaseURLs = Sets.newHashSet();
- for (Slice expectedSlice : expectedSlices) {
- for (Replica replica : expectedSlice.getReplicas()) {
- String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
- expectedBaseURLs.add(baseURL);
- }
- }
-
- assertTrue("expected urls is not fewer than all urls! expected=" + expectedBaseURLs
- + "; all=" + requestCountsMap.keySet(),
- expectedBaseURLs.size() < requestCountsMap.size());
-
- // Calculate a number of shard keys that route to the same shard.
- int n;
- if (TEST_NIGHTLY) {
- n = random().nextInt(999) + 2;
- } else {
- n = random().nextInt(9) + 2;
- }
-
- List<String> sameShardRoutes = Lists.newArrayList();
- sameShardRoutes.add("0");
- for (int i = 1; i < n; i++) {
- String shardKey = Integer.toString(i);
- Collection<Slice> slices = router.getSearchSlicesSingle(shardKey, null, col);
- log.info("Expected Slices {}", slices);
- if (expectedSlices.equals(slices)) {
- sameShardRoutes.add(shardKey);
- }
- }
-
- assertTrue(sameShardRoutes.size() > 1);
-
- // Do N queries with _route_ parameter to the same shard
- for (int i = 0; i < n; i++) {
- ModifiableSolrParams solrParams = new ModifiableSolrParams();
- solrParams.set(CommonParams.Q, "*:*");
- solrParams.set(ShardParams._ROUTE_, sameShardRoutes.get(random().nextInt(sameShardRoutes.size())));
- log.info("output: {}", getRandomClient().query("routing_collection", solrParams));
- }
-
- // Request counts increase from expected nodes should aggregate to 1000, while there should be
- // no increase in unexpected nodes.
- int increaseFromExpectedUrls = 0;
- int increaseFromUnexpectedUrls = 0;
- Map<String, Long> numRequestsToUnexpectedUrls = Maps.newHashMap();
- for (Slice slice : col.getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- String baseURL = (String) replica.get(ZkStateReader.BASE_URL_PROP);
-
- Long prevNumRequests = requestCountsMap.get(baseURL);
- Long curNumRequests = getNumRequests(baseURL, "routing_collection");
-
- long delta = curNumRequests - prevNumRequests;
- if (expectedBaseURLs.contains(baseURL)) {
- increaseFromExpectedUrls += delta;
- } else {
- increaseFromUnexpectedUrls += delta;
- numRequestsToUnexpectedUrls.put(baseURL, delta);
- }
- }
- }
-
- assertEquals("Unexpected number of requests to expected URLs", n, increaseFromExpectedUrls);
- assertEquals("Unexpected number of requests to unexpected URLs: " + numRequestsToUnexpectedUrls,
- 0, increaseFromUnexpectedUrls);
-
- }
-
- /**
- * Tests if the specification of 'preferLocalShards' in the query-params
- * limits the distributed query to locally hosted shards only
- */
- @Test
- // commented 4-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
- public void preferLocalShardsTest() throws Exception {
-
- String collectionName = "localShardsTestColl";
-
- int liveNodes = cluster.getJettySolrRunners().size();
-
- // For preferLocalShards to succeed in a test, every shard should have
- // all its cores on the same node.
- // Hence the below configuration for our collection
- CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, liveNodes)
- .setMaxShardsPerNode(liveNodes * liveNodes)
- .processAndWait(cluster.getSolrClient(), TIMEOUT);
- cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * liveNodes);
- // Add some new documents
- new UpdateRequest()
- .add(id, "0", "a_t", "hello1")
- .add(id, "2", "a_t", "hello2")
- .add(id, "3", "a_t", "hello2")
- .commit(getRandomClient(), collectionName);
-
- // Run the actual test for 'preferLocalShards'
- queryWithShardsPreferenceRules(getRandomClient(), false, collectionName);
- queryWithShardsPreferenceRules(getRandomClient(), true, collectionName);
- }
-
- @SuppressWarnings("deprecation")
- private void queryWithShardsPreferenceRules(CloudHttp2SolrClient cloudClient,
- boolean useShardsPreference,
- String collectionName)
- throws Exception
- {
- SolrQuery qRequest = new SolrQuery("*:*");
-
- ModifiableSolrParams qParams = new ModifiableSolrParams();
- if (useShardsPreference) {
- qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":" + ShardParams.REPLICA_LOCAL);
- } else {
- qParams.add(CommonParams.PREFER_LOCAL_SHARDS, "true");
- }
- qParams.add(ShardParams.SHARDS_INFO, "true");
- qRequest.add(qParams);
-
- // CloudSolrClient sends the request to some node.
- // And since all the nodes are hosting cores from all shards, the
- // distributed query formed by this node will select cores from the
- // local shards only
- QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
-
- Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
- assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
-
- // Iterate over shards-info and check what cores responded
- SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
- Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
- List<String> shardAddresses = new ArrayList<String>();
- while (itr.hasNext()) {
- Map.Entry<String, ?> e = itr.next();
- assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
- String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
- assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
- shardAddresses.add(shardAddress);
- }
- log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
-
- // Make sure the distributed queries were directed to a single node only
- Set<Integer> ports = new HashSet<Integer>();
- for (String shardAddr: shardAddresses) {
- URL url = new URL (shardAddr);
- ports.add(url.getPort());
- }
-
- // This assertion would hold true as long as every shard has a core on each node
- assertTrue ("Response was not received from shards on a single node",
- shardAddresses.size() > 1 && ports.size()==1);
- }
-
- private Long getNumRequests(String baseUrl, String collectionName) throws
- SolrServerException, IOException {
- return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);
- }
-
- private Long getNumRequests(String baseUrl, String collectionName, String category, String key, String scope, boolean returnNumErrors) throws
- SolrServerException, IOException {
-
- NamedList<Object> resp;
- try (HttpSolrClient client = getHttpSolrClient(baseUrl + "/"+ collectionName, 15000, 60000)) {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set("qt", "/admin/mbeans");
- params.set("stats", "true");
- params.set("key", key);
- params.set("cat", category);
- // use generic request to avoid extra processing of queries
- QueryRequest req = new QueryRequest(params);
- resp = client.request(req);
- }
- String name;
- if (returnNumErrors) {
- name = category + "." + (scope != null ? scope : key) + ".errors";
- } else {
- name = category + "." + (scope != null ? scope : key) + ".requests";
- }
- Map<String,Object> map = (Map<String,Object>)resp.findRecursive("solr-mbeans", category, key, "stats");
- if (map == null) {
- return null;
- }
- if (scope != null) { // admin handler uses a meter instead of counter here
- return (Long)map.get(name + ".count");
- } else {
- return (Long) map.get(name);
- }
- }
-
- @Test
- public void testNonRetryableRequests() throws Exception {
- try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
- // important to have one replica on each node
- RequestStatusState state = CollectionAdminRequest.createCollection("foo", "conf", 1, NODE_COUNT).processAndWait(client, 60);
- if (state == RequestStatusState.COMPLETED) {
- cluster.waitForActiveCollection("foo", 1, NODE_COUNT);
- client.setDefaultCollection("foo");
-
- Map<String, String> adminPathToMbean = new HashMap<>(CommonParams.ADMIN_PATHS.size());
- adminPathToMbean.put(CommonParams.COLLECTIONS_HANDLER_PATH, CollectionsHandler.class.getName());
- adminPathToMbean.put(CommonParams.CORES_HANDLER_PATH, CoreAdminHandler.class.getName());
- adminPathToMbean.put(CommonParams.CONFIGSETS_HANDLER_PATH, ConfigSetsHandler.class.getName());
- // we do not add the authc/authz handlers because they do not currently expose any mbeans
-
- for (String adminPath : adminPathToMbean.keySet()) {
- long errorsBefore = 0;
- for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
- Long numRequests = getNumRequests(runner.getBaseUrl().toString(), "foo", "ADMIN", adminPathToMbean.get(adminPath), adminPath, true);
- errorsBefore += numRequests;
- log.info("Found {} requests to {} on {}", numRequests, adminPath, runner.getBaseUrl());
- }
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set("qt", adminPath);
- params.set("action", "foobar"); // this should cause an error
- QueryRequest req = new QueryRequest(params);
- try {
- NamedList<Object> resp = client.request(req);
- fail("call to foo for admin path " + adminPath + " should have failed");
- } catch (Exception e) {
- // expected
- }
- long errorsAfter = 0;
- for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
- Long numRequests = getNumRequests(runner.getBaseUrl().toString(), "foo", "ADMIN", adminPathToMbean.get(adminPath), adminPath, true);
- errorsAfter += numRequests;
- log.info("Found {} requests to {} on {}", numRequests, adminPath, runner.getBaseUrl());
- }
- assertEquals(errorsBefore + 1, errorsAfter);
- }
- } else {
- fail("Collection could not be created within 60 seconds");
- }
- }
- }
-
- @Test
- public void checkCollectionParameters() throws Exception {
-
- try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress())) {
-
- String async1 = CollectionAdminRequest.createCollection("multicollection1", "conf", 2, 1)
- .processAsync(client);
- String async2 = CollectionAdminRequest.createCollection("multicollection2", "conf", 2, 1)
- .processAsync(client);
-
- CollectionAdminRequest.waitForAsyncRequest(async1, client, TIMEOUT);
- CollectionAdminRequest.waitForAsyncRequest(async2, client, TIMEOUT);
- cluster.waitForActiveCollection("multicollection1", 2, 2);
- cluster.waitForActiveCollection("multicollection2", 2, 2);
- client.setDefaultCollection("multicollection1");
-
- List<SolrInputDocument> docs = new ArrayList<>(3);
- for (int i = 0; i < 3; i++) {
- SolrInputDocument doc = new SolrInputDocument();
- doc.addField(id, Integer.toString(i));
- doc.addField("a_t", "hello");
- docs.add(doc);
- }
-
- client.add(docs); // default - will add them to multicollection1
- client.commit();
-
- ModifiableSolrParams queryParams = new ModifiableSolrParams();
- queryParams.add("q", "*:*");
- assertEquals(3, client.query(queryParams).getResults().size());
- assertEquals(0, client.query("multicollection2", queryParams).getResults().size());
-
- SolrQuery query = new SolrQuery("*:*");
- query.set("collection", "multicollection2");
- assertEquals(0, client.query(query).getResults().size());
-
- client.add("multicollection2", docs);
- client.commit("multicollection2");
-
- assertEquals(3, client.query("multicollection2", queryParams).getResults().size());
-
- }
-
- }
-
- @Test
- public void stateVersionParamTest() throws Exception {
- CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
- cluster.waitForActiveCollection(COLLECTION, 2, 2);
-
- DocCollection coll = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
- Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
-
- SolrQuery q = new SolrQuery().setQuery("*:*");
- HttpSolrClient.RemoteSolrException sse = null;
-
- final String url = r.getStr(ZkStateReader.BASE_URL_PROP) + "/" + COLLECTION;
- try (HttpSolrClient solrClient = getHttpSolrClient(url)) {
-
- log.info("should work query, result {}", solrClient.query(q));
- //no problem
- q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + coll.getZNodeVersion());
- log.info("2nd query , result {}", solrClient.query(q));
- //no error yet good
-
- q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion() - 1)); //an older version expect error
-
- QueryResponse rsp = solrClient.query(q);
- Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size()-1);
- assertNotNull("Expected an extra information from server with the list of invalid collection states", m);
- assertNotNull(m.get(COLLECTION));
- }
-
- //now send the request to another node that does not serve the collection
-
- Set<String> allNodesOfColl = new HashSet<>();
- for (Slice slice : coll.getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
- }
- }
- String theNode = null;
- Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
- for (String s : liveNodes) {
- String n = cluster.getSolrClient().getZkStateReader().getBaseUrlForNodeName(s);
- if(!allNodesOfColl.contains(n)){
- theNode = n;
- break;
- }
- }
- log.info("the node which does not serve this collection{} ",theNode);
- assertNotNull(theNode);
-
-
- final String solrClientUrl = theNode + "/" + COLLECTION;
- try (SolrClient solrClient = getHttpSolrClient(solrClientUrl)) {
-
- q.setParam(CloudSolrClient.STATE_VERSION, COLLECTION + ":" + (coll.getZNodeVersion()-1));
- try {
- QueryResponse rsp = solrClient.query(q);
- log.info("error was expected");
- } catch (HttpSolrClient.RemoteSolrException e) {
- sse = e;
- }
- assertNotNull(sse);
- assertEquals(" Error code should be 510", SolrException.ErrorCode.INVALID_STATE.code, sse.code());
- }
-
- }
-
- @Test
- public void testShutdown() throws IOException {
- try (CloudSolrClient client = getCloudSolrClient("[ff01::114]:33332")) {
- client.setZkConnectTimeout(100);
- client.connect();
- fail("Expected exception");
- } catch (SolrException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
- }
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
- @Test
- public void testWrongZkChrootTest() throws IOException {
-
- exception.expect(SolrException.class);
- exception.expectMessage("cluster not found/not ready");
-
- try (CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress() + "/xyz/foo")) {
- client.setZkClientTimeout(1000 * 60);
- client.connect();
- fail("Expected exception");
- }
- }
-
- @Test
- public void customHttpClientTest() throws IOException {
- CloseableHttpClient client = HttpClientUtil.createClient(null);
- try (CloudSolrClient solrClient = getCloudSolrClient(cluster.getZkServer().getZkAddress(), client)) {
-
- assertTrue(solrClient.getLbClient().getHttpClient() == client);
-
- } finally {
- HttpClientUtil.close(client);
- }
- }
-
- @Test
- public void testVersionsAreReturned() throws Exception {
- CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1).process(cluster.getSolrClient());
- cluster.waitForActiveCollection("versions_collection", 2, 2);
-
- // assert that "adds" are returned
- UpdateRequest updateRequest = new UpdateRequest()
- .add("id", "1", "a_t", "hello1")
- .add("id", "2", "a_t", "hello2");
- updateRequest.setParam(UpdateParams.VERSIONS, Boolean.TRUE.toString());
-
- NamedList<Object> response = updateRequest.commit(getRandomClient(), "versions_collection").getResponse();
- Object addsObject = response.get("adds");
-
- assertNotNull("There must be a adds parameter", addsObject);
- assertTrue(addsObject instanceof NamedList<?>);
- NamedList<?> adds = (NamedList<?>) addsObject;
- assertEquals("There must be 2 versions (one per doc)", 2, adds.size());
-
- Map<String, Long> versions = new HashMap<>();
- Object object = adds.get("1");
- assertNotNull("There must be a version for id 1", object);
- assertTrue("Version for id 1 must be a long", object instanceof Long);
- versions.put("1", (Long) object);
-
- object = adds.get("2");
- assertNotNull("There must be a version for id 2", object);
- assertTrue("Version for id 2 must be a long", object instanceof Long);
- versions.put("2", (Long) object);
-
- QueryResponse resp = getRandomClient().query("versions_collection", new SolrQuery("*:*"));
- assertEquals("There should be one document because overwrite=true", 2, resp.getResults().getNumFound());
-
- for (SolrDocument doc : resp.getResults()) {
- Long version = versions.get(doc.getFieldValue("id"));
- assertEquals("Version on add must match _version_ field", version, doc.getFieldValue("_version_"));
- }
-
- // assert that "deletes" are returned
- UpdateRequest deleteRequest = new UpdateRequest().deleteById("1");
- deleteRequest.setParam(UpdateParams.VERSIONS, Boolean.TRUE.toString());
- response = deleteRequest.commit(getRandomClient(), "versions_collection").getResponse();
- Object deletesObject = response.get("deletes");
- assertNotNull("There must be a deletes parameter", deletesObject);
- NamedList deletes = (NamedList) deletesObject;
- assertEquals("There must be 1 version", 1, deletes.size());
- }
-
- @Test
- public void testInitializationWithSolrUrls() throws Exception {
- CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1).process(cluster.getSolrClient());
- cluster.waitForActiveCollection(COLLECTION, 2, 2);
- CloudHttp2SolrClient client = httpBasedCloudSolrClient;
- SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
- client.add(COLLECTION, doc);
- client.commit(COLLECTION);
- assertEquals(1, client.query(COLLECTION, params("q", "*:*")).getResults().getNumFound());
- }
-
- @Test
- public void testCollectionDoesntExist() throws Exception {
- CloudHttp2SolrClient client = getRandomClient();
- SolrInputDocument doc = new SolrInputDocument("id", "1", "title_s", "my doc");
- try {
- client.add("boguscollectionname", doc);
- fail();
- } catch (SolrException ex) {
- if (ex.getMessage().equals("Collection not found: boguscollectionname")) {
- // pass
- } else {
- throw ex;
- }
- }
- }
-
- public void testRetryUpdatesWhenClusterStateIsStale() throws Exception {
- final String COL = "stale_state_test_col";
- assert cluster.getJettySolrRunners().size() >= 2;
-
- final JettySolrRunner old_leader_node = cluster.getJettySolrRunners().get(0);
- final JettySolrRunner new_leader_node = cluster.getJettySolrRunners().get(1);
-
- // start with exactly 1 shard/replica...
- assertEquals("Couldn't create collection", 0,
- CollectionAdminRequest.createCollection(COL, "conf", 1, 1)
- .setCreateNodeSet(old_leader_node.getNodeName())
- .process(cluster.getSolrClient()).getStatus());
- cluster.waitForActiveCollection(COL, 1, 1);
-
- // determine the coreNodeName of only current replica
- Collection<Slice> slices = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COL).getSlices();
- assertEquals(1, slices.size()); // sanity check
- Slice slice = slices.iterator().next();
- assertEquals(1, slice.getReplicas().size()); // sanity check
- final String old_leader_core_node_name = slice.getLeader().getName();
-
- // NOTE: creating our own CloudSolrClient whose settings we can muck with...
- try (CloudSolrClient stale_client = new CloudSolrClientBuilder
- (Collections.singletonList(cluster.getZkServer().getZkAddress()), Optional.empty())
- .sendDirectUpdatesToAnyShardReplica()
- .withParallelUpdates(true)
- .build()) {
- // don't let collection cache entries get expired, even on a slow machine...
- stale_client.setCollectionCacheTTl(Integer.MAX_VALUE);
- stale_client.setDefaultCollection(COL);
-
- // do a query to populate stale_client's cache...
- assertEquals(0, stale_client.query(new SolrQuery("*:*")).getResults().getNumFound());
-
- // add 1 replica on a diff node...
- assertEquals("Couldn't create collection", 0,
- CollectionAdminRequest.addReplicaToShard(COL, "shard1")
- .setNode(new_leader_node.getNodeName())
- // NOTE: don't use our stale_client for this -- don't tip it off of a collection change
- .process(cluster.getSolrClient()).getStatus());
- AbstractDistribZkTestBase.waitForRecoveriesToFinish
- (COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
-
- // ...and delete our original leader.
- assertEquals("Couldn't create collection", 0,
- CollectionAdminRequest.deleteReplica(COL, "shard1", old_leader_core_node_name)
- // NOTE: don't use our stale_client for this -- don't tip it off of a collection change
- .process(cluster.getSolrClient()).getStatus());
- AbstractDistribZkTestBase.waitForRecoveriesToFinish
- (COL, cluster.getSolrClient().getZkStateReader(), true, true, 330);
-
- // stale_client's collection state cache should now only point at a leader that no longer exists.
-
- // attempt a (direct) update that should succeed in spite of cached cluster state
- // pointing solely to a node that's no longer part of our collection...
- assertEquals(0, (new UpdateRequest().add("id", "1").commit(stale_client, COL)).getStatus());
- assertEquals(1, stale_client.query(new SolrQuery("*:*")).getResults().getNumFound());
-
- }
- }
-
-
- private static void checkSingleServer(NamedList<Object> response) {
- final RouteResponse rr = (RouteResponse) response;
- final Map<String,LBSolrClient.Req> routes = rr.getRoutes();
- final Iterator<Map.Entry<String,LBSolrClient.Req>> it =
- routes.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<String,LBSolrClient.Req> entry = it.next();
- assertEquals("wrong number of servers: "+entry.getValue().getServers(),
- 1, entry.getValue().getServers().size());
- }
- }
-
- /**
- * Tests if the specification of 'preferReplicaTypes` in the query-params
- * limits the distributed query to locally hosted shards only
- */
- @Test
- // commented 15-Sep-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 2-Aug-2018
- public void preferReplicaTypesTest() throws Exception {
-
- String collectionName = "replicaTypesTestColl";
-
- int liveNodes = cluster.getJettySolrRunners().size();
-
- // For these tests we need to have multiple replica types.
- // Hence the below configuration for our collection
- int pullReplicas = Math.max(1, liveNodes - 2);
- CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, 1, 1, pullReplicas)
- .setMaxShardsPerNode(liveNodes)
- .processAndWait(cluster.getSolrClient(), TIMEOUT);
- cluster.waitForActiveCollection(collectionName, liveNodes, liveNodes * (2 + pullReplicas));
-
- // Add some new documents
- new UpdateRequest()
- .add(id, "0", "a_t", "hello1")
- .add(id, "2", "a_t", "hello2")
- .add(id, "3", "a_t", "hello2")
- .commit(getRandomClient(), collectionName);
-
- // Run the actual tests for 'shards.preference=replica.type:*'
- queryWithPreferReplicaTypes(getRandomClient(), "PULL", false, collectionName);
- queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", false, collectionName);
- queryWithPreferReplicaTypes(getRandomClient(), "TLOG", false, collectionName);
- queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", false, collectionName);
- queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
- queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", false, collectionName);
- // Test to verify that preferLocalShards=true doesn't break this
- queryWithPreferReplicaTypes(getRandomClient(), "PULL", true, collectionName);
- queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", true, collectionName);
- queryWithPreferReplicaTypes(getRandomClient(), "TLOG", true, collectionName);
- queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", true, collectionName);
- queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
- queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", true, collectionName);
- }
-
- private void queryWithPreferReplicaTypes(CloudHttp2SolrClient cloudClient,
- String preferReplicaTypes,
- boolean preferLocalShards,
- String collectionName)
- throws Exception
- {
- SolrQuery qRequest = new SolrQuery("*:*");
- ModifiableSolrParams qParams = new ModifiableSolrParams();
-
- final List<String> preferredTypes = Arrays.asList(preferReplicaTypes.split("\\|"));
- StringBuilder rule = new StringBuilder();
- preferredTypes.forEach(type -> {
- if (rule.length() != 0) {
- rule.append(',');
- }
- rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE);
- rule.append(':');
- rule.append(type);
- });
- if (preferLocalShards) {
- if (rule.length() != 0) {
- rule.append(',');
- }
- rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION);
- rule.append(":local");
- }
- qParams.add(ShardParams.SHARDS_PREFERENCE, rule.toString());
- qParams.add(ShardParams.SHARDS_INFO, "true");
- qRequest.add(qParams);
-
- // CloudSolrClient sends the request to some node.
- // And since all the nodes are hosting cores from all shards, the
- // distributed query formed by this node will select cores from the
- // local shards only
- QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
-
- Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
- assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
-
- Map<String, String> replicaTypeMap = new HashMap<>();
- DocCollection collection = getCollectionState(collectionName);
- for (Slice slice : collection.getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- String coreUrl = replica.getCoreUrl();
- // It seems replica reports its core URL with a trailing slash while shard
- // info returned from the query doesn't. Oh well.
- if (coreUrl.endsWith("/")) {
- coreUrl = coreUrl.substring(0, coreUrl.length() - 1);
- }
- replicaTypeMap.put(coreUrl, replica.getType().toString());
- }
- }
-
- // Iterate over shards-info and check that replicas of correct type responded
- SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
- Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
- List<String> shardAddresses = new ArrayList<String>();
- while (itr.hasNext()) {
- Map.Entry<String, ?> e = itr.next();
- assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
- String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
- assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
- assertTrue(replicaTypeMap.containsKey(shardAddress));
- assertTrue(preferredTypes.indexOf(replicaTypeMap.get(shardAddress)) == 0);
- shardAddresses.add(shardAddress);
- }
- assertTrue("No responses", shardAddresses.size() > 0);
- log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
- }
-
-}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
index 92c5c62..f3920b0 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
@@ -108,7 +108,7 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
private LBHttpSolrClient getMockLbHttpSolrClient(Map<String, Function> responses) throws Exception {
LBHttpSolrClient mockLbclient = mock(LBHttpSolrClient.class);
- when(mockLbclient.request(any(LBSolrClient.Req.class))).then(invocationOnMock -> {
+ when(mockLbclient.request(any(LBHttpSolrClient.Req.class))).then(invocationOnMock -> {
LBHttpSolrClient.Req req = invocationOnMock.getArgument(0);
Function f = responses.get("request");
if (f == null) return null;
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index 7b08c73..748721d 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -86,7 +86,6 @@ import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
@@ -2279,61 +2278,6 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
}
/**
- * A variant of {@link org.apache.solr.client.solrj.impl.CloudHttp2SolrClient.Builder} that will randomize
- * some internal settings.
- */
- public static class CloudHttp2SolrClientBuilder extends CloudHttp2SolrClient.Builder {
-
- public CloudHttp2SolrClientBuilder(List<String> zkHosts, Optional<String> zkChroot) {
- super(zkHosts, zkChroot);
- randomizeCloudSolrClient();
- }
-
- public CloudHttp2SolrClientBuilder(ClusterStateProvider stateProvider) {
- super(new ArrayList<>());
- this.stateProvider = stateProvider;
- randomizeCloudSolrClient();
- }
-
- public CloudHttp2SolrClientBuilder(MiniSolrCloudCluster cluster) {
- super(new ArrayList<>());
- if (random().nextBoolean()) {
- this.zkHosts.add(cluster.getZkServer().getZkAddress());
- } else {
- populateSolrUrls(cluster);
- }
-
- randomizeCloudSolrClient();
- }
-
- private void populateSolrUrls(MiniSolrCloudCluster cluster) {
- if (random().nextBoolean()) {
- final List<JettySolrRunner> solrNodes = cluster.getJettySolrRunners();
- for (JettySolrRunner node : solrNodes) {
- this.solrUrls.add(node.getBaseUrl().toString());
- }
- } else {
- this.solrUrls.add(cluster.getRandomJetty(random()).getBaseUrl().toString());
- }
- }
-
- private void randomizeCloudSolrClient() {
- this.directUpdatesToLeadersOnly = random().nextBoolean();
- this.shardLeadersOnly = random().nextBoolean();
- this.parallelUpdates = random().nextBoolean();
- }
- }
-
- /**
- * This method <i>may</i> randomize unspecified aspects of the resulting SolrClient.
- * Tests that do not wish to have any randomized behavior should use the
- * {@link org.apache.solr.client.solrj.impl.CloudHttp2SolrClient.Builder} class directly
- */
- public static CloudHttp2SolrClient getCloudHttp2SolrClient(MiniSolrCloudCluster cluster) {
- return new CloudHttp2SolrClientBuilder(cluster).build();
- }
-
- /**
* A variant of {@link org.apache.solr.client.solrj.impl.CloudSolrClient.Builder} that will randomize
* some internal settings.
*/