You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2016/08/08 05:36:10 UTC

[2/2] lucene-solr:master: SOLR-6465: CDCR: fall back to whole-index replication when tlogs are insufficient

SOLR-6465: CDCR: fall back to whole-index replication when tlogs are insufficient


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/153c2700
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/153c2700
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/153c2700

Branch: refs/heads/master
Commit: 153c2700450af1e1c4bd063d7d8b65cc4a726438
Parents: bfee229
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Mon Aug 8 11:06:00 2016 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Mon Aug 8 11:06:00 2016 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../org/apache/solr/handler/CdcrParams.java     |  10 +-
 .../org/apache/solr/handler/CdcrReplicator.java |   8 +-
 .../solr/handler/CdcrReplicatorManager.java     | 242 +++++++++++-
 .../solr/handler/CdcrReplicatorScheduler.java   |   6 +-
 .../solr/handler/CdcrReplicatorState.java       |  23 ++
 .../apache/solr/handler/CdcrRequestHandler.java | 233 ++++++++++-
 .../org/apache/solr/handler/IndexFetcher.java   |  10 +-
 .../apache/solr/handler/ReplicationHandler.java |  17 +-
 .../org/apache/solr/update/CdcrUpdateLog.java   |   7 +-
 .../solr/update/DefaultSolrCoreState.java       |   6 +-
 .../org/apache/solr/update/SolrCoreState.java   |   2 +
 .../update/processor/CdcrUpdateProcessor.java   |  10 +-
 .../processor/DistributedUpdateProcessor.java   |   2 +-
 .../configsets/cdcr-source-disabled/schema.xml  |  29 ++
 .../cdcr-source-disabled/solrconfig.xml         |  60 +++
 .../solr/configsets/cdcr-source/schema.xml      |  29 ++
 .../solr/configsets/cdcr-source/solrconfig.xml  |  76 ++++
 .../solr/configsets/cdcr-target/schema.xml      |  29 ++
 .../solr/configsets/cdcr-target/solrconfig.xml  |  63 +++
 .../solr/cloud/BaseCdcrDistributedZkTest.java   |  16 +
 .../apache/solr/cloud/CdcrBootstrapTest.java    | 396 +++++++++++++++++++
 .../cloud/CdcrReplicationDistributedZkTest.java |  31 ++
 23 files changed, 1280 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index b497a53..3d597b7 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -116,6 +116,9 @@ New Features
 
 * SOLR-9252: Feature selection and logistic regression on text (Cao Manh Dat, Joel Bernstein)
 
