You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by jm...@apache.org on 2023/04/12 14:35:38 UTC

svn commit: r1909097 [3/5] - in /manifoldcf/branches/CONNECTORS-1740: ./ connectors/solr/ connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ connectors/solr/connector/src/test/java/org/apache/manifoldcf/agents/output/solr...

Added: manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedCloudHttp2SolrClient.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedCloudHttp2SolrClient.java?rev=1909097&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedCloudHttp2SolrClient.java (added)
+++ manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedCloudHttp2SolrClient.java Wed Apr 12 14:35:38 2023
@@ -0,0 +1,239 @@
+package org.apache.manifoldcf.agents.output.solr;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
+
+public class ModifiedCloudHttp2SolrClient extends ModifiedCloudSolrClient {
+
+  private static final long serialVersionUID = -7543846119917075693L;
+  private final ClusterStateProvider stateProvider;
+  private final ModifiedLBHttp2SolrClient lbClient;
+  private final ModifiedHttp2SolrClient myClient;
+  private final boolean clientIsInternal;
+
+  /**
+   * Create a new client object that connects to Zookeeper and is always aware of the SolrCloud state. If there is a fully redundant Zookeeper quorum and SolrCloud has enough replicas for every shard
+   * in a collection, there is no single point of failure. Updates will be sent to shard leaders by default.
+   *
+   * @param builder a {@link ModifiedHttp2SolrClient.Builder} with the options used to create the client.
+   */
+  protected ModifiedCloudHttp2SolrClient(final Builder builder) {
+    super(builder.shardLeadersOnly, builder.parallelUpdates, builder.directUpdatesToLeadersOnly);
+    if (builder.httpClient == null) {
+      this.clientIsInternal = true;
+      if (builder.internalClientBuilder == null) {
+        this.myClient = new ModifiedHttp2SolrClient.Builder().build();
+      } else {
+        this.myClient = builder.internalClientBuilder.build();
+      }
+    } else {
+      this.clientIsInternal = false;
+      this.myClient = builder.httpClient;
+    }
+    if (builder.stateProvider == null) {
+      if (builder.zkHosts != null && builder.solrUrls != null) {
+        throw new IllegalArgumentException("Both zkHost(s) & solrUrl(s) have been specified. Only specify one.");
+      }
+      if (builder.zkHosts != null) {
+        this.stateProvider = ClusterStateProvider.newZkClusterStateProvider(builder.zkHosts, builder.zkChroot);
+      } else if (builder.solrUrls != null && !builder.solrUrls.isEmpty()) {
+        try {
+          this.stateProvider = new ModifiedHttp2ClusterStateProvider(builder.solrUrls, builder.httpClient);
+        } catch (final Exception e) {
+          throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the " + "Solr server(s), " + builder.solrUrls + ", down?)", e);
+        }
+      } else {
+        throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
+      }
+    } else {
+      this.stateProvider = builder.stateProvider;
+    }
+    this.lbClient = new ModifiedLBHttp2SolrClient(myClient);
+  }
+
+  @Override
+  public void close() throws IOException {
+    stateProvider.close();
+    lbClient.close();
+
+    if (clientIsInternal && myClient != null) {
+      myClient.close();
+    }
+
+    super.close();
+  }
+
+  @Override
+  public ModifiedLBHttp2SolrClient getLbClient() {
+    return lbClient;
+  }
+
+  @Override
+  public ClusterStateProvider getClusterStateProvider() {
+    return stateProvider;
+  }
+
+  public ModifiedHttp2SolrClient getHttpClient() {
+    return myClient;
+  }
+
+  @Override
+  protected boolean wasCommError(final Throwable rootCause) {
+    return false;
+  }
+
+  /** Constructs {@link CloudHttp2SolrClient} instances from provided configuration. */
+  public static class Builder {
+    protected Collection<String> zkHosts = new ArrayList<>();
+    protected List<String> solrUrls = new ArrayList<>();
+    protected String zkChroot;
+    protected ModifiedHttp2SolrClient httpClient;
+    protected boolean shardLeadersOnly = true;
+    protected boolean directUpdatesToLeadersOnly = false;
+    protected boolean parallelUpdates = true;
+    protected ClusterStateProvider stateProvider;
+    protected ModifiedHttp2SolrClient.Builder internalClientBuilder;
+
+    /**
+     * Provide a series of Solr URLs to be used when configuring {@link CloudHttp2SolrClient} instances. The solr client will use these urls to understand the cluster topology, which solr nodes are
+     * active etc.
+     *
+     * <p>
+     * Provided Solr URLs are expected to point to the root Solr path ("http://hostname:8983/solr"); they should not include any collections, cores, or other path components.
+     *
+     * <p>
+     * Usage example:
+     *
+     * <pre>
+     * final List&lt;String&gt; solrBaseUrls = new ArrayList&lt;String&gt;();
+     * solrBaseUrls.add("http://solr1:8983/solr");
+     * solrBaseUrls.add("http://solr2:8983/solr");
+     * solrBaseUrls.add("http://solr3:8983/solr");
+     * final SolrClient client = new CloudHttp2SolrClient.Builder(solrBaseUrls).build();
+     * </pre>
+     */
+    public Builder(final List<String> solrUrls) {
+      this.solrUrls = solrUrls;
+    }
+
+    /**
+     * Provide a series of ZK hosts which will be used when configuring {@link CloudHttp2SolrClient} instances.
+     *
+     * <p>
+     * Usage example when Solr stores data at the ZooKeeper root ('/'):
+     *
+     * <pre>
+     * final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
+     * zkServers.add("zookeeper1:2181");
+     * zkServers.add("zookeeper2:2181");
+     * zkServers.add("zookeeper3:2181");
+     * final SolrClient client = new CloudHttp2SolrClient.Builder(zkServers, Optional.empty()).build();
+     * </pre>
+     *
+     * Usage example when Solr data is stored in a ZooKeeper chroot:
+     *
+     * <pre>
+     * final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
+     * zkServers.add("zookeeper1:2181");
+     * zkServers.add("zookeeper2:2181");
+     * zkServers.add("zookeeper3:2181");
+     * final SolrClient client = new CloudHttp2SolrClient.Builder(zkServers, Optional.of("/solr")).build();
+     * </pre>
+     *
+     * @param zkHosts  a List of at least one ZooKeeper host and port (e.g. "zookeeper1:2181")
+     * @param zkChroot the path to the root ZooKeeper node containing Solr data. Provide {@code
+     *     java.util.Optional.empty()} if no ZK chroot is used.
+     */
+    public Builder(final List<String> zkHosts, final Optional<String> zkChroot) {
+      this.zkHosts = zkHosts;
+      if (zkChroot.isPresent())
+        this.zkChroot = zkChroot.get();
+    }
+
+    /**
+     * Tells {@link CloudHttp2SolrClient.Builder} that created clients should send direct updates to shard leaders only.
+     *
+     * <p>
+     * UpdateRequests whose leaders cannot be found will "fail fast" on the client side with a {@link SolrException}
+     */
+    public Builder sendDirectUpdatesToShardLeadersOnly() {
+      directUpdatesToLeadersOnly = true;
+      return this;
+    }
+
+    /**
+     * Tells {@link CloudHttp2SolrClient.Builder} that created clients can send updates to any shard replica (shard leaders and non-leaders).
+     *
+     * <p>
+     * Shard leaders are still preferred, but the created clients will fallback to using other replicas if a leader cannot be found.
+     */
+    public Builder sendDirectUpdatesToAnyShardReplica() {
+      directUpdatesToLeadersOnly = false;
+      return this;
+    }
+
+    /**
+     * Tells {@link CloudHttp2SolrClient.Builder} whether created clients should send shard updates serially or in parallel
+     *
+     * <p>
+     * When an {@link UpdateRequest} affects multiple shards, {@link CloudHttp2SolrClient} splits it up and sends a request to each affected shard. This setting chooses whether those sub-requests are
+     * sent serially or in parallel.
+     *
+     * <p>
+     * If not set, this defaults to 'true' and sends sub-requests in parallel.
+     */
+    public Builder withParallelUpdates(final boolean parallelUpdates) {
+      this.parallelUpdates = parallelUpdates;
+      return this;
+    }
+
+    public Builder withHttpClient(final ModifiedHttp2SolrClient httpClient) {
+      if (this.internalClientBuilder != null) {
+        throw new IllegalStateException("The builder can't accept an httpClient AND an internalClientBuilder, only one of those can be provided");
+      }
+      this.httpClient = httpClient;
+      return this;
+    }
+
+    /**
+     * If provided, the CloudHttp2SolrClient will build it's internal ModifiedHttp2SolrClient using this builder (instead of the empty default one). Providing this builder allows users to configure
+     * the internal clients (authentication, timeouts, etc).
+     *
+     * @param internalClientBuilder the builder to use for creating the internal http client.
+     * @return this
+     */
+    public Builder withInternalClientBuilder(final ModifiedHttp2SolrClient.Builder internalClientBuilder) {
+      if (this.httpClient != null) {
+        throw new IllegalStateException("The builder can't accept an httpClient AND an internalClientBuilder, only one of those can be provided");
+      }
+      this.internalClientBuilder = internalClientBuilder;
+      return this;
+    }
+
+    /** Create a {@link CloudHttp2SolrClient} based on the provided configuration. */
+    public ModifiedCloudHttp2SolrClient build() {
+      if (stateProvider == null) {
+        if (!zkHosts.isEmpty()) {
+          stateProvider = ClusterStateProvider.newZkClusterStateProvider(zkHosts, Builder.this.zkChroot);
+        } else if (!this.solrUrls.isEmpty()) {
+          try {
+            stateProvider = new ModifiedHttp2ClusterStateProvider(solrUrls, httpClient);
+          } catch (final Exception e) {
+            throw new RuntimeException("Couldn't initialize a HttpClusterStateProvider (is/are the " + "Solr server(s), " + solrUrls + ", down?)", e);
+          }
+        } else {
+          throw new IllegalArgumentException("Both zkHosts and solrUrl cannot be null.");
+        }
+      }
+      return new ModifiedCloudHttp2SolrClient(this);
+    }
+  }
+}

