You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2016/08/08 05:52:57 UTC
[1/2] lucene-solr:branch_6x: SOLR-6465: CDCR: fall back to
whole-index replication when tlogs are insufficient
Repository: lucene-solr
Updated Branches:
refs/heads/branch_6x 9c37aaabe -> cc3f3e8a8
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
index 3478df9..3ba6186 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
@@ -103,6 +103,11 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
// check status
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
+ this.waitForBootstrapToComplete(TARGET_COLLECTION, SHARD2);
+
+ // sleep for a bit to ensure that replicator threads are started
+ Thread.sleep(3000);
+
// Kill all the servers of the target
this.deleteCollection(TARGET_COLLECTION);
@@ -156,6 +161,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
+
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
@@ -182,6 +190,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
+
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
@@ -203,6 +214,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
+
log.info("Indexing 10 documents");
int start = 0;
@@ -244,6 +258,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
log.info("Waiting for replication");
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
+
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
@@ -267,6 +284,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
+
log.info("Indexing 10 documents");
int start = 0;
@@ -349,6 +369,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
}
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
+
// wait a bit for the replication to complete
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
@@ -495,6 +518,8 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
// Start CDCR
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
+ this.waitForBootstrapToComplete(TARGET_COLLECTION, SHARD1);
+ this.waitForBootstrapToComplete(TARGET_COLLECTION, SHARD2);
// wait a bit for the replication to complete
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
@@ -526,6 +551,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
assertNumDocs(128, SOURCE_COLLECTION);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
+
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
@@ -553,6 +581,9 @@ public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForBootstrapToComplete(SOURCE_COLLECTION, SHARD2);
+
// wait a bit for the replication to complete
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
[2/2] lucene-solr:branch_6x: SOLR-6465: CDCR: fall back to
whole-index replication when tlogs are insufficient
Posted by sh...@apache.org.
SOLR-6465: CDCR: fall back to whole-index replication when tlogs are insufficient
(cherry picked from commit 153c2700450af1e1c4bd063d7d8b65cc4a726438)
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/cc3f3e8a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/cc3f3e8a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/cc3f3e8a
Branch: refs/heads/branch_6x
Commit: cc3f3e8a8b37bba8c465beded466ba95e3c4a77d
Parents: 9c37aaa
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Mon Aug 8 11:22:50 2016 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Mon Aug 8 11:22:50 2016 +0530
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../org/apache/solr/handler/CdcrParams.java | 10 +-
.../org/apache/solr/handler/CdcrReplicator.java | 8 +-
.../solr/handler/CdcrReplicatorManager.java | 242 +++++++++++-
.../solr/handler/CdcrReplicatorScheduler.java | 6 +-
.../solr/handler/CdcrReplicatorState.java | 23 ++
.../apache/solr/handler/CdcrRequestHandler.java | 233 ++++++++++-
.../org/apache/solr/handler/IndexFetcher.java | 10 +-
.../apache/solr/handler/ReplicationHandler.java | 17 +-
.../org/apache/solr/update/CdcrUpdateLog.java | 7 +-
.../solr/update/DefaultSolrCoreState.java | 6 +-
.../org/apache/solr/update/SolrCoreState.java | 2 +
.../update/processor/CdcrUpdateProcessor.java | 10 +-
.../processor/DistributedUpdateProcessor.java | 2 +-
.../configsets/cdcr-source-disabled/schema.xml | 29 ++
.../cdcr-source-disabled/solrconfig.xml | 60 +++
.../solr/configsets/cdcr-source/schema.xml | 29 ++
.../solr/configsets/cdcr-source/solrconfig.xml | 76 ++++
.../solr/configsets/cdcr-target/schema.xml | 29 ++
.../solr/configsets/cdcr-target/solrconfig.xml | 63 +++
.../solr/cloud/BaseCdcrDistributedZkTest.java | 16 +
.../apache/solr/cloud/CdcrBootstrapTest.java | 396 +++++++++++++++++++
.../cloud/CdcrReplicationDistributedZkTest.java | 31 ++
23 files changed, 1280 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index dce3f5c..d3bd30d 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -79,6 +79,9 @@ New Features
* SOLR-9252: Feature selection and logistic regression on text (Cao Manh Dat, Joel Bernstein)
+* SOLR-6465: CDCR: fall back to whole-index replication when tlogs are insufficient.
+ (Noble Paul, Renaud Delbru, shalin)
+
Bug Fixes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/handler/CdcrParams.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrParams.java b/solr/core/src/java/org/apache/solr/handler/CdcrParams.java
index aa1d5bf..3f65b90 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrParams.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrParams.java
@@ -122,6 +122,11 @@ public class CdcrParams {
public final static String COUNTER_DELETES = "deletes";
/**
+ * Counter for Bootstrap operations *
+ */
+ public final static String COUNTER_BOOTSTRAP = "bootstraps";
+
+ /**
* A list of errors per target collection *
*/
public final static String ERRORS = "errors";
@@ -165,7 +170,10 @@ public class CdcrParams {
LASTPROCESSEDVERSION,
QUEUES,
OPS,
- ERRORS;
+ ERRORS,
+ BOOTSTRAP,
+ BOOTSTRAP_STATUS,
+ CANCEL_BOOTSTRAP;
public static CdcrAction get(String p) {
if (p != null) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
index 66ce096..8519815 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
@@ -119,7 +119,7 @@ public class CdcrReplicator implements Runnable {
// we might have read a single commit operation and reached the end of the update logs
logReader.forwardSeek(subReader);
- log.debug("Forwarded {} updates to target {}", counter, state.getTargetCollection());
+ log.info("Forwarded {} updates to target {}", counter, state.getTargetCollection());
} catch (Exception e) {
// report error and update error stats
this.handleException(e);
@@ -150,13 +150,13 @@ public class CdcrReplicator implements Runnable {
if (e instanceof CdcrReplicatorException) {
UpdateRequest req = ((CdcrReplicatorException) e).req;
UpdateResponse rsp = ((CdcrReplicatorException) e).rsp;
- log.warn("Failed to forward update request {}. Got response {}", req, rsp);
+ log.warn("Failed to forward update request {} to target: {}. Got response {}", req, state.getTargetCollection(), rsp);
state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
} else if (e instanceof CloudSolrClient.RouteException) {
- log.warn("Failed to forward update request", e);
+ log.warn("Failed to forward update request to target: " + state.getTargetCollection(), e);
state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
} else {
- log.warn("Failed to forward update request", e);
+ log.warn("Failed to forward update request to target: " + state.getTargetCollection(), e);
state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
index af0161e..528e0b7 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
@@ -16,29 +16,49 @@
*/
package org.apache.solr.handler;
+import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.CdcrUpdateLog;
+import org.apache.solr.util.TimeOut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
+
class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
+ private static final int MAX_BOOTSTRAP_ATTEMPTS = 5;
+ private static final int BOOTSTRAP_RETRY_DELAY_MS = 2000;
+ // 6 hours is hopefully long enough for most indexes
+ private static final long BOOTSTRAP_TIMEOUT_SECONDS = 6L * 3600L * 3600L;
+
private List<CdcrReplicatorState> replicatorStates;
private final CdcrReplicatorScheduler scheduler;
@@ -48,6 +68,9 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
private SolrCore core;
private String path;
+ private ExecutorService bootstrapExecutor;
+ private volatile BootstrapStatusRunnable bootstrapStatusRunnable;
+
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
CdcrReplicatorManager(final SolrCore core, String path,
@@ -104,12 +127,20 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
@Override
public synchronized void stateUpdate() {
if (leaderStateManager.amILeader() && processStateManager.getState().equals(CdcrParams.ProcessState.STARTED)) {
+ if (replicatorStates.size() > 0) {
+ this.bootstrapExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(replicatorStates.size(),
+ new SolrjNamedThreadFactory("cdcr-bootstrap-status"));
+ }
this.initLogReaders();
this.scheduler.start();
return;
}
this.scheduler.shutdown();
+ if (bootstrapExecutor != null) {
+ IOUtils.closeQuietly(bootstrapStatusRunnable);
+ ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
+ }
this.closeLogReaders();
}
@@ -117,7 +148,7 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
return replicatorStates;
}
- void initLogReaders() {
+ private void initLogReaders() {
String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
@@ -129,8 +160,23 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
checkpoint, collectionName, shard);
CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
- reader.seek(checkpoint);
+ boolean seek = reader.seek(checkpoint);
state.init(reader);
+ if (!seek) {
+ // targetVersion is lower than the oldest known entry.
+ // In this scenario, it probably means that there is a gap in the updates log.
+ // the best we can do here is to bootstrap the target leader by replicating the full index
+ final String targetCollection = state.getTargetCollection();
+ state.setBootstrapInProgress(true);
+ log.info("Attempting to bootstrap target collection: {}, shard: {}", targetCollection, shard);
+ bootstrapStatusRunnable = new BootstrapStatusRunnable(core, state);
+ log.info("Submitting bootstrap task to executor");
+ try {
+ bootstrapExecutor.submit(bootstrapStatusRunnable);
+ } catch (Exception e) {
+ log.error("Unable to submit bootstrap call to executor", e);
+ }
+ }
} catch (IOException | SolrServerException | SolrException e) {
log.warn("Unable to instantiate the log reader for target collection " + state.getTargetCollection(), e);
} catch (InterruptedException e) {
@@ -164,11 +210,203 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
*/
void shutdown() {
this.scheduler.shutdown();
+ if (bootstrapExecutor != null) {
+ IOUtils.closeQuietly(bootstrapStatusRunnable);
+ ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
+ }
for (CdcrReplicatorState state : replicatorStates) {
state.shutdown();
}
replicatorStates.clear();
}
+ private class BootstrapStatusRunnable implements Runnable, Closeable {
+ private final CdcrReplicatorState state;
+ private final String targetCollection;
+ private final String shard;
+ private final String collectionName;
+ private final CdcrUpdateLog ulog;
+ private final String myCoreUrl;
+
+ private volatile boolean closed = false;
+
+ BootstrapStatusRunnable(SolrCore core, CdcrReplicatorState state) {
+ this.collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ this.shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+ this.ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
+ this.state = state;
+ this.targetCollection = state.getTargetCollection();
+ String baseUrl = core.getCoreDescriptor().getCoreContainer().getZkController().getBaseUrl();
+ this.myCoreUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, core.getName());
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ try {
+ Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
+ String leaderCoreUrl = leader.getCoreUrl();
+ HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
+ try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
+ sendCdcrCommand(client, CdcrParams.CdcrAction.CANCEL_BOOTSTRAP);
+ } catch (SolrServerException e) {
+ log.error("Error sending cancel bootstrap message to target collection: {} shard: {} leader: {}",
+ targetCollection, shard, leaderCoreUrl);
+ }
+ } catch (InterruptedException e) {
+ log.error("Interrupted while closing BootstrapStatusRunnable", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void run() {
+ int retries = 1;
+ boolean success = false;
+ try {
+ while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
+ Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
+ }
+ TimeOut timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ while (!timeOut.hasTimedOut()) {
+ if (closed) {
+ log.warn("Cancelling waiting for bootstrap on target: {} shard: {} to complete", targetCollection, shard);
+ state.setBootstrapInProgress(false);
+ break;
+ }
+ BootstrapStatus status = getBoostrapStatus();
+ if (status == BootstrapStatus.RUNNING) {
+ try {
+ log.info("CDCR bootstrap running for {} seconds, sleeping for {} ms",
+ BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS), BOOTSTRAP_RETRY_DELAY_MS);
+ Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } else if (status == BootstrapStatus.COMPLETED) {
+ log.info("CDCR bootstrap successful in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
+ long checkpoint = CdcrReplicatorManager.this.getCheckpoint(state);
+ log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
+ checkpoint, collectionName, shard);
+ CdcrUpdateLog.CdcrLogReader reader1 = ulog.newLogReader();
+ reader1.seek(checkpoint);
+ success = true;
+ break;
+ } else if (status == BootstrapStatus.FAILED) {
+ log.warn("CDCR bootstrap failed in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
+ // let's retry a fixed number of times before giving up
+ if (retries >= MAX_BOOTSTRAP_ATTEMPTS) {
+ log.error("Unable to bootstrap the target collection: {}, shard: {} even after {} retries", targetCollection, shard, retries);
+ break;
+ } else {
+ log.info("Retry: {} - Attempting to bootstrap target collection: {} shard: {}", retries, targetCollection, shard);
+ while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
+ Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
+ }
+ timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS); // reset the timer
+ retries++;
+ }
+ } else if (status == BootstrapStatus.NOTFOUND) {
+ // the leader of the target shard may have changed and therefore there is no record of the
+ // bootstrap process so we must retry the operation
+ while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
+ Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
+ }
+ retries = 1;
+ timeOut = new TimeOut(6L * 3600L * 3600L, TimeUnit.SECONDS); // reset the timer
+ } else if (status == BootstrapStatus.UNKNOWN) {
+ // we were not able to query the status on the remote end
+ // so just sleep for a bit and try again
+ Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
+ }
+ }
+ } catch (InterruptedException e) {
+ log.info("Bootstrap thread interrupted");
+ state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
+ Thread.currentThread().interrupt();
+ } catch (IOException | SolrServerException | SolrException e) {
+ log.error("Unable to bootstrap the target collection " + targetCollection + " shard: " + shard, e);
+ state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
+ } finally {
+ if (success) {
+ log.info("Bootstrap successful, giving the go-ahead to replicator");
+ state.setBootstrapInProgress(false);
+ }
+ }
+ }
+
+ private BootstrapStatus sendBootstrapCommand() throws InterruptedException {
+ Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
+ String leaderCoreUrl = leader.getCoreUrl();
+ HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
+ try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
+ log.info("Attempting to bootstrap target collection: {} shard: {} leader: {}", targetCollection, shard, leaderCoreUrl);
+ try {
+ NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP, ReplicationHandler.MASTER_URL, myCoreUrl);
+ log.debug("CDCR Bootstrap response: {}", response);
+ String status = response.get(RESPONSE_STATUS).toString();
+ return BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
+ } catch (Exception e) {
+ log.error("Exception submitting bootstrap request", e);
+ return BootstrapStatus.UNKNOWN;
+ }
+ } catch (IOException e) {
+ log.error("There shouldn't be an IOException while closing but there was!", e);
+ }
+ return BootstrapStatus.UNKNOWN;
+ }
+
+ private BootstrapStatus getBoostrapStatus() throws InterruptedException {
+ try {
+ Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
+ String leaderCoreUrl = leader.getCoreUrl();
+ HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
+ try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
+ NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
+ String status = (String) response.get(RESPONSE_STATUS);
+ BootstrapStatus bootstrapStatus = BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
+ if (bootstrapStatus == BootstrapStatus.RUNNING) {
+ return BootstrapStatus.RUNNING;
+ } else if (bootstrapStatus == BootstrapStatus.COMPLETED) {
+ return BootstrapStatus.COMPLETED;
+ } else if (bootstrapStatus == BootstrapStatus.FAILED) {
+ return BootstrapStatus.FAILED;
+ } else if (bootstrapStatus == BootstrapStatus.NOTFOUND) {
+ log.warn("Bootstrap process was not found on target collection: {} shard: {}, leader: {}", targetCollection, shard, leaderCoreUrl);
+ return BootstrapStatus.NOTFOUND;
+ } else if (bootstrapStatus == BootstrapStatus.CANCELLED) {
+ return BootstrapStatus.CANCELLED;
+ } else {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unknown status: " + status + " returned by BOOTSTRAP_STATUS command");
+ }
+ }
+ } catch (Exception e) {
+ log.error("Exception during bootstrap status request", e);
+ return BootstrapStatus.UNKNOWN;
+ }
+ }
+ }
+
+ private NamedList sendCdcrCommand(SolrClient client, CdcrParams.CdcrAction action, String... params) throws SolrServerException, IOException {
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.set(CommonParams.QT, "/cdcr");
+ solrParams.set(CommonParams.ACTION, action.toString());
+ for (int i = 0; i < params.length - 1; i+=2) {
+ solrParams.set(params[i], params[i + 1]);
+ }
+ SolrRequest request = new QueryRequest(solrParams);
+ return client.request(request);
+ }
+
+ private enum BootstrapStatus {
+ SUBMITTED,
+ RUNNING,
+ COMPLETED,
+ FAILED,
+ NOTFOUND,
+ CANCELLED,
+ UNKNOWN
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
index bb817e5..62abeab 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
@@ -77,7 +77,11 @@ class CdcrReplicatorScheduler {
CdcrReplicatorState state = statesQueue.poll();
assert state != null; // Should never happen
try {
- new CdcrReplicator(state, batchSize).run();
+ if (!state.isBootstrapInProgress()) {
+ new CdcrReplicator(state, batchSize).run();
+ } else {
+ log.debug("Replicator state is bootstrapping, skipping replication for target collection {}", state.getTargetCollection());
+ }
} finally {
statesQueue.offer(state);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
index 9e01f11..2ca0d80 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
@@ -27,6 +27,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.update.CdcrUpdateLog;
@@ -53,6 +55,9 @@ class CdcrReplicatorState {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final AtomicBoolean bootstrapInProgress = new AtomicBoolean(false);
+ private final AtomicInteger numBootstraps = new AtomicInteger();
+
CdcrReplicatorState(final String targetCollection, final String zkHost, final CloudSolrClient targetClient) {
this.targetCollection = targetCollection;
this.targetClient = targetClient;
@@ -164,6 +169,24 @@ class CdcrReplicatorState {
return this.benchmarkTimer;
}
+ /**
+ * @return true if a bootstrap operation is in progress, false otherwise
+ */
+ boolean isBootstrapInProgress() {
+ return bootstrapInProgress.get();
+ }
+
+ void setBootstrapInProgress(boolean inProgress) {
+ if (bootstrapInProgress.compareAndSet(true, false)) {
+ numBootstraps.incrementAndGet();
+ }
+ bootstrapInProgress.set(inProgress);
+ }
+
+ public int getNumBootstraps() {
+ return numBootstraps.get();
+ }
+
enum ErrorType {
INTERNAL,
BAD_REQUEST;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java b/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
index f60e562..f706637 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.handler;
+import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
@@ -24,14 +25,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@@ -41,21 +48,33 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.PluginBag;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.VersionInfo;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
+import static org.apache.solr.handler.admin.CoreAdminHandler.FAILED;
+import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE;
+import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_MESSAGE;
+import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
+import static org.apache.solr.handler.admin.CoreAdminHandler.RUNNING;
+
/**
* <p>
* This request handler implements the CDCR API and is responsible of the execution of the
@@ -199,6 +218,18 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
this.handleErrorsAction(req, rsp);
break;
}
+ case BOOTSTRAP: {
+ this.handleBootstrapAction(req, rsp);
+ break;
+ }
+ case BOOTSTRAP_STATUS: {
+ this.handleBootstrapStatus(req, rsp);
+ break;
+ }
+ case CANCEL_BOOTSTRAP: {
+ this.handleCancelBootstrap(req, rsp);
+ break;
+ }
default: {
throw new RuntimeException("Unknown action: " + action);
}
@@ -409,10 +440,20 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
}
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ VersionInfo versionInfo = ulog.getVersionInfo();
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
- List<Long> versions = recentUpdates.getVersions(1);
- long lastVersion = versions.isEmpty() ? -1 : Math.abs(versions.get(0));
- rsp.add(CdcrParams.CHECKPOINT, lastVersion);
+ long maxVersionFromRecent = recentUpdates.getMaxRecentVersion();
+ long maxVersionFromIndex = versionInfo.getMaxVersionFromIndex(req.getSearcher());
+ log.info("Found maxVersionFromRecent {} maxVersionFromIndex {}", maxVersionFromRecent, maxVersionFromIndex);
+ // there is no race with ongoing bootstrap because we don't expect any updates to come from the source
+ long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent);
+ if (maxVersion == 0L) {
+ maxVersion = -1;
+ }
+ rsp.add(CdcrParams.CHECKPOINT, maxVersion);
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
+ "' could not read max version");
}
}
@@ -574,6 +615,192 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
rsp.add(CdcrParams.ERRORS, hosts);
}
+ private AtomicBoolean running = new AtomicBoolean();
+ private volatile Future<Boolean> bootstrapFuture;
+ private volatile BootstrapCallable bootstrapCallable;
+
+ private void handleBootstrapAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, SolrServerException {
+ String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+ if (!leaderStateManager.amILeader()) {
+ log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.BOOTSTRAP +
+ " sent to non-leader replica");
+ }
+
+ Runnable runnable = () -> {
+ Lock recoveryLock = req.getCore().getSolrCoreState().getRecoveryLock();
+ boolean locked = recoveryLock.tryLock();
+ try {
+ if (!locked) {
+ handleCancelBootstrap(req, rsp);
+ } else if (leaderStateManager.amILeader()) {
+ running.set(true);
+ String masterUrl = req.getParams().get(ReplicationHandler.MASTER_URL);
+ bootstrapCallable = new BootstrapCallable(masterUrl, core);
+ bootstrapFuture = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getRecoveryExecutor().submit(bootstrapCallable);
+ try {
+ bootstrapFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Bootstrap was interrupted", e);
+ } catch (ExecutionException e) {
+ log.error("Bootstrap operation failed", e);
+ }
+ } else {
+ log.error("Action {} sent to non-leader replica @ {}:{}. Aborting bootstrap.", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
+ }
+ } finally {
+ if (locked) {
+ running.set(false);
+ recoveryLock.unlock();
+ }
+ }
+ };
+
+ try {
+ core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getUpdateExecutor().submit(runnable);
+ rsp.add(RESPONSE_STATUS, "submitted");
+ } catch (RejectedExecutionException ree) {
+ // no problem, we're probably shutting down
+ rsp.add(RESPONSE_STATUS, "failed");
+ }
+ }
+
+ private void handleCancelBootstrap(SolrQueryRequest req, SolrQueryResponse rsp) {
+ BootstrapCallable callable = this.bootstrapCallable;
+ IOUtils.closeQuietly(callable);
+ rsp.add(RESPONSE_STATUS, "cancelled");
+ }
+
+ private void handleBootstrapStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, SolrServerException {
+ if (running.get()) {
+ rsp.add(RESPONSE_STATUS, RUNNING);
+ return;
+ }
+
+ Future<Boolean> future = bootstrapFuture;
+ BootstrapCallable callable = this.bootstrapCallable;
+ if (future == null) {
+ rsp.add(RESPONSE_STATUS, "notfound");
+ rsp.add(RESPONSE_MESSAGE, "No bootstrap found in running, completed or failed states");
+ } else if (future.isCancelled() || callable.isClosed()) {
+ rsp.add(RESPONSE_STATUS, "cancelled");
+ } else if (future.isDone()) {
+ // could be a normal termination or an exception
+ try {
+ Boolean result = future.get();
+ if (result) {
+ rsp.add(RESPONSE_STATUS, COMPLETED);
+ } else {
+ rsp.add(RESPONSE_STATUS, FAILED);
+ }
+ } catch (InterruptedException e) {
+ // should not happen?
+ } catch (ExecutionException e) {
+ rsp.add(RESPONSE_STATUS, FAILED);
+ rsp.add(RESPONSE, e);
+ } catch (CancellationException ce) {
+ rsp.add(RESPONSE_STATUS, FAILED);
+ rsp.add(RESPONSE_MESSAGE, "Bootstrap was cancelled");
+ }
+ } else {
+ rsp.add(RESPONSE_STATUS, RUNNING);
+ }
+ }
+
+ private static class BootstrapCallable implements Callable<Boolean>, Closeable {
+ private final String masterUrl;
+ private final SolrCore core;
+ private volatile boolean closed = false;
+
+ BootstrapCallable(String masterUrl, SolrCore core) {
+ this.masterUrl = masterUrl;
+ this.core = core;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
+ ReplicationHandler replicationHandler = (ReplicationHandler) handler;
+ replicationHandler.abortFetch();
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ boolean success = false;
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ // we start buffering updates as a safeguard however we do not expect
+ // to receive any updates from the source during bootstrap
+ ulog.bufferUpdates();
+ try {
+ commitOnLeader(masterUrl);
+ // use rep handler directly, so we can do this sync rather than async
+ SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
+ ReplicationHandler replicationHandler = (ReplicationHandler) handler;
+
+ if (replicationHandler == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
+ }
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.set(ReplicationHandler.MASTER_URL, masterUrl);
+ // we do not want the raw tlog files from the source
+ solrParams.set(ReplicationHandler.TLOG_FILES, false);
+
+ success = replicationHandler.doFetch(solrParams, false);
+
+ // this is required because this callable can race with HttpSolrCall#destroy
+ // which clears the request info.
+ // Applying buffered updates fails without the following line because LogReplayer
+ // also tries to set request info and fails with AssertionError
+ SolrRequestInfo.clearRequestInfo();
+
+ Future<UpdateLog.RecoveryInfo> future = ulog.applyBufferedUpdates();
+ if (future == null) {
+ // no replay needed
+ log.info("No replay needed.");
+ } else {
+ log.info("Replaying buffered documents.");
+ // wait for replay
+ UpdateLog.RecoveryInfo report = future.get();
+ if (report.failed) {
+ SolrException.log(log, "Replay failed");
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Replay failed");
+ }
+ }
+ return success;
+ } finally {
+ if (closed || !success) {
+ // we cannot apply the buffer in this case because it will introduce newer versions in the
+ // update log and then the source cluster will get those versions via collectioncheckpoint
+ // causing the versions in between to be completely missed
+ boolean dropped = ulog.dropBufferedUpdates();
+ assert dropped;
+ }
+ }
+ }
+
+ private void commitOnLeader(String leaderUrl) throws SolrServerException,
+ IOException {
+ try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl).build()) {
+ client.setConnectionTimeout(30000);
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParams(new ModifiableSolrParams());
+ ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
+ ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
+ ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
+ client);
+ }
+ }
+ }
+
@Override
public String getDescription() {
return "Manage Cross Data Center Replication";
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 0612f2f..634a9e0 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -736,14 +736,14 @@ public class IndexFetcher {
}
private void openNewSearcherAndUpdateCommitPoint() throws IOException {
- SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
- new ModifiableSolrParams());
-
RefCounted<SolrIndexSearcher> searcher = null;
IndexCommit commitPoint;
+ // must get the latest solrCore object because the one we have might be closed because of a reload
+ // todo stop keeping solrCore around
+ SolrCore core = solrCore.getCoreDescriptor().getCoreContainer().getCore(solrCore.getName());
try {
Future[] waitSearcher = new Future[1];
- searcher = solrCore.getSearcher(true, true, waitSearcher, true);
+ searcher = core.getSearcher(true, true, waitSearcher, true);
if (waitSearcher[0] != null) {
try {
waitSearcher[0].get();
@@ -753,10 +753,10 @@ public class IndexFetcher {
}
commitPoint = searcher.get().getIndexReader().getIndexCommit();
} finally {
- req.close();
if (searcher != null) {
searcher.decref();
}
+ core.close();
}
// update the commit point in replication handler
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index e473a63..139489a 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -300,9 +300,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
rsp.add("message","No slave configured");
}
} else if (command.equalsIgnoreCase(CMD_ABORT_FETCH)) {
- IndexFetcher fetcher = currentIndexFetcher;
- if (fetcher != null){
- fetcher.abortFetch();
+ if (abortFetch()){
rsp.add(STATUS, OK_STATUS);
} else {
rsp.add(STATUS,ERR_STATUS);
@@ -321,6 +319,16 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
}
}
+ public boolean abortFetch() {
+ IndexFetcher fetcher = currentIndexFetcher;
+ if (fetcher != null){
+ fetcher.abortFetch();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
private void deleteSnapshot(ModifiableSolrParams params) {
String name = params.get(NAME);
if(name == null) {
@@ -651,7 +659,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
rsp.add(CMD_GET_FILE_LIST, result);
// fetch list of tlog files only if cdcr is activated
- if (core.getUpdateHandler().getUpdateLog() != null && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
+ if (solrParams.getBool(TLOG_FILES, true) && core.getUpdateHandler().getUpdateLog() != null
+ && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
try {
List<Map<String, Object>> tlogfiles = getTlogFileList(commit);
LOG.info("Adding tlog files to list: " + tlogfiles);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java b/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
index 3afc66c..6b20204 100644
--- a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
@@ -151,7 +151,12 @@ public class CdcrUpdateLog extends UpdateLog {
if (id != -1) return id;
if (tlogFiles.length == 0) return -1;
String last = tlogFiles[tlogFiles.length - 1];
- return Long.parseLong(last.substring(TLOG_NAME.length() + 1, last.lastIndexOf('.')));
+ if (TLOG_NAME.length() + 1 > last.lastIndexOf('.')) {
+ // old tlog created by default UpdateLog impl
+ return Long.parseLong(last.substring(TLOG_NAME.length() + 1));
+ } else {
+ return Long.parseLong(last.substring(TLOG_NAME.length() + 1, last.lastIndexOf('.')));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index a29d57d..c57ee75 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -395,5 +395,9 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
public void setLastReplicateIndexSuccess(boolean success) {
this.lastReplicationSuccess = success;
}
-
+
+ @Override
+ public Lock getRecoveryLock() {
+ return recoveryLock;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index 89e286a..873a697 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -163,4 +163,6 @@ public abstract class SolrCoreState {
super(s);
}
}
+
+ public abstract Lock getRecoveryLock();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
index 3b3fcb4..5bbc4a2 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
@@ -96,11 +96,11 @@ public class CdcrUpdateProcessor extends DistributedUpdateProcessor {
ModifiableSolrParams result = super.filterParams(params);
if (params.get(CDCR_UPDATE) != null) {
result.set(CDCR_UPDATE, "");
- if (params.get(DistributedUpdateProcessor.VERSION_FIELD) == null) {
- log.warn("+++ cdcr.update but no version field, params are: " + params);
- } else {
- log.info("+++ cdcr.update version present, params are: " + params);
- }
+// if (params.get(DistributedUpdateProcessor.VERSION_FIELD) == null) {
+// log.warn("+++ cdcr.update but no version field, params are: " + params);
+// } else {
+// log.info("+++ cdcr.update version present, params are: " + params);
+// }
result.set(DistributedUpdateProcessor.VERSION_FIELD, params.get(DistributedUpdateProcessor.VERSION_FIELD));
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 67c88dd..9b0e4dc 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -295,7 +295,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// this should always be used - see filterParams
DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist
- (this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS);
+ (this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS, VERSION_FIELD);
CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/schema.xml b/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/schema.xml
new file mode 100644
index 0000000..2897315
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/schema.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+ <types>
+ <fieldType name="string" class="solr.StrField"/>
+ <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
+ </types>
+ <fields>
+ <field name="id" type="string" indexed="true" stored="true"/>
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+ <dynamicField name="*" type="string" indexed="true" stored="true"/>
+ </fields>
+ <uniqueKey>id</uniqueKey>
+</schema>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/solrconfig.xml
new file mode 100644
index 0000000..e63d9a6
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cdcr-source-disabled/solrconfig.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- This is a "kitchen sink" config file that tests can use.
+ When writting a new test, feel free to add *new* items (plugins,
+ config options, etc...) as long as they don't break any existing
+ tests. if you need to test something esoteric please add a new
+ "solrconfig-your-esoteric-purpose.xml" config file.
+
+ Note in particular that this test is used by MinimalSchemaTest so
+ Anything added to this file needs to work correctly even if there
+ is now uniqueKey or defaultSearch Field.
+ -->
+
+<config>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <directoryFactory name="DirectoryFactory"
+ class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+ <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <commitWithin>
+ <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+ </commitWithin>
+
+ <updateLog>
+ <str name="dir">${solr.ulog.dir:}</str>
+ </updateLog>
+
+ </updateHandler>
+ <requestHandler name="/select" class="solr.SearchHandler">
+ <lst name="defaults">
+ <str name="echoParams">explicit</str>
+ <str name="indent">true</str>
+ <str name="df">text</str>
+ </lst>
+
+ </requestHandler>
+</config>
+
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/test-files/solr/configsets/cdcr-source/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cdcr-source/schema.xml b/solr/core/src/test-files/solr/configsets/cdcr-source/schema.xml
new file mode 100644
index 0000000..2897315
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cdcr-source/schema.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+ <types>
+ <fieldType name="string" class="solr.StrField"/>
+ <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
+ </types>
+ <fields>
+ <field name="id" type="string" indexed="true" stored="true"/>
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+ <dynamicField name="*" type="string" indexed="true" stored="true"/>
+ </fields>
+ <uniqueKey>id</uniqueKey>
+</schema>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/test-files/solr/configsets/cdcr-source/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cdcr-source/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cdcr-source/solrconfig.xml
new file mode 100644
index 0000000..f2528c3
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cdcr-source/solrconfig.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- This is a "kitchen sink" config file that tests can use.
+ When writting a new test, feel free to add *new* items (plugins,
+ config options, etc...) as long as they don't break any existing
+ tests. if you need to test something esoteric please add a new
+ "solrconfig-your-esoteric-purpose.xml" config file.
+
+ Note in particular that this test is used by MinimalSchemaTest so
+ Anything added to this file needs to work correctly even if there
+ is now uniqueKey or defaultSearch Field.
+ -->
+
+<config>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <directoryFactory name="DirectoryFactory"
+ class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <updateRequestProcessorChain name="cdcr-processor-chain">
+ <processor class="solr.CdcrUpdateProcessorFactory"/>
+ <processor class="solr.RunUpdateProcessorFactory"/>
+ </updateRequestProcessorChain>
+
+ <requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
+ <lst name="replica">
+ <str name="zkHost">${cdcr.target.zkHost}</str>
+ <str name="source">cdcr-source</str>
+ <str name="target">cdcr-target</str>
+ </lst>
+ <lst name="replicator">
+ <str name="threadPoolSize">1</str>
+ <str name="schedule">1000</str>
+ <str name="batchSize">1000</str>
+ </lst>
+ <lst name="updateLogSynchronizer">
+ <str name="schedule">1000</str>
+ </lst>
+ </requestHandler>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <updateLog class="solr.CdcrUpdateLog">
+ <str name="dir">${solr.ulog.dir:}</str>
+ </updateLog>
+ </updateHandler>
+
+ <requestHandler name="standard" class="solr.StandardRequestHandler">
+ </requestHandler>
+
+ <requestHandler name="/update" class="solr.UpdateRequestHandler">
+ <lst name="defaults">
+ <str name="update.chain">cdcr-processor-chain</str>
+ </lst>
+ </requestHandler>
+</config>
+
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/test-files/solr/configsets/cdcr-target/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cdcr-target/schema.xml b/solr/core/src/test-files/solr/configsets/cdcr-target/schema.xml
new file mode 100644
index 0000000..2897315
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cdcr-target/schema.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+ <types>
+ <fieldType name="string" class="solr.StrField"/>
+ <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
+ </types>
+ <fields>
+ <field name="id" type="string" indexed="true" stored="true"/>
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+ <dynamicField name="*" type="string" indexed="true" stored="true"/>
+ </fields>
+ <uniqueKey>id</uniqueKey>
+</schema>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/test-files/solr/configsets/cdcr-target/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cdcr-target/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cdcr-target/solrconfig.xml
new file mode 100644
index 0000000..ef24fa4
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cdcr-target/solrconfig.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- This is a "kitchen sink" config file that tests can use.
+ When writting a new test, feel free to add *new* items (plugins,
+ config options, etc...) as long as they don't break any existing
+ tests. if you need to test something esoteric please add a new
+ "solrconfig-your-esoteric-purpose.xml" config file.
+
+ Note in particular that this test is used by MinimalSchemaTest so
+ Anything added to this file needs to work correctly even if there
+ is now uniqueKey or defaultSearch Field.
+ -->
+
+<config>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <directoryFactory name="DirectoryFactory"
+ class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <updateRequestProcessorChain name="cdcr-processor-chain">
+ <processor class="solr.CdcrUpdateProcessorFactory"/>
+ <processor class="solr.RunUpdateProcessorFactory"/>
+ </updateRequestProcessorChain>
+
+ <requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
+ </requestHandler>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <updateLog class="solr.CdcrUpdateLog">
+ <str name="dir">${solr.ulog.dir:}</str>
+ </updateLog>
+ </updateHandler>
+
+ <requestHandler name="standard" class="solr.StandardRequestHandler">
+ </requestHandler>
+
+ <requestHandler name="/update" class="solr.UpdateRequestHandler">
+ <lst name="defaults">
+ <str name="update.chain">cdcr-processor-chain</str>
+ </lst>
+ </requestHandler>
+</config>
+
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
index e355b79..0d6fd16 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
@@ -28,6 +28,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.solr.client.solrj.SolrClient;
@@ -57,6 +58,7 @@ import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.CdcrParams;
+import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.AfterClass;
@@ -70,6 +72,8 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
+import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
/**
* <p>
@@ -763,6 +767,18 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
}
}
+ protected void waitForBootstrapToComplete(String collectionName, String shardId) throws Exception {
+ NamedList rsp;// we need to wait until bootstrap is complete otherwise the replicator thread will never start
+ TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS);
+ while (!timeOut.hasTimedOut()) {
+ rsp = invokeCdcrAction(shardToLeaderJetty.get(collectionName).get(shardId), CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
+ if (rsp.get(RESPONSE_STATUS).toString().equals(COMPLETED)) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ }
+
protected void waitForReplicationToComplete(String collectionName, String shardId) throws Exception {
int cnt = 15;
while (cnt > 0) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cc3f3e8a/solr/core/src/test/org/apache/solr/cloud/CdcrBootstrapTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CdcrBootstrapTest.java b/solr/core/src/test/org/apache/solr/cloud/CdcrBootstrapTest.java
new file mode 100644
index 0000000..1efdc6a
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/CdcrBootstrapTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.CdcrParams;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CdcrBootstrapTest extends SolrTestCaseJ4 {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * Starts a source cluster with no CDCR configuration, indexes enough documents such that
+ * the at least one old tlog is closed and thrown away so that the source cluster does not have
+ * all updates available in tlogs only.
+ * <p>
+ * Then we start a target cluster with CDCR configuration and we change the source cluster configuration
+ * to use CDCR (i.e. CdcrUpdateLog, CdcrRequestHandler and CdcrUpdateProcessor) and restart it.
+ * <p>
+ * We test that all updates eventually make it to the target cluster and that the collectioncheckpoint
+ * call returns the same version as the last update indexed on the source.
+ */
+ @Test
+ public void testConvertClusterToCdcrAndBootstrap() throws Exception {
+ // start the target first so that we know its zkhost
+ MiniSolrCloudCluster target = new MiniSolrCloudCluster(1, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
+ try {
+ target.waitForAllNodes(30);
+ System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
+ System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
+
+ // start a cluster with no cdcr
+ MiniSolrCloudCluster source = new MiniSolrCloudCluster(1, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
+ try {
+ source.waitForAllNodes(30);
+ final File configDir = getFile("solr").toPath().resolve("configsets/cdcr-source-disabled").toFile();
+ System.out.println("config dir absolute path = " + configDir.getAbsolutePath());
+ source.uploadConfigDir(configDir, "cdcr-source");
+
+ // create a collection with the cdcr-source-disabled configset
+ Map<String, String> collectionProperties = new HashMap<>();
+ // todo investigate why this is necessary??? because by default it selects a ram directory which deletes the tlogs on reloads?
+ collectionProperties.putIfAbsent("solr.directoryFactory", "solr.StandardDirectoryFactory");
+ source.createCollection("cdcr-source", 1, 1, "cdcr-source", collectionProperties);
+ source.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-source");
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
+
+ // index 10000 docs with a hard commit every 1000 documents
+ CloudSolrClient sourceSolrClient = source.getSolrClient();
+ sourceSolrClient.setDefaultCollection("cdcr-source");
+ int numDocs = 0;
+ for (int k = 0; k < 100; k++) {
+ UpdateRequest req = new UpdateRequest();
+ for (; numDocs < (k + 1) * 100; numDocs++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "source_" + numDocs);
+ doc.addField("xyz", numDocs);
+ req.add(doc);
+ }
+ req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+ System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
+ req.process(sourceSolrClient);
+ }
+
+ QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
+ assertEquals("", numDocs, response.getResults().getNumFound());
+
+ // lets find and keep the maximum version assigned by source cluster across all our updates
+ long maxVersion = Long.MIN_VALUE;
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CommonParams.QT, "/get");
+ params.set("getVersions", numDocs);
+ response = sourceSolrClient.query(params);
+ List<Long> versions = (List<Long>) response.getResponse().get("versions");
+ for (Long version : versions) {
+ maxVersion = Math.max(maxVersion, version);
+ }
+
+// upload the cdcr-enabled config and restart source cluster
+ final File cdcrEnabledSourceConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-source").toFile();
+ source.uploadConfigDir(cdcrEnabledSourceConfigDir, "cdcr-source");
+ JettySolrRunner runner = source.stopJettySolrRunner(0);
+ source.startJettySolrRunner(runner);
+ assertTrue(runner.isRunning());
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
+
+ response = sourceSolrClient.query(new SolrQuery("*:*"));
+ assertEquals("Document mismatch on source after restart", numDocs, response.getResults().getNumFound());
+
+ // setup the target cluster
+ final File targetConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-target").toFile();
+ target.uploadConfigDir(targetConfigDir, "cdcr-target");
+ target.createCollection("cdcr-target", 1, 1, "cdcr-target", Collections.emptyMap());
+ target.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-target");
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-target", target.getSolrClient().getZkStateReader(), true, true, 330);
+ CloudSolrClient targetSolrClient = target.getSolrClient();
+ targetSolrClient.setDefaultCollection("cdcr-target");
+ Thread.sleep(1000);
+
+ cdcrStart(targetSolrClient);
+ cdcrStart(sourceSolrClient);
+
+ response = getCdcrQueue(sourceSolrClient);
+ System.out.println("Cdcr queue response: " + response.getResponse());
+ long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
+ assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
+
+ params = new ModifiableSolrParams();
+ params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
+ params.set(CommonParams.QT, "/cdcr");
+ response = targetSolrClient.query(params);
+ Long checkpoint = (Long) response.getResponse().get(CdcrParams.CHECKPOINT);
+ assertNotNull(checkpoint);
+ assertEquals("COLLECTIONCHECKPOINT from target cluster should have returned the maximum " +
+ "version across all updates made to source", maxVersion, checkpoint.longValue());
+ } finally {
+ source.shutdown();
+ }
+ } finally {
+ target.shutdown();
+ }
+ }
+
+ /**
+ * This test start cdcr source, adds data,starts target cluster, verifies replication,
+ * stops cdcr replication and buffering, adds more data, re-enables cdcr and verify replication
+ */
+ public void testBootstrapWithSourceCluster() throws Exception {
+ // start the target first so that we know its zkhost
+ MiniSolrCloudCluster target = new MiniSolrCloudCluster(1, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
+ try {
+ target.waitForAllNodes(30);
+ System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
+ System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
+
+ MiniSolrCloudCluster source = new MiniSolrCloudCluster(1, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
+ try {
+ source.waitForAllNodes(30);
+ final File configDir = getFile("solr").toPath().resolve("configsets/cdcr-source").toFile();
+ System.out.println("config dir absolute path = " + configDir.getAbsolutePath());
+ source.uploadConfigDir(configDir, "cdcr-source");
+
+ Map<String, String> collectionProperties = new HashMap<>();
+ // todo investigate why this is necessary???
+ collectionProperties.putIfAbsent("solr.directoryFactory", "solr.StandardDirectoryFactory");
+ source.createCollection("cdcr-source", 1, 1, "cdcr-source", collectionProperties);
+ source.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-source");
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
+
+ // index 10000 docs with a hard commit every 1000 documents
+ CloudSolrClient sourceSolrClient = source.getSolrClient();
+ sourceSolrClient.setDefaultCollection("cdcr-source");
+ int numDocs = 0;
+ for (int k = 0; k < 100; k++) {
+ UpdateRequest req = new UpdateRequest();
+ for (; numDocs < (k + 1) * 100; numDocs++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "source_" + numDocs);
+ doc.addField("xyz", numDocs);
+ req.add(doc);
+ }
+ req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+ System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
+ req.process(sourceSolrClient);
+ }
+
+ QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
+ assertEquals("", numDocs, response.getResults().getNumFound());
+
+ // setup the target cluster
+ final File targetConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-target").toFile();
+ target.uploadConfigDir(targetConfigDir, "cdcr-target");
+ target.createCollection("cdcr-target", 1, 1, "cdcr-target", Collections.emptyMap());
+ target.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-target");
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-target", target.getSolrClient().getZkStateReader(), true, true, 330);
+ CloudSolrClient targetSolrClient = target.getSolrClient();
+ targetSolrClient.setDefaultCollection("cdcr-target");
+
+ cdcrStart(targetSolrClient);
+ cdcrStart(sourceSolrClient);
+
+ response = getCdcrQueue(sourceSolrClient);
+ System.out.println("Cdcr queue response: " + response.getResponse());
+ long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
+ assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
+
+ cdcrStop(sourceSolrClient);
+ cdcrDisableBuffer(sourceSolrClient);
+
+ int c = 0;
+ for (int k = 0; k < 100; k++) {
+ UpdateRequest req = new UpdateRequest();
+ for (; c < (k + 1) * 100; c++, numDocs++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "source_" + numDocs);
+ doc.addField("xyz", numDocs);
+ req.add(doc);
+ }
+ req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+ System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
+ req.process(sourceSolrClient);
+ }
+
+ response = sourceSolrClient.query(new SolrQuery("*:*"));
+ assertEquals("", numDocs, response.getResults().getNumFound());
+
+ cdcrStart(sourceSolrClient);
+ cdcrEnableBuffer(sourceSolrClient);
+
+ foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
+ assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
+
+ } finally {
+ source.shutdown();
+ }
+ } finally {
+ target.shutdown();
+ }
+ }
+
+ public void testBootstrapWithContinousIndexingOnSourceCluster() throws Exception {
+ // start the target first so that we know its zkhost
+ MiniSolrCloudCluster target = new MiniSolrCloudCluster(1, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
+ target.waitForAllNodes(30);
+ try {
+ System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
+ System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
+
+ MiniSolrCloudCluster source = new MiniSolrCloudCluster(1, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
+ try {
+ source.waitForAllNodes(30);
+ final File configDir = getFile("solr").toPath().resolve("configsets/cdcr-source").toFile();
+ System.out.println("config dir absolute path = " + configDir.getAbsolutePath());
+ source.uploadConfigDir(configDir, "cdcr-source");
+
+ Map<String, String> collectionProperties = new HashMap<>();
+ // todo investigate why this is necessary???
+ collectionProperties.putIfAbsent("solr.directoryFactory", "solr.StandardDirectoryFactory");
+ source.createCollection("cdcr-source", 1, 1, "cdcr-source", collectionProperties);
+ source.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-source");
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-source", source.getSolrClient().getZkStateReader(), true, true, 330);
+
+ // index 10000 docs with a hard commit every 1000 documents
+ CloudSolrClient sourceSolrClient = source.getSolrClient();
+ sourceSolrClient.setDefaultCollection("cdcr-source");
+ int numDocs = 0;
+ for (int k = 0; k < 100; k++) {
+ UpdateRequest req = new UpdateRequest();
+ for (; numDocs < (k + 1) * 100; numDocs++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "source_" + numDocs);
+ doc.addField("xyz", numDocs);
+ req.add(doc);
+ }
+ req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+ System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
+ req.process(sourceSolrClient);
+ }
+
+ QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
+ assertEquals("", numDocs, response.getResults().getNumFound());
+
+ // setup the target cluster
+ final File targetConfigDir = getFile("solr").toPath().resolve("configsets/cdcr-target").toFile();
+ target.uploadConfigDir(targetConfigDir, "cdcr-target");
+ target.createCollection("cdcr-target", 1, 1, "cdcr-target", Collections.emptyMap());
+ target.getSolrClient().getZkStateReader().forceUpdateCollection("cdcr-target");
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish("cdcr-target", target.getSolrClient().getZkStateReader(), true, true, 330);
+ CloudSolrClient targetSolrClient = target.getSolrClient();
+ targetSolrClient.setDefaultCollection("cdcr-target");
+ Thread.sleep(1000);
+
+ cdcrStart(targetSolrClient);
+ cdcrStart(sourceSolrClient);
+
+ int c = 0;
+ for (int k = 0; k < 100; k++) {
+ UpdateRequest req = new UpdateRequest();
+ for (; c < (k + 1) * 100; c++, numDocs++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "source_" + numDocs);
+ doc.addField("xyz", numDocs);
+ req.add(doc);
+ }
+ req.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+ System.out.println("Adding 100 docs with commit=true, numDocs=" + numDocs);
+ req.process(sourceSolrClient);
+ }
+
+ response = sourceSolrClient.query(new SolrQuery("*:*"));
+ assertEquals("", numDocs, response.getResults().getNumFound());
+
+ response = getCdcrQueue(sourceSolrClient);
+ System.out.println("Cdcr queue response: " + response.getResponse());
+ long foundDocs = waitForTargetToSync(numDocs, targetSolrClient);
+ assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
+
+ } finally {
+ source.shutdown();
+ }
+ } finally {
+ target.shutdown();
+ }
+ }
+
+ private long waitForTargetToSync(int numDocs, CloudSolrClient targetSolrClient) throws SolrServerException, IOException, InterruptedException {
+ long start = System.nanoTime();
+ QueryResponse response = null;
+ while (System.nanoTime() - start <= TimeUnit.NANOSECONDS.convert(120, TimeUnit.SECONDS)) {
+ try {
+ targetSolrClient.commit();
+ response = targetSolrClient.query(new SolrQuery("*:*"));
+ if (response.getResults().getNumFound() == numDocs) {
+ break;
+ }
+ } catch (Exception e) {
+ log.warn("Exception trying to commit on target. This is expected and safe to ignore.", e);
+ }
+ Thread.sleep(1000);
+ }
+ return response != null ? response.getResults().getNumFound() : 0;
+ }
+
+
+ private void cdcrStart(CloudSolrClient client) throws SolrServerException, IOException {
+ QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.START);
+ assertEquals("started", ((NamedList) response.getResponse().get("status")).get("process"));
+ }
+
+ private void cdcrStop(CloudSolrClient client) throws SolrServerException, IOException {
+ QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.STOP);
+ assertEquals("stopped", ((NamedList) response.getResponse().get("status")).get("process"));
+ }
+
+ private void cdcrEnableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
+ QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.ENABLEBUFFER);
+ assertEquals("enabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
+ }
+
+ private void cdcrDisableBuffer(CloudSolrClient client) throws IOException, SolrServerException {
+ QueryResponse response = invokeCdcrAction(client, CdcrParams.CdcrAction.DISABLEBUFFER);
+ assertEquals("disabled", ((NamedList) response.getResponse().get("status")).get("buffer"));
+ }
+
+ private QueryResponse invokeCdcrAction(CloudSolrClient client, CdcrParams.CdcrAction action) throws IOException, SolrServerException {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CommonParams.QT, "/cdcr");
+ params.set(CommonParams.ACTION, action.toLower());
+ return client.query(params);
+ }
+
+ private QueryResponse getCdcrQueue(CloudSolrClient client) throws SolrServerException, IOException {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CommonParams.QT, "/cdcr");
+ params.set(CommonParams.ACTION, CdcrParams.QUEUES);
+ return client.query(params);
+ }
+}