You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/23 00:05:43 UTC

[22/52] [abbrv] [partial] lucene-solr:jira/gradle: Add gradle support for Solr

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
deleted file mode 100644
index 8ec3c8b..0000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.http.client.HttpClient;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.CoreAdminRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrjNamedThreadFactory;
-import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.util.TimeOut;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
-
-class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
-
-  private static final int MAX_BOOTSTRAP_ATTEMPTS = 5;
-  private static final int BOOTSTRAP_RETRY_DELAY_MS = 2000;
-  // 6 hours is hopefully long enough for most indexes
-  private static final long BOOTSTRAP_TIMEOUT_SECONDS = 6L * 3600L * 3600L;
-
-  private List<CdcrReplicatorState> replicatorStates;
-
-  private final CdcrReplicatorScheduler scheduler;
-  private CdcrProcessStateManager processStateManager;
-  private CdcrLeaderStateManager leaderStateManager;
-
-  private SolrCore core;
-  private String path;
-
-  private ExecutorService bootstrapExecutor;
-  private volatile BootstrapStatusRunnable bootstrapStatusRunnable;
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  CdcrReplicatorManager(final SolrCore core, String path,
-                        SolrParams replicatorConfiguration,
-                        Map<String, List<SolrParams>> replicasConfiguration) {
-    this.core = core;
-    this.path = path;
-
-    // create states
-    replicatorStates = new ArrayList<>();
-    String myCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
-    List<SolrParams> targets = replicasConfiguration.get(myCollection);
-    if (targets != null) {
-      for (SolrParams params : targets) {
-        String zkHost = params.get(CdcrParams.ZK_HOST_PARAM);
-        String targetCollection = params.get(CdcrParams.TARGET_COLLECTION_PARAM);
-
-        CloudSolrClient client = new Builder(Collections.singletonList(zkHost), Optional.empty())
-            .sendUpdatesOnlyToShardLeaders()
-            .build();
-        client.setDefaultCollection(targetCollection);
-        replicatorStates.add(new CdcrReplicatorState(targetCollection, zkHost, client));
-      }
-    }
-
-    this.scheduler = new CdcrReplicatorScheduler(this, replicatorConfiguration);
-  }
-
-  void setProcessStateManager(final CdcrProcessStateManager processStateManager) {
-    this.processStateManager = processStateManager;
-    this.processStateManager.register(this);
-  }
-
-  void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
-    this.leaderStateManager = leaderStateManager;
-    this.leaderStateManager.register(this);
-  }
-
-  /**
-   * <p>
-   * Inform the replicator manager of a change of state, and tell him to update its own state.
-   * </p>
-   * <p>
-   * If we are the leader and the process state is STARTED, we need to initialise the log readers and start the
-   * scheduled thread poll.
-   * Otherwise, if the process state is STOPPED or if we are not the leader, we need to close the log readers and stop
-   * the thread pool.
-   * </p>
-   * <p>
-   * This method is synchronised as it can both be called by the leaderStateManager and the processStateManager.
-   * </p>
-   */
-  @Override
-  public synchronized void stateUpdate() {
-    if (leaderStateManager.amILeader() && processStateManager.getState().equals(CdcrParams.ProcessState.STARTED)) {
-      if (replicatorStates.size() > 0)  {
-        this.bootstrapExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(replicatorStates.size(),
-            new SolrjNamedThreadFactory("cdcr-bootstrap-status"));
-      }
-      this.initLogReaders();
-      this.scheduler.start();
-      return;
-    }
-
-    this.scheduler.shutdown();
-    if (bootstrapExecutor != null)  {
-      IOUtils.closeQuietly(bootstrapStatusRunnable);
-      ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
-    }
-    this.closeLogReaders();
-    Callable callable = core.getSolrCoreState().getCdcrBootstrapCallable();
-    if (callable != null)  {
-      CdcrRequestHandler.BootstrapCallable bootstrapCallable = (CdcrRequestHandler.BootstrapCallable) callable;
-      IOUtils.closeQuietly(bootstrapCallable);
-    }
-  }
-
-  List<CdcrReplicatorState> getReplicatorStates() {
-    return replicatorStates;
-  }
-
-  private void initLogReaders() {
-    String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
-    String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-    CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
-
-    for (CdcrReplicatorState state : replicatorStates) {
-      state.closeLogReader();
-      try {
-        long checkpoint = this.getCheckpoint(state);
-        log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
-            checkpoint, collectionName, shard);
-        CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
-        boolean seek = reader.seek(checkpoint);
-        state.init(reader);
-        if (!seek) {
-          // targetVersion is lower than the oldest known entry.
-          // In this scenario, it probably means that there is a gap in the updates log.
-          // the best we can do here is to bootstrap the target leader by replicating the full index
-          final String targetCollection = state.getTargetCollection();
-          state.setBootstrapInProgress(true);
-          log.info("Attempting to bootstrap target collection: {}, shard: {}", targetCollection, shard);
-          bootstrapStatusRunnable = new BootstrapStatusRunnable(core, state);
-          log.info("Submitting bootstrap task to executor");
-          try {
-            bootstrapExecutor.submit(bootstrapStatusRunnable);
-          } catch (Exception e) {
-            log.error("Unable to submit bootstrap call to executor", e);
-          }
-        }
-      } catch (IOException | SolrServerException | SolrException e) {
-        log.warn("Unable to instantiate the log reader for target collection " + state.getTargetCollection(), e);
-      } catch (InterruptedException e) {
-        log.warn("Thread interrupted while instantiate the log reader for target collection " + state.getTargetCollection(), e);
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-
-  private long getCheckpoint(CdcrReplicatorState state) throws IOException, SolrServerException {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
-
-    SolrRequest request = new QueryRequest(params);
-    request.setPath(path);
-
-    NamedList response = state.getClient().request(request);
-    return (Long) response.get(CdcrParams.CHECKPOINT);
-  }
-
-  void closeLogReaders() {
-    for (CdcrReplicatorState state : replicatorStates) {
-      state.closeLogReader();
-    }
-  }
-
-  /**
-   * Shutdown all the {@link org.apache.solr.handler.CdcrReplicatorState} by closing their
-   * {@link org.apache.solr.client.solrj.impl.CloudSolrClient} and
-   * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}.
-   */
-  void shutdown() {
-    this.scheduler.shutdown();
-    if (bootstrapExecutor != null)  {
-      IOUtils.closeQuietly(bootstrapStatusRunnable);
-      ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
-    }
-    for (CdcrReplicatorState state : replicatorStates) {
-      state.shutdown();
-    }
-    replicatorStates.clear();
-  }
-
-  private class BootstrapStatusRunnable implements Runnable, Closeable {
-    private final CdcrReplicatorState state;
-    private final String targetCollection;
-    private final String shard;
-    private final String collectionName;
-    private final CdcrUpdateLog ulog;
-    private final String myCoreUrl;
-
-    private volatile boolean closed = false;
-
-    BootstrapStatusRunnable(SolrCore core, CdcrReplicatorState state) {
-      this.collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
-      this.shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-      this.ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
-      this.state = state;
-      this.targetCollection = state.getTargetCollection();
-      String baseUrl = core.getCoreContainer().getZkController().getBaseUrl();
-      this.myCoreUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, core.getName());
-    }
-
-    @Override
-    public void close() throws IOException {
-      closed = true;
-      try {
-        Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
-        String leaderCoreUrl = leader.getCoreUrl();
-        HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
-        try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
-          sendCdcrCommand(client, CdcrParams.CdcrAction.CANCEL_BOOTSTRAP);
-        } catch (SolrServerException e) {
-          log.error("Error sending cancel bootstrap message to target collection: {} shard: {} leader: {}",
-              targetCollection, shard, leaderCoreUrl);
-        }
-      } catch (InterruptedException e) {
-        log.error("Interrupted while closing BootstrapStatusRunnable", e);
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    @Override
-    public void run() {
-      int retries = 1;
-      boolean success = false;
-      try {
-        while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED)  {
-          Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
-        }
-        TimeOut timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-        while (!timeOut.hasTimedOut()) {
-          if (closed) {
-            log.warn("Cancelling waiting for bootstrap on target: {} shard: {} to complete", targetCollection, shard);
-            state.setBootstrapInProgress(false);
-            break;
-          }
-          BootstrapStatus status = getBoostrapStatus();
-          if (status == BootstrapStatus.RUNNING) {
-            try {
-              log.info("CDCR bootstrap running for {} seconds, sleeping for {} ms",
-                  BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS), BOOTSTRAP_RETRY_DELAY_MS);
-              timeOut.sleep(BOOTSTRAP_RETRY_DELAY_MS);
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-            }
-          } else if (status == BootstrapStatus.COMPLETED) {
-            log.info("CDCR bootstrap successful in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
-            long checkpoint = CdcrReplicatorManager.this.getCheckpoint(state);
-            log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
-                checkpoint, collectionName, shard);
-            CdcrUpdateLog.CdcrLogReader reader1 = ulog.newLogReader();
-            reader1.seek(checkpoint);
-            // issue asynchronous request_recovery to the follower nodes of the shards of target collection
-            sendRequestRecoveryToFollowers(state);
-            success = true;
-            break;
-          } else if (status == BootstrapStatus.FAILED) {
-            log.warn("CDCR bootstrap failed in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
-            // let's retry a fixed number of times before giving up
-            if (retries >= MAX_BOOTSTRAP_ATTEMPTS) {
-              log.error("Unable to bootstrap the target collection: {}, shard: {} even after {} retries", targetCollection, shard, retries);
-              break;
-            } else {
-              log.info("Retry: {} - Attempting to bootstrap target collection: {} shard: {}", retries, targetCollection, shard);
-              while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED)  {
-                Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
-              }
-              timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS, TimeSource.NANO_TIME); // reset the timer
-              retries++;
-            }
-          } else if (status == BootstrapStatus.NOTFOUND || status == BootstrapStatus.CANCELLED) {
-            log.info("CDCR bootstrap " + (status == BootstrapStatus.NOTFOUND ? "not found" : "cancelled") + "in {} seconds",
-                BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
-            // the leader of the target shard may have changed and therefore there is no record of the
-            // bootstrap process so we must retry the operation
-            while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED)  {
-              Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
-            }
-            retries = 1;
-            timeOut = new TimeOut(6L * 3600L * 3600L, TimeUnit.SECONDS, TimeSource.NANO_TIME); // reset the timer
-          } else if (status == BootstrapStatus.UNKNOWN || status == BootstrapStatus.SUBMITTED) {
-            log.info("CDCR bootstrap is " + (status == BootstrapStatus.UNKNOWN ? "unknown" : "submitted"),
-                BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
-            // we were not able to query the status on the remote end
-            // so just sleep for a bit and try again
-            timeOut.sleep(BOOTSTRAP_RETRY_DELAY_MS);
-          }
-        }
-      } catch (InterruptedException e) {
-        log.info("Bootstrap thread interrupted");
-        state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
-        Thread.currentThread().interrupt();
-      } catch (IOException | SolrServerException | SolrException e) {
-        log.error("Unable to bootstrap the target collection " + targetCollection + " shard: " + shard, e);
-        state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
-      } finally {
-        if (success) {
-          log.info("Bootstrap successful, giving the go-ahead to replicator");
-          state.setBootstrapInProgress(false);
-        }
-      }
-    }
-
-    private BootstrapStatus sendBootstrapCommand() throws InterruptedException {
-      Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
-      String leaderCoreUrl = leader.getCoreUrl();
-      HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
-      try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
-        log.info("Attempting to bootstrap target collection: {} shard: {} leader: {}", targetCollection, shard, leaderCoreUrl);
-        try {
-          NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP, ReplicationHandler.MASTER_URL, myCoreUrl);
-          log.debug("CDCR Bootstrap response: {}", response);
-          String status = response.get(RESPONSE_STATUS).toString();
-          return BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
-        } catch (Exception e) {
-          log.error("Exception submitting bootstrap request", e);
-          return BootstrapStatus.UNKNOWN;
-        }
-      } catch (IOException e) {
-        log.error("There shouldn't be an IOException while closing but there was!", e);
-      }
-      return BootstrapStatus.UNKNOWN;
-    }
-
-    private BootstrapStatus getBoostrapStatus() throws InterruptedException {
-      try {
-        Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
-        String leaderCoreUrl = leader.getCoreUrl();
-        HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
-        try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
-          NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
-          String status = (String) response.get(RESPONSE_STATUS);
-          BootstrapStatus bootstrapStatus = BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
-          if (bootstrapStatus == BootstrapStatus.RUNNING) {
-            return BootstrapStatus.RUNNING;
-          } else if (bootstrapStatus == BootstrapStatus.COMPLETED) {
-            return BootstrapStatus.COMPLETED;
-          } else if (bootstrapStatus == BootstrapStatus.FAILED) {
-            return BootstrapStatus.FAILED;
-          } else if (bootstrapStatus == BootstrapStatus.NOTFOUND) {
-            log.warn("Bootstrap process was not found on target collection: {} shard: {}, leader: {}", targetCollection, shard, leaderCoreUrl);
-            return BootstrapStatus.NOTFOUND;
-          } else if (bootstrapStatus == BootstrapStatus.CANCELLED) {
-            return BootstrapStatus.CANCELLED;
-          } else {
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                "Unknown status: " + status + " returned by BOOTSTRAP_STATUS command");
-          }
-        }
-      } catch (Exception e) {
-        log.error("Exception during bootstrap status request", e);
-        return BootstrapStatus.UNKNOWN;
-      }
-    }
-  }
-
-  private NamedList sendCdcrCommand(SolrClient client, CdcrParams.CdcrAction action, String... params) throws SolrServerException, IOException {
-    ModifiableSolrParams solrParams = new ModifiableSolrParams();
-    solrParams.set(CommonParams.QT, "/cdcr");
-    solrParams.set(CommonParams.ACTION, action.toString());
-    for (int i = 0; i < params.length - 1; i+=2) {
-      solrParams.set(params[i], params[i + 1]);
-    }
-    SolrRequest request = new QueryRequest(solrParams);
-    return client.request(request);
-  }
-
-  private void sendRequestRecoveryToFollowers(CdcrReplicatorState state) throws SolrServerException, IOException {
-    Collection<Slice> slices = state.getClient().getZkStateReader().getClusterState().getCollection(state.getTargetCollection()).getActiveSlices();
-    for (Slice slice : slices) {
-      Collection<Replica> replicas = slice.getReplicas();
-      for (Replica replica : replicas) {
-        if (slice.getLeader().getCoreName().equals(replica.getCoreName())) {
-          continue; // no need to request recovery for leader
-        }
-        sendRequestRecoveryToFollower(state.getClient(), replica.getCoreName());
-        log.info("RequestRecovery cmd is issued by core: " + replica.getCoreName() + " of shard: " + slice.getName() +
-            "for target: " + state.getTargetCollection());
-      }
-    }
-  }
-
-  private NamedList sendRequestRecoveryToFollower(SolrClient client, String coreName) throws SolrServerException, IOException {
-    CoreAdminRequest.RequestRecovery recoverRequestCmd = new CoreAdminRequest.RequestRecovery();
-    recoverRequestCmd.setAction(CoreAdminParams.CoreAdminAction.REQUESTRECOVERY);
-    recoverRequestCmd.setCoreName(coreName);
-    return client.request(recoverRequestCmd);
-  }
-
-
-  private enum BootstrapStatus  {
-    SUBMITTED,
-    RUNNING,
-    COMPLETED,
-    FAILED,
-    NOTFOUND,
-    CANCELLED,
-    UNKNOWN
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
deleted file mode 100644
index 62abeab..0000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.*;
-
-/**
- * Schedule the execution of the {@link org.apache.solr.handler.CdcrReplicator} threads at
- * regular time interval. It relies on a queue of {@link org.apache.solr.handler.CdcrReplicatorState} in
- * order to avoid that one {@link org.apache.solr.handler.CdcrReplicatorState} is used by two threads at the same
- * time.
- */
-class CdcrReplicatorScheduler {
-
-  private boolean isStarted = false;
-
-  private ScheduledExecutorService scheduler;
-  private ExecutorService replicatorsPool;
-
-  private final CdcrReplicatorManager replicatorManager;
-  private final ConcurrentLinkedQueue<CdcrReplicatorState> statesQueue;
-
-  private int poolSize = DEFAULT_POOL_SIZE;
-  private int timeSchedule = DEFAULT_TIME_SCHEDULE;
-  private int batchSize = DEFAULT_BATCH_SIZE;
-
-  private static final int DEFAULT_POOL_SIZE = 2;
-  private static final int DEFAULT_TIME_SCHEDULE = 10;
-  private static final int DEFAULT_BATCH_SIZE = 128;
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  CdcrReplicatorScheduler(final CdcrReplicatorManager replicatorStatesManager, final SolrParams replicatorConfiguration) {
-    this.replicatorManager = replicatorStatesManager;
-    this.statesQueue = new ConcurrentLinkedQueue<>(replicatorManager.getReplicatorStates());
-    if (replicatorConfiguration != null) {
-      poolSize = replicatorConfiguration.getInt(CdcrParams.THREAD_POOL_SIZE_PARAM, DEFAULT_POOL_SIZE);
-      timeSchedule = replicatorConfiguration.getInt(CdcrParams.SCHEDULE_PARAM, DEFAULT_TIME_SCHEDULE);
-      batchSize = replicatorConfiguration.getInt(CdcrParams.BATCH_SIZE_PARAM, DEFAULT_BATCH_SIZE);
-    }
-  }
-
-  void start() {
-    if (!isStarted) {
-      scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-scheduler"));
-      replicatorsPool = ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new DefaultSolrThreadFactory("cdcr-replicator"));
-
-      // the scheduler thread is executed every second and submits one replication task
-      // per available state in the queue
-      scheduler.scheduleWithFixedDelay(() -> {
-        int nCandidates = statesQueue.size();
-        for (int i = 0; i < nCandidates; i++) {
-          // a thread that poll one state from the queue, execute the replication task, and push back
-          // the state in the queue when the task is completed
-          replicatorsPool.execute(() -> {
-            CdcrReplicatorState state = statesQueue.poll();
-            assert state != null; // Should never happen
-            try {
-              if (!state.isBootstrapInProgress()) {
-                new CdcrReplicator(state, batchSize).run();
-              } else  {
-                log.debug("Replicator state is bootstrapping, skipping replication for target collection {}", state.getTargetCollection());
-              }
-            } finally {
-              statesQueue.offer(state);
-            }
-          });
-
-        }
-      }, 0, timeSchedule, TimeUnit.MILLISECONDS);
-      isStarted = true;
-    }
-  }
-
-  void shutdown() {
-    if (isStarted) {
-      // interrupts are often dangerous in Lucene / Solr code, but the
-      // test for this will leak threads without
-      replicatorsPool.shutdown();
-      try {
-        replicatorsPool.awaitTermination(60, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        log.warn("Thread interrupted while waiting for CDCR replicator threadpool close.");
-        Thread.currentThread().interrupt();
-      } finally {
-        scheduler.shutdownNow();
-        isStarted = false;
-      }
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
deleted file mode 100644
index bf80608..0000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.update.UpdateLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The state of the replication with a target cluster.
- */
-class CdcrReplicatorState {
-
-  private final String targetCollection;
-  private final String zkHost;
-  private final CloudSolrClient targetClient;
-
-  private CdcrUpdateLog.CdcrLogReader logReader;
-
-  private long consecutiveErrors = 0;
-  private final Map<ErrorType, Long> errorCounters = new HashMap<>();
-  private final FixedQueue<ErrorQueueEntry> errorsQueue = new FixedQueue<>(100); // keep the last 100 errors
-
-  private BenchmarkTimer benchmarkTimer;
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private final AtomicBoolean bootstrapInProgress = new AtomicBoolean(false);
-  private final AtomicInteger numBootstraps = new AtomicInteger();
-
-  CdcrReplicatorState(final String targetCollection, final String zkHost, final CloudSolrClient targetClient) {
-    this.targetCollection = targetCollection;
-    this.targetClient = targetClient;
-    this.zkHost = zkHost;
-    this.benchmarkTimer = new BenchmarkTimer();
-  }
-
-  /**
-   * Initialise the replicator state with a {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}
-   * that is positioned at the last target cluster checkpoint.
-   */
-  void init(final CdcrUpdateLog.CdcrLogReader logReader) {
-    this.logReader = logReader;
-  }
-
-  void closeLogReader() {
-    if (logReader != null) {
-      logReader.close();
-      logReader = null;
-    }
-  }
-
-  CdcrUpdateLog.CdcrLogReader getLogReader() {
-    return logReader;
-  }
-
-  String getTargetCollection() {
-    return targetCollection;
-  }
-
-  String getZkHost() {
-    return zkHost;
-  }
-
-  CloudSolrClient getClient() {
-    return targetClient;
-  }
-
-  void shutdown() {
-    try {
-      targetClient.close();
-    } catch (IOException ioe) {
-      log.warn("Caught exception trying to close server: ", ioe.getMessage());
-    }
-    this.closeLogReader();
-  }
-
-  void reportError(ErrorType error) {
-    if (!errorCounters.containsKey(error)) {
-      errorCounters.put(error, 0l);
-    }
-    errorCounters.put(error, errorCounters.get(error) + 1);
-    errorsQueue.add(new ErrorQueueEntry(error, new Date()));
-    consecutiveErrors++;
-  }
-
-  void resetConsecutiveErrors() {
-    consecutiveErrors = 0;
-  }
-
-  /**
-   * Returns the number of consecutive errors encountered while trying to forward updates to the target.
-   */
-  long getConsecutiveErrors() {
-    return consecutiveErrors;
-  }
-
-  /**
-   * Gets the number of errors of a particular type.
-   */
-  long getErrorCount(ErrorType type) {
-    if (errorCounters.containsKey(type)) {
-      return errorCounters.get(type);
-    } else {
-      return 0;
-    }
-  }
-
-  /**
-   * Gets the last errors ordered by timestamp (most recent first)
-   */
-  List<String[]> getLastErrors() {
-    List<String[]> lastErrors = new ArrayList<>();
-    synchronized (errorsQueue) {
-      Iterator<ErrorQueueEntry> it = errorsQueue.iterator();
-      while (it.hasNext()) {
-        ErrorQueueEntry entry = it.next();
-        lastErrors.add(new String[]{entry.timestamp.toInstant().toString(), entry.type.toLower()});
-      }
-    }
-    return lastErrors;
-  }
-
-  /**
-   * Return the timestamp of the last processed operations
-   */
-  String getTimestampOfLastProcessedOperation() {
-    if (logReader != null && logReader.getLastVersion() != -1) {
-      // Shift back to the right by 20 bits the version number - See VersionInfo#getNewClock
-      return Instant.ofEpochMilli(logReader.getLastVersion() >> 20).toString();
-    }
-    return "";
-  }
-
-  /**
-   * Gets the benchmark timer.
-   */
-  BenchmarkTimer getBenchmarkTimer() {
-    return this.benchmarkTimer;
-  }
-
-  /**
-   * @return true if a bootstrap operation is in progress, false otherwise
-   */
-  boolean isBootstrapInProgress() {
-    return bootstrapInProgress.get();
-  }
-
-  void setBootstrapInProgress(boolean inProgress) {
-    if (bootstrapInProgress.compareAndSet(true, false)) {
-      numBootstraps.incrementAndGet();
-    }
-    bootstrapInProgress.set(inProgress);
-  }
-
-  public int getNumBootstraps() {
-    return numBootstraps.get();
-  }
-
-  enum ErrorType {
-    INTERNAL,
-    BAD_REQUEST;
-
-    public String toLower() {
-      return toString().toLowerCase(Locale.ROOT);
-    }
-
-  }
-
-  static class BenchmarkTimer {
-
-    private long startTime;
-    private long runTime = 0;
-    private Map<Integer, Long> opCounters = new HashMap<>();
-
-    /**
-     * Start recording time.
-     */
-    void start() {
-      startTime = System.nanoTime();
-    }
-
-    /**
-     * Stop recording time.
-     */
-    void stop() {
-      runTime += System.nanoTime() - startTime;
-      startTime = -1;
-    }
-
-    void incrementCounter(final int operationType) {
-      switch (operationType) {
-        case UpdateLog.ADD:
-        case UpdateLog.DELETE:
-        case UpdateLog.DELETE_BY_QUERY: {
-          if (!opCounters.containsKey(operationType)) {
-            opCounters.put(operationType, 0l);
-          }
-          opCounters.put(operationType, opCounters.get(operationType) + 1);
-          return;
-        }
-
-        default:
-      }
-    }
-
-    long getRunTime() {
-      long totalRunTime = runTime;
-      if (startTime != -1) { // we are currently recording the time
-        totalRunTime += System.nanoTime() - startTime;
-      }
-      return totalRunTime;
-    }
-
-    double getOperationsPerSecond() {
-      long total = 0;
-      for (long counter : opCounters.values()) {
-        total += counter;
-      }
-      double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
-      return total / elapsedTimeInSeconds;
-    }
-
-    double getAddsPerSecond() {
-      long total = opCounters.get(UpdateLog.ADD) != null ? opCounters.get(UpdateLog.ADD) : 0;
-      double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
-      return total / elapsedTimeInSeconds;
-    }
-
-    double getDeletesPerSecond() {
-      long total = opCounters.get(UpdateLog.DELETE) != null ? opCounters.get(UpdateLog.DELETE) : 0;
-      total += opCounters.get(UpdateLog.DELETE_BY_QUERY) != null ? opCounters.get(UpdateLog.DELETE_BY_QUERY) : 0;
-      double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
-      return total / elapsedTimeInSeconds;
-    }
-
-  }
-
-  private static class ErrorQueueEntry {
-
-    private ErrorType type;
-    private Date timestamp;
-
-    private ErrorQueueEntry(ErrorType type, Date timestamp) {
-      this.type = type;
-      this.timestamp = timestamp;
-    }
-  }
-
-  private static class FixedQueue<E> extends LinkedList<E> {
-
-    private int maxSize;
-
-    public FixedQueue(int maxSize) {
-      this.maxSize = maxSize;
-    }
-
-    @Override
-    public synchronized boolean add(E e) {
-      super.addFirst(e);
-      if (size() > maxSize) {
-        removeLast();
-      }
-      return true;
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java b/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
deleted file mode 100644
index 1453841..0000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
+++ /dev/null
@@ -1,861 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.core.CloseHook;
-import org.apache.solr.core.PluginBag;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.request.SolrRequestHandler;
-import org.apache.solr.request.SolrRequestInfo;
-import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.update.SolrCoreState;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.VersionInfo;
-import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.apache.solr.util.plugin.SolrCoreAware;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
-import static org.apache.solr.handler.admin.CoreAdminHandler.FAILED;
-import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE;
-import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_MESSAGE;
-import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
-import static org.apache.solr.handler.admin.CoreAdminHandler.RUNNING;
-
-/**
- * <p>
- * This request handler implements the CDCR API and is responsible of the execution of the
- * {@link CdcrReplicator} threads.
- * </p>
- * <p>
- * It relies on three classes, {@link org.apache.solr.handler.CdcrLeaderStateManager},
- * {@link org.apache.solr.handler.CdcrBufferStateManager} and {@link org.apache.solr.handler.CdcrProcessStateManager}
- * to synchronise the state of the CDCR across all the nodes.
- * </p>
- * <p>
- * The CDCR process can be either {@link org.apache.solr.handler.CdcrParams.ProcessState#STOPPED} or {@link org.apache.solr.handler.CdcrParams.ProcessState#STARTED} by using the
- * actions {@link org.apache.solr.handler.CdcrParams.CdcrAction#STOP} and {@link org.apache.solr.handler.CdcrParams.CdcrAction#START} respectively. If a node is leader and the process
- * state is {@link org.apache.solr.handler.CdcrParams.ProcessState#STARTED}, the {@link CdcrReplicatorManager} will
- * start the {@link CdcrReplicator} threads. If a node becomes non-leader or if the process state becomes
- * {@link org.apache.solr.handler.CdcrParams.ProcessState#STOPPED}, the {@link CdcrReplicator} threads are stopped.
- * </p>
- * <p>
- * The CDCR can be switched to a "buffering" mode, in which the update log will never delete old transaction log
- * files. Such a mode can be enabled or disabled using the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#ENABLEBUFFER} and
- * {@link org.apache.solr.handler.CdcrParams.CdcrAction#DISABLEBUFFER} respectively.
- * </p>
- * <p>
- * Known limitations: The source and target clusters must have the same topology. Replication between clusters
- * with a different number of shards will likely results in an inconsistent index.
- * </p>
- */
-public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAware {
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private SolrCore core;
-  private String collection;
-  private String shard;
-  private String path;
-
-  private SolrParams updateLogSynchronizerConfiguration;
-  private SolrParams replicatorConfiguration;
-  private SolrParams bufferConfiguration;
-  private Map<String, List<SolrParams>> replicasConfiguration;
-
-  private CdcrProcessStateManager processStateManager;
-  private CdcrBufferStateManager bufferStateManager;
-  private CdcrReplicatorManager replicatorManager;
-  private CdcrLeaderStateManager leaderStateManager;
-  private CdcrUpdateLogSynchronizer updateLogSynchronizer;
-  private CdcrBufferManager bufferManager;
-
-  @Override
-  public void init(NamedList args) {
-    super.init(args);
-
-    if (args != null) {
-      // Configuration of the Update Log Synchronizer
-      Object updateLogSynchonizerParam = args.get(CdcrParams.UPDATE_LOG_SYNCHRONIZER_PARAM);
-      if (updateLogSynchonizerParam != null && updateLogSynchonizerParam instanceof NamedList) {
-        updateLogSynchronizerConfiguration = ((NamedList) updateLogSynchonizerParam).toSolrParams();
-      }
-
-      // Configuration of the Replicator
-      Object replicatorParam = args.get(CdcrParams.REPLICATOR_PARAM);
-      if (replicatorParam != null && replicatorParam instanceof NamedList) {
-        replicatorConfiguration = ((NamedList) replicatorParam).toSolrParams();
-      }
-
-      // Configuration of the Buffer
-      Object bufferParam = args.get(CdcrParams.BUFFER_PARAM);
-      if (bufferParam != null && bufferParam instanceof NamedList) {
-        bufferConfiguration = ((NamedList) bufferParam).toSolrParams();
-      }
-
-      // Configuration of the Replicas
-      replicasConfiguration = new HashMap<>();
-      List replicas = args.getAll(CdcrParams.REPLICA_PARAM);
-      for (Object replica : replicas) {
-        if (replica != null && replica instanceof NamedList) {
-          SolrParams params = ((NamedList) replica).toSolrParams();
-          if (!replicasConfiguration.containsKey(params.get(CdcrParams.SOURCE_COLLECTION_PARAM))) {
-            replicasConfiguration.put(params.get(CdcrParams.SOURCE_COLLECTION_PARAM), new ArrayList<>());
-          }
-          replicasConfiguration.get(params.get(CdcrParams.SOURCE_COLLECTION_PARAM)).add(params);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
-    // Pick the action
-    SolrParams params = req.getParams();
-    CdcrParams.CdcrAction action = null;
-    String a = params.get(CommonParams.ACTION);
-    if (a != null) {
-      action = CdcrParams.CdcrAction.get(a);
-    }
-    if (action == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
-    }
-
-    switch (action) {
-      case START: {
-        this.handleStartAction(req, rsp);
-        break;
-      }
-      case STOP: {
-        this.handleStopAction(req, rsp);
-        break;
-      }
-      case STATUS: {
-        this.handleStatusAction(req, rsp);
-        break;
-      }
-      case COLLECTIONCHECKPOINT: {
-        this.handleCollectionCheckpointAction(req, rsp);
-        break;
-      }
-      case SHARDCHECKPOINT: {
-        this.handleShardCheckpointAction(req, rsp);
-        break;
-      }
-      case ENABLEBUFFER: {
-        this.handleEnableBufferAction(req, rsp);
-        break;
-      }
-      case DISABLEBUFFER: {
-        this.handleDisableBufferAction(req, rsp);
-        break;
-      }
-      case LASTPROCESSEDVERSION: {
-        this.handleLastProcessedVersionAction(req, rsp);
-        break;
-      }
-      case QUEUES: {
-        this.handleQueuesAction(req, rsp);
-        break;
-      }
-      case OPS: {
-        this.handleOpsAction(req, rsp);
-        break;
-      }
-      case ERRORS: {
-        this.handleErrorsAction(req, rsp);
-        break;
-      }
-      case BOOTSTRAP: {
-        this.handleBootstrapAction(req, rsp);
-        break;
-      }
-      case BOOTSTRAP_STATUS:  {
-        this.handleBootstrapStatus(req, rsp);
-        break;
-      }
-      case CANCEL_BOOTSTRAP:  {
-        this.handleCancelBootstrap(req, rsp);
-        break;
-      }
-      default: {
-        throw new RuntimeException("Unknown action: " + action);
-      }
-    }
-
-    rsp.setHttpCaching(false);
-  }
-
-  @Override
-  public void inform(SolrCore core) {
-    this.core = core;
-    collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
-    shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-
-    // Make sure that the core is ZKAware
-    if (!core.getCoreContainer().isZooKeeperAware()) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "Solr instance is not running in SolrCloud mode.");
-    }
-
-    // Make sure that the core is using the CdcrUpdateLog implementation
-    if (!(core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog)) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "Solr instance is not configured with the cdcr update log.");
-    }
-
-    // Find the registered path of the handler
-    path = null;
-    for (Map.Entry<String, PluginBag.PluginHolder<SolrRequestHandler>> entry : core.getRequestHandlers().getRegistry().entrySet()) {
-      if (core.getRequestHandlers().isLoaded(entry.getKey()) && entry.getValue().get() == this) {
-        path = entry.getKey();
-        break;
-      }
-    }
-    if (path == null) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "The CdcrRequestHandler is not registered with the current core.");
-    }
-    if (!path.startsWith("/")) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "The CdcrRequestHandler needs to be registered to a path. Typically this is '/cdcr'");
-    }
-
-    // Initialisation phase
-    // If the Solr cloud is being initialised, each CDCR node will start up in its default state, i.e., STOPPED
-    // and non-leader. The leader state will be updated later, when all the Solr cores have been loaded.
-    // If the Solr cloud has already been initialised, and the core is reloaded (i.e., because a node died or a new node
-    // is added to the cluster), the CDCR node will synchronise its state with the global CDCR state that is stored
-    // in zookeeper.
-
-    // Initialise the buffer state manager
-    bufferStateManager = new CdcrBufferStateManager(core, bufferConfiguration);
-    // Initialise the process state manager
-    processStateManager = new CdcrProcessStateManager(core);
-    // Initialise the leader state manager
-    leaderStateManager = new CdcrLeaderStateManager(core);
-
-    // Initialise the replicator states manager
-    replicatorManager = new CdcrReplicatorManager(core, path, replicatorConfiguration, replicasConfiguration);
-    replicatorManager.setProcessStateManager(processStateManager);
-    replicatorManager.setLeaderStateManager(leaderStateManager);
-    // we need to inform it of a state event since the process and leader state
-    // may have been synchronised during the initialisation
-    replicatorManager.stateUpdate();
-
-    // Initialise the update log synchronizer
-    updateLogSynchronizer = new CdcrUpdateLogSynchronizer(core, path, updateLogSynchronizerConfiguration);
-    updateLogSynchronizer.setLeaderStateManager(leaderStateManager);
-    // we need to inform it of a state event since the leader state
-    // may have been synchronised during the initialisation
-    updateLogSynchronizer.stateUpdate();
-
-    // Initialise the buffer manager
-    bufferManager = new CdcrBufferManager(core);
-    bufferManager.setLeaderStateManager(leaderStateManager);
-    bufferManager.setBufferStateManager(bufferStateManager);
-    // we need to inform it of a state event since the leader state
-    // may have been synchronised during the initialisation
-    bufferManager.stateUpdate();
-
-    // register the close hook
-    this.registerCloseHook(core);
-  }
-
-  /**
-   * register a close hook to properly shutdown the state manager and scheduler
-   */
-  private void registerCloseHook(SolrCore core) {
-    core.addCloseHook(new CloseHook() {
-
-      @Override
-      public void preClose(SolrCore core) {
-        log.info("Solr core is being closed - shutting down CDCR handler @ {}:{}", collection, shard);
-
-        updateLogSynchronizer.shutdown();
-        replicatorManager.shutdown();
-        bufferStateManager.shutdown();
-        processStateManager.shutdown();
-        leaderStateManager.shutdown();
-      }
-
-      @Override
-      public void postClose(SolrCore core) {
-      }
-
-    });
-  }
-
-  /**
-   * <p>
-   * Update and synchronize the process state.
-   * </p>
-   * <p>
-   * The process state manager must notify the replicator states manager of the change of state.
-   * </p>
-   */
-  private void handleStartAction(SolrQueryRequest req, SolrQueryResponse rsp) {
-    if (processStateManager.getState() == CdcrParams.ProcessState.STOPPED) {
-      processStateManager.setState(CdcrParams.ProcessState.STARTED);
-      processStateManager.synchronize();
-    }
-
-    rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
-  }
-
-  private void handleStopAction(SolrQueryRequest req, SolrQueryResponse rsp) {
-    if (processStateManager.getState() == CdcrParams.ProcessState.STARTED) {
-      processStateManager.setState(CdcrParams.ProcessState.STOPPED);
-      processStateManager.synchronize();
-    }
-
-    rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
-  }
-
-  private void handleStatusAction(SolrQueryRequest req, SolrQueryResponse rsp) {
-    rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
-  }
-
-  private NamedList getStatus() {
-    NamedList status = new NamedList();
-    status.add(CdcrParams.ProcessState.getParam(), processStateManager.getState().toLower());
-    status.add(CdcrParams.BufferState.getParam(), bufferStateManager.getState().toLower());
-    return status;
-  }
-
-  /**
-   * This action is generally executed on the target cluster in order to retrieve the latest update checkpoint.
-   * This checkpoint is used on the source cluster to setup the
-   * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader} of a shard leader. <br/>
-   * This method will execute in parallel one
-   * {@link org.apache.solr.handler.CdcrParams.CdcrAction#SHARDCHECKPOINT} request per shard leader. It will
-   * then pick the lowest version number as checkpoint. Picking the lowest amongst all shards will ensure that we do not
-   * pick a checkpoint that is ahead of the source cluster. This can occur when other shard leaders are sending new
-   * updates to the target cluster while we are currently instantiating the
-   * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}.
-   * This solution only works in scenarios where the topology of the source and target clusters are identical.
-   */
-  private void handleCollectionCheckpointAction(SolrQueryRequest req, SolrQueryResponse rsp)
-      throws IOException, SolrServerException {
-    ZkController zkController = core.getCoreContainer().getZkController();
-    try {
-      zkController.getZkStateReader().forceUpdateCollection(collection);
-    } catch (Exception e) {
-      log.warn("Error when updating cluster state", e);
-    }
-    ClusterState cstate = zkController.getClusterState();
-    DocCollection docCollection = cstate.getCollectionOrNull(collection);
-    Collection<Slice> shards = docCollection == null? null : docCollection.getActiveSlices();
-
-    ExecutorService parallelExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new DefaultSolrThreadFactory("parallelCdcrExecutor"));
-
-    long checkpoint = Long.MAX_VALUE;
-    try {
-      List<Callable<Long>> callables = new ArrayList<>();
-      for (Slice shard : shards) {
-        ZkNodeProps leaderProps = zkController.getZkStateReader().getLeaderRetry(collection, shard.getName());
-        ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
-        callables.add(new SliceCheckpointCallable(nodeProps.getCoreUrl(), path));
-      }
-
-      for (final Future<Long> future : parallelExecutor.invokeAll(callables)) {
-        long version = future.get();
-        if (version < checkpoint) { // we must take the lowest checkpoint from all the shards
-          checkpoint = version;
-        }
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "Error while requesting shard's checkpoints", e);
-    } catch (ExecutionException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "Error while requesting shard's checkpoints", e);
-    } finally {
-      parallelExecutor.shutdown();
-    }
-
-    rsp.add(CdcrParams.CHECKPOINT, checkpoint);
-  }
-
-  /**
-   * Retrieve the version number of the latest entry of the {@link org.apache.solr.update.UpdateLog}.
-   */
-  private void handleShardCheckpointAction(SolrQueryRequest req, SolrQueryResponse rsp) {
-    if (!leaderStateManager.amILeader()) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
-          "' sent to non-leader replica");
-    }
-
-    UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-    VersionInfo versionInfo = ulog.getVersionInfo();
-    try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
-      long maxVersionFromRecent = recentUpdates.getMaxRecentVersion();
-      long maxVersionFromIndex = versionInfo.getMaxVersionFromIndex(req.getSearcher());
-      log.info("Found maxVersionFromRecent {} maxVersionFromIndex {}", maxVersionFromRecent, maxVersionFromIndex);
-      // there is no race with ongoing bootstrap because we don't expect any updates to come from the source
-      long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent);
-      if (maxVersion == 0L) {
-        maxVersion = -1;
-      }
-      rsp.add(CdcrParams.CHECKPOINT, maxVersion);
-    } catch (IOException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
-          "' could not read max version");
-    }
-  }
-
-  private void handleEnableBufferAction(SolrQueryRequest req, SolrQueryResponse rsp) {
-    if (bufferStateManager.getState() == CdcrParams.BufferState.DISABLED) {
-      bufferStateManager.setState(CdcrParams.BufferState.ENABLED);
-      bufferStateManager.synchronize();
-    }
-
-    rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
-  }
-
-  private void handleDisableBufferAction(SolrQueryRequest req, SolrQueryResponse rsp) {
-    if (bufferStateManager.getState() == CdcrParams.BufferState.ENABLED) {
-      bufferStateManager.setState(CdcrParams.BufferState.DISABLED);
-      bufferStateManager.synchronize();
-    }
-
-    rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
-  }
-
-  /**
-   * <p>
-   * We have to take care of four cases:
-   * <ul>
-   * <li>Replication & Buffering</li>
-   * <li>Replication & No Buffering</li>
-   * <li>No Replication & Buffering</li>
-   * <li>No Replication & No Buffering</li>
-   * </ul>
-   * In the first three cases, at least one log reader should have been initialised. We should take the lowest
-   * last processed version across all the initialised readers. In the last case, there isn't a log reader
-   * initialised. We should instantiate one and get the version of the first entries.
-   * </p>
-   */
-  private void handleLastProcessedVersionAction(SolrQueryRequest req, SolrQueryResponse rsp) {
-    String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
-    String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-
-    if (!leaderStateManager.amILeader()) {
-      log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.LASTPROCESSEDVERSION, collectionName, shard);
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.LASTPROCESSEDVERSION +
-          " sent to non-leader replica");
-    }
-
-    // take care of the first three cases
-    // first check the log readers from the replicator states
-    long lastProcessedVersion = Long.MAX_VALUE;
-    for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
-      long version = Long.MAX_VALUE;
-      if (state.getLogReader() != null) {
-        version = state.getLogReader().getLastVersion();
-      }
-      lastProcessedVersion = Math.min(lastProcessedVersion, version);
-    }
-
-    // next check the log reader of the buffer
-    CdcrUpdateLog.CdcrLogReader bufferLogReader = ((CdcrUpdateLog) core.getUpdateHandler().getUpdateLog()).getBufferToggle();
-    if (bufferLogReader != null) {
-      lastProcessedVersion = Math.min(lastProcessedVersion, bufferLogReader.getLastVersion());
-    }
-
-    // the fourth case: no cdc replication, no buffering: all readers were null
-    if (processStateManager.getState().equals(CdcrParams.ProcessState.STOPPED) &&
-        bufferStateManager.getState().equals(CdcrParams.BufferState.DISABLED)) {
-      CdcrUpdateLog.CdcrLogReader logReader = ((CdcrUpdateLog) core.getUpdateHandler().getUpdateLog()).newLogReader();
-      try {
-        // let the reader initialize lastVersion
-        logReader.next();
-        lastProcessedVersion = Math.min(lastProcessedVersion, logReader.getLastVersion());
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            "Error while fetching the last processed version", e);
-      } catch (IOException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            "Error while fetching the last processed version", e);
-      } finally {
-        logReader.close();
-      }
-    }
-
-    log.debug("Returning the lowest last processed version {}  @ {}:{}", lastProcessedVersion, collectionName, shard);
-    rsp.add(CdcrParams.LAST_PROCESSED_VERSION, lastProcessedVersion);
-  }
-
-  private void handleQueuesAction(SolrQueryRequest req, SolrQueryResponse rsp) {
-    NamedList hosts = new NamedList();
-
-    for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
-      NamedList queueStats = new NamedList();
-
-      CdcrUpdateLog.CdcrLogReader logReader = state.getLogReader();
-      if (logReader == null) {
-        String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
-        String shard = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
-        log.warn("The log reader for target collection {} is not initialised @ {}:{}",
-            state.getTargetCollection(), collectionName, shard);
-        queueStats.add(CdcrParams.QUEUE_SIZE, -1l);
-      } else {
-        queueStats.add(CdcrParams.QUEUE_SIZE, logReader.getNumberOfRemainingRecords());
-      }
-      queueStats.add(CdcrParams.LAST_TIMESTAMP, state.getTimestampOfLastProcessedOperation());
-
-      if (hosts.get(state.getZkHost()) == null) {
-        hosts.add(state.getZkHost(), new NamedList());
-      }
-      ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), queueStats);
-    }
-
-    rsp.add(CdcrParams.QUEUES, hosts);
-    UpdateLog updateLog = core.getUpdateHandler().getUpdateLog();
-    rsp.add(CdcrParams.TLOG_TOTAL_SIZE, updateLog.getTotalLogsSize());
-    rsp.add(CdcrParams.TLOG_TOTAL_COUNT, updateLog.getTotalLogsNumber());
-    rsp.add(CdcrParams.UPDATE_LOG_SYNCHRONIZER,
-        updateLogSynchronizer.isStarted() ? CdcrParams.ProcessState.STARTED.toLower() : CdcrParams.ProcessState.STOPPED.toLower());
-  }
-
-  private void handleOpsAction(SolrQueryRequest req, SolrQueryResponse rsp) {
-    NamedList hosts = new NamedList();
-
-    for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
-      NamedList ops = new NamedList();
-      ops.add(CdcrParams.COUNTER_ALL, state.getBenchmarkTimer().getOperationsPerSecond());
-      ops.add(CdcrParams.COUNTER_ADDS, state.getBenchmarkTimer().getAddsPerSecond());
-      ops.add(CdcrParams.COUNTER_DELETES, state.getBenchmarkTimer().getDeletesPerSecond());
-
-      if (hosts.get(state.getZkHost()) == null) {
-        hosts.add(state.getZkHost(), new NamedList());
-      }
-      ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), ops);
-    }
-
-    rsp.add(CdcrParams.OPERATIONS_PER_SECOND, hosts);
-  }
-
-  private void handleErrorsAction(SolrQueryRequest req, SolrQueryResponse rsp) {
-    NamedList hosts = new NamedList();
-
-    for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
-      NamedList errors = new NamedList();
-
-      errors.add(CdcrParams.CONSECUTIVE_ERRORS, state.getConsecutiveErrors());
-      errors.add(CdcrReplicatorState.ErrorType.BAD_REQUEST.toLower(), state.getErrorCount(CdcrReplicatorState.ErrorType.BAD_REQUEST));
-      errors.add(CdcrReplicatorState.ErrorType.INTERNAL.toLower(), state.getErrorCount(CdcrReplicatorState.ErrorType.INTERNAL));
-
-      NamedList lastErrors = new NamedList();
-      for (String[] lastError : state.getLastErrors()) {
-        lastErrors.add(lastError[0], lastError[1]);
-      }
-      errors.add(CdcrParams.LAST, lastErrors);
-
-      if (hosts.get(state.getZkHost()) == null) {
-        hosts.add(state.getZkHost(), new NamedList());
-      }
-      ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), errors);
-    }
-
-    rsp.add(CdcrParams.ERRORS, hosts);
-  }
-
-  private void handleBootstrapAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, InterruptedException, SolrServerException {
-    String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
-    String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-    if (!leaderStateManager.amILeader()) {
-      log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.BOOTSTRAP +
-          " sent to non-leader replica");
-    }
-    CountDownLatch latch = new CountDownLatch(1); // latch to make sure BOOTSTRAP_STATUS gives correct response
-
-    Runnable runnable = () -> {
-      Lock recoveryLock = req.getCore().getSolrCoreState().getRecoveryLock();
-      boolean locked = recoveryLock.tryLock();
-      SolrCoreState coreState = core.getSolrCoreState();
-      try {
-        if (!locked)  {
-          handleCancelBootstrap(req, rsp);
-        } else if (leaderStateManager.amILeader())  {
-          coreState.setCdcrBootstrapRunning(true);
-          latch.countDown(); // free the latch as current bootstrap is executing
-          //running.set(true);
-          String masterUrl = req.getParams().get(ReplicationHandler.MASTER_URL);
-          BootstrapCallable bootstrapCallable = new BootstrapCallable(masterUrl, core);
-          coreState.setCdcrBootstrapCallable(bootstrapCallable);
-          Future<Boolean> bootstrapFuture = core.getCoreContainer().getUpdateShardHandler().getRecoveryExecutor()
-              .submit(bootstrapCallable);
-          coreState.setCdcrBootstrapFuture(bootstrapFuture);
-          try {
-            bootstrapFuture.get();
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            log.warn("Bootstrap was interrupted", e);
-          } catch (ExecutionException e) {
-            log.error("Bootstrap operation failed", e);
-          }
-        } else  {
-          log.error("Action {} sent to non-leader replica @ {}:{}. Aborting bootstrap.", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
-        }
-      } finally {
-        if (locked) {
-          coreState.setCdcrBootstrapRunning(false);
-          recoveryLock.unlock();
-        } else {
-          latch.countDown(); // free the latch as current bootstrap is executing
-        }
-      }
-    };
-
-    try {
-      core.getCoreContainer().getUpdateShardHandler().getUpdateExecutor().submit(runnable);
-      rsp.add(RESPONSE_STATUS, "submitted");
-      latch.await(10000, TimeUnit.MILLISECONDS); // put the latch for current bootstrap command
-    } catch (RejectedExecutionException ree)  {
-      // no problem, we're probably shutting down
-      rsp.add(RESPONSE_STATUS, "failed");
-    }
-  }
-
-  private void handleCancelBootstrap(SolrQueryRequest req, SolrQueryResponse rsp) {
-    BootstrapCallable callable = (BootstrapCallable)core.getSolrCoreState().getCdcrBootstrapCallable();
-    IOUtils.closeQuietly(callable);
-    rsp.add(RESPONSE_STATUS, "cancelled");
-  }
-
-  private void handleBootstrapStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, SolrServerException {
-    SolrCoreState coreState = core.getSolrCoreState();
-    if (coreState.getCdcrBootstrapRunning()) {
-      rsp.add(RESPONSE_STATUS, RUNNING);
-      return;
-    }
-
-    Future<Boolean> future = coreState.getCdcrBootstrapFuture();
-    BootstrapCallable callable = (BootstrapCallable)coreState.getCdcrBootstrapCallable();
-    if (future == null) {
-      rsp.add(RESPONSE_STATUS, "notfound");
-      rsp.add(RESPONSE_MESSAGE, "No bootstrap found in running, completed or failed states");
-    } else if (future.isCancelled() || callable.isClosed()) {
-      rsp.add(RESPONSE_STATUS, "cancelled");
-    } else if (future.isDone()) {
-      // could be a normal termination or an exception
-      try {
-        Boolean result = future.get();
-        if (result) {
-          rsp.add(RESPONSE_STATUS, COMPLETED);
-        } else {
-          rsp.add(RESPONSE_STATUS, FAILED);
-        }
-      } catch (InterruptedException e) {
-        // should not happen?
-      } catch (ExecutionException e) {
-        rsp.add(RESPONSE_STATUS, FAILED);
-        rsp.add(RESPONSE, e);
-      } catch (CancellationException ce) {
-        rsp.add(RESPONSE_STATUS, FAILED);
-        rsp.add(RESPONSE_MESSAGE, "Bootstrap was cancelled");
-      }
-    } else {
-      rsp.add(RESPONSE_STATUS, RUNNING);
-    }
-  }
-
-  static class BootstrapCallable implements Callable<Boolean>, Closeable {
-    private final String masterUrl;
-    private final SolrCore core;
-    private volatile boolean closed = false;
-
-    BootstrapCallable(String masterUrl, SolrCore core) {
-      this.masterUrl = masterUrl;
-      this.core = core;
-    }
-
-    @Override
-    public void close() throws IOException {
-      closed = true;
-      SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
-      ReplicationHandler replicationHandler = (ReplicationHandler) handler;
-      replicationHandler.abortFetch();
-    }
-
-    public boolean isClosed() {
-      return closed;
-    }
-
-    @Override
-    public Boolean call() throws Exception {
-      boolean success = false;
-      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-      // we start buffering updates as a safeguard however we do not expect
-      // to receive any updates from the source during bootstrap
-      ulog.bufferUpdates();
-      try {
-        commitOnLeader(masterUrl);
-        // use rep handler directly, so we can do this sync rather than async
-        SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
-        ReplicationHandler replicationHandler = (ReplicationHandler) handler;
-
-        if (replicationHandler == null) {
-          throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
-              "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
-        }
-
-        ModifiableSolrParams solrParams = new ModifiableSolrParams();
-        solrParams.set(ReplicationHandler.MASTER_URL, masterUrl);
-        // we do not want the raw tlog files from the source
-        solrParams.set(ReplicationHandler.TLOG_FILES, false);
-
-        success = replicationHandler.doFetch(solrParams, false).getSuccessful();
-
-        // this is required because this callable can race with HttpSolrCall#destroy
-        // which clears the request info.
-        // Applying buffered updates fails without the following line because LogReplayer
-        // also tries to set request info and fails with AssertionError
-        SolrRequestInfo.clearRequestInfo();
-
-        Future<UpdateLog.RecoveryInfo> future = ulog.applyBufferedUpdates();
-        if (future == null) {
-          // no replay needed
-          log.info("No replay needed.");
-        } else {
-          log.info("Replaying buffered documents.");
-          // wait for replay
-          UpdateLog.RecoveryInfo report = future.get();
-          if (report.failed) {
-            SolrException.log(log, "Replay failed");
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Replay failed");
-          }
-        }
-        return success;
-      } finally {
-        if (closed || !success) {
-          // we cannot apply the buffer in this case because it will introduce newer versions in the
-          // update log and then the source cluster will get those versions via collectioncheckpoint
-          // causing the versions in between to be completely missed
-          boolean dropped = ulog.dropBufferedUpdates();
-          assert dropped;
-        }
-      }
-    }
-
-    private void commitOnLeader(String leaderUrl) throws SolrServerException,
-        IOException {
-      try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl)
-          .withConnectionTimeout(30000)
-          .build()) {
-        UpdateRequest ureq = new UpdateRequest();
-        ureq.setParams(new ModifiableSolrParams());
-        ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
-        ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
-        ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
-            client);
-      }
-    }
-  }
-
-  @Override
-  public String getDescription() {
-    return "Manage Cross Data Center Replication";
-  }
-
-  @Override
-  public Category getCategory() {
-    return Category.REPLICATION;
-  }
-
-  /**
-   * A thread subclass for executing a single
-   * {@link org.apache.solr.handler.CdcrParams.CdcrAction#SHARDCHECKPOINT} action.
-   */
-  private static final class SliceCheckpointCallable implements Callable<Long> {
-
-    final String baseUrl;
-    final String cdcrPath;
-
-    SliceCheckpointCallable(final String baseUrl, final String cdcrPath) {
-      this.baseUrl = baseUrl;
-      this.cdcrPath = cdcrPath;
-    }
-
-    @Override
-    public Long call() throws Exception {
-      try (HttpSolrClient server = new HttpSolrClient.Builder(baseUrl)
-          .withConnectionTimeout(15000)
-          .withSocketTimeout(60000)
-          .build()) {
-
-        ModifiableSolrParams params = new ModifiableSolrParams();
-        params.set(CommonParams.ACTION, CdcrParams.CdcrAction.SHARDCHECKPOINT.toString());
-
-        SolrRequest request = new QueryRequest(params);
-        request.setPath(cdcrPath);
-
-        NamedList response = server.request(request);
-        return (Long) response.get(CdcrParams.CHECKPOINT);
-      }
-    }
-
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java
deleted file mode 100644
index 151615e..0000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A state manager which implements an observer pattern to notify observers
- * of a state change.
- */
-abstract class CdcrStateManager {
-
-  private List<CdcrStateObserver> observers = new ArrayList<>();
-
-  void register(CdcrStateObserver observer) {
-    this.observers.add(observer);
-  }
-
-  void callback() {
-    for (CdcrStateObserver observer : observers) {
-      observer.stateUpdate();
-    }
-  }
-
-  interface CdcrStateObserver {
-
-    void stateUpdate();
-
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java b/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java
deleted file mode 100644
index 80f27ce..0000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * Synchronize periodically the update log of non-leader nodes with their leaders.
- * </p>
- * <p>
- * Non-leader nodes must always buffer updates in case of leader failures. They have to periodically
- * synchronize their update logs with their leader to remove old transaction logs that will never be used anymore.
- * This is performed by a background thread that is scheduled with a fixed delay. The background thread is sending
- * the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#LASTPROCESSEDVERSION} to the leader to retrieve
- * the lowest last version number processed. This version is then used to move forward the buffer log reader.
- * </p>
- */
-class CdcrUpdateLogSynchronizer implements CdcrStateManager.CdcrStateObserver {
-
-  private CdcrLeaderStateManager leaderStateManager;
-  private ScheduledExecutorService scheduler;
-
-  private final SolrCore core;
-  private final String collection;
-  private final String shardId;
-  private final String path;
-
-  private int timeSchedule = DEFAULT_TIME_SCHEDULE;
-
-  private static final int DEFAULT_TIME_SCHEDULE = 60000;  // by default, every minute
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  CdcrUpdateLogSynchronizer(SolrCore core, String path, SolrParams updateLogSynchonizerConfiguration) {
-    this.core = core;
-    this.path = path;
-    this.collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
-    this.shardId = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-    if (updateLogSynchonizerConfiguration != null) {
-      this.timeSchedule = updateLogSynchonizerConfiguration.getInt(CdcrParams.SCHEDULE_PARAM, DEFAULT_TIME_SCHEDULE);
-    }
-  }
-
-  void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
-    this.leaderStateManager = leaderStateManager;
-    this.leaderStateManager.register(this);
-  }
-
-  @Override
-  public void stateUpdate() {
-    // If I am not the leader, I need to synchronise periodically my update log with my leader.
-    if (!leaderStateManager.amILeader()) {
-      scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-update-log-synchronizer"));
-      scheduler.scheduleWithFixedDelay(new UpdateLogSynchronisation(), 0, timeSchedule, TimeUnit.MILLISECONDS);
-      return;
-    }
-
-    this.shutdown();
-  }
-
-  boolean isStarted() {
-    return scheduler != null;
-  }
-
-  void shutdown() {
-    if (scheduler != null) {
-      // interrupts are often dangerous in Lucene / Solr code, but the
-      // test for this will leak threads without
-      scheduler.shutdownNow();
-      scheduler = null;
-    }
-  }
-
-  private class UpdateLogSynchronisation implements Runnable {
-
-    private String getLeaderUrl() {
-      ZkController zkController = core.getCoreContainer().getZkController();
-      ClusterState cstate = zkController.getClusterState();
-      DocCollection docCollection = cstate.getCollection(collection);
-      ZkNodeProps leaderProps = docCollection.getLeader(shardId);
-      if (leaderProps == null) { // we might not have a leader yet, returns null
-        return null;
-      }
-      ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
-      return nodeProps.getCoreUrl();
-    }
-
-    @Override
-    public void run() {
-      try {
-        String leaderUrl = getLeaderUrl();
-        if (leaderUrl == null) { // we might not have a leader yet, stop and try again later
-          return;
-        }
-
-        HttpSolrClient server = new HttpSolrClient.Builder(leaderUrl)
-            .withConnectionTimeout(15000)
-            .withSocketTimeout(60000)
-            .build();
-
-        ModifiableSolrParams params = new ModifiableSolrParams();
-        params.set(CommonParams.ACTION, CdcrParams.CdcrAction.LASTPROCESSEDVERSION.toString());
-
-        SolrRequest request = new QueryRequest(params);
-        request.setPath(path);
-
-        long lastVersion;
-        try {
-          NamedList response = server.request(request);
-          lastVersion = (Long) response.get(CdcrParams.LAST_PROCESSED_VERSION);
-          log.debug("My leader {} says its last processed _version_ number is: {}. I am {}", leaderUrl, lastVersion,
-              core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
-        } catch (IOException | SolrServerException e) {
-          log.warn("Couldn't get last processed version from leader {}: {}", leaderUrl, e.getMessage());
-          return;
-        } finally {
-          try {
-            server.close();
-          } catch (IOException ioe) {
-            log.warn("Caught exception trying to close server: ", leaderUrl, ioe.getMessage());
-          }
-        }
-
-        // if we received -1, it means that the log reader on the leader has not yet started to read log entries
-        // do nothing
-        if (lastVersion == -1) {
-          return;
-        }
-
-        try {
-          CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
-          if (ulog.isBuffering()) {
-            log.debug("Advancing replica buffering tlog reader to {} @ {}:{}", lastVersion, collection, shardId);
-            ulog.getBufferToggle().seek(lastVersion);
-          }
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          log.warn("Couldn't advance replica buffering tlog reader to {} (to remove old tlogs): {}", lastVersion, e.getMessage());
-        } catch (IOException e) {
-          log.warn("Couldn't advance replica buffering tlog reader to {} (to remove old tlogs): {}", lastVersion, e.getMessage());
-        }
-      } catch (Throwable e) {
-        log.warn("Caught unexpected exception", e);
-        throw e;
-      }
-    }
-  }
-
-}
-