Added: manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedCloudSolrClient.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedCloudSolrClient.java?rev=1909097&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedCloudSolrClient.java (added)
+++ manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedCloudSolrClient.java Wed Apr 12 14:35:38 2023
@@ -0,0 +1,1229 @@
+package org.apache.manifoldcf.agents.output.solr;
+
+import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
+import static org.apache.solr.common.params.CommonParams.ID;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.V2RequestSupport;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
+import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.ToleratedUpdateError;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.Hash;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+public abstract class ModifiedCloudSolrClient extends SolrClient {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private volatile String defaultCollection;
+  // no of times collection state to be reloaded if stale state error is received
+  private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "5"));
+  private final Random rand = new Random();
+
+  private final boolean updatesToLeaders;
+  private final boolean directUpdatesToLeadersOnly;
+  private final RequestReplicaListTransformerGenerator requestRLTGenerator;
+  boolean parallelUpdates; // TODO final
+  private ExecutorService threadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("CloudSolrClient ThreadPool"));
+
+  public static final String STATE_VERSION = "_stateVer_";
+  private long retryExpiryTime = TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS); // 3 seconds or 3 million nanos
+  private final Set<String> NON_ROUTABLE_PARAMS;
+
+  {
+    NON_ROUTABLE_PARAMS = new HashSet<>();
+    NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
+
+    NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
+
+    // Not supported via SolrCloud
+    // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
+
+  }
+
+  private volatile List<Object> locks = objectList(3);
+
+  /** Constructs {@link CloudSolrClient} instances from provided configuration. */
+  public static class Builder extends ModifiedCloudHttp2SolrClient.Builder {
+
+    /**
+     * Provide a series of Solr URLs to be used when configuring {@link CloudSolrClient} instances. The solr client will use these urls to understand the cluster topology, which solr nodes are active
+     * etc.
+     *
+     * <p>
+     * Provided Solr URLs are expected to point to the root Solr path ("http://hostname:8983/solr"); they should not include any collections, cores, or other path components.
+     *
+     * <p>
+     * Usage example:
+     *
+     * <pre>
+     * final List&lt;String&gt; solrBaseUrls = new ArrayList&lt;String&gt;();
+     * solrBaseUrls.add("http://solr1:8983/solr");
+     * solrBaseUrls.add("http://solr2:8983/solr");
+     * solrBaseUrls.add("http://solr3:8983/solr");
+     * final SolrClient client = new CloudSolrClient.Builder(solrBaseUrls).build();
+     * </pre>
+     */
+    public Builder(final List<String> solrUrls) {
+      super(solrUrls);
+    }
+
+    /**
+     * Provide a series of ZK hosts which will be used when configuring {@link CloudSolrClient} instances. This requires a dependency on {@code solr-solrj-zookeeper} which transitively depends on more
+     * JARs. The ZooKeeper based connection is the most reliable and performant means for CloudSolrClient to work. On the other hand, it means exposing ZooKeeper more broadly than to Solr nodes, which
+     * is a security risk.
+     *
+     * <p>
+     * Usage example when Solr stores data at the ZooKeeper root ('/'):
+     *
+     * <pre>
+     * final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
+     * zkServers.add("zookeeper1:2181");
+     * zkServers.add("zookeeper2:2181");
+     * zkServers.add("zookeeper3:2181");
+     * final SolrClient client = new CloudSolrClient.Builder(zkServers, Optional.empty()).build();
+     * </pre>
+     *
+     * Usage example when Solr data is stored in a ZooKeeper chroot:
+     *
+     * <pre>
+     * final List&lt;String&gt; zkServers = new ArrayList&lt;String&gt;();
+     * zkServers.add("zookeeper1:2181");
+     * zkServers.add("zookeeper2:2181");
+     * zkServers.add("zookeeper3:2181");
+     * final SolrClient client = new CloudSolrClient.Builder(zkServers, Optional.of("/solr")).build();
+     * </pre>
+     *
+     * @param zkHosts  a List of at least one ZooKeeper host and port (e.g. "zookeeper1:2181")
+     * @param zkChroot the path to the root ZooKeeper node containing Solr data. Provide {@code
+     *     java.util.Optional.empty()} if no ZK chroot is used.
+     */
+    public Builder(final List<String> zkHosts, final Optional<String> zkChroot) {
+      super(zkHosts, zkChroot);
+    }
+  }
+
+  static class StateCache extends ConcurrentHashMap<String, ExpiringCachedDocCollection> {
+    final AtomicLong puts = new AtomicLong();
+    final AtomicLong hits = new AtomicLong();
+    final Lock evictLock = new ReentrantLock(true);
+    protected volatile long timeToLive = 60 * 1000L;
+
+    @Override
+    public ExpiringCachedDocCollection get(final Object key) {
+      final ExpiringCachedDocCollection val = super.get(key);
+      if (val == null) {
+        // a new collection is likely to be added now.
+        // check if there are stale items and remove them
+        evictStale();
+        return null;
+      }
+      if (val.isExpired(timeToLive)) {
+        super.remove(key);
+        return null;
+      }
+      hits.incrementAndGet();
+      return val;
+    }
+
+    @Override
+    public ExpiringCachedDocCollection put(final String key, final ExpiringCachedDocCollection value) {
+      puts.incrementAndGet();
+      return super.put(key, value);
+    }
+
+    void evictStale() {
+      if (!evictLock.tryLock())
+        return;
+      try {
+        for (final Entry<String, ExpiringCachedDocCollection> e : entrySet()) {
+          if (e.getValue().isExpired(timeToLive)) {
+            super.remove(e.getKey());
+          }
+        }
+      } finally {
+        evictLock.unlock();
+      }
+    }
+  }
+
+  /**
+   * This is the time to wait to refetch the state after getting the same state version from ZK
+   *
+   * <p>
+   * secs
+   */
+  public void setRetryExpiryTime(final int secs) {
+    this.retryExpiryTime = TimeUnit.NANOSECONDS.convert(secs, TimeUnit.SECONDS);
+  }
+
+  protected final StateCache collectionStateCache = new StateCache();
+
+  class ExpiringCachedDocCollection {
+    final DocCollection cached;
+    final long cachedAt;
+    // This is the time at which the collection is retried and got the same old version
+    volatile long retriedAt = -1;
+    // flag that suggests that this is potentially to be rechecked
+    volatile boolean maybeStale = false;
+
+    ExpiringCachedDocCollection(final DocCollection cached) {
+      this.cached = cached;
+      this.cachedAt = System.nanoTime();
+    }
+
+    boolean isExpired(final long timeToLiveMs) {
+      return (System.nanoTime() - cachedAt) > TimeUnit.NANOSECONDS.convert(timeToLiveMs, TimeUnit.MILLISECONDS);
+    }
+
+    boolean shouldRetry() {
+      if (maybeStale) { // we are not sure if it is stale so check with retry time
+        if ((retriedAt == -1 || (System.nanoTime() - retriedAt) > retryExpiryTime)) {
+          return true; // we retried a while back. and we could not get anything new.
+          // it's likely that it is not going to be available now also.
+        }
+      }
+      return false;
+    }
+
+    void setRetriedAt() {
+      retriedAt = System.nanoTime();
+    }
+  }
+
+  protected ModifiedCloudSolrClient(final boolean updatesToLeaders, final boolean parallelUpdates, final boolean directUpdatesToLeadersOnly) {
+    this.updatesToLeaders = updatesToLeaders;
+    this.parallelUpdates = parallelUpdates;
+    this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
+    this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
+  }
+
+  /**
+   * Sets the cache ttl for DocCollection Objects cached.
+   *
+   * @param seconds ttl value in seconds
+   */
+  public void setCollectionCacheTTl(final int seconds) {
+    assert seconds > 0;
+    this.collectionStateCache.timeToLive = seconds * 1000L;
+  }
+
+  protected abstract ModifiedLBSolrClient getLbClient();
+
+  public abstract ClusterStateProvider getClusterStateProvider();
+
+  public ClusterState getClusterState() {
+    return getClusterStateProvider().getClusterState();
+  }
+
+  protected abstract boolean wasCommError(Throwable t);
+
+  @Override
+  public void close() throws IOException {
+    if (this.threadPool != null && !this.threadPool.isShutdown()) {
+      ExecutorUtil.shutdownAndAwaitTermination(this.threadPool);
+      this.threadPool = null;
+    }
+  }
+
+  public ResponseParser getParser() {
+    return getLbClient().getParser();
+  }
+
+  /**
+   * Note: This setter method is <b>not thread-safe</b>.
+   *
+   * @param processor Default Response Parser chosen to parse the response if the parser were not specified as part of the request.
+   * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+   */
+  public void setParser(final ResponseParser processor) {
+    getLbClient().setParser(processor);
+  }
+
+  public RequestWriter getRequestWriter() {
+    return getLbClient().getRequestWriter();
+  }
+
+  public void setRequestWriter(final RequestWriter requestWriter) {
+    getLbClient().setRequestWriter(requestWriter);
+  }
+
+  /** Sets the default collection for request */
+  public void setDefaultCollection(final String collection) {
+    this.defaultCollection = collection;
+  }
+
+  /** Gets the default collection for request */
+  public String getDefaultCollection() {
+    return defaultCollection;
+  }
+
+  /** Gets whether direct updates are sent in parallel */
+  public boolean isParallelUpdates() {
+    return parallelUpdates;
+  }
+
+  /**
+   * Connect to the zookeeper ensemble. This is an optional method that may be used to force a connect before any other requests are sent.
+   */
+  public void connect() {
+    getClusterStateProvider().connect();
+  }
+
+  /**
+   * Connect to a cluster. If the cluster is not ready, retry connection up to a given timeout.
+   *
+   * @param duration the timeout
+   * @param timeUnit the units of the timeout
+   * @throws TimeoutException     if the cluster is not ready after the timeout
+   * @throws InterruptedException if the wait is interrupted
+   */
+  public void connect(final long duration, final TimeUnit timeUnit) throws TimeoutException, InterruptedException {
+    if (log.isInfoEnabled()) {
+      log.info("Waiting for {} {} for cluster at {} to be ready", duration, timeUnit, getClusterStateProvider());
+    }
+    final long timeout = System.nanoTime() + timeUnit.toNanos(duration);
+    while (System.nanoTime() < timeout) {
+      try {
+        connect();
+        if (log.isInfoEnabled()) {
+          log.info("Cluster at {} ready", getClusterStateProvider());
+        }
+        return;
+      } catch (final RuntimeException e) {
+        // not ready yet, then...
+      }
+      TimeUnit.MILLISECONDS.sleep(250);
+    }
+    throw new TimeoutException("Timed out waiting for cluster");
+  }
+
+  @SuppressWarnings({ "unchecked" })
+  private NamedList<Object> directUpdate(final AbstractUpdateRequest request, String collection) throws SolrServerException {
+    final ModifiedUpdateRequest updateRequest = (ModifiedUpdateRequest) request;
+    SolrParams params = request.getParams();
+    final ModifiableSolrParams routableParams = new ModifiableSolrParams();
+    final ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
+
+    if (params != null) {
+      nonRoutableParams.add(params);
+      routableParams.add(params);
+      for (final String param : NON_ROUTABLE_PARAMS) {
+        routableParams.remove(param);
+      }
+    } else {
+      params = new ModifiableSolrParams();
+    }
+
+    if (collection == null) {
+      throw new SolrServerException("No collection param specified on request and no default collection has been set.");
+    }
+
+    // Check to see if the collection is an alias. Updates to multi-collection aliases are ok as
+    // long as they are routed aliases
+    final List<String> aliasedCollections = getClusterStateProvider().resolveAlias(collection);
+    if (getClusterStateProvider().isRoutedAlias(collection) || aliasedCollections.size() == 1) {
+      collection = aliasedCollections.get(0); // pick 1st (consistent with HttpSolrCall behavior)
+    } else {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Update request to non-routed multi-collection alias not supported: " + collection + " -> " + aliasedCollections);
+    }
+
+    final DocCollection col = getDocCollection(collection, null);
+
+    final DocRouter router = col.getRouter();
+
+    if (router instanceof ImplicitDocRouter) {
+      // short circuit as optimization
+      return null;
+    }
+
+    final ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(params);
+
+    // Create the URL map, which is keyed on slice name.
+    // The value is a list of URLs for each replica in the slice.
+    // The first value in the list is the leader for the slice.
+    final Map<String, List<String>> urlMap = buildUrlMap(col, replicaListTransformer);
+    final String routeField = (col.getRouter().getRouteField(col) == null) ? ID : col.getRouter().getRouteField(col);
+    final Map<String, ? extends ModifiedLBSolrClient.Req> routes = createRoutes(updateRequest, routableParams, col, router, urlMap, routeField);
+    if (routes == null) {
+      if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, routeField)) {
+        // we have info (documents with ids and/or ids to delete) with
+        // which to find the leaders but we could not find (all of) them
+        throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "directUpdatesToLeadersOnly==true but could not find leader(s)");
+      } else {
+        // we could not find a leader or routes yet - use unoptimized general path
+        return null;
+      }
+    }
+
+    final NamedList<Throwable> exceptions = new NamedList<>();
+    final NamedList<NamedList<?>> shardResponses = new NamedList<>(routes.size() + 1); // +1 for deleteQuery
+
+    final long start = System.nanoTime();
+
+    if (parallelUpdates) {
+      final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size());
+      for (final Map.Entry<String, ? extends ModifiedLBSolrClient.Req> entry : routes.entrySet()) {
+        final String url = entry.getKey();
+        final ModifiedLBSolrClient.Req lbRequest = entry.getValue();
+        try {
+          MDC.put("CloudSolrClient.url", url);
+          responseFutures.put(url, threadPool.submit(() -> {
+            return getLbClient().request(lbRequest).getResponse();
+          }));
+        } finally {
+          MDC.remove("CloudSolrClient.url");
+        }
+      }
+
+      for (final Map.Entry<String, Future<NamedList<?>>> entry : responseFutures.entrySet()) {
+        final String url = entry.getKey();
+        final Future<NamedList<?>> responseFuture = entry.getValue();
+        try {
+          shardResponses.add(url, responseFuture.get());
+        } catch (final InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        } catch (final ExecutionException e) {
+          exceptions.add(url, e.getCause());
+        }
+      }
+
+      if (exceptions.size() > 0) {
+        final Throwable firstException = exceptions.getVal(0);
+        if (firstException instanceof SolrException) {
+          final SolrException e = (SolrException) firstException;
+          throw getRouteException(SolrException.ErrorCode.getErrorCode(e.code()), exceptions, routes);
+        } else {
+          throw getRouteException(SolrException.ErrorCode.SERVER_ERROR, exceptions, routes);
+        }
+      }
+    } else {
+      for (final Map.Entry<String, ? extends ModifiedLBSolrClient.Req> entry : routes.entrySet()) {
+        final String url = entry.getKey();
+        final ModifiedLBSolrClient.Req lbRequest = entry.getValue();
+        try {
+          final NamedList<Object> rsp = getLbClient().request(lbRequest).getResponse();
+          shardResponses.add(url, rsp);
+        } catch (final Exception e) {
+          if (e instanceof SolrException) {
+            throw (SolrException) e;
+          } else {
+            throw new SolrServerException(e);
+          }
+        }
+      }
+    }
+
+    ModifiedUpdateRequest nonRoutableRequest = null;
+    final List<String> deleteQuery = updateRequest.getDeleteQuery();
+    if (deleteQuery != null && deleteQuery.size() > 0) {
+      final ModifiedUpdateRequest deleteQueryRequest = new ModifiedUpdateRequest();
+      deleteQueryRequest.setDeleteQuery(deleteQuery);
+      nonRoutableRequest = deleteQueryRequest;
+    }
+
+    final Set<String> paramNames = nonRoutableParams.getParameterNames();
+
+    final Set<String> intersection = new HashSet<>(paramNames);
+    intersection.retainAll(NON_ROUTABLE_PARAMS);
+
+    if (nonRoutableRequest != null || intersection.size() > 0) {
+      if (nonRoutableRequest == null) {
+        nonRoutableRequest = new ModifiedUpdateRequest();
+      }
+      nonRoutableRequest.setParams(nonRoutableParams);
+      nonRoutableRequest.setBasicAuthCredentials(request.getBasicAuthUser(), request.getBasicAuthPassword());
+      final List<String> urlList = new ArrayList<>(routes.keySet());
+      Collections.shuffle(urlList, rand);
+      final ModifiedLBSolrClient.Req req = new ModifiedLBSolrClient.Req(nonRoutableRequest, urlList);
+      try {
+        final ModifiedLBSolrClient.Rsp rsp = getLbClient().request(req);
+        shardResponses.add(urlList.get(0), rsp.getResponse());
+      } catch (final Exception e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, urlList.get(0), e);
+      }
+    }
+
+    final long end = System.nanoTime();
+
+    @SuppressWarnings({ "rawtypes" })
+    final RouteResponse rr = condenseResponse(shardResponses, (int) TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS));
+    rr.setRouteResponses(shardResponses);
+    rr.setRoutes(routes);
+    return rr;
+  }
+
+  protected RouteException getRouteException(final SolrException.ErrorCode serverError, final NamedList<Throwable> exceptions, final Map<String, ? extends ModifiedLBSolrClient.Req> routes) {
+    return new RouteException(serverError, exceptions, routes);
+  }
+
+  protected Map<String, ? extends ModifiedLBSolrClient.Req> createRoutes(final ModifiedUpdateRequest updateRequest, final ModifiableSolrParams routableParams, final DocCollection col,
+      final DocRouter router, final Map<String, List<String>> urlMap, final String routeField) {
+    return urlMap == null ? null : updateRequest.getRoutesToCollection(router, col, urlMap, routableParams, routeField);
+  }
+
+  private Map<String, List<String>> buildUrlMap(final DocCollection col, final ReplicaListTransformer replicaListTransformer) {
+    final Map<String, List<String>> urlMap = new HashMap<>();
+    final Slice[] slices = col.getActiveSlicesArr();
+    for (final Slice slice : slices) {
+      final String name = slice.getName();
+      final List<Replica> sortedReplicas = new ArrayList<>();
+      Replica leader = slice.getLeader();
+      if (directUpdatesToLeadersOnly && leader == null) {
+        for (final Replica replica : slice.getReplicas(replica -> replica.isActive(getClusterStateProvider().getLiveNodes()) && replica.getType() == Replica.Type.NRT)) {
+          leader = replica;
+          break;
+        }
+      }
+      if (leader == null) {
+        if (directUpdatesToLeadersOnly) {
+          continue;
+        }
+        // take unoptimized general path - we cannot find a leader yet
+        return null;
+      }
+
+      if (!directUpdatesToLeadersOnly) {
+        for (final Replica replica : slice.getReplicas()) {
+          if (!replica.equals(leader)) {
+            sortedReplicas.add(replica);
+          }
+        }
+      }
+
+      // Sort the non-leader replicas according to the request parameters
+      replicaListTransformer.transform(sortedReplicas);
+
+      // put the leaderUrl first.
+      sortedReplicas.add(0, leader);
+
+      urlMap.put(name, sortedReplicas.stream().map(Replica::getCoreUrl).collect(Collectors.toList()));
+    }
+    return urlMap;
+  }
+
+  protected <T extends RouteResponse<?>> T condenseResponse(final NamedList<?> response, final int timeMillis, final Supplier<T> supplier) {
+    final T condensed = supplier.get();
+    int status = 0;
+    Integer rf = null;
+
+    // TolerantUpdateProcessor
+    List<SimpleOrderedMap<String>> toleratedErrors = null;
+    int maxToleratedErrors = Integer.MAX_VALUE;
+
+    // For "adds", "deletes", "deleteByQuery" etc.
+    final Map<String, NamedList<Object>> versions = new HashMap<>();
+
+    for (int i = 0; i < response.size(); i++) {
+      final NamedList<?> shardResponse = (NamedList<?>) response.getVal(i);
+      final NamedList<?> header = (NamedList<?>) shardResponse.get("responseHeader");
+      final Integer shardStatus = (Integer) header.get("status");
+      final int s = shardStatus.intValue();
+      if (s > 0) {
+        status = s;
+      }
+      final Object rfObj = header.get(ModifiedUpdateRequest.REPFACT);
+      if (rfObj != null && rfObj instanceof Integer) {
+        final Integer routeRf = (Integer) rfObj;
+        if (rf == null || routeRf < rf)
+          rf = routeRf;
+      }
+
+      @SuppressWarnings("unchecked")
+      final List<SimpleOrderedMap<String>> shardTolerantErrors = (List<SimpleOrderedMap<String>>) header.get("errors");
+      if (null != shardTolerantErrors) {
+        final Integer shardMaxToleratedErrors = (Integer) header.get("maxErrors");
+        assert null != shardMaxToleratedErrors : "TolerantUpdateProcessor reported errors but not maxErrors";
+        // if we get into some weird state where the nodes disagree about the effective maxErrors,
+        // assume the min value seen to decide if we should fail.
+        maxToleratedErrors = Math.min(maxToleratedErrors, ToleratedUpdateError.getEffectiveMaxErrors(shardMaxToleratedErrors.intValue()));
+
+        if (null == toleratedErrors) {
+          toleratedErrors = new ArrayList<SimpleOrderedMap<String>>(shardTolerantErrors.size());
+        }
+        for (final SimpleOrderedMap<String> err : shardTolerantErrors) {
+          toleratedErrors.add(err);
+        }
+      }
+      for (final String updateType : Arrays.asList("adds", "deletes", "deleteByQuery")) {
+        final Object obj = shardResponse.get(updateType);
+        if (obj instanceof NamedList) {
+          final NamedList<Object> versionsList = versions.containsKey(updateType) ? versions.get(updateType) : new NamedList<>();
+          final NamedList<?> nl = (NamedList<?>) obj;
+          versionsList.addAll(nl);
+          versions.put(updateType, versionsList);
+        }
+      }
+    }
+
+    final NamedList<Object> cheader = new NamedList<>();
+    cheader.add("status", status);
+    cheader.add("QTime", timeMillis);
+    if (rf != null)
+      cheader.add(ModifiedUpdateRequest.REPFACT, rf);
+    if (null != toleratedErrors) {
+      cheader.add("maxErrors", ToleratedUpdateError.getUserFriendlyMaxErrors(maxToleratedErrors));
+      cheader.add("errors", toleratedErrors);
+      if (maxToleratedErrors < toleratedErrors.size()) {
+        // cumulative errors are too high, we need to throw a client exception w/correct metadata
+
+        // NOTE: it shouldn't be possible for 1 == toleratedErrors.size(), because if that were the
+        // case then at least one shard should have thrown a real error before this, so we don't
+        // worry about having a more "singular" exception msg for that situation
+        final StringBuilder msgBuf = new StringBuilder().append(toleratedErrors.size()).append(" Async failures during distributed update: ");
+
+        final NamedList<String> metadata = new NamedList<>();
+        for (final SimpleOrderedMap<String> err : toleratedErrors) {
+          final ToleratedUpdateError te = ToleratedUpdateError.parseMap(err);
+          metadata.add(te.getMetadataKey(), te.getMetadataValue());
+
+          msgBuf.append("\n").append(te.getMessage());
+        }
+
+        final SolrException toThrow = new SolrException(SolrException.ErrorCode.BAD_REQUEST, msgBuf.toString());
+        toThrow.setMetadata(metadata);
+        throw toThrow;
+      }
+    }
+    for (final Map.Entry<String, NamedList<Object>> entry : versions.entrySet()) {
+      condensed.add(entry.getKey(), entry.getValue());
+    }
+    condensed.add("responseHeader", cheader);
+    return condensed;
+  }
+
+  @SuppressWarnings({ "rawtypes" })
+  public RouteResponse condenseResponse(final NamedList<?> response, final int timeMillis) {
+    return condenseResponse(response, timeMillis, RouteResponse::new);
+  }
+
+  @SuppressWarnings({ "rawtypes" })
+  public static class RouteResponse<T extends ModifiedLBSolrClient.Req> extends NamedList<Object> {
+    private NamedList<NamedList<?>> routeResponses;
+    private Map<String, T> routes;
+
+    public void setRouteResponses(final NamedList<NamedList<?>> routeResponses) {
+      this.routeResponses = routeResponses;
+    }
+
+    public NamedList<NamedList<?>> getRouteResponses() {
+      return routeResponses;
+    }
+
+    public void setRoutes(final Map<String, T> routes) {
+      this.routes = routes;
+    }
+
+    public Map<String, T> getRoutes() {
+      return routes;
+    }
+  }
+
+  public static class RouteException extends SolrException {
+
+    private final NamedList<Throwable> throwables;
+    private final Map<String, ? extends ModifiedLBSolrClient.Req> routes;
+
+    public RouteException(final ErrorCode errorCode, final NamedList<Throwable> throwables, final Map<String, ? extends ModifiedLBSolrClient.Req> routes) {
+      super(errorCode, throwables.getVal(0).getMessage(), throwables.getVal(0));
+      this.throwables = throwables;
+      this.routes = routes;
+
+      // create a merged copy of the metadata from all wrapped exceptions
+      final NamedList<String> metadata = new NamedList<String>();
+      for (int i = 0; i < throwables.size(); i++) {
+        final Throwable t = throwables.getVal(i);
+        if (t instanceof SolrException) {
+          final SolrException e = (SolrException) t;
+          final NamedList<String> eMeta = e.getMetadata();
+          if (null != eMeta) {
+            metadata.addAll(eMeta);
+          }
+        }
+      }
+      if (0 < metadata.size()) {
+        this.setMetadata(metadata);
+      }
+    }
+
+    public NamedList<Throwable> getThrowables() {
+      return throwables;
+    }
+
+    public Map<String, ? extends ModifiedLBSolrClient.Req> getRoutes() {
+      return this.routes;
+    }
+  }
+
+  @Override
+  public NamedList<Object> request(final SolrRequest<?> request, String collection) throws SolrServerException, IOException {
+    // the collection parameter of the request overrides that of the parameter to this method
+    final String requestCollection = request.getCollection();
+    if (requestCollection != null) {
+      collection = requestCollection;
+    } else if (collection == null) {
+      collection = defaultCollection;
+    }
+    final List<String> inputCollections = collection == null ? Collections.emptyList() : StrUtils.splitSmart(collection, ",", true);
+    return requestWithRetryOnStaleState(request, 0, inputCollections);
+  }
+
+  /**
+   * As this class doesn't watch external collections on the client side, there's a chance that the request will fail due to cached stale state, which means the state must be refreshed from ZK and
+   * retried.
+   */
+  protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest<?> request, final int retryCount, final List<String> inputCollections) throws SolrServerException, IOException {
+    connect(); // important to call this before you start working with the ZkStateReader
+
+    // build up a _stateVer_ param to pass to the server containing all of the
+    // external collection state versions involved in this request, which allows
+    // the server to notify us that our cached state for one or more of the external
+    // collections is stale and needs to be refreshed ... this code has no impact on internal
+    // collections
+    String stateVerParam = null;
+    List<DocCollection> requestedCollections = null;
+    boolean isCollectionRequestOfV2 = false;
+    if (request instanceof V2RequestSupport) {
+      request = ((V2RequestSupport) request).getV2Request();
+    }
+    if (request instanceof V2Request) {
+      isCollectionRequestOfV2 = ((V2Request) request).isPerCollectionRequest();
+    }
+    final boolean isAdmin = ADMIN_PATHS.contains(request.getPath());
+    final boolean isUpdate = (request instanceof IsUpdateRequest) && (request instanceof ModifiedUpdateRequest);
+    if (!inputCollections.isEmpty() && !isAdmin && !isCollectionRequestOfV2) { // don't do _stateVer_ checking for admin, v2 api requests
+      final Set<String> requestedCollectionNames = resolveAliases(inputCollections, isUpdate);
+
+      StringBuilder stateVerParamBuilder = null;
+      for (final String requestedCollection : requestedCollectionNames) {
+        // track the version of state we're using on the client side using the _stateVer_ param
+        final DocCollection coll = getDocCollection(requestedCollection, null);
+        if (coll == null) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + requestedCollection);
+        }
+        final int collVer = coll.getZNodeVersion();
+        if (requestedCollections == null)
+          requestedCollections = new ArrayList<>(requestedCollectionNames.size());
+        requestedCollections.add(coll);
+
+        if (stateVerParamBuilder == null) {
+          stateVerParamBuilder = new StringBuilder();
+        } else {
+          stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
+        }
+
+        stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
+      }
+
+      if (stateVerParamBuilder != null) {
+        stateVerParam = stateVerParamBuilder.toString();
+      }
+    }
+
+    if (request.getParams() instanceof ModifiableSolrParams) {
+      final ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+      if (stateVerParam != null) {
+        params.set(STATE_VERSION, stateVerParam);
+      } else {
+        params.remove(STATE_VERSION);
+      }
+    } // else: ??? how to set this ???
+
+    NamedList<Object> resp = null;
+    try {
+      resp = sendRequest(request, inputCollections);
+      // to avoid an O(n) operation we always add STATE_VERSION to the last and try to read it from
+      // there
+      final Object o = resp == null || resp.size() == 0 ? null : resp.get(STATE_VERSION, resp.size() - 1);
+      if (o != null && o instanceof Map) {
+        // remove this because no one else needs this and tests would fail if they are comparing
+        // responses
+        resp.remove(resp.size() - 1);
+        final Map<?, ?> invalidStates = (Map<?, ?>) o;
+        for (final Map.Entry<?, ?> e : invalidStates.entrySet()) {
+          getDocCollection((String) e.getKey(), (Integer) e.getValue());
+        }
+      }
+    } catch (final Exception exc) {
+
+      final Throwable rootCause = SolrException.getRootCause(exc);
+      // don't do retry support for admin requests
+      // or if the request doesn't have a collection specified
+      // or request is v2 api and its method is not GET
+      if (inputCollections.isEmpty() || isAdmin || (request instanceof V2Request && request.getMethod() != SolrRequest.METHOD.GET)) {
+        if (exc instanceof SolrServerException) {
+          throw (SolrServerException) exc;
+        } else if (exc instanceof IOException) {
+          throw (IOException) exc;
+        } else if (exc instanceof RuntimeException) {
+          throw (RuntimeException) exc;
+        } else {
+          throw new SolrServerException(rootCause);
+        }
+      }
+
+      final int errorCode = (rootCause instanceof SolrException) ? ((SolrException) rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
+
+      final boolean wasCommError = (rootCause instanceof ConnectException || rootCause instanceof SocketException || wasCommError(rootCause));
+
+      if (wasCommError || (exc instanceof RouteException && (errorCode == 503)) // 404 because the core does not exist 503 service unavailable
+      // TODO there are other reasons for 404. We need to change the solr response format from HTML
+      // to structured data to know that
+      ) {
+        // it was a communication error. it is likely that
+        // the node to which the request to be sent is down . So , expire the state
+        // so that the next attempt would fetch the fresh state
+        // just re-read state for all of them, if it has not been retried
+        // in retryExpiryTime time
+        if (requestedCollections != null) {
+          for (final DocCollection ext : requestedCollections) {
+            final ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(ext.getName());
+            if (cacheEntry == null)
+              continue;
+            cacheEntry.maybeStale = true;
+          }
+        }
+        if (retryCount < MAX_STALE_RETRIES) { // if it is a communication error , we must try again
+          // may be, we have a stale version of the collection state
+          // and we could not get any information from the server
+          // it is probably not worth trying again and again because
+          // the state would not have been updated
+          log.info("Request to collection {} failed due to ({}) {}, retry={} maxRetries={} commError={} errorCode={} - retrying", inputCollections, errorCode, rootCause, retryCount, MAX_STALE_RETRIES,
+              wasCommError, errorCode);
+          return requestWithRetryOnStaleState(request, retryCount + 1, inputCollections);
+        }
+      } else {
+        log.info("request was not communication error it seems");
+      }
+      log.info("Request to collection {} failed due to ({}) {}, retry={} maxRetries={} commError={} errorCode={} ", inputCollections, errorCode, rootCause, retryCount, MAX_STALE_RETRIES, wasCommError,
+          errorCode);
+
+      boolean stateWasStale = false;
+      if (retryCount < MAX_STALE_RETRIES && requestedCollections != null && !requestedCollections.isEmpty()
+          && (SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE || errorCode == 404)) {
+        // cached state for one or more external collections was stale
+        // re-issue request using updated state
+        stateWasStale = true;
+
+        // just re-read state for all of them, which is a little heavy handed but hopefully a rare
+        // occurrence
+        for (final DocCollection ext : requestedCollections) {
+          collectionStateCache.remove(ext.getName());
+        }
+      }
+
+      // if we experienced a communication error, it's worth checking the state
+      // with ZK just to make sure the node we're trying to hit is still part of the collection
+      if (retryCount < MAX_STALE_RETRIES && !stateWasStale && requestedCollections != null && !requestedCollections.isEmpty() && wasCommError) {
+        for (final DocCollection ext : requestedCollections) {
+          final DocCollection latestStateFromZk = getDocCollection(ext.getName(), null);
+          if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
+            // looks like we couldn't reach the server because the state was stale == retry
+            stateWasStale = true;
+            // we just pulled state from ZK, so update the cache so that the retry uses it
+            collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
+          }
+        }
+      }
+
+      if (requestedCollections != null) {
+        requestedCollections.clear(); // done with this
+      }
+
+      // if the state was stale, then we retry the request once with new state pulled from Zk
+      if (stateWasStale) {
+        log.warn("Re-trying request to collection(s) {} after stale state error from server.", inputCollections);
+        resp = requestWithRetryOnStaleState(request, retryCount + 1, inputCollections);
+      } else {
+        if (exc instanceof SolrException || exc instanceof SolrServerException || exc instanceof IOException) {
+          throw exc;
+        } else {
+          throw new SolrServerException(rootCause);
+        }
+      }
+    }
+
+    return resp;
+  }
+
+  protected NamedList<Object> sendRequest(final SolrRequest<?> request, final List<String> inputCollections) throws SolrServerException, IOException {
+    connect();
+
+    boolean sendToLeaders = false;
+    boolean isUpdate = false;
+
+    if (request instanceof IsUpdateRequest) {
+      if (request instanceof ModifiedUpdateRequest) {
+        isUpdate = true;
+        if (inputCollections.size() > 1) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Update request must be sent to a single collection " + "or an alias: " + inputCollections);
+        }
+        final String collection = inputCollections.isEmpty() ? null : inputCollections.get(0); // getting first mimics HttpSolrCall
+        final NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, collection);
+        if (response != null) {
+          return response;
+        }
+      }
+      sendToLeaders = true;
+    }
+
+    SolrParams reqParams = request.getParams();
+    if (reqParams == null) { // TODO fix getParams to never return null!
+      reqParams = new ModifiableSolrParams();
+    }
+
+    final ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(reqParams);
+
+    final ClusterStateProvider provider = getClusterStateProvider();
+    final String urlScheme = provider.getClusterProperty(ClusterState.URL_SCHEME, "http");
+    final Set<String> liveNodes = provider.getLiveNodes();
+
+    final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
+
+    if (request instanceof V2Request) {
+      if (!liveNodes.isEmpty()) {
+        final List<String> liveNodesList = new ArrayList<>(liveNodes);
+        Collections.shuffle(liveNodesList, rand);
+        theUrlList.add(Utils.getBaseUrlForNodeName(liveNodesList.get(0), urlScheme));
+      }
+
+    } else if (ADMIN_PATHS.contains(request.getPath())) {
+      for (final String liveNode : liveNodes) {
+        theUrlList.add(Utils.getBaseUrlForNodeName(liveNode, urlScheme));
+      }
+
+    } else { // Typical...
+      final Set<String> collectionNames = resolveAliases(inputCollections, isUpdate);
+      if (collectionNames.isEmpty()) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No collection param specified on request and no default collection has been set: " + inputCollections);
+      }
+
+      final List<String> preferredNodes = request.getPreferredNodes();
+      if (preferredNodes != null && !preferredNodes.isEmpty()) {
+        final String joinedInputCollections = StrUtils.join(inputCollections, ',');
+        final List<String> urlList = new ArrayList<>(preferredNodes.size());
+        for (final String nodeName : preferredNodes) {
+          urlList.add(Utils.getBaseUrlForNodeName(nodeName, urlScheme) + "/" + joinedInputCollections);
+        }
+        if (!urlList.isEmpty()) {
+          final ModifiedLBSolrClient.Req req = new ModifiedLBSolrClient.Req(request, urlList);
+          final ModifiedLBSolrClient.Rsp rsp = getLbClient().request(req);
+          return rsp.getResponse();
+        }
+      }
+
+      // TODO: not a big deal because of the caching, but we could avoid looking
+      // at every shard when getting leaders if we tweaked some things
+
+      // Retrieve slices from the cloud state and, for each collection specified, add it to the Map
+      // of slices.
+      final Map<String, Slice> slices = new HashMap<>();
+      final String shardKeys = reqParams.get(ShardParams._ROUTE_);
+      for (final String collectionName : collectionNames) {
+        final DocCollection col = getDocCollection(collectionName, null);
+        if (col == null) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
+        }
+        final Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams, col);
+        ClientUtils.addSlices(slices, collectionName, routeSlices, true);
+      }
+
+      // Gather URLs, grouped by leader or replica
+      final List<Replica> sortedReplicas = new ArrayList<>();
+      final List<Replica> replicas = new ArrayList<>();
+      for (final Slice slice : slices.values()) {
+        final Replica leader = slice.getLeader();
+        for (final Replica replica : slice.getReplicas()) {
+          final String node = replica.getNodeName();
+          if (!liveNodes.contains(node) // Must be a live node to continue
+              || replica.getState() != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
+            continue;
+          if (sendToLeaders && replica.equals(leader)) {
+            sortedReplicas.add(replica); // put leaders here eagerly (if sendToLeader mode)
+          } else {
+            replicas.add(replica); // replicas here
+          }
+        }
+      }
+
+      // Sort the leader replicas, if any, according to the request preferences (none if
+      // !sendToLeaders)
+      replicaListTransformer.transform(sortedReplicas);
+
+      // Sort the replicas, if any, according to the request preferences and append to our list
+      replicaListTransformer.transform(replicas);
+
+      sortedReplicas.addAll(replicas);
+
+      final String joinedInputCollections = StrUtils.join(inputCollections, ',');
+      final Set<String> seenNodes = new HashSet<>();
+      sortedReplicas.forEach(replica -> {
+        if (seenNodes.add(replica.getNodeName())) {
+          theUrlList.add(ZkCoreNodeProps.getCoreUrl(replica.getBaseUrl(), joinedInputCollections));
+        }
+      });
+
+      if (theUrlList.isEmpty()) {
+        collectionStateCache.keySet().removeAll(collectionNames);
+        throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Could not find a healthy node to handle the request.");
+      }
+    }
+
+    final ModifiedLBSolrClient.Req req = new ModifiedLBSolrClient.Req(request, theUrlList);
+    final ModifiedLBSolrClient.Rsp rsp = getLbClient().request(req);
+    return rsp.getResponse();
+  }
+
+  /**
+   * Resolves the input collections to their possible aliased collections. Doesn't validate collection existence.
+   */
+  private Set<String> resolveAliases(final List<String> inputCollections, final boolean isUpdate) {
+    if (inputCollections.isEmpty()) {
+      return Collections.emptySet();
+    }
+    final LinkedHashSet<String> uniqueNames = new LinkedHashSet<>(); // consistent ordering
+    for (final String collectionName : inputCollections) {
+      if (getClusterStateProvider().getState(collectionName) == null) {
+        // perhaps it's an alias
+        uniqueNames.addAll(getClusterStateProvider().resolveAlias(collectionName));
+      } else {
+        uniqueNames.add(collectionName); // it's a collection
+      }
+    }
+    return uniqueNames;
+  }
+
+  public boolean isUpdatesToLeaders() {
+    return updatesToLeaders;
+  }
+
+  /**
+   * @return true if direct updates are sent to shard leaders only
+   */
+  public boolean isDirectUpdatesToLeadersOnly() {
+    return directUpdatesToLeadersOnly;
+  }
+
+  /**
+   * If caches are expired they are refreshed after acquiring a lock. use this to set the number of locks
+   */
+  public void setParallelCacheRefreshes(final int n) {
+    locks = objectList(n);
+  }
+
+  protected static ArrayList<Object> objectList(final int n) {
+    final ArrayList<Object> l = new ArrayList<>(n);
+    for (int i = 0; i < n; i++)
+      l.add(new Object());
+    return l;
+  }
+
+  protected DocCollection getDocCollection(final String collection, Integer expectedVersion) throws SolrException {
+    if (expectedVersion == null)
+      expectedVersion = -1;
+    if (collection == null)
+      return null;
+    ExpiringCachedDocCollection cacheEntry = collectionStateCache.get(collection);
+    DocCollection col = cacheEntry == null ? null : cacheEntry.cached;
+    if (col != null) {
+      if (expectedVersion <= col.getZNodeVersion() && !cacheEntry.shouldRetry())
+        return col;
+    }
+
+    final ClusterState.CollectionRef ref = getCollectionRef(collection);
+    if (ref == null) {
+      // no such collection exists
+      return null;
+    }
+    if (!ref.isLazilyLoaded()) {
+      // it is readily available just return it
+      return ref.get();
+    }
+    final List<Object> locks = this.locks;
+    final Object lock = locks.get(Math.abs(Hash.murmurhash3_x86_32(collection, 0, collection.length(), 0) % locks.size()));
+    DocCollection fetchedCol = null;
+    synchronized (lock) {
+      /* we have waited for sometime just check once again */
+      cacheEntry = collectionStateCache.get(collection);
+      col = cacheEntry == null ? null : cacheEntry.cached;
+      if (col != null) {
+        if (expectedVersion <= col.getZNodeVersion() && !cacheEntry.shouldRetry())
+          return col;
+      }
+      // We are going to fetch a new version
+      // we MUST try to get a new version
+      fetchedCol = ref.get(); // this is a call to ZK
+      if (fetchedCol == null)
+        return null; // this collection no more exists
+      if (col != null && fetchedCol.getZNodeVersion() == col.getZNodeVersion()) {
+        cacheEntry.setRetriedAt(); // we retried and found that it is the same version
+        cacheEntry.maybeStale = false;
+      } else {
+        collectionStateCache.put(collection, new ExpiringCachedDocCollection(fetchedCol));
+      }
+      return fetchedCol;
+    }
+  }
+
+  ClusterState.CollectionRef getCollectionRef(final String collection) {
+    return getClusterStateProvider().getState(collection);
+  }
+
+  /**
+   * Useful for determining the minimum achieved replication factor across all shards involved in processing an update request, typically useful for gauging the replication factor of a batch.
+   */
+  public int getMinAchievedReplicationFactor(final String collection, final NamedList<?> resp) {
+    // it's probably already on the top-level header set by condense
+    final NamedList<?> header = (NamedList<?>) resp.get("responseHeader");
+    Integer achRf = (Integer) header.get(ModifiedUpdateRequest.REPFACT);
+    if (achRf != null)
+      return achRf.intValue();
+
+    // not on the top-level header, walk the shard route tree
+    final Map<String, Integer> shardRf = getShardReplicationFactor(collection, resp);
+    for (final Integer rf : shardRf.values()) {
+      if (achRf == null || rf < achRf) {
+        achRf = rf;
+      }
+    }
+    return (achRf != null) ? achRf.intValue() : -1;
+  }
+
+  /**
+   * Walks the NamedList response after performing an update request looking for the replication factor that was achieved in each shard involved in the request. For single doc updates, there will be
+   * only one shard in the return value.
+   */
+  public Map<String, Integer> getShardReplicationFactor(final String collection, final NamedList<?> resp) {
+    connect();
+
+    final Map<String, Integer> results = new HashMap<>();
+    if (resp instanceof RouteResponse) {
+      final NamedList<NamedList<?>> routes = ((RouteResponse<?>) resp).getRouteResponses();
+      final DocCollection coll = getDocCollection(collection, null);
+      final Map<String, String> leaders = new HashMap<>();
+      for (final Slice slice : coll.getActiveSlicesArr()) {
+        final Replica leader = slice.getLeader();
+        if (leader != null) {
+          final ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+          final String leaderUrl = zkProps.getBaseUrl() + "/" + zkProps.getCoreName();
+          leaders.put(leaderUrl, slice.getName());
+          final String altLeaderUrl = zkProps.getBaseUrl() + "/" + collection;
+          leaders.put(altLeaderUrl, slice.getName());
+        }
+      }
+
+      final Iterator<Map.Entry<String, NamedList<?>>> routeIter = routes.iterator();
+      while (routeIter.hasNext()) {
+        final Map.Entry<String, NamedList<?>> next = routeIter.next();
+        final String host = next.getKey();
+        final NamedList<?> hostResp = next.getValue();
+        final Integer rf = (Integer) ((NamedList<?>) hostResp.get("responseHeader")).get(ModifiedUpdateRequest.REPFACT);
+        if (rf != null) {
+          String shard = leaders.get(host);
+          if (shard == null) {
+            if (host.endsWith("/"))
+              shard = leaders.get(host.substring(0, host.length() - 1));
+            if (shard == null) {
+              shard = host;
+            }
+          }
+          results.put(shard, rf);
+        }
+      }
+    }
+    return results;
+  }
+
+  private static boolean hasInfoToFindLeaders(final ModifiedUpdateRequest updateRequest, final String idField) {
+    final Map<SolrInputDocument, Map<String, Object>> documents = updateRequest.getDocumentsMap();
+    final Map<String, Map<String, Object>> deleteById = updateRequest.getDeleteByIdMap();
+
+    final boolean hasNoDocuments = (documents == null || documents.isEmpty());
+    final boolean hasNoDeleteById = (deleteById == null || deleteById.isEmpty());
+    if (hasNoDocuments && hasNoDeleteById) {
+      // no documents and no delete-by-id, so no info to find leader(s)
+      return false;
+    }
+
+    if (documents != null) {
+      for (final Map.Entry<SolrInputDocument, Map<String, Object>> entry : documents.entrySet()) {
+        final SolrInputDocument doc = entry.getKey();
+        final Object fieldValue = doc.getFieldValue(idField);
+        if (fieldValue == null) {
+          // a document with no id field value, so can't find leader for it
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+}

Added: manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedHttp2ClusterStateProvider.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedHttp2ClusterStateProvider.java?rev=1909097&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedHttp2ClusterStateProvider.java (added)
+++ manifoldcf/branches/CONNECTORS-1740/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedHttp2ClusterStateProvider.java Wed Apr 12 14:35:38 2023
@@ -0,0 +1,30 @@
+package org.apache.manifoldcf.agents.output.solr;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.BaseHttpClusterStateProvider;
+
+public class ModifiedHttp2ClusterStateProvider extends BaseHttpClusterStateProvider {
+  final ModifiedHttp2SolrClient httpClient;
+  final boolean closeClient;
+
+  public ModifiedHttp2ClusterStateProvider(final List<String> solrUrls, final ModifiedHttp2SolrClient httpClient) throws Exception {
+    this.httpClient = httpClient == null ? new ModifiedHttp2SolrClient.Builder().build() : httpClient;
+    this.closeClient = httpClient == null;
+    init(solrUrls);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.closeClient && this.httpClient != null) {
+      httpClient.close();
+    }
+  }
+
+  @Override
+  protected SolrClient getSolrClient(final String baseUrl) {
+    return new ModifiedHttp2SolrClient.Builder(baseUrl).withHttpClient(httpClient).build();
+  }
+}