You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/23 00:05:43 UTC
[22/52] [abbrv] [partial] lucene-solr:jira/gradle: Add gradle support
for Solr
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
deleted file mode 100644
index 8ec3c8b..0000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.http.client.HttpClient;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.CoreAdminRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrjNamedThreadFactory;
-import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.util.TimeOut;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
-
-class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
-
- private static final int MAX_BOOTSTRAP_ATTEMPTS = 5;
- private static final int BOOTSTRAP_RETRY_DELAY_MS = 2000;
- // 6 hours is hopefully long enough for most indexes
- private static final long BOOTSTRAP_TIMEOUT_SECONDS = 6L * 3600L * 3600L;
-
- private List<CdcrReplicatorState> replicatorStates;
-
- private final CdcrReplicatorScheduler scheduler;
- private CdcrProcessStateManager processStateManager;
- private CdcrLeaderStateManager leaderStateManager;
-
- private SolrCore core;
- private String path;
-
- private ExecutorService bootstrapExecutor;
- private volatile BootstrapStatusRunnable bootstrapStatusRunnable;
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- CdcrReplicatorManager(final SolrCore core, String path,
- SolrParams replicatorConfiguration,
- Map<String, List<SolrParams>> replicasConfiguration) {
- this.core = core;
- this.path = path;
-
- // create states
- replicatorStates = new ArrayList<>();
- String myCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- List<SolrParams> targets = replicasConfiguration.get(myCollection);
- if (targets != null) {
- for (SolrParams params : targets) {
- String zkHost = params.get(CdcrParams.ZK_HOST_PARAM);
- String targetCollection = params.get(CdcrParams.TARGET_COLLECTION_PARAM);
-
- CloudSolrClient client = new Builder(Collections.singletonList(zkHost), Optional.empty())
- .sendUpdatesOnlyToShardLeaders()
- .build();
- client.setDefaultCollection(targetCollection);
- replicatorStates.add(new CdcrReplicatorState(targetCollection, zkHost, client));
- }
- }
-
- this.scheduler = new CdcrReplicatorScheduler(this, replicatorConfiguration);
- }
-
- void setProcessStateManager(final CdcrProcessStateManager processStateManager) {
- this.processStateManager = processStateManager;
- this.processStateManager.register(this);
- }
-
- void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
- this.leaderStateManager = leaderStateManager;
- this.leaderStateManager.register(this);
- }
-
- /**
- * <p>
- * Inform the replicator manager of a change of state, and tell him to update its own state.
- * </p>
- * <p>
- * If we are the leader and the process state is STARTED, we need to initialise the log readers and start the
- * scheduled thread poll.
- * Otherwise, if the process state is STOPPED or if we are not the leader, we need to close the log readers and stop
- * the thread pool.
- * </p>
- * <p>
- * This method is synchronised as it can both be called by the leaderStateManager and the processStateManager.
- * </p>
- */
- @Override
- public synchronized void stateUpdate() {
- if (leaderStateManager.amILeader() && processStateManager.getState().equals(CdcrParams.ProcessState.STARTED)) {
- if (replicatorStates.size() > 0) {
- this.bootstrapExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(replicatorStates.size(),
- new SolrjNamedThreadFactory("cdcr-bootstrap-status"));
- }
- this.initLogReaders();
- this.scheduler.start();
- return;
- }
-
- this.scheduler.shutdown();
- if (bootstrapExecutor != null) {
- IOUtils.closeQuietly(bootstrapStatusRunnable);
- ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
- }
- this.closeLogReaders();
- Callable callable = core.getSolrCoreState().getCdcrBootstrapCallable();
- if (callable != null) {
- CdcrRequestHandler.BootstrapCallable bootstrapCallable = (CdcrRequestHandler.BootstrapCallable) callable;
- IOUtils.closeQuietly(bootstrapCallable);
- }
- }
-
- List<CdcrReplicatorState> getReplicatorStates() {
- return replicatorStates;
- }
-
- private void initLogReaders() {
- String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
- CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
-
- for (CdcrReplicatorState state : replicatorStates) {
- state.closeLogReader();
- try {
- long checkpoint = this.getCheckpoint(state);
- log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
- checkpoint, collectionName, shard);
- CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
- boolean seek = reader.seek(checkpoint);
- state.init(reader);
- if (!seek) {
- // targetVersion is lower than the oldest known entry.
- // In this scenario, it probably means that there is a gap in the updates log.
- // the best we can do here is to bootstrap the target leader by replicating the full index
- final String targetCollection = state.getTargetCollection();
- state.setBootstrapInProgress(true);
- log.info("Attempting to bootstrap target collection: {}, shard: {}", targetCollection, shard);
- bootstrapStatusRunnable = new BootstrapStatusRunnable(core, state);
- log.info("Submitting bootstrap task to executor");
- try {
- bootstrapExecutor.submit(bootstrapStatusRunnable);
- } catch (Exception e) {
- log.error("Unable to submit bootstrap call to executor", e);
- }
- }
- } catch (IOException | SolrServerException | SolrException e) {
- log.warn("Unable to instantiate the log reader for target collection " + state.getTargetCollection(), e);
- } catch (InterruptedException e) {
- log.warn("Thread interrupted while instantiate the log reader for target collection " + state.getTargetCollection(), e);
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private long getCheckpoint(CdcrReplicatorState state) throws IOException, SolrServerException {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
-
- SolrRequest request = new QueryRequest(params);
- request.setPath(path);
-
- NamedList response = state.getClient().request(request);
- return (Long) response.get(CdcrParams.CHECKPOINT);
- }
-
- void closeLogReaders() {
- for (CdcrReplicatorState state : replicatorStates) {
- state.closeLogReader();
- }
- }
-
- /**
- * Shutdown all the {@link org.apache.solr.handler.CdcrReplicatorState} by closing their
- * {@link org.apache.solr.client.solrj.impl.CloudSolrClient} and
- * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}.
- */
- void shutdown() {
- this.scheduler.shutdown();
- if (bootstrapExecutor != null) {
- IOUtils.closeQuietly(bootstrapStatusRunnable);
- ExecutorUtil.shutdownAndAwaitTermination(bootstrapExecutor);
- }
- for (CdcrReplicatorState state : replicatorStates) {
- state.shutdown();
- }
- replicatorStates.clear();
- }
-
- private class BootstrapStatusRunnable implements Runnable, Closeable {
- private final CdcrReplicatorState state;
- private final String targetCollection;
- private final String shard;
- private final String collectionName;
- private final CdcrUpdateLog ulog;
- private final String myCoreUrl;
-
- private volatile boolean closed = false;
-
- BootstrapStatusRunnable(SolrCore core, CdcrReplicatorState state) {
- this.collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- this.shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
- this.ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
- this.state = state;
- this.targetCollection = state.getTargetCollection();
- String baseUrl = core.getCoreContainer().getZkController().getBaseUrl();
- this.myCoreUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, core.getName());
- }
-
- @Override
- public void close() throws IOException {
- closed = true;
- try {
- Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
- String leaderCoreUrl = leader.getCoreUrl();
- HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
- sendCdcrCommand(client, CdcrParams.CdcrAction.CANCEL_BOOTSTRAP);
- } catch (SolrServerException e) {
- log.error("Error sending cancel bootstrap message to target collection: {} shard: {} leader: {}",
- targetCollection, shard, leaderCoreUrl);
- }
- } catch (InterruptedException e) {
- log.error("Interrupted while closing BootstrapStatusRunnable", e);
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public void run() {
- int retries = 1;
- boolean success = false;
- try {
- while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
- Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
- }
- TimeOut timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS, TimeSource.NANO_TIME);
- while (!timeOut.hasTimedOut()) {
- if (closed) {
- log.warn("Cancelling waiting for bootstrap on target: {} shard: {} to complete", targetCollection, shard);
- state.setBootstrapInProgress(false);
- break;
- }
- BootstrapStatus status = getBoostrapStatus();
- if (status == BootstrapStatus.RUNNING) {
- try {
- log.info("CDCR bootstrap running for {} seconds, sleeping for {} ms",
- BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS), BOOTSTRAP_RETRY_DELAY_MS);
- timeOut.sleep(BOOTSTRAP_RETRY_DELAY_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- } else if (status == BootstrapStatus.COMPLETED) {
- log.info("CDCR bootstrap successful in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
- long checkpoint = CdcrReplicatorManager.this.getCheckpoint(state);
- log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
- checkpoint, collectionName, shard);
- CdcrUpdateLog.CdcrLogReader reader1 = ulog.newLogReader();
- reader1.seek(checkpoint);
- // issue asynchronous request_recovery to the follower nodes of the shards of target collection
- sendRequestRecoveryToFollowers(state);
- success = true;
- break;
- } else if (status == BootstrapStatus.FAILED) {
- log.warn("CDCR bootstrap failed in {} seconds", BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
- // let's retry a fixed number of times before giving up
- if (retries >= MAX_BOOTSTRAP_ATTEMPTS) {
- log.error("Unable to bootstrap the target collection: {}, shard: {} even after {} retries", targetCollection, shard, retries);
- break;
- } else {
- log.info("Retry: {} - Attempting to bootstrap target collection: {} shard: {}", retries, targetCollection, shard);
- while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
- Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
- }
- timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS, TimeSource.NANO_TIME); // reset the timer
- retries++;
- }
- } else if (status == BootstrapStatus.NOTFOUND || status == BootstrapStatus.CANCELLED) {
- log.info("CDCR bootstrap " + (status == BootstrapStatus.NOTFOUND ? "not found" : "cancelled") + "in {} seconds",
- BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
- // the leader of the target shard may have changed and therefore there is no record of the
- // bootstrap process so we must retry the operation
- while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
- Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
- }
- retries = 1;
- timeOut = new TimeOut(6L * 3600L * 3600L, TimeUnit.SECONDS, TimeSource.NANO_TIME); // reset the timer
- } else if (status == BootstrapStatus.UNKNOWN || status == BootstrapStatus.SUBMITTED) {
- log.info("CDCR bootstrap is " + (status == BootstrapStatus.UNKNOWN ? "unknown" : "submitted"),
- BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
- // we were not able to query the status on the remote end
- // so just sleep for a bit and try again
- timeOut.sleep(BOOTSTRAP_RETRY_DELAY_MS);
- }
- }
- } catch (InterruptedException e) {
- log.info("Bootstrap thread interrupted");
- state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
- Thread.currentThread().interrupt();
- } catch (IOException | SolrServerException | SolrException e) {
- log.error("Unable to bootstrap the target collection " + targetCollection + " shard: " + shard, e);
- state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
- } finally {
- if (success) {
- log.info("Bootstrap successful, giving the go-ahead to replicator");
- state.setBootstrapInProgress(false);
- }
- }
- }
-
- private BootstrapStatus sendBootstrapCommand() throws InterruptedException {
- Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
- String leaderCoreUrl = leader.getCoreUrl();
- HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
- log.info("Attempting to bootstrap target collection: {} shard: {} leader: {}", targetCollection, shard, leaderCoreUrl);
- try {
- NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP, ReplicationHandler.MASTER_URL, myCoreUrl);
- log.debug("CDCR Bootstrap response: {}", response);
- String status = response.get(RESPONSE_STATUS).toString();
- return BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
- } catch (Exception e) {
- log.error("Exception submitting bootstrap request", e);
- return BootstrapStatus.UNKNOWN;
- }
- } catch (IOException e) {
- log.error("There shouldn't be an IOException while closing but there was!", e);
- }
- return BootstrapStatus.UNKNOWN;
- }
-
- private BootstrapStatus getBoostrapStatus() throws InterruptedException {
- try {
- Replica leader = state.getClient().getZkStateReader().getLeaderRetry(targetCollection, shard, 30000); // assume same shard exists on target
- String leaderCoreUrl = leader.getCoreUrl();
- HttpClient httpClient = state.getClient().getLbClient().getHttpClient();
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderCoreUrl).withHttpClient(httpClient).build()) {
- NamedList response = sendCdcrCommand(client, CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
- String status = (String) response.get(RESPONSE_STATUS);
- BootstrapStatus bootstrapStatus = BootstrapStatus.valueOf(status.toUpperCase(Locale.ROOT));
- if (bootstrapStatus == BootstrapStatus.RUNNING) {
- return BootstrapStatus.RUNNING;
- } else if (bootstrapStatus == BootstrapStatus.COMPLETED) {
- return BootstrapStatus.COMPLETED;
- } else if (bootstrapStatus == BootstrapStatus.FAILED) {
- return BootstrapStatus.FAILED;
- } else if (bootstrapStatus == BootstrapStatus.NOTFOUND) {
- log.warn("Bootstrap process was not found on target collection: {} shard: {}, leader: {}", targetCollection, shard, leaderCoreUrl);
- return BootstrapStatus.NOTFOUND;
- } else if (bootstrapStatus == BootstrapStatus.CANCELLED) {
- return BootstrapStatus.CANCELLED;
- } else {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unknown status: " + status + " returned by BOOTSTRAP_STATUS command");
- }
- }
- } catch (Exception e) {
- log.error("Exception during bootstrap status request", e);
- return BootstrapStatus.UNKNOWN;
- }
- }
- }
-
- private NamedList sendCdcrCommand(SolrClient client, CdcrParams.CdcrAction action, String... params) throws SolrServerException, IOException {
- ModifiableSolrParams solrParams = new ModifiableSolrParams();
- solrParams.set(CommonParams.QT, "/cdcr");
- solrParams.set(CommonParams.ACTION, action.toString());
- for (int i = 0; i < params.length - 1; i+=2) {
- solrParams.set(params[i], params[i + 1]);
- }
- SolrRequest request = new QueryRequest(solrParams);
- return client.request(request);
- }
-
- private void sendRequestRecoveryToFollowers(CdcrReplicatorState state) throws SolrServerException, IOException {
- Collection<Slice> slices = state.getClient().getZkStateReader().getClusterState().getCollection(state.getTargetCollection()).getActiveSlices();
- for (Slice slice : slices) {
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica replica : replicas) {
- if (slice.getLeader().getCoreName().equals(replica.getCoreName())) {
- continue; // no need to request recovery for leader
- }
- sendRequestRecoveryToFollower(state.getClient(), replica.getCoreName());
- log.info("RequestRecovery cmd is issued by core: " + replica.getCoreName() + " of shard: " + slice.getName() +
- "for target: " + state.getTargetCollection());
- }
- }
- }
-
- private NamedList sendRequestRecoveryToFollower(SolrClient client, String coreName) throws SolrServerException, IOException {
- CoreAdminRequest.RequestRecovery recoverRequestCmd = new CoreAdminRequest.RequestRecovery();
- recoverRequestCmd.setAction(CoreAdminParams.CoreAdminAction.REQUESTRECOVERY);
- recoverRequestCmd.setCoreName(coreName);
- return client.request(recoverRequestCmd);
- }
-
-
- private enum BootstrapStatus {
- SUBMITTED,
- RUNNING,
- COMPLETED,
- FAILED,
- NOTFOUND,
- CANCELLED,
- UNKNOWN
- }
-}
-
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
deleted file mode 100644
index 62abeab..0000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.*;
-
-/**
- * Schedule the execution of the {@link org.apache.solr.handler.CdcrReplicator} threads at
- * regular time interval. It relies on a queue of {@link org.apache.solr.handler.CdcrReplicatorState} in
- * order to avoid that one {@link org.apache.solr.handler.CdcrReplicatorState} is used by two threads at the same
- * time.
- */
-class CdcrReplicatorScheduler {
-
- private boolean isStarted = false;
-
- private ScheduledExecutorService scheduler;
- private ExecutorService replicatorsPool;
-
- private final CdcrReplicatorManager replicatorManager;
- private final ConcurrentLinkedQueue<CdcrReplicatorState> statesQueue;
-
- private int poolSize = DEFAULT_POOL_SIZE;
- private int timeSchedule = DEFAULT_TIME_SCHEDULE;
- private int batchSize = DEFAULT_BATCH_SIZE;
-
- private static final int DEFAULT_POOL_SIZE = 2;
- private static final int DEFAULT_TIME_SCHEDULE = 10;
- private static final int DEFAULT_BATCH_SIZE = 128;
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- CdcrReplicatorScheduler(final CdcrReplicatorManager replicatorStatesManager, final SolrParams replicatorConfiguration) {
- this.replicatorManager = replicatorStatesManager;
- this.statesQueue = new ConcurrentLinkedQueue<>(replicatorManager.getReplicatorStates());
- if (replicatorConfiguration != null) {
- poolSize = replicatorConfiguration.getInt(CdcrParams.THREAD_POOL_SIZE_PARAM, DEFAULT_POOL_SIZE);
- timeSchedule = replicatorConfiguration.getInt(CdcrParams.SCHEDULE_PARAM, DEFAULT_TIME_SCHEDULE);
- batchSize = replicatorConfiguration.getInt(CdcrParams.BATCH_SIZE_PARAM, DEFAULT_BATCH_SIZE);
- }
- }
-
- void start() {
- if (!isStarted) {
- scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-scheduler"));
- replicatorsPool = ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new DefaultSolrThreadFactory("cdcr-replicator"));
-
- // the scheduler thread is executed every second and submits one replication task
- // per available state in the queue
- scheduler.scheduleWithFixedDelay(() -> {
- int nCandidates = statesQueue.size();
- for (int i = 0; i < nCandidates; i++) {
- // a thread that poll one state from the queue, execute the replication task, and push back
- // the state in the queue when the task is completed
- replicatorsPool.execute(() -> {
- CdcrReplicatorState state = statesQueue.poll();
- assert state != null; // Should never happen
- try {
- if (!state.isBootstrapInProgress()) {
- new CdcrReplicator(state, batchSize).run();
- } else {
- log.debug("Replicator state is bootstrapping, skipping replication for target collection {}", state.getTargetCollection());
- }
- } finally {
- statesQueue.offer(state);
- }
- });
-
- }
- }, 0, timeSchedule, TimeUnit.MILLISECONDS);
- isStarted = true;
- }
- }
-
- void shutdown() {
- if (isStarted) {
- // interrupts are often dangerous in Lucene / Solr code, but the
- // test for this will leak threads without
- replicatorsPool.shutdown();
- try {
- replicatorsPool.awaitTermination(60, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Thread interrupted while waiting for CDCR replicator threadpool close.");
- Thread.currentThread().interrupt();
- } finally {
- scheduler.shutdownNow();
- isStarted = false;
- }
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
deleted file mode 100644
index bf80608..0000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.update.UpdateLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The state of the replication with a target cluster.
- */
-class CdcrReplicatorState {
-
- private final String targetCollection;
- private final String zkHost;
- private final CloudSolrClient targetClient;
-
- private CdcrUpdateLog.CdcrLogReader logReader;
-
- private long consecutiveErrors = 0;
- private final Map<ErrorType, Long> errorCounters = new HashMap<>();
- private final FixedQueue<ErrorQueueEntry> errorsQueue = new FixedQueue<>(100); // keep the last 100 errors
-
- private BenchmarkTimer benchmarkTimer;
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final AtomicBoolean bootstrapInProgress = new AtomicBoolean(false);
- private final AtomicInteger numBootstraps = new AtomicInteger();
-
- CdcrReplicatorState(final String targetCollection, final String zkHost, final CloudSolrClient targetClient) {
- this.targetCollection = targetCollection;
- this.targetClient = targetClient;
- this.zkHost = zkHost;
- this.benchmarkTimer = new BenchmarkTimer();
- }
-
- /**
- * Initialise the replicator state with a {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}
- * that is positioned at the last target cluster checkpoint.
- */
- void init(final CdcrUpdateLog.CdcrLogReader logReader) {
- this.logReader = logReader;
- }
-
- void closeLogReader() {
- if (logReader != null) {
- logReader.close();
- logReader = null;
- }
- }
-
- CdcrUpdateLog.CdcrLogReader getLogReader() {
- return logReader;
- }
-
- String getTargetCollection() {
- return targetCollection;
- }
-
- String getZkHost() {
- return zkHost;
- }
-
- CloudSolrClient getClient() {
- return targetClient;
- }
-
- void shutdown() {
- try {
- targetClient.close();
- } catch (IOException ioe) {
- log.warn("Caught exception trying to close server: ", ioe.getMessage());
- }
- this.closeLogReader();
- }
-
- void reportError(ErrorType error) {
- if (!errorCounters.containsKey(error)) {
- errorCounters.put(error, 0l);
- }
- errorCounters.put(error, errorCounters.get(error) + 1);
- errorsQueue.add(new ErrorQueueEntry(error, new Date()));
- consecutiveErrors++;
- }
-
- void resetConsecutiveErrors() {
- consecutiveErrors = 0;
- }
-
- /**
- * Returns the number of consecutive errors encountered while trying to forward updates to the target.
- */
- long getConsecutiveErrors() {
- return consecutiveErrors;
- }
-
- /**
- * Gets the number of errors of a particular type.
- */
- long getErrorCount(ErrorType type) {
- if (errorCounters.containsKey(type)) {
- return errorCounters.get(type);
- } else {
- return 0;
- }
- }
-
- /**
- * Gets the last errors ordered by timestamp (most recent first)
- */
- List<String[]> getLastErrors() {
- List<String[]> lastErrors = new ArrayList<>();
- synchronized (errorsQueue) {
- Iterator<ErrorQueueEntry> it = errorsQueue.iterator();
- while (it.hasNext()) {
- ErrorQueueEntry entry = it.next();
- lastErrors.add(new String[]{entry.timestamp.toInstant().toString(), entry.type.toLower()});
- }
- }
- return lastErrors;
- }
-
- /**
- * Return the timestamp of the last processed operations
- */
- String getTimestampOfLastProcessedOperation() {
- if (logReader != null && logReader.getLastVersion() != -1) {
- // Shift back to the right by 20 bits the version number - See VersionInfo#getNewClock
- return Instant.ofEpochMilli(logReader.getLastVersion() >> 20).toString();
- }
- return "";
- }
-
- /**
- * Gets the benchmark timer.
- */
- BenchmarkTimer getBenchmarkTimer() {
- return this.benchmarkTimer;
- }
-
- /**
- * @return true if a bootstrap operation is in progress, false otherwise
- */
- boolean isBootstrapInProgress() {
- return bootstrapInProgress.get();
- }
-
- void setBootstrapInProgress(boolean inProgress) {
- if (bootstrapInProgress.compareAndSet(true, false)) {
- numBootstraps.incrementAndGet();
- }
- bootstrapInProgress.set(inProgress);
- }
-
- public int getNumBootstraps() {
- return numBootstraps.get();
- }
-
- enum ErrorType {
- INTERNAL,
- BAD_REQUEST;
-
- public String toLower() {
- return toString().toLowerCase(Locale.ROOT);
- }
-
- }
-
- static class BenchmarkTimer {
-
- private long startTime;
- private long runTime = 0;
- private Map<Integer, Long> opCounters = new HashMap<>();
-
- /**
- * Start recording time.
- */
- void start() {
- startTime = System.nanoTime();
- }
-
- /**
- * Stop recording time.
- */
- void stop() {
- runTime += System.nanoTime() - startTime;
- startTime = -1;
- }
-
- void incrementCounter(final int operationType) {
- switch (operationType) {
- case UpdateLog.ADD:
- case UpdateLog.DELETE:
- case UpdateLog.DELETE_BY_QUERY: {
- if (!opCounters.containsKey(operationType)) {
- opCounters.put(operationType, 0l);
- }
- opCounters.put(operationType, opCounters.get(operationType) + 1);
- return;
- }
-
- default:
- }
- }
-
- long getRunTime() {
- long totalRunTime = runTime;
- if (startTime != -1) { // we are currently recording the time
- totalRunTime += System.nanoTime() - startTime;
- }
- return totalRunTime;
- }
-
- double getOperationsPerSecond() {
- long total = 0;
- for (long counter : opCounters.values()) {
- total += counter;
- }
- double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
- return total / elapsedTimeInSeconds;
- }
-
- double getAddsPerSecond() {
- long total = opCounters.get(UpdateLog.ADD) != null ? opCounters.get(UpdateLog.ADD) : 0;
- double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
- return total / elapsedTimeInSeconds;
- }
-
- double getDeletesPerSecond() {
- long total = opCounters.get(UpdateLog.DELETE) != null ? opCounters.get(UpdateLog.DELETE) : 0;
- total += opCounters.get(UpdateLog.DELETE_BY_QUERY) != null ? opCounters.get(UpdateLog.DELETE_BY_QUERY) : 0;
- double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
- return total / elapsedTimeInSeconds;
- }
-
- }
-
- private static class ErrorQueueEntry {
-
- private ErrorType type;
- private Date timestamp;
-
- private ErrorQueueEntry(ErrorType type, Date timestamp) {
- this.type = type;
- this.timestamp = timestamp;
- }
- }
-
- private static class FixedQueue<E> extends LinkedList<E> {
-
- private int maxSize;
-
- public FixedQueue(int maxSize) {
- this.maxSize = maxSize;
- }
-
- @Override
- public synchronized boolean add(E e) {
- super.addFirst(e);
- if (size() > maxSize) {
- removeLast();
- }
- return true;
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java b/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
deleted file mode 100644
index 1453841..0000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
+++ /dev/null
@@ -1,861 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.core.CloseHook;
-import org.apache.solr.core.PluginBag;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.request.SolrRequestHandler;
-import org.apache.solr.request.SolrRequestInfo;
-import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.update.SolrCoreState;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.VersionInfo;
-import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.apache.solr.util.plugin.SolrCoreAware;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
-import static org.apache.solr.handler.admin.CoreAdminHandler.FAILED;
-import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE;
-import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_MESSAGE;
-import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
-import static org.apache.solr.handler.admin.CoreAdminHandler.RUNNING;
-
-/**
- * <p>
- * This request handler implements the CDCR API and is responsible of the execution of the
- * {@link CdcrReplicator} threads.
- * </p>
- * <p>
- * It relies on three classes, {@link org.apache.solr.handler.CdcrLeaderStateManager},
- * {@link org.apache.solr.handler.CdcrBufferStateManager} and {@link org.apache.solr.handler.CdcrProcessStateManager}
- * to synchronise the state of the CDCR across all the nodes.
- * </p>
- * <p>
- * The CDCR process can be either {@link org.apache.solr.handler.CdcrParams.ProcessState#STOPPED} or {@link org.apache.solr.handler.CdcrParams.ProcessState#STARTED} by using the
- * actions {@link org.apache.solr.handler.CdcrParams.CdcrAction#STOP} and {@link org.apache.solr.handler.CdcrParams.CdcrAction#START} respectively. If a node is leader and the process
- * state is {@link org.apache.solr.handler.CdcrParams.ProcessState#STARTED}, the {@link CdcrReplicatorManager} will
- * start the {@link CdcrReplicator} threads. If a node becomes non-leader or if the process state becomes
- * {@link org.apache.solr.handler.CdcrParams.ProcessState#STOPPED}, the {@link CdcrReplicator} threads are stopped.
- * </p>
- * <p>
- * The CDCR can be switched to a "buffering" mode, in which the update log will never delete old transaction log
- * files. Such a mode can be enabled or disabled using the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#ENABLEBUFFER} and
- * {@link org.apache.solr.handler.CdcrParams.CdcrAction#DISABLEBUFFER} respectively.
- * </p>
- * <p>
- * Known limitations: The source and target clusters must have the same topology. Replication between clusters
- * with a different number of shards will likely results in an inconsistent index.
- * </p>
- */
-public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAware {
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private SolrCore core;
- private String collection;
- private String shard;
- private String path;
-
- private SolrParams updateLogSynchronizerConfiguration;
- private SolrParams replicatorConfiguration;
- private SolrParams bufferConfiguration;
- private Map<String, List<SolrParams>> replicasConfiguration;
-
- private CdcrProcessStateManager processStateManager;
- private CdcrBufferStateManager bufferStateManager;
- private CdcrReplicatorManager replicatorManager;
- private CdcrLeaderStateManager leaderStateManager;
- private CdcrUpdateLogSynchronizer updateLogSynchronizer;
- private CdcrBufferManager bufferManager;
-
- @Override
- public void init(NamedList args) {
- super.init(args);
-
- if (args != null) {
- // Configuration of the Update Log Synchronizer
- Object updateLogSynchonizerParam = args.get(CdcrParams.UPDATE_LOG_SYNCHRONIZER_PARAM);
- if (updateLogSynchonizerParam != null && updateLogSynchonizerParam instanceof NamedList) {
- updateLogSynchronizerConfiguration = ((NamedList) updateLogSynchonizerParam).toSolrParams();
- }
-
- // Configuration of the Replicator
- Object replicatorParam = args.get(CdcrParams.REPLICATOR_PARAM);
- if (replicatorParam != null && replicatorParam instanceof NamedList) {
- replicatorConfiguration = ((NamedList) replicatorParam).toSolrParams();
- }
-
- // Configuration of the Buffer
- Object bufferParam = args.get(CdcrParams.BUFFER_PARAM);
- if (bufferParam != null && bufferParam instanceof NamedList) {
- bufferConfiguration = ((NamedList) bufferParam).toSolrParams();
- }
-
- // Configuration of the Replicas
- replicasConfiguration = new HashMap<>();
- List replicas = args.getAll(CdcrParams.REPLICA_PARAM);
- for (Object replica : replicas) {
- if (replica != null && replica instanceof NamedList) {
- SolrParams params = ((NamedList) replica).toSolrParams();
- if (!replicasConfiguration.containsKey(params.get(CdcrParams.SOURCE_COLLECTION_PARAM))) {
- replicasConfiguration.put(params.get(CdcrParams.SOURCE_COLLECTION_PARAM), new ArrayList<>());
- }
- replicasConfiguration.get(params.get(CdcrParams.SOURCE_COLLECTION_PARAM)).add(params);
- }
- }
- }
- }
-
- @Override
- public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
- // Pick the action
- SolrParams params = req.getParams();
- CdcrParams.CdcrAction action = null;
- String a = params.get(CommonParams.ACTION);
- if (a != null) {
- action = CdcrParams.CdcrAction.get(a);
- }
- if (action == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
- }
-
- switch (action) {
- case START: {
- this.handleStartAction(req, rsp);
- break;
- }
- case STOP: {
- this.handleStopAction(req, rsp);
- break;
- }
- case STATUS: {
- this.handleStatusAction(req, rsp);
- break;
- }
- case COLLECTIONCHECKPOINT: {
- this.handleCollectionCheckpointAction(req, rsp);
- break;
- }
- case SHARDCHECKPOINT: {
- this.handleShardCheckpointAction(req, rsp);
- break;
- }
- case ENABLEBUFFER: {
- this.handleEnableBufferAction(req, rsp);
- break;
- }
- case DISABLEBUFFER: {
- this.handleDisableBufferAction(req, rsp);
- break;
- }
- case LASTPROCESSEDVERSION: {
- this.handleLastProcessedVersionAction(req, rsp);
- break;
- }
- case QUEUES: {
- this.handleQueuesAction(req, rsp);
- break;
- }
- case OPS: {
- this.handleOpsAction(req, rsp);
- break;
- }
- case ERRORS: {
- this.handleErrorsAction(req, rsp);
- break;
- }
- case BOOTSTRAP: {
- this.handleBootstrapAction(req, rsp);
- break;
- }
- case BOOTSTRAP_STATUS: {
- this.handleBootstrapStatus(req, rsp);
- break;
- }
- case CANCEL_BOOTSTRAP: {
- this.handleCancelBootstrap(req, rsp);
- break;
- }
- default: {
- throw new RuntimeException("Unknown action: " + action);
- }
- }
-
- rsp.setHttpCaching(false);
- }
-
- @Override
- public void inform(SolrCore core) {
- this.core = core;
- collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-
- // Make sure that the core is ZKAware
- if (!core.getCoreContainer().isZooKeeperAware()) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Solr instance is not running in SolrCloud mode.");
- }
-
- // Make sure that the core is using the CdcrUpdateLog implementation
- if (!(core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog)) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Solr instance is not configured with the cdcr update log.");
- }
-
- // Find the registered path of the handler
- path = null;
- for (Map.Entry<String, PluginBag.PluginHolder<SolrRequestHandler>> entry : core.getRequestHandlers().getRegistry().entrySet()) {
- if (core.getRequestHandlers().isLoaded(entry.getKey()) && entry.getValue().get() == this) {
- path = entry.getKey();
- break;
- }
- }
- if (path == null) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "The CdcrRequestHandler is not registered with the current core.");
- }
- if (!path.startsWith("/")) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "The CdcrRequestHandler needs to be registered to a path. Typically this is '/cdcr'");
- }
-
- // Initialisation phase
- // If the Solr cloud is being initialised, each CDCR node will start up in its default state, i.e., STOPPED
- // and non-leader. The leader state will be updated later, when all the Solr cores have been loaded.
- // If the Solr cloud has already been initialised, and the core is reloaded (i.e., because a node died or a new node
- // is added to the cluster), the CDCR node will synchronise its state with the global CDCR state that is stored
- // in zookeeper.
-
- // Initialise the buffer state manager
- bufferStateManager = new CdcrBufferStateManager(core, bufferConfiguration);
- // Initialise the process state manager
- processStateManager = new CdcrProcessStateManager(core);
- // Initialise the leader state manager
- leaderStateManager = new CdcrLeaderStateManager(core);
-
- // Initialise the replicator states manager
- replicatorManager = new CdcrReplicatorManager(core, path, replicatorConfiguration, replicasConfiguration);
- replicatorManager.setProcessStateManager(processStateManager);
- replicatorManager.setLeaderStateManager(leaderStateManager);
- // we need to inform it of a state event since the process and leader state
- // may have been synchronised during the initialisation
- replicatorManager.stateUpdate();
-
- // Initialise the update log synchronizer
- updateLogSynchronizer = new CdcrUpdateLogSynchronizer(core, path, updateLogSynchronizerConfiguration);
- updateLogSynchronizer.setLeaderStateManager(leaderStateManager);
- // we need to inform it of a state event since the leader state
- // may have been synchronised during the initialisation
- updateLogSynchronizer.stateUpdate();
-
- // Initialise the buffer manager
- bufferManager = new CdcrBufferManager(core);
- bufferManager.setLeaderStateManager(leaderStateManager);
- bufferManager.setBufferStateManager(bufferStateManager);
- // we need to inform it of a state event since the leader state
- // may have been synchronised during the initialisation
- bufferManager.stateUpdate();
-
- // register the close hook
- this.registerCloseHook(core);
- }
-
- /**
- * register a close hook to properly shutdown the state manager and scheduler
- */
- private void registerCloseHook(SolrCore core) {
- core.addCloseHook(new CloseHook() {
-
- @Override
- public void preClose(SolrCore core) {
- log.info("Solr core is being closed - shutting down CDCR handler @ {}:{}", collection, shard);
-
- updateLogSynchronizer.shutdown();
- replicatorManager.shutdown();
- bufferStateManager.shutdown();
- processStateManager.shutdown();
- leaderStateManager.shutdown();
- }
-
- @Override
- public void postClose(SolrCore core) {
- }
-
- });
- }
-
- /**
- * <p>
- * Update and synchronize the process state.
- * </p>
- * <p>
- * The process state manager must notify the replicator states manager of the change of state.
- * </p>
- */
- private void handleStartAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- if (processStateManager.getState() == CdcrParams.ProcessState.STOPPED) {
- processStateManager.setState(CdcrParams.ProcessState.STARTED);
- processStateManager.synchronize();
- }
-
- rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
- }
-
- private void handleStopAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- if (processStateManager.getState() == CdcrParams.ProcessState.STARTED) {
- processStateManager.setState(CdcrParams.ProcessState.STOPPED);
- processStateManager.synchronize();
- }
-
- rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
- }
-
- private void handleStatusAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
- }
-
- private NamedList getStatus() {
- NamedList status = new NamedList();
- status.add(CdcrParams.ProcessState.getParam(), processStateManager.getState().toLower());
- status.add(CdcrParams.BufferState.getParam(), bufferStateManager.getState().toLower());
- return status;
- }
-
- /**
- * This action is generally executed on the target cluster in order to retrieve the latest update checkpoint.
- * This checkpoint is used on the source cluster to setup the
- * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader} of a shard leader. <br/>
- * This method will execute in parallel one
- * {@link org.apache.solr.handler.CdcrParams.CdcrAction#SHARDCHECKPOINT} request per shard leader. It will
- * then pick the lowest version number as checkpoint. Picking the lowest amongst all shards will ensure that we do not
- * pick a checkpoint that is ahead of the source cluster. This can occur when other shard leaders are sending new
- * updates to the target cluster while we are currently instantiating the
- * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}.
- * This solution only works in scenarios where the topology of the source and target clusters are identical.
- */
- private void handleCollectionCheckpointAction(SolrQueryRequest req, SolrQueryResponse rsp)
- throws IOException, SolrServerException {
- ZkController zkController = core.getCoreContainer().getZkController();
- try {
- zkController.getZkStateReader().forceUpdateCollection(collection);
- } catch (Exception e) {
- log.warn("Error when updating cluster state", e);
- }
- ClusterState cstate = zkController.getClusterState();
- DocCollection docCollection = cstate.getCollectionOrNull(collection);
- Collection<Slice> shards = docCollection == null? null : docCollection.getActiveSlices();
-
- ExecutorService parallelExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new DefaultSolrThreadFactory("parallelCdcrExecutor"));
-
- long checkpoint = Long.MAX_VALUE;
- try {
- List<Callable<Long>> callables = new ArrayList<>();
- for (Slice shard : shards) {
- ZkNodeProps leaderProps = zkController.getZkStateReader().getLeaderRetry(collection, shard.getName());
- ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
- callables.add(new SliceCheckpointCallable(nodeProps.getCoreUrl(), path));
- }
-
- for (final Future<Long> future : parallelExecutor.invokeAll(callables)) {
- long version = future.get();
- if (version < checkpoint) { // we must take the lowest checkpoint from all the shards
- checkpoint = version;
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Error while requesting shard's checkpoints", e);
- } catch (ExecutionException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Error while requesting shard's checkpoints", e);
- } finally {
- parallelExecutor.shutdown();
- }
-
- rsp.add(CdcrParams.CHECKPOINT, checkpoint);
- }
-
- /**
- * Retrieve the version number of the latest entry of the {@link org.apache.solr.update.UpdateLog}.
- */
- private void handleShardCheckpointAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- if (!leaderStateManager.amILeader()) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
- "' sent to non-leader replica");
- }
-
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- VersionInfo versionInfo = ulog.getVersionInfo();
- try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
- long maxVersionFromRecent = recentUpdates.getMaxRecentVersion();
- long maxVersionFromIndex = versionInfo.getMaxVersionFromIndex(req.getSearcher());
- log.info("Found maxVersionFromRecent {} maxVersionFromIndex {}", maxVersionFromRecent, maxVersionFromIndex);
- // there is no race with ongoing bootstrap because we don't expect any updates to come from the source
- long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent);
- if (maxVersion == 0L) {
- maxVersion = -1;
- }
- rsp.add(CdcrParams.CHECKPOINT, maxVersion);
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
- "' could not read max version");
- }
- }
-
- private void handleEnableBufferAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- if (bufferStateManager.getState() == CdcrParams.BufferState.DISABLED) {
- bufferStateManager.setState(CdcrParams.BufferState.ENABLED);
- bufferStateManager.synchronize();
- }
-
- rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
- }
-
- private void handleDisableBufferAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- if (bufferStateManager.getState() == CdcrParams.BufferState.ENABLED) {
- bufferStateManager.setState(CdcrParams.BufferState.DISABLED);
- bufferStateManager.synchronize();
- }
-
- rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
- }
-
- /**
- * <p>
- * We have to take care of four cases:
- * <ul>
- * <li>Replication & Buffering</li>
- * <li>Replication & No Buffering</li>
- * <li>No Replication & Buffering</li>
- * <li>No Replication & No Buffering</li>
- * </ul>
- * In the first three cases, at least one log reader should have been initialised. We should take the lowest
- * last processed version across all the initialised readers. In the last case, there isn't a log reader
- * initialised. We should instantiate one and get the version of the first entries.
- * </p>
- */
- private void handleLastProcessedVersionAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-
- if (!leaderStateManager.amILeader()) {
- log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.LASTPROCESSEDVERSION, collectionName, shard);
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.LASTPROCESSEDVERSION +
- " sent to non-leader replica");
- }
-
- // take care of the first three cases
- // first check the log readers from the replicator states
- long lastProcessedVersion = Long.MAX_VALUE;
- for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
- long version = Long.MAX_VALUE;
- if (state.getLogReader() != null) {
- version = state.getLogReader().getLastVersion();
- }
- lastProcessedVersion = Math.min(lastProcessedVersion, version);
- }
-
- // next check the log reader of the buffer
- CdcrUpdateLog.CdcrLogReader bufferLogReader = ((CdcrUpdateLog) core.getUpdateHandler().getUpdateLog()).getBufferToggle();
- if (bufferLogReader != null) {
- lastProcessedVersion = Math.min(lastProcessedVersion, bufferLogReader.getLastVersion());
- }
-
- // the fourth case: no cdc replication, no buffering: all readers were null
- if (processStateManager.getState().equals(CdcrParams.ProcessState.STOPPED) &&
- bufferStateManager.getState().equals(CdcrParams.BufferState.DISABLED)) {
- CdcrUpdateLog.CdcrLogReader logReader = ((CdcrUpdateLog) core.getUpdateHandler().getUpdateLog()).newLogReader();
- try {
- // let the reader initialize lastVersion
- logReader.next();
- lastProcessedVersion = Math.min(lastProcessedVersion, logReader.getLastVersion());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Error while fetching the last processed version", e);
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Error while fetching the last processed version", e);
- } finally {
- logReader.close();
- }
- }
-
- log.debug("Returning the lowest last processed version {} @ {}:{}", lastProcessedVersion, collectionName, shard);
- rsp.add(CdcrParams.LAST_PROCESSED_VERSION, lastProcessedVersion);
- }
-
- private void handleQueuesAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- NamedList hosts = new NamedList();
-
- for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
- NamedList queueStats = new NamedList();
-
- CdcrUpdateLog.CdcrLogReader logReader = state.getLogReader();
- if (logReader == null) {
- String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
- String shard = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
- log.warn("The log reader for target collection {} is not initialised @ {}:{}",
- state.getTargetCollection(), collectionName, shard);
- queueStats.add(CdcrParams.QUEUE_SIZE, -1l);
- } else {
- queueStats.add(CdcrParams.QUEUE_SIZE, logReader.getNumberOfRemainingRecords());
- }
- queueStats.add(CdcrParams.LAST_TIMESTAMP, state.getTimestampOfLastProcessedOperation());
-
- if (hosts.get(state.getZkHost()) == null) {
- hosts.add(state.getZkHost(), new NamedList());
- }
- ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), queueStats);
- }
-
- rsp.add(CdcrParams.QUEUES, hosts);
- UpdateLog updateLog = core.getUpdateHandler().getUpdateLog();
- rsp.add(CdcrParams.TLOG_TOTAL_SIZE, updateLog.getTotalLogsSize());
- rsp.add(CdcrParams.TLOG_TOTAL_COUNT, updateLog.getTotalLogsNumber());
- rsp.add(CdcrParams.UPDATE_LOG_SYNCHRONIZER,
- updateLogSynchronizer.isStarted() ? CdcrParams.ProcessState.STARTED.toLower() : CdcrParams.ProcessState.STOPPED.toLower());
- }
-
- private void handleOpsAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- NamedList hosts = new NamedList();
-
- for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
- NamedList ops = new NamedList();
- ops.add(CdcrParams.COUNTER_ALL, state.getBenchmarkTimer().getOperationsPerSecond());
- ops.add(CdcrParams.COUNTER_ADDS, state.getBenchmarkTimer().getAddsPerSecond());
- ops.add(CdcrParams.COUNTER_DELETES, state.getBenchmarkTimer().getDeletesPerSecond());
-
- if (hosts.get(state.getZkHost()) == null) {
- hosts.add(state.getZkHost(), new NamedList());
- }
- ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), ops);
- }
-
- rsp.add(CdcrParams.OPERATIONS_PER_SECOND, hosts);
- }
-
- private void handleErrorsAction(SolrQueryRequest req, SolrQueryResponse rsp) {
- NamedList hosts = new NamedList();
-
- for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
- NamedList errors = new NamedList();
-
- errors.add(CdcrParams.CONSECUTIVE_ERRORS, state.getConsecutiveErrors());
- errors.add(CdcrReplicatorState.ErrorType.BAD_REQUEST.toLower(), state.getErrorCount(CdcrReplicatorState.ErrorType.BAD_REQUEST));
- errors.add(CdcrReplicatorState.ErrorType.INTERNAL.toLower(), state.getErrorCount(CdcrReplicatorState.ErrorType.INTERNAL));
-
- NamedList lastErrors = new NamedList();
- for (String[] lastError : state.getLastErrors()) {
- lastErrors.add(lastError[0], lastError[1]);
- }
- errors.add(CdcrParams.LAST, lastErrors);
-
- if (hosts.get(state.getZkHost()) == null) {
- hosts.add(state.getZkHost(), new NamedList());
- }
- ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), errors);
- }
-
- rsp.add(CdcrParams.ERRORS, hosts);
- }
-
- private void handleBootstrapAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, InterruptedException, SolrServerException {
- String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
- if (!leaderStateManager.amILeader()) {
- log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.BOOTSTRAP +
- " sent to non-leader replica");
- }
- CountDownLatch latch = new CountDownLatch(1); // latch to make sure BOOTSTRAP_STATUS gives correct response
-
- Runnable runnable = () -> {
- Lock recoveryLock = req.getCore().getSolrCoreState().getRecoveryLock();
- boolean locked = recoveryLock.tryLock();
- SolrCoreState coreState = core.getSolrCoreState();
- try {
- if (!locked) {
- handleCancelBootstrap(req, rsp);
- } else if (leaderStateManager.amILeader()) {
- coreState.setCdcrBootstrapRunning(true);
- latch.countDown(); // free the latch as current bootstrap is executing
- //running.set(true);
- String masterUrl = req.getParams().get(ReplicationHandler.MASTER_URL);
- BootstrapCallable bootstrapCallable = new BootstrapCallable(masterUrl, core);
- coreState.setCdcrBootstrapCallable(bootstrapCallable);
- Future<Boolean> bootstrapFuture = core.getCoreContainer().getUpdateShardHandler().getRecoveryExecutor()
- .submit(bootstrapCallable);
- coreState.setCdcrBootstrapFuture(bootstrapFuture);
- try {
- bootstrapFuture.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Bootstrap was interrupted", e);
- } catch (ExecutionException e) {
- log.error("Bootstrap operation failed", e);
- }
- } else {
- log.error("Action {} sent to non-leader replica @ {}:{}. Aborting bootstrap.", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
- }
- } finally {
- if (locked) {
- coreState.setCdcrBootstrapRunning(false);
- recoveryLock.unlock();
- } else {
- latch.countDown(); // free the latch as current bootstrap is executing
- }
- }
- };
-
- try {
- core.getCoreContainer().getUpdateShardHandler().getUpdateExecutor().submit(runnable);
- rsp.add(RESPONSE_STATUS, "submitted");
- latch.await(10000, TimeUnit.MILLISECONDS); // put the latch for current bootstrap command
- } catch (RejectedExecutionException ree) {
- // no problem, we're probably shutting down
- rsp.add(RESPONSE_STATUS, "failed");
- }
- }
-
- private void handleCancelBootstrap(SolrQueryRequest req, SolrQueryResponse rsp) {
- BootstrapCallable callable = (BootstrapCallable)core.getSolrCoreState().getCdcrBootstrapCallable();
- IOUtils.closeQuietly(callable);
- rsp.add(RESPONSE_STATUS, "cancelled");
- }
-
- private void handleBootstrapStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, SolrServerException {
- SolrCoreState coreState = core.getSolrCoreState();
- if (coreState.getCdcrBootstrapRunning()) {
- rsp.add(RESPONSE_STATUS, RUNNING);
- return;
- }
-
- Future<Boolean> future = coreState.getCdcrBootstrapFuture();
- BootstrapCallable callable = (BootstrapCallable)coreState.getCdcrBootstrapCallable();
- if (future == null) {
- rsp.add(RESPONSE_STATUS, "notfound");
- rsp.add(RESPONSE_MESSAGE, "No bootstrap found in running, completed or failed states");
- } else if (future.isCancelled() || callable.isClosed()) {
- rsp.add(RESPONSE_STATUS, "cancelled");
- } else if (future.isDone()) {
- // could be a normal termination or an exception
- try {
- Boolean result = future.get();
- if (result) {
- rsp.add(RESPONSE_STATUS, COMPLETED);
- } else {
- rsp.add(RESPONSE_STATUS, FAILED);
- }
- } catch (InterruptedException e) {
- // should not happen?
- } catch (ExecutionException e) {
- rsp.add(RESPONSE_STATUS, FAILED);
- rsp.add(RESPONSE, e);
- } catch (CancellationException ce) {
- rsp.add(RESPONSE_STATUS, FAILED);
- rsp.add(RESPONSE_MESSAGE, "Bootstrap was cancelled");
- }
- } else {
- rsp.add(RESPONSE_STATUS, RUNNING);
- }
- }
-
- static class BootstrapCallable implements Callable<Boolean>, Closeable {
- private final String masterUrl;
- private final SolrCore core;
- private volatile boolean closed = false;
-
- BootstrapCallable(String masterUrl, SolrCore core) {
- this.masterUrl = masterUrl;
- this.core = core;
- }
-
- @Override
- public void close() throws IOException {
- closed = true;
- SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
- ReplicationHandler replicationHandler = (ReplicationHandler) handler;
- replicationHandler.abortFetch();
- }
-
- public boolean isClosed() {
- return closed;
- }
-
- @Override
- public Boolean call() throws Exception {
- boolean success = false;
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- // we start buffering updates as a safeguard however we do not expect
- // to receive any updates from the source during bootstrap
- ulog.bufferUpdates();
- try {
- commitOnLeader(masterUrl);
- // use rep handler directly, so we can do this sync rather than async
- SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
- ReplicationHandler replicationHandler = (ReplicationHandler) handler;
-
- if (replicationHandler == null) {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
- "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
- }
-
- ModifiableSolrParams solrParams = new ModifiableSolrParams();
- solrParams.set(ReplicationHandler.MASTER_URL, masterUrl);
- // we do not want the raw tlog files from the source
- solrParams.set(ReplicationHandler.TLOG_FILES, false);
-
- success = replicationHandler.doFetch(solrParams, false).getSuccessful();
-
- // this is required because this callable can race with HttpSolrCall#destroy
- // which clears the request info.
- // Applying buffered updates fails without the following line because LogReplayer
- // also tries to set request info and fails with AssertionError
- SolrRequestInfo.clearRequestInfo();
-
- Future<UpdateLog.RecoveryInfo> future = ulog.applyBufferedUpdates();
- if (future == null) {
- // no replay needed
- log.info("No replay needed.");
- } else {
- log.info("Replaying buffered documents.");
- // wait for replay
- UpdateLog.RecoveryInfo report = future.get();
- if (report.failed) {
- SolrException.log(log, "Replay failed");
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Replay failed");
- }
- }
- return success;
- } finally {
- if (closed || !success) {
- // we cannot apply the buffer in this case because it will introduce newer versions in the
- // update log and then the source cluster will get those versions via collectioncheckpoint
- // causing the versions in between to be completely missed
- boolean dropped = ulog.dropBufferedUpdates();
- assert dropped;
- }
- }
- }
-
- private void commitOnLeader(String leaderUrl) throws SolrServerException,
- IOException {
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl)
- .withConnectionTimeout(30000)
- .build()) {
- UpdateRequest ureq = new UpdateRequest();
- ureq.setParams(new ModifiableSolrParams());
- ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
- ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
- ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
- client);
- }
- }
- }
-
- @Override
- public String getDescription() {
- return "Manage Cross Data Center Replication";
- }
-
- @Override
- public Category getCategory() {
- return Category.REPLICATION;
- }
-
- /**
- * A thread subclass for executing a single
- * {@link org.apache.solr.handler.CdcrParams.CdcrAction#SHARDCHECKPOINT} action.
- */
- private static final class SliceCheckpointCallable implements Callable<Long> {
-
- final String baseUrl;
- final String cdcrPath;
-
- SliceCheckpointCallable(final String baseUrl, final String cdcrPath) {
- this.baseUrl = baseUrl;
- this.cdcrPath = cdcrPath;
- }
-
- @Override
- public Long call() throws Exception {
- try (HttpSolrClient server = new HttpSolrClient.Builder(baseUrl)
- .withConnectionTimeout(15000)
- .withSocketTimeout(60000)
- .build()) {
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CommonParams.ACTION, CdcrParams.CdcrAction.SHARDCHECKPOINT.toString());
-
- SolrRequest request = new QueryRequest(params);
- request.setPath(cdcrPath);
-
- NamedList response = server.request(request);
- return (Long) response.get(CdcrParams.CHECKPOINT);
- }
- }
-
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java
deleted file mode 100644
index 151615e..0000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A state manager which implements an observer pattern to notify observers
- * of a state change.
- */
-abstract class CdcrStateManager {
-
- private List<CdcrStateObserver> observers = new ArrayList<>();
-
- void register(CdcrStateObserver observer) {
- this.observers.add(observer);
- }
-
- void callback() {
- for (CdcrStateObserver observer : observers) {
- observer.stateUpdate();
- }
- }
-
- interface CdcrStateObserver {
-
- void stateUpdate();
-
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java b/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java
deleted file mode 100644
index 80f27ce..0000000
--- a/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * Synchronize periodically the update log of non-leader nodes with their leaders.
- * </p>
- * <p>
- * Non-leader nodes must always buffer updates in case of leader failures. They have to periodically
- * synchronize their update logs with their leader to remove old transaction logs that will never be used anymore.
- * This is performed by a background thread that is scheduled with a fixed delay. The background thread is sending
- * the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#LASTPROCESSEDVERSION} to the leader to retrieve
- * the lowest last version number processed. This version is then used to move forward the buffer log reader.
- * </p>
- */
-class CdcrUpdateLogSynchronizer implements CdcrStateManager.CdcrStateObserver {
-
- private CdcrLeaderStateManager leaderStateManager;
- private ScheduledExecutorService scheduler;
-
- private final SolrCore core;
- private final String collection;
- private final String shardId;
- private final String path;
-
- private int timeSchedule = DEFAULT_TIME_SCHEDULE;
-
- private static final int DEFAULT_TIME_SCHEDULE = 60000; // by default, every minute
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- CdcrUpdateLogSynchronizer(SolrCore core, String path, SolrParams updateLogSynchonizerConfiguration) {
- this.core = core;
- this.path = path;
- this.collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- this.shardId = core.getCoreDescriptor().getCloudDescriptor().getShardId();
- if (updateLogSynchonizerConfiguration != null) {
- this.timeSchedule = updateLogSynchonizerConfiguration.getInt(CdcrParams.SCHEDULE_PARAM, DEFAULT_TIME_SCHEDULE);
- }
- }
-
- void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
- this.leaderStateManager = leaderStateManager;
- this.leaderStateManager.register(this);
- }
-
- @Override
- public void stateUpdate() {
- // If I am not the leader, I need to synchronise periodically my update log with my leader.
- if (!leaderStateManager.amILeader()) {
- scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-update-log-synchronizer"));
- scheduler.scheduleWithFixedDelay(new UpdateLogSynchronisation(), 0, timeSchedule, TimeUnit.MILLISECONDS);
- return;
- }
-
- this.shutdown();
- }
-
- boolean isStarted() {
- return scheduler != null;
- }
-
- void shutdown() {
- if (scheduler != null) {
- // interrupts are often dangerous in Lucene / Solr code, but the
- // test for this will leak threads without
- scheduler.shutdownNow();
- scheduler = null;
- }
- }
-
- private class UpdateLogSynchronisation implements Runnable {
-
- private String getLeaderUrl() {
- ZkController zkController = core.getCoreContainer().getZkController();
- ClusterState cstate = zkController.getClusterState();
- DocCollection docCollection = cstate.getCollection(collection);
- ZkNodeProps leaderProps = docCollection.getLeader(shardId);
- if (leaderProps == null) { // we might not have a leader yet, returns null
- return null;
- }
- ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
- return nodeProps.getCoreUrl();
- }
-
- @Override
- public void run() {
- try {
- String leaderUrl = getLeaderUrl();
- if (leaderUrl == null) { // we might not have a leader yet, stop and try again later
- return;
- }
-
- HttpSolrClient server = new HttpSolrClient.Builder(leaderUrl)
- .withConnectionTimeout(15000)
- .withSocketTimeout(60000)
- .build();
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CommonParams.ACTION, CdcrParams.CdcrAction.LASTPROCESSEDVERSION.toString());
-
- SolrRequest request = new QueryRequest(params);
- request.setPath(path);
-
- long lastVersion;
- try {
- NamedList response = server.request(request);
- lastVersion = (Long) response.get(CdcrParams.LAST_PROCESSED_VERSION);
- log.debug("My leader {} says its last processed _version_ number is: {}. I am {}", leaderUrl, lastVersion,
- core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
- } catch (IOException | SolrServerException e) {
- log.warn("Couldn't get last processed version from leader {}: {}", leaderUrl, e.getMessage());
- return;
- } finally {
- try {
- server.close();
- } catch (IOException ioe) {
- log.warn("Caught exception trying to close server: ", leaderUrl, ioe.getMessage());
- }
- }
-
- // if we received -1, it means that the log reader on the leader has not yet started to read log entries
- // do nothing
- if (lastVersion == -1) {
- return;
- }
-
- try {
- CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
- if (ulog.isBuffering()) {
- log.debug("Advancing replica buffering tlog reader to {} @ {}:{}", lastVersion, collection, shardId);
- ulog.getBufferToggle().seek(lastVersion);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Couldn't advance replica buffering tlog reader to {} (to remove old tlogs): {}", lastVersion, e.getMessage());
- } catch (IOException e) {
- log.warn("Couldn't advance replica buffering tlog reader to {} (to remove old tlogs): {}", lastVersion, e.getMessage());
- }
- } catch (Throwable e) {
- log.warn("Caught unexpected exception", e);
- throw e;
- }
- }
- }
-
-}
-