+* SOLR-6465: CDCR: fall back to whole-index replication when tlogs are insufficient.
+  (Noble Paul, Renaud Delbru, shalin)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/handler/CdcrParams.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrParams.java b/solr/core/src/java/org/apache/solr/handler/CdcrParams.java
index aa1d5bf..3f65b90 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrParams.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrParams.java
@@ -122,6 +122,11 @@ public class CdcrParams {
   public final static String COUNTER_DELETES = "deletes";
 
   /**
+   * Counter for Bootstrap operations *
+   */
+  public final static String COUNTER_BOOTSTRAP = "bootstraps";
+
+  /**
    * A list of errors per target collection *
    */
   public final static String ERRORS = "errors";
@@ -165,7 +170,10 @@ public class CdcrParams {
     LASTPROCESSEDVERSION,
     QUEUES,
     OPS,
-    ERRORS;
+    ERRORS,
+    BOOTSTRAP,
+    BOOTSTRAP_STATUS,
+    CANCEL_BOOTSTRAP;
 
     public static CdcrAction get(String p) {
       if (p != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
index 66ce096..8519815 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
@@ -119,7 +119,7 @@ public class CdcrReplicator implements Runnable {
       // we might have read a single commit operation and reached the end of the update logs
       logReader.forwardSeek(subReader);
 
-      log.debug("Forwarded {} updates to target {}", counter, state.getTargetCollection());
+      log.info("Forwarded {} updates to target {}", counter, state.getTargetCollection());
     } catch (Exception e) {
       // report error and update error stats
       this.handleException(e);
@@ -150,13 +150,13 @@ public class CdcrReplicator implements Runnable {
     if (e instanceof CdcrReplicatorException) {
       UpdateRequest req = ((CdcrReplicatorException) e).req;
       UpdateResponse rsp = ((CdcrReplicatorException) e).rsp;
-      log.warn("Failed to forward update request {}. Got response {}", req, rsp);
+      log.warn("Failed to forward update request {} to target: {}. Got response {}", req, state.getTargetCollection(), rsp);
       state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
     } else if (e instanceof CloudSolrClient.RouteException) {
-      log.warn("Failed to forward update request", e);
+      log.warn("Failed to forward update request to target: " + state.getTargetCollection(), e);
       state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
     } else {
-      log.warn("Failed to forward update request", e);
+      log.warn("Failed to forward update request to target: " + state.getTargetCollection(), e);
       state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
     }
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
index af0161e..528e0b7 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
@@ -16,29 +16,49 @@
  */
 package org.apache.solr.handler;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.update.CdcrUpdateLog;
+import org.apache.solr.util.TimeOut;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
+
 class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
 
+  private static final int MAX_BOOTSTRAP_ATTEMPTS = 5;
+  private static final int BOOTSTRAP_RETRY_DELAY_MS = 2000;
+  // 6 hours is hopefully long enough for most indexes
+  private static final long BOOTSTRAP_TIMEOUT_SECONDS = 6L * 3600L * 3600L;
+
   private List<CdcrReplicatorState> replicatorStates;
 
   private final CdcrReplicatorScheduler scheduler;
@@ -48,6 +68,9 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
   private SolrCore core;
   private String path;
 
+  private ExecutorService bootstrapExecutor;
+  private volatile BootstrapStatusRunnable bootstrapStatusRunnable;
+
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   CdcrReplicatorManager(final SolrCore core, String path,
@@ -104,12 +127,20 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
   @Override
   public synchronized void stateUpdate() {
     if (leaderStateManager.amILeader() && processStateManager.getState().equals(CdcrParams.ProcessState.STARTED)) {
+      if (replicatorStates.size() > 0)  {
+        this.bootstrapExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(replicatorStates.size(),
+            new SolrjNamedThreadFactory("cdcr-bootstrap-status"));
+      }
       this.initLogReaders();
       this.scheduler.start();
       return;
     }
 
     this.scheduler.shutdown();
+    if (bootstrapExecutor != null)  {
+      IOUtils.closeQuietly(bootstrapStatusRunnable);
+      ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
+    }
     this.closeLogReaders();
   }
 
@@ -117,7 +148,7 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
     return replicatorStates;
   }
 
-  void initLogReaders() {
+  private void initLogReaders() {
     String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
     String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
     CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
@@ -129,8 +160,23 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
         log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
             checkpoint, collectionName, shard);
         CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
-        reader.seek(checkpoint);
+        boolean seek = reader.seek(checkpoint);
         state.init(reader);
+        if (!seek) {
+          // targetVersion is lower than the oldest known entry.
+          // In this scenario, it probably means that there is a gap in the updates log.
+          // the best we can do here is to bootstrap the target leader by replicating the full index
+          final String targetCollection = state.getTargetCollection();
+          state.setBootstrapInProgress(true);
+          log.info("Attempting to bootstrap target collection: {}, shard: {}", targetCollection, shard);
+          bootstrapStatusRunnable = new BootstrapStatusRunnable(core, state);
+          log.info("Submitting bootstrap task to executor");
+          try {
+            bootstrapExecutor.submit(bootstrapStatusRunnable);
+          } catch (Exception e) {
+            log.error("Unable to submit bootstrap call to executor", e);
+          }
+        }
       } catch (IOException | SolrServerException | SolrException e) {
         log.warn("Unable to instantiate the log reader for target collection " + state.getTargetCollection(), e);
       } catch (InterruptedException e) {
@@ -164,11 +210,203 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
    */
   void shutdown() {
     this.scheduler.shutdown();
+    if (bootstrapExecutor != null)  {
+      IOUtils.closeQuietly(bootstrapStatusRunnable);
+      ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
+    }
     for (CdcrReplicatorState state : replicatorStates) {
       state.shutdown();
     }
     replicatorStates.clear();
   }
 
+  private class BootstrapStatusRunnable implements Runnable, Closeable {
+    private final CdcrReplicatorState state;
+    private final String targetCollection;
+    private final String shard;
+    private final String collectionName;
+    private final CdcrUpdateLog ulog;
+    private final String myCoreUrl;
+
+    private volatile boolean closed = false;
+
+    BootstrapStatusRunnable(SolrCore core, CdcrReplicatorState state) {
+      this.collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+      this.shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+      this.ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
+      this.state = state;
+      this.targetCollection = state.getTargetCollection();
+      String baseUrl = core.getCoreDescriptor().getCoreContainer().getZkController().getBaseUrl();
+      this.myCoreUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, core.getName());
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+      try {
+        Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
+        String leaderCoreUrl = leader.getCoreUrl();
+        HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
+        try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
+          sendCdcrCommand(client, CdcrParams.CdcrAction.CANCEL_BOOTSTRAP);
+        } catch (SolrServerException e) {
+          log.error("Error sending cancel bootstrap message to target collection: {} shard: {} leader: {}",
+              targetCollection, shard, leaderCoreUrl);
+        }
+      } catch (InterruptedException e) {
+        log.error("Interrupted while closing BootstrapStatusRunnable", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @Override
+    public void run() {
+      int retries = 1;
+      boolean success = false;
+      try {
+        while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED)  {
+          Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
+        }
+        TimeOut timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        while (!timeOut.hasTimedOut()) {
+          if (closed) {
+            log.warn("Cancelling waiting for bootstrap on target: {} shard: {} to complete", targetCollection, shard);
+            state.setBootstrapInProgress(false);
+            break;
+          }
+          BootstrapStatus status = getBoostrapStatus();
+          if (status == BootstrapStatus.RUNNING) {
+            try {
+              log.info("CDCR bootstrap running for {} seconds, sleeping for {} ms",
+                  BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS), BOOTSTRAP_RETRY_DELAY_MS);
+              Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+          } else if (status == BootstrapStatus.COMPLETED) {
+            log.info("CDCR bootstrap successful in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
+            long checkpoint = CdcrReplicatorManager.this.getCheckpoint(state);
+            log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
+                checkpoint, collectionName, shard);
+            CdcrUpdateLog.CdcrLogReader reader1 = ulog.newLogReader();
+            reader1.seek(checkpoint);
+            success = true;
+            break;
+          } else if (status == BootstrapStatus.FAILED) {
+            log.warn("CDCR bootstrap failed in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
+            // let's retry a fixed number of times before giving up
+            if (retries >= MAX_BOOTSTRAP_ATTEMPTS) {
+              log.error("Unable to bootstrap the target collection: {}, shard: {} even after {} retries", targetCollection, shard, retries);
+              break;
+            } else {
+              log.info("Retry: {} - Attempting to bootstrap target collection: {} shard: {}", retries, targetCollection, shard);
+              while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED)  {
+                Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
+              }
+              timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS); // reset the timer
+              retries++;
+            }
+          } else if (status == BootstrapStatus.NOTFOUND) {
+            // the leader of the target shard may have changed and therefore there is no record of the
+            // bootstrap process so we must retry the operation
+            while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED)  {
+              Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
+            }
+            retries = 1;
+            timeOut = new TimeOut(6L * 3600L * 3600L, TimeUnit.SECONDS); // reset the timer
+          } else if (status == BootstrapStatus.UNKNOWN) {
+            // we were not able to query the status on the remote end
+            // so just sleep for a bit and try again
+            Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
+          }
+        }
+      } catch (InterruptedException e) {
+        log.info("Bootstrap thread interrupted");
+        state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
+        Thread.currentThread().interrupt();
+      } catch (IOException | SolrServerException | SolrException e) {
+        log.error("Unable to bootstrap the target collection " + targetCollection + " shard: " + shard, e);
+        state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
+      } finally {
+        if (success) {
+          log.info("Bootstrap successful, giving the go-ahead to replicator");
+          state.setBootstrapInProgress(false);
+        }
+      }
+    }
+
+    private BootstrapStatus sendBootstrapCommand() throws InterruptedException {
+      Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
+      String leaderCoreUrl = leader.getCoreUrl();
+      HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
+      try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
+        log.info("Attempting to bootstrap target collection: {} shard: {} leader: {}", targetCollection, shard, leaderCoreUrl);
+        try {
+          NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP, ReplicationHandler.MASTER_URL, myCoreUrl);
+          log.debug("CDCR Bootstrap response: {}", response);
+          String status = response.get(RESPONSE_STATUS).toString();
+          return BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
+        } catch (Exception e) {
+          log.error("Exception submitting bootstrap request", e);
+          return BootstrapStatus.UNKNOWN;
+        }
+      } catch (IOException e) {
+        log.error("There shouldn't be an IOException while closing but there was!", e);
+      }
+      return BootstrapStatus.UNKNOWN;
+    }
+
+    private BootstrapStatus getBoostrapStatus() throws InterruptedException {
+      try {
+        Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
+        String leaderCoreUrl = leader.getCoreUrl();
+        HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
+        try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
+          NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
+          String status = (String) response.get(RESPONSE_STATUS);
+          BootstrapStatus bootstrapStatus = BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
+          if (bootstrapStatus == BootstrapStatus.RUNNING) {
+            return BootstrapStatus.RUNNING;
+          } else if (bootstrapStatus == BootstrapStatus.COMPLETED) {
+            return BootstrapStatus.COMPLETED;
+          } else if (bootstrapStatus == BootstrapStatus.FAILED) {
+            return BootstrapStatus.FAILED;
+          } else if (bootstrapStatus == BootstrapStatus.NOTFOUND) {
+            log.warn("Bootstrap process was not found on target collection: {} shard: {}, leader: {}", targetCollection, shard, leaderCoreUrl);
+            return BootstrapStatus.NOTFOUND;
+          } else if (bootstrapStatus == BootstrapStatus.CANCELLED) {
+            return BootstrapStatus.CANCELLED;
+          } else {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                "Unknown status: " + status + " returned by BOOTSTRAP_STATUS command");
+          }
+        }
+      } catch (Exception e) {
+        log.error("Exception during bootstrap status request", e);
+        return BootstrapStatus.UNKNOWN;
+      }
+    }
+  }
+
+  private NamedList sendCdcrCommand(SolrClient client, CdcrParams.CdcrAction action, String... params) throws SolrServerException, IOException {
+    ModifiableSolrParams solrParams = new ModifiableSolrParams();
+    solrParams.set(CommonParams.QT, "/cdcr");
+    solrParams.set(CommonParams.ACTION, action.toString());
+    for (int i = 0; i < params.length - 1; i+=2) {
+      solrParams.set(params[i], params[i + 1]);
+    }
+    SolrRequest request = new QueryRequest(solrParams);
+    return client.request(request);
+  }
+
+  private enum BootstrapStatus  {
+    SUBMITTED,
+    RUNNING,
+    COMPLETED,
+    FAILED,
+    NOTFOUND,
+    CANCELLED,
+    UNKNOWN
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
index bb817e5..62abeab 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
@@ -77,7 +77,11 @@ class CdcrReplicatorScheduler {
             CdcrReplicatorState state = statesQueue.poll();
             assert state != null; // Should never happen
             try {
-              new CdcrReplicator(state, batchSize).run();
+              if (!state.isBootstrapInProgress()) {
+                new CdcrReplicator(state, batchSize).run();
+              } else  {
+                log.debug("Replicator state is bootstrapping, skipping replication for target collection {}", state.getTargetCollection());
+              }
             } finally {
               statesQueue.offer(state);
             }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
index 9e01f11..2ca0d80 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
@@ -27,6 +27,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.update.CdcrUpdateLog;
@@ -53,6 +55,9 @@ class CdcrReplicatorState {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private final AtomicBoolean bootstrapInProgress = new AtomicBoolean(false);
+  private final AtomicInteger numBootstraps = new AtomicInteger();
+
   CdcrReplicatorState(final String targetCollection, final String zkHost, final CloudSolrClient targetClient) {
     this.targetCollection = targetCollection;
     this.targetClient = targetClient;
@@ -164,6 +169,24 @@ class CdcrReplicatorState {
     return this.benchmarkTimer;
   }
 
+  /**
+   * @return true if a bootstrap operation is in progress, false otherwise
+   */
+  boolean isBootstrapInProgress() {
+    return bootstrapInProgress.get();
+  }
+
+  void setBootstrapInProgress(boolean inProgress) {
+    if (bootstrapInProgress.compareAndSet(true, false)) {
+      numBootstraps.incrementAndGet();
+    }
+    bootstrapInProgress.set(inProgress);
+  }
+
+  public int getNumBootstraps() {
+    return numBootstraps.get();
+  }
+
   enum ErrorType {
     INTERNAL,
     BAD_REQUEST;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java b/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
index f60e562..f706637 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.handler;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -24,14 +25,20 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
@@ -41,21 +48,33 @@ import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CloseHook;
 import org.apache.solr.core.PluginBag;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.request.SolrRequestInfo;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.update.CdcrUpdateLog;
 import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.VersionInfo;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.plugin.SolrCoreAware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
+import static org.apache.solr.handler.admin.CoreAdminHandler.FAILED;
+import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE;
+import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_MESSAGE;
+import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
+import static org.apache.solr.handler.admin.CoreAdminHandler.RUNNING;
+
 /**
  * <p>
  * This request handler implements the CDCR API and is responsible of the execution of the
@@ -199,6 +218,18 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
         this.handleErrorsAction(req, rsp);
         break;
       }
+      case BOOTSTRAP: {
+        this.handleBootstrapAction(req, rsp);
+        break;
+      }
+      case BOOTSTRAP_STATUS:  {
+        this.handleBootstrapStatus(req, rsp);
+        break;
+      }
+      case CANCEL_BOOTSTRAP:  {
+        this.handleCancelBootstrap(req, rsp);
+        break;
+      }
       default: {
         throw new RuntimeException("Unknown action: " + action);
       }
@@ -409,10 +440,20 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
     }
 
     UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+    VersionInfo versionInfo = ulog.getVersionInfo();
     try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
-      List<Long> versions = recentUpdates.getVersions(1);
-      long lastVersion = versions.isEmpty() ? -1 : Math.abs(versions.get(0));
-      rsp.add(CdcrParams.CHECKPOINT, lastVersion);
+      long maxVersionFromRecent = recentUpdates.getMaxRecentVersion();
+      long maxVersionFromIndex = versionInfo.getMaxVersionFromIndex(req.getSearcher());
+      log.info("Found maxVersionFromRecent {} maxVersionFromIndex {}", maxVersionFromRecent, maxVersionFromIndex);
+      // there is no race with ongoing bootstrap because we don't expect any updates to come from the source
+      long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent);
+      if (maxVersion == 0L) {
+        maxVersion = -1;
+      }
+      rsp.add(CdcrParams.CHECKPOINT, maxVersion);
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
+          "' could not read max version");
     }
   }
 
@@ -574,6 +615,192 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
     rsp.add(CdcrParams.ERRORS, hosts);
   }
 
+  private AtomicBoolean running = new AtomicBoolean();
+  private volatile Future<Boolean> bootstrapFuture;
+  private volatile BootstrapCallable bootstrapCallable;
+
+  private void handleBootstrapAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, SolrServerException {
+    String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+    String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+    if (!leaderStateManager.amILeader()) {
+      log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.BOOTSTRAP +
+          " sent to non-leader replica");
+    }
+
+    Runnable runnable = () -> {
+      Lock recoveryLock = req.getCore().getSolrCoreState().getRecoveryLock();
+      boolean locked = recoveryLock.tryLock();
+      try {
+        if (!locked)  {
+          handleCancelBootstrap(req, rsp);
+        } else if (leaderStateManager.amILeader())  {
+          running.set(true);
+          String masterUrl = req.getParams().get(ReplicationHandler.MASTER_URL);
+          bootstrapCallable = new BootstrapCallable(masterUrl, core);
+          bootstrapFuture = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getRecoveryExecutor().submit(bootstrapCallable);
+          try {
+            bootstrapFuture.get();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("Bootstrap was interrupted", e);
+          } catch (ExecutionException e) {
+            log.error("Bootstrap operation failed", e);
+          }
+        } else  {
+          log.error("Action {} sent to non-leader replica @ {}:{}. Aborting bootstrap.", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
+        }
+      } finally {
+        if (locked) {
+          running.set(false);
+          recoveryLock.unlock();
+        }
+      }
+    };
+
+    try {
+      core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getUpdateExecutor().submit(runnable);
+      rsp.add(RESPONSE_STATUS, "submitted");
+    } catch (RejectedExecutionException ree)  {
+      // no problem, we're probably shutting down
+      rsp.add(RESPONSE_STATUS, "failed");
+    }
+  }
+
+  private void handleCancelBootstrap(SolrQueryRequest req, SolrQueryResponse rsp) {
+    BootstrapCallable callable = this.bootstrapCallable;
+    IOUtils.closeQuietly(callable);
+    rsp.add(RESPONSE_STATUS, "cancelled");
+  }
+
+  private void handleBootstrapStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, SolrServerException {
+    if (running.get()) {
+      rsp.add(RESPONSE_STATUS, RUNNING);
+      return;
+    }
+
+    Future<Boolean> future = bootstrapFuture;
+    BootstrapCallable callable = this.bootstrapCallable;
+    if (future == null) {
+      rsp.add(RESPONSE_STATUS, "notfound");
+      rsp.add(RESPONSE_MESSAGE, "No bootstrap found in running, completed or failed states");
+    } else if (future.isCancelled() || callable.isClosed()) {
+      rsp.add(RESPONSE_STATUS, "cancelled");
+    } else if (future.isDone()) {
+      // could be a normal termination or an exception
+      try {
+        Boolean result = future.get();
+        if (result) {
+          rsp.add(RESPONSE_STATUS, COMPLETED);
+        } else {
+          rsp.add(RESPONSE_STATUS, FAILED);
+        }
+      } catch (InterruptedException e) {
+        // should not happen?
+      } catch (ExecutionException e) {
+        rsp.add(RESPONSE_STATUS, FAILED);
+        rsp.add(RESPONSE, e);
+      } catch (CancellationException ce) {
+        rsp.add(RESPONSE_STATUS, FAILED);
+        rsp.add(RESPONSE_MESSAGE, "Bootstrap was cancelled");
+      }
+    } else {
+      rsp.add(RESPONSE_STATUS, RUNNING);
+    }
+  }
+
+  private static class BootstrapCallable implements Callable<Boolean>, Closeable {
+    private final String masterUrl;
+    private final SolrCore core;
+    private volatile boolean closed = false;
+
+    BootstrapCallable(String masterUrl, SolrCore core) {
+      this.masterUrl = masterUrl;
+      this.core = core;
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+      SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
+      ReplicationHandler replicationHandler = (ReplicationHandler) handler;
+      replicationHandler.abortFetch();
+    }
+
+    public boolean isClosed() {
+      return closed;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      boolean success = false;
+      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+      // we start buffering updates as a safeguard however we do not expect
+      // to receive any updates from the source during bootstrap
+      ulog.bufferUpdates();
+      try {
+        commitOnLeader(masterUrl);
+        // use rep handler directly, so we can do this sync rather than async
+        SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
+        ReplicationHandler replicationHandler = (ReplicationHandler) handler;
+
+        if (replicationHandler == null) {
+          throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+              "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
+        }
+
+        ModifiableSolrParams solrParams = new ModifiableSolrParams();
+        solrParams.set(ReplicationHandler.MASTER_URL, masterUrl);
+        // we do not want the raw tlog files from the source
+        solrParams.set(ReplicationHandler.TLOG_FILES, false);
+
+        success = replicationHandler.doFetch(solrParams, false);
+
+        // this is required because this callable can race with HttpSolrCall#destroy
+        // which clears the request info.
+        // Applying buffered updates fails without the following line because LogReplayer
+        // also tries to set request info and fails with AssertionError
+        SolrRequestInfo.clearRequestInfo();
+
+        Future<UpdateLog.RecoveryInfo> future = ulog.applyBufferedUpdates();
+        if (future == null) {
+          // no replay needed
+          log.info("No replay needed.");
+        } else {
+          log.info("Replaying buffered documents.");
+          // wait for replay
+          UpdateLog.RecoveryInfo report = future.get();
+          if (report.failed) {
+            SolrException.log(log, "Replay failed");
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Replay failed");
+          }
+        }
+        return success;
+      } finally {
+        if (closed || !success) {
+          // we cannot apply the buffer in this case because it will introduce newer versions in the
+          // update log and then the source cluster will get those versions via collectioncheckpoint
+          // causing the versions in between to be completely missed
+          boolean dropped = ulog.dropBufferedUpdates();
+          assert dropped;
+        }
+      }
+    }
+
+    private void commitOnLeader(String leaderUrl) throws SolrServerException,
+        IOException {
+      try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl).build()) {
+        client.setConnectionTimeout(30000);
+        UpdateRequest ureq = new UpdateRequest();
+        ureq.setParams(new ModifiableSolrParams());
+        ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
+        ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
+        ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
+            client);
+      }
+    }
+  }
+
   @Override
   public String getDescription() {
     return "Manage Cross Data Center Replication";

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 77624c9..080cf9f 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -750,14 +750,14 @@ public class IndexFetcher {
   }
 
   private void openNewSearcherAndUpdateCommitPoint() throws IOException {
-    SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
-        new ModifiableSolrParams());
-
     RefCounted<SolrIndexSearcher> searcher = null;
     IndexCommit commitPoint;
+    // must get the latest solrCore object because the one we have might be closed because of a reload
+    // todo stop keeping solrCore around
+    SolrCore core = solrCore.getCoreDescriptor().getCoreContainer().getCore(solrCore.getName());
     try {
       Future[] waitSearcher = new Future[1];
-      searcher = solrCore.getSearcher(true, true, waitSearcher, true);
+      searcher = core.getSearcher(true, true, waitSearcher, true);
       if (waitSearcher[0] != null) {
         try {
           waitSearcher[0].get();
@@ -767,10 +767,10 @@ public class IndexFetcher {
       }
       commitPoint = searcher.get().getIndexReader().getIndexCommit();
     } finally {
-      req.close();
       if (searcher != null) {
         searcher.decref();
       }
+      core.close();
     }
 
     // update the commit point in replication handler

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index 0870e35..aee3b97 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -300,9 +300,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
         rsp.add("message","No slave configured");
       }
     } else if (command.equalsIgnoreCase(CMD_ABORT_FETCH)) {
-      IndexFetcher fetcher = currentIndexFetcher;
-      if (fetcher != null){
-        fetcher.abortFetch();
+      if (abortFetch()){
         rsp.add(STATUS, OK_STATUS);
       } else {
         rsp.add(STATUS,ERR_STATUS);
@@ -321,6 +319,16 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
     }
   }
 
+  public boolean abortFetch() {
+    IndexFetcher fetcher = currentIndexFetcher;
+    if (fetcher != null){
+      fetcher.abortFetch();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   private void deleteSnapshot(ModifiableSolrParams params) {
     String name = params.get(NAME);
     if(name == null) {
@@ -658,7 +666,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
     rsp.add(CMD_GET_FILE_LIST, result);
 
     // fetch list of tlog files only if cdcr is activated
-    if (core.getUpdateHandler().getUpdateLog() != null && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
+    if (solrParams.getBool(TLOG_FILES, true) && core.getUpdateHandler().getUpdateLog() != null
+        && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
       try {
         List<Map<String, Object>> tlogfiles = getTlogFileList(commit);
         LOG.info("Adding tlog files to list: " + tlogfiles);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java b/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
index 3afc66c..6b20204 100644
--- a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
@@ -151,7 +151,12 @@ public class CdcrUpdateLog extends UpdateLog {
     if (id != -1) return id;
     if (tlogFiles.length == 0) return -1;
     String last = tlogFiles[tlogFiles.length - 1];
-    return Long.parseLong(last.substring(TLOG_NAME.length() + 1, last.lastIndexOf('.')));
+    if (TLOG_NAME.length() + 1 > last.lastIndexOf('.'))  {
+      // old tlog created by default UpdateLog impl
+      return Long.parseLong(last.substring(TLOG_NAME.length() + 1));
+    } else  {
+      return Long.parseLong(last.substring(TLOG_NAME.length() + 1, last.lastIndexOf('.')));
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index a29d57d..c57ee75 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -395,5 +395,9 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
   public void setLastReplicateIndexSuccess(boolean success) {
     this.lastReplicationSuccess = success;
   }
-  
+
+  @Override
+  public Lock getRecoveryLock() {
+    return recoveryLock;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index 89e286a..873a697 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -163,4 +163,6 @@ public abstract class SolrCoreState {
       super(s);
     }
   }
+
+  public abstract Lock getRecoveryLock();
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
index 3b3fcb4..5bbc4a2 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
@@ -96,11 +96,11 @@ public class CdcrUpdateProcessor extends DistributedUpdateProcessor {
     ModifiableSolrParams result = super.filterParams(params);
     if (params.get(CDCR_UPDATE) != null) {
       result.set(CDCR_UPDATE, "");
-      if (params.get(DistributedUpdateProcessor.VERSION_FIELD) == null) {
-        log.warn("+++ cdcr.update but no version field, params are: " + params);
-      } else {
-        log.info("+++ cdcr.update version present, params are: " + params);
-      }
+//      if (params.get(DistributedUpdateProcessor.VERSION_FIELD) == null) {
+//        log.warn("+++ cdcr.update but no version field, params are: " + params);
+//      } else {
+//        log.info("+++ cdcr.update version present, params are: " + params);
+//      }
       result.set(DistributedUpdateProcessor.VERSION_FIELD, params.get(DistributedUpdateProcessor.VERSION_FIELD));
     }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 67c88dd..9b0e4dc 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -295,7 +295,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     
     // this should always be used - see filterParams
     DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist
-      (this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS);
+      (this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS, VERSION_FIELD);
     
     CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
     

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/schema.xml b/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/schema.xml
new file mode 100644
index 0000000..2897315
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/schema.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+  <types>
+    <fieldType name="string" class="solr.StrField"/>
+    <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
+  </types>
+  <fields>
+    <field name="id" type="string" indexed="true" stored="true"/>
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+    <dynamicField name="*" type="string" indexed="true" stored="true"/>
+  </fields>
+  <uniqueKey>id</uniqueKey>
+</schema>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/solrconfig.xml
new file mode 100644
index 0000000..e63d9a6
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/solrconfig.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- This is a "kitchen sink" config file that tests can use.
+     When writting a new test, feel free to add *new* items (plugins,
+     config options, etc...) as long as they don't break any existing
+     tests.  if you need to test something esoteric please add a new
+     "solrconfig-your-esoteric-purpose.xml" config file.
+
+     Note in particular that this test is used by MinimalSchemaTest so
+     Anything added to this file needs to work correctly even if there
+     is now uniqueKey or defaultSearch Field.
+  -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+
+    <updateLog>
+      <str name="dir">${solr.ulog.dir:}</str>
+    </updateLog>
+
+  </updateHandler>
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+</config>
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/test-files/solr/configsets/cdcr-source/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cdcr-source/schema.xml b/solr/core/src/test-files/solr/configsets/cdcr-source/schema.xml
new file mode 100644
index 0000000..2897315
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cdcr-source/schema.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+  <types>
+    <fieldType name="string" class="solr.StrField"/>
+    <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
+  </types>
+  <fields>
+    <field name="id" type="string" indexed="true" stored="true"/>
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+    <dynamicField name="*" type="string" indexed="true" stored="true"/>
+  </fields>
+  <uniqueKey>id</uniqueKey>
+</schema>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/test-files/solr/configsets/cdcr-source/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cdcr-source/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cdcr-source/solrconfig.xml
new file mode 100644
index 0000000..f2528c3
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cdcr-source/solrconfig.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- This is a "kitchen sink" config file that tests can use.
+     When writting a new test, feel free to add *new* items (plugins,
+     config options, etc...) as long as they don't break any existing
+     tests.  if you need to test something esoteric please add a new
+     "solrconfig-your-esoteric-purpose.xml" config file.
+
+     Note in particular that this test is used by MinimalSchemaTest so
+     Anything added to this file needs to work correctly even if there
+     is now uniqueKey or defaultSearch Field.
+  -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <updateRequestProcessorChain name="cdcr-processor-chain">
+    <processor class="solr.CdcrUpdateProcessorFactory"/>
+    <processor class="solr.RunUpdateProcessorFactory"/>
+  </updateRequestProcessorChain>
+
+  <requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
+    <lst name="replica">
+      <str name="zkHost">${cdcr.target.zkHost}</str>
+      <str name="source">cdcr-source</str>
+      <str name="target">cdcr-target</str>
+    </lst>
+    <lst name="replicator">
+      <str name="threadPoolSize">1</str>
+      <str name="schedule">1000</str>
+      <str name="batchSize">1000</str>
+    </lst>
+    <lst name="updateLogSynchronizer">
+      <str name="schedule">1000</str>
+    </lst>
+  </requestHandler>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <updateLog class="solr.CdcrUpdateLog">
+      <str name="dir">${solr.ulog.dir:}</str>
+    </updateLog>
+  </updateHandler>
+
+  <requestHandler name="standard" class="solr.StandardRequestHandler">
+  </requestHandler>
+
+  <requestHandler name="/update" class="solr.UpdateRequestHandler">
+    <lst name="defaults">
+      <str name="update.chain">cdcr-processor-chain</str>
+    </lst>
+  </requestHandler>
+</config>
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/test-files/solr/configsets/cdcr-target/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cdcr-target/schema.xml b/solr/core/src/test-files/solr/configsets/cdcr-target/schema.xml
new file mode 100644
index 0000000..2897315
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cdcr-target/schema.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+  <types>
+    <fieldType name="string" class="solr.StrField"/>
+    <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
+  </types>
+  <fields>
+    <field name="id" type="string" indexed="true" stored="true"/>
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+    <dynamicField name="*" type="string" indexed="true" stored="true"/>
+  </fields>
+  <uniqueKey>id</uniqueKey>
+</schema>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/test-files/solr/configsets/cdcr-target/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cdcr-target/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cdcr-target/solrconfig.xml
new file mode 100644
index 0000000..ef24fa4
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cdcr-target/solrconfig.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- This is a "kitchen sink" config file that tests can use.
+     When writting a new test, feel free to add *new* items (plugins,
+     config options, etc...) as long as they don't break any existing
+     tests.  if you need to test something esoteric please add a new
+     "solrconfig-your-esoteric-purpose.xml" config file.
+
+     Note in particular that this test is used by MinimalSchemaTest so
+     Anything added to this file needs to work correctly even if there
+     is now uniqueKey or defaultSearch Field.
+  -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <updateRequestProcessorChain name="cdcr-processor-chain">
+    <processor class="solr.CdcrUpdateProcessorFactory"/>
+    <processor class="solr.RunUpdateProcessorFactory"/>
+  </updateRequestProcessorChain>
+
+  <requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
+  </requestHandler>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <updateLog class="solr.CdcrUpdateLog">
+      <str name="dir">${solr.ulog.dir:}</str>
+    </updateLog>
+  </updateHandler>
+
+  <requestHandler name="standard" class="solr.StandardRequestHandler">
+  </requestHandler>
+
+  <requestHandler name="/update" class="solr.UpdateRequestHandler">
+    <lst name="defaults">
+      <str name="update.chain">cdcr-processor-chain</str>
+    </lst>
+  </requestHandler>
+</config>
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
index 0ca25aa..3d758e0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
@@ -28,6 +28,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
@@ -56,6 +57,7 @@ import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.CdcrParams;
+import org.apache.solr.util.TimeOut;
 import org.apache.zookeeper.CreateMode;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -69,6 +71,8 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
+import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
 
 /**
  * <p>
@@ -763,6 +767,18 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
     }
   }
 
+  protected void waitForBootstrapToComplete(String collectionName, String shardId) throws Exception {
+    NamedList rsp;// we need to wait until bootstrap is complete otherwise the replicator thread will never start
+    TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS);
+    while (!timeOut.hasTimedOut())  {
+      rsp = invokeCdcrAction(shardToLeaderJetty.get(collectionName).get(shardId), CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
+      if (rsp.get(RESPONSE_STATUS).toString().equals(COMPLETED))  {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+  }
+
   protected void waitForReplicationToComplete(String collectionName, String shardId) throws Exception {
     int cnt = 15;
     while (cnt > 0) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/153c2700/solr/core/src/test/org/apache/solr/cloud/CdcrBootstrapTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CdcrBootstrapTest.java b/solr/core/src/test/org/apache/solr/cloud/CdcrBootstrapTest.java
new file mode 100644
index 0000000..1efdc6a
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/CdcrBootstrapTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.CdcrParams;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CdcrBootstrapTest extends SolrTestCaseJ4 {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Starts a source cluster with no CDCR configuration, indexes enough documents such that
+   * the at least one old tlog is closed and thrown away so that the source cluster does not have
+   * all updates available in tlogs only.
+   * <p>
+   * Then we start a target cluster with CDCR configuration and we change the source cluster configuration
+   * to use CDCR (i.e. CdcrUpdateLog, CdcrRequestHandler and CdcrUpdateProcessor) and restart it.
+   * <p>
+   * We test that all updates eventually make it to the target cluster and that the collectioncheckpoint
+   * call returns the same version as the last update indexed on the source.
+   */
+  @Test
+  public void testConvertClusterToCdcrAndBootstrap() throws Exception {
+    // start the target first so that we know its zkhost
+    MiniSolrCloudCluster target = new MiniSolrCloudCluster(1, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
+    try {
+      target.waitForAllNodes(30);
+      System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
+      System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
+
+      // start a cluster with no cdcr
+      MiniSolrCloudCluster source = new MiniSolrCloudCluster(1, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
+      try {
+        source.waitForAllNodes(30);
+        final File configDir = getFile("solr").toPath().resolve("configsets/cdcr-source-disabled").toFile();
+        System.out.println("config dir absolute path = " + configDir.getAbsolutePath());
+        source.uploadConfigDir(configDir, "cdcr-source");
+
+        // create a collection with the cdcr-source-disabled configset
+        Map<String, String> collectionProperties = new HashMap<>();
+        // todo investigate why this is necessary??? because by default it selects a ram directory which deletes the tlogs on reloads?
+        collectionProperties.putIfAbsent("solr.directoryFactory", "solr.StandardDirectoryFactory");
+        source.createCollection("cdcr-source", 1, 1, "cdcr-source", collectionProperties);
+        source.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-source");
+        AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
+
+        // index 10000 docs with a hard commit every 1000 documents
+        CloudSolrClient sourceSolrClient = source.getSolrClient();
+        sourceSolrClient.setDefaultCollection("cdcr-source");
+        int numDocs = 0;
+        for (int k = 0; k < 100; k++) {
+          UpdateRequest req = new UpdateRequest();
+          for (; numDocs < (k + 1) * 100; numDocs++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", "source_" + numDocs);
+            doc.addField("xyz", numDocs);
+            req.add(doc);
+          }
+          req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+          System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
+          req.process(sourceSolrClient);
+        }
+
+        QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
+        assertEquals("", numDocs, response.getResults().getNumFound());
+
+        // lets find and keep the maximum version assigned by source cluster across all our updates
+        long maxVersion = Long.MIN_VALUE;
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CommonParams.QT, "/get");
+        params.set("getVersions", numDocs);
+        response = sourceSolrClient.query(params);
+        List<Long> versions = (List<Long>) response.getResponse().get("versions");
+        for (Long version : versions) {
+          maxVersion = Math.max(maxVersion, version);
+        }
+
+//       upload the cdcr-enabled config and restart source cluster
+        final File cdcrEnabledSourceConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-source").toFile();
+        source.uploadConfigDir(cdcrEnabledSourceConfigDir, "cdcr-source");
+        JettySolrRunner runner = source.stopJettySolrRunner(0);
+        source.startJettySolrRunner(runner);
+        assertTrue(runner.isRunning());
+        AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
+
+        response = sourceSolrClient.query(new SolrQuery("*:*"));
+        assertEquals("Document mismatch on source after restart", numDocs, response.getResults().getNumFound());
+
+        // setup the target cluster
+        final File targetConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-target").toFile();
+        target.uploadConfigDir(targetConfigDir, "cdcr-target");
+        target.createCollection("cdcr-target", 1, 1, "cdcr-target", Collections.emptyMap());
+        target.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-target");
+        AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-target", target.getSolrClient().getZkStateReader(), true, true, 330);
+        CloudSolrClient targetSolrClient = target.getSolrClient();
+        targetSolrClient.setDefaultCollection("cdcr-target");
+        Thread.sleep(1000);
+
+        cdcrStart(targetSolrClient);
+        cdcrStart(sourceSolrClient);
+
+        response = getCdcrQueue(sourceSolrClient);
+        System.out.println("Cdcr queue response: " + response.getResponse());
+        long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
+        assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
+
+        params = new ModifiableSolrParams();
+        params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
+        params.set(CommonParams.QT, "/cdcr");
+        response = targetSolrClient.query(params);
+        Long checkpoint = (Long) response.getResponse().get(CdcrParams.CHECKPOINT);
+        assertNotNull(checkpoint);
+        assertEquals("COLLECTIONCHECKPOINT from target cluster should have returned the maximum " +
+            "version across all updates made to source", maxVersion, checkpoint.longValue());
+      } finally {
+        source.shutdown();
+      }
+    } finally {
+      target.shutdown();
+    }
+  }
+
+  /**
+   * This test start cdcr source, adds data,starts target cluster, verifies replication,
+   * stops cdcr replication and buffering, adds more data, re-enables cdcr and verify replication
+   */
+  public void testBootstrapWithSourceCluster() throws Exception {
+    // start the target first so that we know its zkhost
+    MiniSolrCloudCluster target = new MiniSolrCloudCluster(1, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
+    try {
+      target.waitForAllNodes(30);
+      System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
+      System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
+
+      MiniSolrCloudCluster source = new MiniSolrCloudCluster(1, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
+      try {
+        source.waitForAllNodes(30);
+        final File configDir = getFile("solr").toPath().resolve("configsets/cdcr-source").toFile();
+        System.out.println("config dir absolute path = " + configDir.getAbsolutePath());
+        source.uploadConfigDir(configDir, "cdcr-source");
+
+        Map<String, String> collectionProperties = new HashMap<>();
+        // todo investigate why this is necessary???
+        collectionProperties.putIfAbsent("solr.directoryFactory", "solr.StandardDirectoryFactory");
+        source.createCollection("cdcr-source", 1, 1, "cdcr-source", collectionProperties);
+        source.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-source");
+        AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
+
+        // index 10000 docs with a hard commit every 1000 documents
+        CloudSolrClient sourceSolrClient = source.getSolrClient();
+        sourceSolrClient.setDefaultCollection("cdcr-source");
+        int numDocs = 0;
+        for (int k = 0; k < 100; k++) {
+          UpdateRequest req = new UpdateRequest();
+          for (; numDocs < (k + 1) * 100; numDocs++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", "source_" + numDocs);
+            doc.addField("xyz", numDocs);
+            req.add(doc);
+          }
+          req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+          System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
+          req.process(sourceSolrClient);
+        }
+
+        QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
+        assertEquals("", numDocs, response.getResults().getNumFound());
+
+        // setup the target cluster
+        final File targetConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-target").toFile();
+        target.uploadConfigDir(targetConfigDir, "cdcr-target");
+        target.createCollection("cdcr-target", 1, 1, "cdcr-target", Collections.emptyMap());
+        target.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-target");
+        AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-target", target.getSolrClient().getZkStateReader(), true, true, 330);
+        CloudSolrClient targetSolrClient = target.getSolrClient();
+        targetSolrClient.setDefaultCollection("cdcr-target");
+
+        cdcrStart(targetSolrClient);
+        cdcrStart(sourceSolrClient);
+
+        response = getCdcrQueue(sourceSolrClient);
+        System.out.println("Cdcr queue response: " + response.getResponse());
+        long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
+        assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
+
+        cdcrStop(sourceSolrClient);
+        cdcrDisableBuffer(sourceSolrClient);
+
+        int c = 0;
+        for (int k = 0; k < 100; k++) {
+          UpdateRequest req = new UpdateRequest();
+          for (; c < (k + 1) * 100; c++, numDocs++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", "source_" + numDocs);
+            doc.addField("xyz", numDocs);
+            req.add(doc);
+          }
+          req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+          System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
+          req.process(sourceSolrClient);
+        }
+
+        response = sourceSolrClient.query(new SolrQuery("*:*"));
+        assertEquals("", numDocs, response.getResults().getNumFound());
+
+        cdcrStart(sourceSolrClient);
+        cdcrEnableBuffer(sourceSolrClient);
+
+        foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
+        assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
+
+      } finally {
+        source.shutdown();
+      }
+    } finally {
+      target.shutdown();
+    }
+  }
+
+  public void testBootstrapWithContinousIndexingOnSourceCluster() throws Exception {
+    // start the target first so that we know its zkhost
+    MiniSolrCloudCluster target = new MiniSolrCloudCluster(1, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
+    target.waitForAllNodes(30);
+    try {
+      System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
+      System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
+
+      MiniSolrCloudCluster source = new MiniSolrCloudCluster(1, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
+      try {
+        source.waitForAllNodes(30);
+        final File configDir = getFile("solr").toPath().resolve("configsets/cdcr-source").toFile();
+        System.out.println("config dir absolute path = " + configDir.getAbsolutePath());
+        source.uploadConfigDir(configDir, "cdcr-source");
+
+        Map<String, String> collectionProperties = new HashMap<>();
+        // todo investigate why this is necessary???
+        collectionProperties.putIfAbsent("solr.directoryFactory", "solr.StandardDirectoryFactory");
+        source.createCollection("cdcr-source", 1, 1, "cdcr-source", collectionProperties);
+        source.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-source");
+        AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
+
+        // index 10000 docs with a hard commit every 1000 documents
+        CloudSolrClient sourceSolrClient = source.getSolrClient();
+        sourceSolrClient.setDefaultCollection("cdcr-source");
+        int numDocs = 0;
+        for (int k = 0; k < 100; k++) {
+          UpdateRequest req = new UpdateRequest();
+          for (; numDocs < (k + 1) * 100; numDocs++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", "source_" + numDocs);
+            doc.addField("xyz", numDocs);
+            req.add(doc);
+          }
+          req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+          System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
+          req.process(sourceSolrClient);
+        }
+
+        QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
+        assertEquals("", numDocs, response.getResults().getNumFound());
+
+        // setup the target cluster
+        final File targetConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-target").toFile();
+        target.uploadConfigDir(targetConfigDir, "cdcr-target");
+        target.createCollection("cdcr-target", 1, 1, "cdcr-target", Collections.emptyMap());
+        target.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-target");
+        AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-target", target.getSolrClient().getZkStateReader(), true, true, 330);
+        CloudSolrClient targetSolrClient = target.getSolrClient();
+        targetSolrClient.setDefaultCollection("cdcr-target");
+        Thread.sleep(1000);
+
+        cdcrStart(targetSolrClient);
+        cdcrStart(sourceSolrClient);
+
+        int c = 0;
+        for (int k = 0; k < 100; k++) {
+          UpdateRequest req = new UpdateRequest();
+          for (; c < (k + 1) * 100; c++, numDocs++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", "source_" + numDocs);
+            doc.addField("xyz", numDocs);
+            req.add(doc);
+          }
+          req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+          System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
+          req.process(sourceSolrClient);
+        }
+
+        response = sourceSolrClient.query(new SolrQuery("*:*"));
+        assertEquals("", numDocs, response.getResults().getNumFound());
+
+        response = getCdcrQueue(sourceSolrClient);
+        System.out.println("Cdcr queue response: " + response.getResponse());
+        long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
+        assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
+
+      } finally {
+        source.shutdown();
+      }
+    } finally {
+      target.shutdown();
+    }
+  }
+
+  private long waitForTargetToSync(int numDocs, CloudSolrClient targetSolrClient) throws SolrServerException, IOException, InterruptedException {
+    long start = System.nanoTime();
+    QueryResponse response = null;
+    while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(120, TimeUnit.SECONDS)) {
+      try {
+        targetSolrClient.commit();
+        response = targetSolrClient.query(new SolrQuery("*:*"));
+        if (response.getResults().getNumFound() == numDocs) {
+          break;
+        }
+      } catch (Exception e) {
+        log.warn("Exception trying to commit on target. This is expected and safe to ignore.", e);
+      }
+      Thread.sleep(1000);
+    }
+    return response != null ? response.getResults().getNumFound() : 0;
+  }
+
+
+  private void cdcrStart(CloudSolrClient client) throws SolrServerException, IOException {
+    QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.START);
+    assertEquals("started", ((NamedList) response.getResponse().get("status")).get("process"));
+  }
+
+  private void cdcrStop(CloudSolrClient client) throws SolrServerException, IOException {
+    QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.STOP);
+    assertEquals("stopped", ((NamedList) response.getResponse().get("status")).get("process"));
+  }
+
+  private void cdcrEnableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
+    QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.ENABLEBUFFER);
+    assertEquals("enabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
+  }
+
+  private void cdcrDisableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
+    QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.DISABLEBUFFER);
+    assertEquals("disabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
+  }
+
+  private QueryResponse invokeCdcrAction(CloudSolrClient client, CdcrParams.CdcrAction action) throws IOException, SolrServerException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CommonParams.QT, "/cdcr");
+    params.set(CommonParams.ACTION, action.toLower());
+    return client.query(params);
+  }
+
+  private QueryResponse getCdcrQueue(CloudSolrClient client) throws SolrServerException, IOException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CommonParams.QT, "/cdcr");
+    params.set(CommonParams.ACTION, CdcrParams.QUEUES);
+    return client.query(params);
+  }
+}