You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/05/22 20:58:29 UTC
svn commit: r1681186 [2/5] - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/processor/
core/src/java/org/apache/solr/util/ core/src/test-files/solr/collect...
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java Fri May 22 18:58:29 2015
@@ -0,0 +1,615 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.PluginBag;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.CdcrUpdateLog;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.plugin.SolrCoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ * This request handler implements the CDCR API and is responsible of the execution of the
+ * {@link CdcrReplicator} threads.
+ * </p>
+ * <p>
+ * It relies on three classes, {@link org.apache.solr.handler.CdcrLeaderStateManager},
+ * {@link org.apache.solr.handler.CdcrBufferStateManager} and {@link org.apache.solr.handler.CdcrProcessStateManager}
+ * to synchronise the state of the CDCR across all the nodes.
+ * </p>
+ * <p>
+ * The CDCR process can be either {@link org.apache.solr.handler.CdcrParams.ProcessState#STOPPED} or {@link org.apache.solr.handler.CdcrParams.ProcessState#STARTED} by using the
+ * actions {@link org.apache.solr.handler.CdcrParams.CdcrAction#STOP} and {@link org.apache.solr.handler.CdcrParams.CdcrAction#START} respectively. If a node is leader and the process
+ * state is {@link org.apache.solr.handler.CdcrParams.ProcessState#STARTED}, the {@link CdcrReplicatorManager} will
+ * start the {@link CdcrReplicator} threads. If a node becomes non-leader or if the process state becomes
+ * {@link org.apache.solr.handler.CdcrParams.ProcessState#STOPPED}, the {@link CdcrReplicator} threads are stopped.
+ * </p>
+ * <p>
+ * The CDCR can be switched to a "buffering" mode, in which the update log will never delete old transaction log
+ * files. Such a mode can be enabled or disabled using the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#ENABLEBUFFER} and
+ * {@link org.apache.solr.handler.CdcrParams.CdcrAction#DISABLEBUFFER} respectively.
+ * </p>
+ * <p>
+ * Known limitations: The source and target clusters must have the same topology. Replication between clusters
+ * with a different number of shards will likely results in an inconsistent index.
+ * </p>
+ */
+public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAware {
+
+ protected static Logger log = LoggerFactory.getLogger(CdcrRequestHandler.class);
+
+ private SolrCore core;
+ private String collection;
+ private String path;
+
+ private SolrParams updateLogSynchronizerConfiguration;
+ private SolrParams replicatorConfiguration;
+ private SolrParams bufferConfiguration;
+ private Map<String, List<SolrParams>> replicasConfiguration;
+
+ private CdcrProcessStateManager processStateManager;
+ private CdcrBufferStateManager bufferStateManager;
+ private CdcrReplicatorManager replicatorManager;
+ private CdcrLeaderStateManager leaderStateManager;
+ private CdcrUpdateLogSynchronizer updateLogSynchronizer;
+ private CdcrBufferManager bufferManager;
+
+ @Override
+ public void init(NamedList args) {
+ super.init(args);
+
+ if (args != null) {
+ // Configuration of the Update Log Synchronizer
+ Object updateLogSynchonizerParam = args.get(CdcrParams.UPDATE_LOG_SYNCHRONIZER_PARAM);
+ if (updateLogSynchonizerParam != null && updateLogSynchonizerParam instanceof NamedList) {
+ updateLogSynchronizerConfiguration = SolrParams.toSolrParams((NamedList) updateLogSynchonizerParam);
+ }
+
+ // Configuration of the Replicator
+ Object replicatorParam = args.get(CdcrParams.REPLICATOR_PARAM);
+ if (replicatorParam != null && replicatorParam instanceof NamedList) {
+ replicatorConfiguration = SolrParams.toSolrParams((NamedList) replicatorParam);
+ }
+
+ // Configuration of the Buffer
+ Object bufferParam = args.get(CdcrParams.BUFFER_PARAM);
+ if (bufferParam != null && bufferParam instanceof NamedList) {
+ bufferConfiguration = SolrParams.toSolrParams((NamedList) bufferParam);
+ }
+
+ // Configuration of the Replicas
+ replicasConfiguration = new HashMap<>();
+ List replicas = args.getAll(CdcrParams.REPLICA_PARAM);
+ for (Object replica : replicas) {
+ if (replicas != null && replica instanceof NamedList) {
+ SolrParams params = SolrParams.toSolrParams((NamedList) replica);
+ if (!replicasConfiguration.containsKey(params.get(CdcrParams.SOURCE_COLLECTION_PARAM))) {
+ replicasConfiguration.put(params.get(CdcrParams.SOURCE_COLLECTION_PARAM), new ArrayList<SolrParams>());
+ }
+ replicasConfiguration.get(params.get(CdcrParams.SOURCE_COLLECTION_PARAM)).add(params);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
+ // Pick the action
+ SolrParams params = req.getParams();
+ CdcrParams.CdcrAction action = null;
+ String a = params.get(CommonParams.ACTION);
+ if (a != null) {
+ action = CdcrParams.CdcrAction.get(a);
+ }
+ if (action == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
+ }
+
+ switch (action) {
+ case START: {
+ this.handleStartAction(req, rsp);
+ break;
+ }
+ case STOP: {
+ this.handleStopAction(req, rsp);
+ break;
+ }
+ case STATUS: {
+ this.handleStatusAction(req, rsp);
+ break;
+ }
+ case COLLECTIONCHECKPOINT: {
+ this.handleCollectionCheckpointAction(req, rsp);
+ break;
+ }
+ case SHARDCHECKPOINT: {
+ this.handleShardCheckpointAction(req, rsp);
+ break;
+ }
+ case ENABLEBUFFER: {
+ this.handleEnableBufferAction(req, rsp);
+ break;
+ }
+ case DISABLEBUFFER: {
+ this.handleDisableBufferAction(req, rsp);
+ break;
+ }
+ case LASTPROCESSEDVERSION: {
+ this.handleLastProcessedVersionAction(req, rsp);
+ break;
+ }
+ case QUEUES: {
+ this.handleQueuesAction(req, rsp);
+ break;
+ }
+ case OPS: {
+ this.handleOpsAction(req, rsp);
+ break;
+ }
+ case ERRORS: {
+ this.handleErrorsAction(req, rsp);
+ break;
+ }
+ default: {
+ throw new RuntimeException("Unknown action: " + action);
+ }
+ }
+
+ rsp.setHttpCaching(false);
+ }
+
+ @Override
+ public void inform(SolrCore core) {
+ this.core = core;
+ collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+
+ // Make sure that the core is ZKAware
+ if (!core.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Solr instance is not running in SolrCloud mode.");
+ }
+
+ // Make sure that the core is using the CdcrUpdateLog implementation
+ if (!(core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog)) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Solr instance is not configured with the cdcr update log.");
+ }
+
+ // Find the registered path of the handler
+ path = null;
+ for (Map.Entry<String, PluginBag.PluginHolder<SolrRequestHandler>> entry : core.getRequestHandlers().getRegistry().entrySet()) {
+ if (core.getRequestHandlers().isLoaded(entry.getKey()) && entry.getValue().get() == this) {
+ path = entry.getKey();
+ break;
+ }
+ }
+ if (path == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "The CdcrRequestHandler is not registered with the current core.");
+ }
+ if (!path.startsWith("/")) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "The CdcrRequestHandler needs to be registered to a path. Typically this is '/cdcr'");
+ }
+
+ // Initialisation phase
+ // If the Solr cloud is being initialised, each CDCR node will start up in its default state, i.e., STOPPED
+ // and non-leader. The leader state will be updated later, when all the Solr cores have been loaded.
+ // If the Solr cloud has already been initialised, and the core is reloaded (i.e., because a node died or a new node
+ // is added to the cluster), the CDCR node will synchronise its state with the global CDCR state that is stored
+ // in zookeeper.
+
+ // Initialise the buffer state manager
+ bufferStateManager = new CdcrBufferStateManager(core, bufferConfiguration);
+ // Initialise the process state manager
+ processStateManager = new CdcrProcessStateManager(core);
+ // Initialise the leader state manager
+ leaderStateManager = new CdcrLeaderStateManager(core);
+
+ // Initialise the replicator states manager
+ replicatorManager = new CdcrReplicatorManager(core, path, replicatorConfiguration, replicasConfiguration);
+ replicatorManager.setProcessStateManager(processStateManager);
+ replicatorManager.setLeaderStateManager(leaderStateManager);
+ // we need to inform it of a state event since the process and leader state
+ // may have been synchronised during the initialisation
+ replicatorManager.stateUpdate();
+
+ // Initialise the update log synchronizer
+ updateLogSynchronizer = new CdcrUpdateLogSynchronizer(core, path, updateLogSynchronizerConfiguration);
+ updateLogSynchronizer.setLeaderStateManager(leaderStateManager);
+ // we need to inform it of a state event since the leader state
+ // may have been synchronised during the initialisation
+ updateLogSynchronizer.stateUpdate();
+
+ // Initialise the buffer manager
+ bufferManager = new CdcrBufferManager(core);
+ bufferManager.setLeaderStateManager(leaderStateManager);
+ bufferManager.setBufferStateManager(bufferStateManager);
+ // we need to inform it of a state event since the leader state
+ // may have been synchronised during the initialisation
+ bufferManager.stateUpdate();
+
+ // register the close hook
+ this.registerCloseHook(core);
+ }
+
+ /**
+ * register a close hook to properly shutdown the state manager and scheduler
+ */
+ private void registerCloseHook(SolrCore core) {
+ core.addCloseHook(new CloseHook() {
+
+ @Override
+ public void preClose(SolrCore core) {
+ String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+ log.info("Solr core is being closed - shutting down CDCR handler @ {}:{}", collectionName, shard);
+
+ updateLogSynchronizer.shutdown();
+ replicatorManager.shutdown();
+ bufferStateManager.shutdown();
+ processStateManager.shutdown();
+ leaderStateManager.shutdown();
+ }
+
+ @Override
+ public void postClose(SolrCore core) {
+ }
+
+ });
+ }
+
+ /**
+ * <p>
+ * Update and synchronize the process state.
+ * </p>
+ * <p>
+ * The process state manager must notify the replicator states manager of the change of state.
+ * </p>
+ */
+ private void handleStartAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+ if (processStateManager.getState() == CdcrParams.ProcessState.STOPPED) {
+ processStateManager.setState(CdcrParams.ProcessState.STARTED);
+ processStateManager.synchronize();
+ }
+
+ rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
+ }
+
+ private void handleStopAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+ if (processStateManager.getState() == CdcrParams.ProcessState.STARTED) {
+ processStateManager.setState(CdcrParams.ProcessState.STOPPED);
+ processStateManager.synchronize();
+ }
+
+ rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
+ }
+
+ private void handleStatusAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+ rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
+ }
+
+ private NamedList getStatus() {
+ NamedList status = new NamedList();
+ status.add(CdcrParams.ProcessState.getParam(), processStateManager.getState().toLower());
+ status.add(CdcrParams.BufferState.getParam(), bufferStateManager.getState().toLower());
+ return status;
+ }
+
+ /**
+ * This action is generally executed on the target cluster in order to retrieve the latest update checkpoint.
+ * This checkpoint is used on the source cluster to setup the
+ * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader} of a shard leader. <br/>
+ * This method will execute in parallel one
+ * {@link org.apache.solr.handler.CdcrParams.CdcrAction#SHARDCHECKPOINT} request per shard leader. It will
+ * then pick the lowest version number as checkpoint. Picking the lowest amongst all shards will ensure that we do not
+ * pick a checkpoint that is ahead of the source cluster. This can occur when other shard leaders are sending new
+ * updates to the target cluster while we are currently instantiating the
+ * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}.
+ * This solution only works in scenarios where the topology of the source and target clusters are identical.
+ */
+ private void handleCollectionCheckpointAction(SolrQueryRequest req, SolrQueryResponse rsp)
+ throws IOException, SolrServerException {
+ ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
+ try {
+ zkController.getZkStateReader().updateClusterState(true);
+ } catch (Exception e) {
+ log.warn("Error when updating cluster state", e);
+ }
+ ClusterState cstate = zkController.getClusterState();
+ Collection<Slice> shards = cstate.getActiveSlices(collection);
+
+ ExecutorService parallelExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new DefaultSolrThreadFactory("parallelCdcrExecutor"));
+
+ long checkpoint = Long.MAX_VALUE;
+ try {
+ List<Callable<Long>> callables = new ArrayList<>();
+ for (Slice shard : shards) {
+ ZkNodeProps leaderProps = zkController.getZkStateReader().getLeaderRetry(collection, shard.getName());
+ ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
+ callables.add(new SliceCheckpointCallable(nodeProps.getCoreUrl(), path));
+ }
+
+ for (final Future<Long> future : parallelExecutor.invokeAll(callables)) {
+ long version = future.get();
+ if (version < checkpoint) { // we must take the lowest checkpoint from all the shards
+ checkpoint = version;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Error while requesting shard's checkpoints", e);
+ } catch (ExecutionException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Error while requesting shard's checkpoints", e);
+ } finally {
+ parallelExecutor.shutdown();
+ }
+
+ rsp.add(CdcrParams.CHECKPOINT, checkpoint);
+ }
+
+ /**
+ * Retrieve the version number of the latest entry of the {@link org.apache.solr.update.UpdateLog}.
+ */
+ private void handleShardCheckpointAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+ if (!leaderStateManager.amILeader()) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
+ "' sent to non-leader replica");
+ }
+
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
+ List<Long> versions = recentUpdates.getVersions(1);
+ long lastVersion = versions.isEmpty() ? -1 : Math.abs(versions.get(0));
+ rsp.add(CdcrParams.CHECKPOINT, lastVersion);
+ recentUpdates.close();
+ }
+
+ private void handleEnableBufferAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+ if (bufferStateManager.getState() == CdcrParams.BufferState.DISABLED) {
+ bufferStateManager.setState(CdcrParams.BufferState.ENABLED);
+ bufferStateManager.synchronize();
+ }
+
+ rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
+ }
+
+ private void handleDisableBufferAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+ if (bufferStateManager.getState() == CdcrParams.BufferState.ENABLED) {
+ bufferStateManager.setState(CdcrParams.BufferState.DISABLED);
+ bufferStateManager.synchronize();
+ }
+
+ rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
+ }
+
+ /**
+ * <p>
+ * We have to take care of four cases:
+ * <ul>
+ * <li>Replication & Buffering</li>
+ * <li>Replication & No Buffering</li>
+ * <li>No Replication & Buffering</li>
+ * <li>No Replication & No Buffering</li>
+ * </ul>
+ * In the first three cases, at least one log reader should have been initialised. We should take the lowest
+ * last processed version across all the initialised readers. In the last case, there isn't a log reader
+ * initialised. We should instantiate one and get the version of the first entries.
+ * </p>
+ */
+ private void handleLastProcessedVersionAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+ String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+
+ if (!leaderStateManager.amILeader()) {
+ log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.LASTPROCESSEDVERSION, collectionName, shard);
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.LASTPROCESSEDVERSION +
+ " sent to non-leader replica");
+ }
+
+ // take care of the first three cases
+ // first check the log readers from the replicator states
+ long lastProcessedVersion = Long.MAX_VALUE;
+ for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
+ long version = Long.MAX_VALUE;
+ if (state.getLogReader() != null) {
+ version = state.getLogReader().getLastVersion();
+ }
+ lastProcessedVersion = Math.min(lastProcessedVersion, version);
+ }
+
+ // next check the log reader of the buffer
+ CdcrUpdateLog.CdcrLogReader bufferLogReader = ((CdcrUpdateLog) core.getUpdateHandler().getUpdateLog()).getBufferToggle();
+ if (bufferLogReader != null) {
+ lastProcessedVersion = Math.min(lastProcessedVersion, bufferLogReader.getLastVersion());
+ }
+
+ // the fourth case: no cdc replication, no buffering: all readers were null
+ if (processStateManager.getState().equals(CdcrParams.ProcessState.STOPPED) &&
+ bufferStateManager.getState().equals(CdcrParams.BufferState.DISABLED)) {
+ CdcrUpdateLog.CdcrLogReader logReader = ((CdcrUpdateLog) core.getUpdateHandler().getUpdateLog()).newLogReader();
+ try {
+ // let the reader initialize lastVersion
+ logReader.next();
+ lastProcessedVersion = Math.min(lastProcessedVersion, logReader.getLastVersion());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Error while fetching the last processed version", e);
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Error while fetching the last processed version", e);
+ } finally {
+ logReader.close();
+ }
+ }
+
+ log.info("Returning the lowest last processed version {} @ {}:{}", lastProcessedVersion, collectionName, shard);
+ rsp.add(CdcrParams.LAST_PROCESSED_VERSION, lastProcessedVersion);
+ }
+
+ private void handleQueuesAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+ NamedList hosts = new NamedList();
+
+ for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
+ NamedList queueStats = new NamedList();
+
+ CdcrUpdateLog.CdcrLogReader logReader = state.getLogReader();
+ if (logReader == null) {
+ String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ String shard = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
+ log.warn("The log reader for target collection {} is not initialised @ {}:{}",
+ state.getTargetCollection(), collectionName, shard);
+ queueStats.add(CdcrParams.QUEUE_SIZE, -1l);
+ } else {
+ queueStats.add(CdcrParams.QUEUE_SIZE, logReader.getNumberOfRemainingRecords());
+ }
+ queueStats.add(CdcrParams.LAST_TIMESTAMP, state.getTimestampOfLastProcessedOperation());
+
+ if (hosts.get(state.getZkHost()) == null) {
+ hosts.add(state.getZkHost(), new NamedList());
+ }
+ ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), queueStats);
+ }
+
+ rsp.add(CdcrParams.QUEUES, hosts);
+ UpdateLog updateLog = core.getUpdateHandler().getUpdateLog();
+ rsp.add(CdcrParams.TLOG_TOTAL_SIZE, updateLog.getTotalLogsSize());
+ rsp.add(CdcrParams.TLOG_TOTAL_COUNT, updateLog.getTotalLogsNumber());
+ rsp.add(CdcrParams.UPDATE_LOG_SYNCHRONIZER,
+ updateLogSynchronizer.isStarted() ? CdcrParams.ProcessState.STARTED.toLower() : CdcrParams.ProcessState.STOPPED.toLower());
+ }
+
+ private void handleOpsAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+ NamedList hosts = new NamedList();
+
+ for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
+ NamedList ops = new NamedList();
+ ops.add(CdcrParams.COUNTER_ALL, state.getBenchmarkTimer().getOperationsPerSecond());
+ ops.add(CdcrParams.COUNTER_ADDS, state.getBenchmarkTimer().getAddsPerSecond());
+ ops.add(CdcrParams.COUNTER_DELETES, state.getBenchmarkTimer().getDeletesPerSecond());
+
+ if (hosts.get(state.getZkHost()) == null) {
+ hosts.add(state.getZkHost(), new NamedList());
+ }
+ ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), ops);
+ }
+
+ rsp.add(CdcrParams.OPERATIONS_PER_SECOND, hosts);
+ }
+
+ private void handleErrorsAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+ NamedList hosts = new NamedList();
+
+ for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
+ NamedList errors = new NamedList();
+
+ errors.add(CdcrParams.CONSECUTIVE_ERRORS, state.getConsecutiveErrors());
+ errors.add(CdcrReplicatorState.ErrorType.BAD_REQUEST.toLower(), state.getErrorCount(CdcrReplicatorState.ErrorType.BAD_REQUEST));
+ errors.add(CdcrReplicatorState.ErrorType.INTERNAL.toLower(), state.getErrorCount(CdcrReplicatorState.ErrorType.INTERNAL));
+
+ NamedList lastErrors = new NamedList();
+ for (String[] lastError : state.getLastErrors()) {
+ lastErrors.add(lastError[0], lastError[1]);
+ }
+ errors.add(CdcrParams.LAST, lastErrors);
+
+ if (hosts.get(state.getZkHost()) == null) {
+ hosts.add(state.getZkHost(), new NamedList());
+ }
+ ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), errors);
+ }
+
+ rsp.add(CdcrParams.ERRORS, hosts);
+ }
+
+ @Override
+ public String getDescription() {
+ return "Manage Cross Data Center Replication";
+ }
+
+ /**
+ * A thread subclass for executing a single
+ * {@link org.apache.solr.handler.CdcrParams.CdcrAction#SHARDCHECKPOINT} action.
+ */
+ private static final class SliceCheckpointCallable implements Callable<Long> {
+
+ final String baseUrl;
+ final String cdcrPath;
+
+ SliceCheckpointCallable(final String baseUrl, final String cdcrPath) {
+ this.baseUrl = baseUrl;
+ this.cdcrPath = cdcrPath;
+ }
+
+ @Override
+ public Long call() throws Exception {
+ HttpSolrClient server = new HttpSolrClient(baseUrl);
+ try {
+ server.setConnectionTimeout(15000);
+ server.setSoTimeout(60000);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CommonParams.ACTION, CdcrParams.CdcrAction.SHARDCHECKPOINT.toString());
+
+ SolrRequest request = new QueryRequest(params);
+ request.setPath(cdcrPath);
+
+ NamedList response = server.request(request);
+ return (Long) response.get(CdcrParams.CHECKPOINT);
+ } finally {
+ server.close();
+ }
+ }
+
+ }
+
+}
+
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java Fri May 22 18:58:29 2015
@@ -0,0 +1,48 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A state manager which implements an observer pattern to notify observers
+ * of a state change.
+ */
+abstract class CdcrStateManager {
+
+ private List<CdcrStateObserver> observers = new ArrayList<>();
+
+ void register(CdcrStateObserver observer) {
+ this.observers.add(observer);
+ }
+
+ void callback() {
+ for (CdcrStateObserver observer : observers) {
+ observer.stateUpdate();
+ }
+ }
+
+ static interface CdcrStateObserver {
+
+ public void stateUpdate();
+
+ }
+
+}
+
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java Fri May 22 18:58:29 2015
@@ -0,0 +1,183 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.CdcrUpdateLog;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ * Synchronize periodically the update log of non-leader nodes with their leaders.
+ * </p>
+ * <p>
+ * Non-leader nodes must always buffer updates in case of leader failures. They have to periodically
+ * synchronize their update logs with their leader to remove old transaction logs that will never be used anymore.
+ * This is performed by a background thread that is scheduled with a fixed delay. The background thread is sending
+ * the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#LASTPROCESSEDVERSION} to the leader to retrieve
+ * the lowest last version number processed. This version is then used to move forward the buffer log reader.
+ * </p>
+ */
+class CdcrUpdateLogSynchronizer implements CdcrStateManager.CdcrStateObserver {
+
+ private CdcrLeaderStateManager leaderStateManager;
+ private ScheduledExecutorService scheduler;
+
+ private final SolrCore core;
+ private final String collection;
+ private final String shardId;
+ private final String path;
+
+ private int timeSchedule = DEFAULT_TIME_SCHEDULE;
+
+ private static final int DEFAULT_TIME_SCHEDULE = 60000; // by default, every minute
+
+ protected static Logger log = LoggerFactory.getLogger(CdcrUpdateLogSynchronizer.class);
+
+ CdcrUpdateLogSynchronizer(SolrCore core, String path, SolrParams updateLogSynchonizerConfiguration) {
+ this.core = core;
+ this.path = path;
+ this.collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ this.shardId = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+ if (updateLogSynchonizerConfiguration != null) {
+ this.timeSchedule = updateLogSynchonizerConfiguration.getInt(CdcrParams.SCHEDULE_PARAM, DEFAULT_TIME_SCHEDULE);
+ }
+ }
+
+ void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
+ this.leaderStateManager = leaderStateManager;
+ this.leaderStateManager.register(this);
+ }
+
+ @Override
+ public void stateUpdate() {
+ // If I am not the leader, I need to synchronise periodically my update log with my leader.
+ if (!leaderStateManager.amILeader()) {
+ scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-update-log-synchronizer"));
+ scheduler.scheduleWithFixedDelay(new UpdateLogSynchronisation(), 0, timeSchedule, TimeUnit.MILLISECONDS);
+ return;
+ }
+
+ this.shutdown();
+ }
+
+ boolean isStarted() {
+ return scheduler != null;
+ }
+
+ void shutdown() {
+ if (scheduler != null) {
+ scheduler.shutdownNow();
+ scheduler = null;
+ }
+ }
+
+ private class UpdateLogSynchronisation implements Runnable {
+
+ private String getLeaderUrl() {
+ ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
+ ClusterState cstate = zkController.getClusterState();
+ ZkNodeProps leaderProps = cstate.getLeader(collection, shardId);
+ if (leaderProps == null) { // we might not have a leader yet, returns null
+ return null;
+ }
+ ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
+ return nodeProps.getCoreUrl();
+ }
+
+ @Override
+ public void run() {
+ try {
+ String leaderUrl = getLeaderUrl();
+ if (leaderUrl == null) { // we might not have a leader yet, stop and try again later
+ return;
+ }
+
+ HttpSolrClient server = new HttpSolrClient(leaderUrl);
+ server.setConnectionTimeout(15000);
+ server.setSoTimeout(60000);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CommonParams.ACTION, CdcrParams.CdcrAction.LASTPROCESSEDVERSION.toString());
+
+ SolrRequest request = new QueryRequest(params);
+ request.setPath(path);
+
+ long lastVersion;
+ try {
+ NamedList response = server.request(request);
+ lastVersion = (Long) response.get(CdcrParams.LAST_PROCESSED_VERSION);
+ log.debug("My leader {} says its last processed _version_ number is: {}. I am {}", leaderUrl, lastVersion,
+ core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
+ } catch (IOException | SolrServerException e) {
+ log.warn("Couldn't get last processed version from leader {}: {}", leaderUrl, e.getMessage());
+ return;
+ } finally {
+ try {
+ server.close();
+ } catch (IOException ioe) {
+ log.warn("Caught exception trying to close server: ", leaderUrl, ioe.getMessage());
+ }
+ }
+
+ // if we received -1, it means that the log reader on the leader has not yet started to read log entries
+ // do nothing
+ if (lastVersion == -1) {
+ return;
+ }
+
+ try {
+ CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
+ if (ulog.isBuffering()) {
+ log.debug("Advancing replica buffering tlog reader to {} @ {}:{}", lastVersion, collection, shardId);
+ ulog.getBufferToggle().seek(lastVersion);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Couldn't advance replica buffering tlog reader to {} (to remove old tlogs): {}", lastVersion, e.getMessage());
+ } catch (IOException e) {
+ log.warn("Couldn't advance replica buffering tlog reader to {} (to remove old tlogs): {}", lastVersion, e.getMessage());
+ }
+ } catch (Throwable e) {
+ log.warn("Caught unexpected exception", e);
+ throw e;
+ }
+ }
+ }
+
+}
+
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java Fri May 22 18:58:29 2015
@@ -29,7 +29,6 @@ import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
-import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -46,7 +45,6 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.zip.Adler32;
@@ -78,11 +76,12 @@ import org.apache.solr.core.DirectoryFac
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.ReplicationHandler.FileInfo;
+import org.apache.solr.handler.ReplicationHandler.*;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.PropertiesInputStream;
@@ -111,6 +110,8 @@ import static org.apache.solr.handler.Re
import static org.apache.solr.handler.ReplicationHandler.MASTER_URL;
import static org.apache.solr.handler.ReplicationHandler.OFFSET;
import static org.apache.solr.handler.ReplicationHandler.SIZE;
+import static org.apache.solr.handler.ReplicationHandler.TLOG_FILE;
+import static org.apache.solr.handler.ReplicationHandler.TLOG_FILES;
/**
* <p> Provides functionality of downloading changed index files as well as config files and a timer for scheduling fetches from the
@@ -138,14 +139,18 @@ public class IndexFetcher {
private volatile List<Map<String, Object>> confFilesToDownload;
+ private volatile List<Map<String, Object>> tlogFilesToDownload;
+
private volatile List<Map<String, Object>> filesDownloaded;
private volatile List<Map<String, Object>> confFilesDownloaded;
+ private volatile List<Map<String, Object>> tlogFilesDownloaded;
+
private volatile Map<String, Object> currentFile;
private volatile DirectoryFileFetcher dirFileFetcher;
-
+
private volatile LocalFsFileFetcher localFileFetcher;
private volatile ExecutorService fsyncService;
@@ -180,7 +185,7 @@ public class IndexFetcher {
LOG.warn("'masterUrl' must be specified without the /replication suffix");
}
this.masterUrl = masterUrl;
-
+
this.replicationHandler = handler;
String compress = (String) initArgs.get(COMPRESSION);
useInternal = INTERNAL.equals(compress);
@@ -207,7 +212,7 @@ public class IndexFetcher {
try (HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient)) {
client.setSoTimeout(60000);
client.setConnectionTimeout(15000);
-
+
return client.request(req);
} catch (SolrServerException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e.getMessage(), e);
@@ -243,6 +248,10 @@ public class IndexFetcher {
if (files != null)
confFilesToDownload = Collections.synchronizedList(files);
+ files = (List<Map<String, Object>>) response.get(TLOG_FILES);
+ if (files != null) {
+ tlogFilesToDownload = Collections.synchronizedList(files);
+ }
} catch (SolrServerException e) {
throw new IOException(e);
}
@@ -251,18 +260,18 @@ public class IndexFetcher {
boolean fetchLatestIndex(boolean forceReplication) throws IOException, InterruptedException {
return fetchLatestIndex(forceReplication, false);
}
-
+
/**
* This command downloads all the necessary files from master to install a index commit point. Only changed files are
* downloaded. It also downloads the conf files (if they are modified).
*
- * @param forceReplication force a replication in all cases
+ * @param forceReplication force a replication in all cases
* @param forceCoreReload force a core reload in all cases
* @return true on success, false if slave is already in sync
* @throws IOException if an exception occurs
*/
boolean fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException {
-
+
boolean cleanupDone = false;
boolean successfulInstall = false;
replicationStartTime = System.currentTimeMillis();
@@ -271,14 +280,14 @@ public class IndexFetcher {
Directory indexDir = null;
String indexDirPath;
boolean deleteTmpIdxDir = true;
-
+
if (!solrCore.getSolrCoreState().getLastReplicateIndexSuccess()) {
// if the last replication was not a success, we force a full replication
// when we are a bit more confident we may want to try a partial replication
// if the error is connection related or something, but we have to be careful
forceReplication = true;
}
-
+
try {
//get the current 'replicateable' index version in the master
NamedList response;
@@ -323,12 +332,12 @@ public class IndexFetcher {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore, new ModifiableSolrParams());
solrCore.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
}
-
+
//there is nothing to be replicated
successfulInstall = true;
return true;
}
-
+
if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
//master and slave are already in sync just return
LOG.info("Slave in sync with master.");
@@ -345,6 +354,9 @@ public class IndexFetcher {
return false;
}
LOG.info("Number of files in latest index in master: " + filesToDownload.size());
+ if (tlogFilesToDownload != null) {
+ LOG.info("Number of tlog files in master: " + tlogFilesToDownload.size());
+ }
// Create the sync service
fsyncService = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("fsyncService"));
@@ -356,11 +368,12 @@ public class IndexFetcher {
.getCommitTimestamp(commit) >= latestVersion
|| commit.getGeneration() >= latestGeneration || forceReplication;
- String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
+ String timestamp = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
+ String tmpIdxDirName = "index." + timestamp;
tmpIndex = solrCore.getDataDir() + tmpIdxDirName;
tmpIndexDir = solrCore.getDirectoryFactory().get(tmpIndex, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
-
+
// cindex dir...
indexDirPath = solrCore.getIndexDir();
indexDir = solrCore.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
@@ -372,7 +385,7 @@ public class IndexFetcher {
if (!isFullCopyNeeded && isIndexStale(indexDir)) {
isFullCopyNeeded = true;
}
-
+
if (!isFullCopyNeeded) {
// a searcher might be using some flushed but not committed segments
// because of soft commits (which open a searcher on IW's data)
@@ -410,13 +423,16 @@ public class IndexFetcher {
solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(solrCore, true);
}
boolean reloadCore = false;
-
+
try {
LOG.info("Starting download to " + tmpIndexDir + " fullCopy="
+ isFullCopyNeeded);
successfulInstall = false;
-
+
downloadIndexFiles(isFullCopyNeeded, indexDir, tmpIndexDir, latestGeneration);
+ if (tlogFilesToDownload != null) {
+ downloadTlogFiles(timestamp, latestGeneration);
+ }
LOG.info("Total time taken for download : "
+ ((System.currentTimeMillis() - replicationStartTime) / 1000)
+ " secs");
@@ -440,7 +456,7 @@ public class IndexFetcher {
solrCore.getDirectoryFactory().remove(indexDir);
}
}
-
+
LOG.info("Configuration files are modified, core will be reloaded");
logReplicationTimeAndConfFiles(modifiedConfFiles,
successfulInstall);// write to a file time of replication and
@@ -464,7 +480,7 @@ public class IndexFetcher {
solrCore.getUpdateHandler().getSolrCoreState().openIndexWriter(solrCore);
}
}
-
+
// we must reload the core after we open the IW back up
if (successfulInstall && (reloadCore || forceCoreReload)) {
LOG.info("Reloading SolrCore {}", solrCore.getName());
@@ -484,10 +500,10 @@ public class IndexFetcher {
if (isFullCopyNeeded) {
solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);
}
-
+
openNewSearcherAndUpdateCommitPoint();
}
-
+
if (!isFullCopyNeeded && !forceReplication && !successfulInstall) {
cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, successfulInstall);
cleanupDone = true;
@@ -497,7 +513,7 @@ public class IndexFetcher {
reloadCore);
successfulInstall = fetchLatestIndex(true, reloadCore);
}
-
+
replicationStartTime = 0;
return successfulInstall;
} catch (ReplicationHandlerException e) {
@@ -605,7 +621,7 @@ public class IndexFetcher {
Directory dir = null;
try {
dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), DirContext.META_DATA, solrCore.getSolrConfig().indexConfig.lockType);
-
+
int indexCount = 1, confFilesCount = 1;
if (props.containsKey(TIMES_INDEX_REPLICATED)) {
indexCount = Integer.valueOf(props.getProperty(TIMES_INDEX_REPLICATED)) + 1;
@@ -696,7 +712,7 @@ public class IndexFetcher {
private void openNewSearcherAndUpdateCommitPoint() throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
new ModifiableSolrParams());
-
+
RefCounted<SolrIndexSearcher> searcher = null;
IndexCommit commitPoint;
try {
@@ -719,7 +735,7 @@ public class IndexFetcher {
// update the commit point in replication handler
replicationHandler.indexCommitPoint = commitPoint;
-
+
}
private void reloadCore() {
@@ -756,7 +772,7 @@ public class IndexFetcher {
}
for (Map<String, Object> file : confFilesToDownload) {
String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
- localFileFetcher = new LocalFsFileFetcher(tmpconfDir, file, saveAs, true, latestGeneration);
+ localFileFetcher = new LocalFsFileFetcher(tmpconfDir, file, saveAs, CONF_FILE_SHORT, latestGeneration);
currentFile = file;
localFileFetcher.fetchFile();
confFilesDownloaded.add(new HashMap<>(file));
@@ -770,6 +786,34 @@ public class IndexFetcher {
}
}
+ private void downloadTlogFiles(String timestamp, long latestGeneration) throws Exception {
+ UpdateLog ulog = solrCore.getUpdateHandler().getUpdateLog();
+
+ LOG.info("Starting download of tlog files from master: " + tlogFilesToDownload);
+ tlogFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
+ File tmpTlogDir = new File(ulog.getLogDir(), "tlog." + getDateAsStr(new Date()));
+ try {
+ boolean status = tmpTlogDir.mkdirs();
+ if (!status) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Failed to create temporary tlog folder: " + tmpTlogDir.getName());
+ }
+ for (Map<String, Object> file : tlogFilesToDownload) {
+ String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
+ localFileFetcher = new LocalFsFileFetcher(tmpTlogDir, file, saveAs, TLOG_FILE, latestGeneration);
+ currentFile = file;
+ localFileFetcher.fetchFile();
+ tlogFilesDownloaded.add(new HashMap<>(file));
+ }
+ // this is called before copying the files to the original conf dir
+ // so that if there is an exception avoid corrupting the original files.
+ terminateAndWaitFsyncService();
+ copyTmpTlogFiles2Tlog(tmpTlogDir, timestamp);
+ } finally {
+ delTree(tmpTlogDir);
+ }
+ }
+
/**
* Download the index files. If a new index is needed, download all the files.
*
@@ -790,7 +834,7 @@ public class IndexFetcher {
if (!compareResult.equal || downloadCompleteIndex
|| filesToAlwaysDownloadIfNoChecksums(filename, size, compareResult)) {
dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file,
- (String) file.get(NAME), false, latestGeneration);
+ (String) file.get(NAME), FILE, latestGeneration);
currentFile = file;
dirFileFetcher.fetchFile();
filesDownloaded.add(new HashMap<>(file));
@@ -829,10 +873,10 @@ public class IndexFetcher {
LOG.warn("Could not retrieve checksum from file.", e);
}
}
-
+
if (!compareResult.checkSummed) {
// we don't have checksums to compare
-
+
if (indexFileLen == backupIndexFileLen) {
compareResult.equal = true;
return compareResult;
@@ -843,9 +887,9 @@ public class IndexFetcher {
return compareResult;
}
}
-
+
// we have checksums to compare
-
+
if (indexFileLen == backupIndexFileLen && indexFileChecksum == backupIndexFileChecksum) {
compareResult.equal = true;
return compareResult;
@@ -878,7 +922,7 @@ public class IndexFetcher {
} catch (NoSuchFileException | FileNotFoundException e) {
return false;
}
- }
+ }
/**
* All the files which are common between master and slave must have same size and same checksum else we assume
@@ -969,7 +1013,7 @@ public class IndexFetcher {
}
/**
- * Make file list
+ * Make file list
*/
private List<File> makeTmpConfDirFileList(File dir, List<File> fileList) {
File[] files = dir.listFiles();
@@ -982,7 +1026,7 @@ public class IndexFetcher {
}
return fileList;
}
-
+
/**
* The conf files are copied to the tmp dir to the conf dir. A backup of the old file is maintained
*/
@@ -1021,6 +1065,30 @@ public class IndexFetcher {
}
}
+ /**
+ * The tlog files are copied from the tmp dir to the tlog dir by renaming the directory if possible.
+ * A backup of the old file is maintained.
+ */
+ private void copyTmpTlogFiles2Tlog(File tmpTlogDir, String timestamp) {
+ File tlogDir = new File(solrCore.getUpdateHandler().getUpdateLog().getLogDir());
+ File backupTlogDir = new File(tlogDir.getParent(), UpdateLog.TLOG_NAME + "." + timestamp);
+
+ try {
+ org.apache.commons.io.FileUtils.moveDirectory(tlogDir, backupTlogDir);
+ } catch (IOException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Unable to rename: " + tlogDir + " to: " + backupTlogDir, e);
+ }
+
+ try {
+ tmpTlogDir = new File(backupTlogDir, tmpTlogDir.getName());
+ org.apache.commons.io.FileUtils.moveDirectory(tmpTlogDir, tlogDir);
+ } catch (IOException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Unable to rename: " + tmpTlogDir + " to: " + tlogDir, e);
+ }
+ }
+
private String getDateAsStr(Date d) {
return new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(d);
}
@@ -1114,10 +1182,10 @@ public class IndexFetcher {
}
return nameVsFile.isEmpty() ? Collections.EMPTY_LIST : nameVsFile.values();
}
-
- /**
+
+ /**
* This simulates File.delete exception-wise, since this class has some strange behavior with it.
- * The only difference is it returns null on success, throws SecurityException on SecurityException,
+ * The only difference is it returns null on success, throws SecurityException on SecurityException,
* otherwise returns Throwable preventing deletion (instead of false), for additional information.
*/
static Throwable delete(File file) {
@@ -1130,7 +1198,7 @@ public class IndexFetcher {
return other;
}
}
-
+
static boolean delTree(File dir) {
try {
org.apache.lucene.util.IOUtils.rm(dir.toPath());
@@ -1159,6 +1227,20 @@ public class IndexFetcher {
return timeElapsed;
}
+ List<Map<String, Object>> getTlogFilesToDownload() {
+ //make a copy first because it can be null later
+ List<Map<String, Object>> tmp = tlogFilesToDownload;
+ //create a new instance. or else iterator may fail
+ return tmp == null ? Collections.EMPTY_LIST : new ArrayList<>(tmp);
+ }
+
+ List<Map<String, Object>> getTlogFilesDownloaded() {
+ //make a copy first because it can be null later
+ List<Map<String, Object>> tmp = tlogFilesDownloaded;
+ // NOTE: it's safe to make a copy of a SynchronizedCollection(ArrayList)
+ return tmp == null ? Collections.EMPTY_LIST : new ArrayList<>(tmp);
+ }
+
List<Map<String, Object>> getConfFilesToDownload() {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = confFilesToDownload;
@@ -1219,7 +1301,7 @@ public class IndexFetcher {
private boolean includeChecksum = true;
private String fileName;
private String saveAs;
- private boolean isConf;
+ private String solrParamOutput;
private Long indexGen;
private long size;
@@ -1230,11 +1312,11 @@ public class IndexFetcher {
private boolean aborted = false;
FileFetcher(FileInterface file, Map<String, Object> fileDetails, String saveAs,
- boolean isConf, long latestGen) throws IOException {
+ String solrParamOutput, long latestGen) throws IOException {
this.file = file;
this.fileName = (String) fileDetails.get(NAME);
this.size = (Long) fileDetails.get(SIZE);
- this.isConf = isConf;
+ this.solrParamOutput = solrParamOutput;
this.saveAs = saveAs;
indexGen = latestGen;
if (includeChecksum)
@@ -1404,12 +1486,7 @@ public class IndexFetcher {
params.set(GENERATION, Long.toString(indexGen));
params.set(CommonParams.QT, "/replication");
//add the version to download. This is used to reserve the download
- if (isConf) {
- //set cf instead of file for config file
- params.set(CONF_FILE_SHORT, fileName);
- } else {
- params.set(FILE, fileName);
- }
+ params.set(solrParamOutput, fileName);
if (useInternal) {
params.set(COMPRESSION, "true");
}
@@ -1478,8 +1555,8 @@ public class IndexFetcher {
private class DirectoryFileFetcher extends FileFetcher {
DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
- boolean isConf, long latestGen) throws IOException {
- super(new DirectoryFile(tmpIndexDir, saveAs), fileDetails, saveAs, isConf, latestGen);
+ String solrParamOutput, long latestGen) throws IOException {
+ super(new DirectoryFile(tmpIndexDir, saveAs), fileDetails, saveAs, solrParamOutput, latestGen);
}
}
@@ -1527,8 +1604,8 @@ public class IndexFetcher {
private class LocalFsFileFetcher extends FileFetcher {
LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
- boolean isConf, long latestGen) throws IOException {
- super(new LocalFsFile(dir, saveAs), fileDetails, saveAs, isConf, latestGen);
+ String solrParamOutput, long latestGen) throws IOException {
+ super(new LocalFsFile(dir, saveAs), fileDetails, saveAs, solrParamOutput, latestGen);
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Fri May 22 18:58:29 2015
@@ -81,7 +81,9 @@ import org.apache.solr.core.SolrEventLis
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.update.SolrIndexWriter;
+import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.NumberUtils;
import org.apache.solr.util.PropertiesInputStream;
@@ -121,8 +123,8 @@ public class ReplicationHandler extends
version = v;
}
/**
- * builds a CommitVersionInfo data for the specified IndexCommit.
- * Will never be null, ut version and generation may be zero if
+ * builds a CommitVersionInfo data for the specified IndexCommit.
+ * Will never be null, ut version and generation may be zero if
* there are problems extracting them from the commit data
*/
public static CommitVersionInfo build(IndexCommit commit) {
@@ -512,8 +514,11 @@ public class ReplicationHandler extends
rawParams.set(CommonParams.WT, FILE_STREAM);
String cfileName = solrParams.get(CONF_FILE_SHORT);
+ String tlogFileName = solrParams.get(TLOG_FILE);
if (cfileName != null) {
- rsp.add(FILE_STREAM, new LocalFsFileStream(solrParams));
+ rsp.add(FILE_STREAM, new LocalFsConfFileStream(solrParams));
+ } else if (tlogFileName != null) {
+ rsp.add(FILE_STREAM, new LocalFsTlogFileStream(solrParams));
} else {
rsp.add(FILE_STREAM, new DirectoryFileStream(solrParams));
}
@@ -589,6 +594,14 @@ public class ReplicationHandler extends
}
}
rsp.add(CMD_GET_FILE_LIST, result);
+
+ // fetch list of tlog files only if cdcr is activated
+ if (core.getUpdateHandler().getUpdateLog() != null && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
+ List<Map<String, Object>> tlogfiles = getTlogFileList();
+ LOG.info("Adding tlog files to list: " + tlogfiles);
+ rsp.add(TLOG_FILES, tlogfiles);
+ }
+
if (confFileNameAlias.size() < 1 || core.getCoreDescriptor().getCoreContainer().isZooKeeperAware())
return;
LOG.debug("Adding config files to list: " + includeConfFiles);
@@ -596,6 +609,19 @@ public class ReplicationHandler extends
rsp.add(CONF_FILES, getConfFileInfoFromCache(confFileNameAlias, confFileInfoCache));
}
+ List<Map<String, Object>> getTlogFileList() {
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ String[] logList = ulog.getLogList(new File(ulog.getLogDir()));
+ List<Map<String, Object>> tlogFiles = new ArrayList<>();
+ for (String fileName : logList) {
+ Map<String, Object> fileMeta = new HashMap<>();
+ fileMeta.put(NAME, fileName);
+ fileMeta.put(SIZE, new File(ulog.getLogDir(), fileName).length());
+ tlogFiles.add(fileMeta);
+ }
+ return tlogFiles;
+ }
+
/**
* For configuration files, checksum of the file is included because, unlike index files, they may have same content
* but different timestamps.
@@ -1247,11 +1273,11 @@ public class ReplicationHandler extends
***/
}
if (snapshoot) {
- try {
+ try {
int numberToKeep = numberBackupsToKeep;
if (numberToKeep < 1) {
numberToKeep = Integer.MAX_VALUE;
- }
+ }
SnapShooter snapShooter = new SnapShooter(core, null, null);
snapShooter.validateCreateSnapshot();
snapShooter.createSnapAsync(currentCommitPoint, numberToKeep, ReplicationHandler.this);
@@ -1284,6 +1310,7 @@ public class ReplicationHandler extends
protected String fileName;
protected String cfileName;
+ protected String tlogFileName;
protected String sOffset;
protected String sLen;
protected String compress;
@@ -1304,6 +1331,7 @@ public class ReplicationHandler extends
fileName = params.get(FILE);
cfileName = params.get(CONF_FILE_SHORT);
+ tlogFileName = params.get(TLOG_FILE);
sOffset = params.get(OFFSET);
sLen = params.get(LEN);
compress = params.get(COMPRESSION);
@@ -1320,7 +1348,7 @@ public class ReplicationHandler extends
protected void initWrite() throws IOException {
if (sOffset != null) offset = Long.parseLong(sOffset);
if (sLen != null) len = Integer.parseInt(sLen);
- if (fileName == null && cfileName == null) {
+ if (fileName == null && cfileName == null && tlogFileName == null) {
// no filename do nothing
writeNothingAndFlush();
}
@@ -1370,7 +1398,7 @@ public class ReplicationHandler extends
in = dir.openInput(fileName, IOContext.READONCE);
// if offset is mentioned move the pointer to that point
if (offset != -1) in.seek(offset);
-
+
long filelen = dir.fileLength(fileName);
long maxBytesBeforePause = 0;
@@ -1425,12 +1453,17 @@ public class ReplicationHandler extends
/**This is used to write files in the conf directory.
*/
- private class LocalFsFileStream extends DirectoryFileStream {
+ private abstract class LocalFsFileStream extends DirectoryFileStream {
+
+ private File file;
public LocalFsFileStream(SolrParams solrParams) {
super(solrParams);
+ this.file = this.initFile();
}
+ protected abstract File initFile();
+
@Override
public void write(OutputStream out) throws IOException {
createOutputStream(out);
@@ -1438,9 +1471,6 @@ public class ReplicationHandler extends
try {
initWrite();
- //if if is a conf file read from config directory
- File file = new File(core.getResourceLoader().getConfigDir(), cfileName);
-
if (file.exists() && file.canRead()) {
inputStream = new FileInputStream(file);
FileChannel channel = inputStream.getChannel();
@@ -1478,6 +1508,32 @@ public class ReplicationHandler extends
}
}
+ private class LocalFsTlogFileStream extends LocalFsFileStream {
+
+ public LocalFsTlogFileStream(SolrParams solrParams) {
+ super(solrParams);
+ }
+
+ protected File initFile() {
+ //if it is a tlog file read from tlog directory
+ return new File(core.getUpdateHandler().getUpdateLog().getLogDir(), tlogFileName);
+ }
+
+ }
+
+ private class LocalFsConfFileStream extends LocalFsFileStream {
+
+ public LocalFsConfFileStream(SolrParams solrParams) {
+ super(solrParams);
+ }
+
+ protected File initFile() {
+ //if it is a conf file read from config directory
+ return new File(core.getResourceLoader().getConfigDir(), cfileName);
+ }
+
+ }
+
static Integer readInterval(String interval) {
if (interval == null)
return null;
@@ -1568,6 +1624,8 @@ public class ReplicationHandler extends
public static final String CONF_FILE_SHORT = "cf";
+ public static final String TLOG_FILE = "tlogFile";
+
public static final String CHECKSUM = "checksum";
public static final String ALIAS = "alias";
@@ -1576,6 +1634,8 @@ public class ReplicationHandler extends
public static final String CONF_FILES = "confFiles";
+ public static final String TLOG_FILES = "tlogFiles";
+
public static final String REPLICATE_AFTER = "replicateAfter";
public static final String FILE_STREAM = "filestream";
@@ -1601,15 +1661,15 @@ public class ReplicationHandler extends
public static final String OK_STATUS = "OK";
public static final String NEXT_EXECUTION_AT = "nextExecutionAt";
-
+
public static final String NUMBER_BACKUPS_TO_KEEP_REQUEST_PARAM = "numberToKeep";
-
+
public static final String NUMBER_BACKUPS_TO_KEEP_INIT_PARAM = "maxNumberOfBackups";
- /**
- * Boolean param for tests that can be specified when using
- * {@link #CMD_FETCH_INDEX} to force the current request to block until
- * the fetch is complete. <b>NOTE:</b> This param is not advised for
+ /**
+ * Boolean param for tests that can be specified when using
+ * {@link #CMD_FETCH_INDEX} to force the current request to block until
+ * the fetch is complete. <b>NOTE:</b> This param is not advised for
* non-test code, since the the duration of the fetch for non-trivial
* indexes will likeley cause the request to time out.
*
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java Fri May 22 18:58:29 2015
@@ -0,0 +1,249 @@
+package org.apache.solr.update;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.Channels;
+import java.nio.file.Files;
+import java.util.Collection;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.FastOutputStream;
+import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+
+/**
+ * Extends {@link org.apache.solr.update.TransactionLog} to:
+ * <ul>
+ * <li>reopen automatically the output stream if its reference count reached 0. This is achieved by extending
+ * methods {@link #incref()}, {@link #close()} and {@link #reopenOutputStream()}.</li>
+ * <li>encode the number of records in the tlog file in the last commit record. The number of records will be
+ * decoded and reuse if the tlog file is reopened. This is achieved by extending the constructor, and the
+ * methods {@link #writeCommit(CommitUpdateCommand, int)} and {@link #getReader(long)}.</li>
+ * </ul>
+ */
+public class CdcrTransactionLog extends TransactionLog {
+
+ private boolean isReplaying;
+ long startVersion; // (absolute) version of the first element of this transaction log
+
+ CdcrTransactionLog(File tlogFile, Collection<String> globalStrings) {
+ super(tlogFile, globalStrings);
+
+ // The starting version number will be used to seek more efficiently tlogs
+ String filename = tlogFile.getName();
+ startVersion = Math.abs(Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1)));
+
+ isReplaying = false;
+ }
+
+ CdcrTransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) {
+ super(tlogFile, globalStrings, openExisting);
+
+ // The starting version number will be used to seek more efficiently tlogs
+ String filename = tlogFile.getName();
+ startVersion = Math.abs(Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1)));
+
+ numRecords = openExisting ? this.readNumRecords() : 0;
+ // if we try to reopen an existing tlog file and that the number of records is equal to 0, then we are replaying
+ // the log and we will append a commit
+ if (openExisting && numRecords == 0) {
+ isReplaying = true;
+ }
+ }
+
+ /**
+ * Returns the number of records in the log (currently includes the header and an optional commit).
+ */
+ public int numRecords() {
+ return super.numRecords();
+ }
+
+ /**
+ * The last record of the transaction log file is expected to be a commit with a 4 byte integer that encodes the
+ * number of records in the file.
+ */
+ private int readNumRecords() {
+ try {
+ if (endsWithCommit()) {
+ long size = fos.size();
+ // 4 bytes for the record size, the lenght of the end message + 1 byte for its value tag,
+ // and 4 bytes for the number of records
+ long pos = size - 4 - END_MESSAGE.length() - 1 - 4;
+ if (pos < 0) return 0;
+
+ ChannelFastInputStream is = new ChannelFastInputStream(channel, pos);
+ return is.readInt();
+ }
+ } catch (IOException e) {
+ log.error("Error while reading number of records in tlog " + this, e);
+ }
+ return 0;
+ }
+
+ @Override
+ public long writeCommit(CommitUpdateCommand cmd, int flags) {
+ LogCodec codec = new LogCodec(resolver);
+ synchronized (this) {
+ try {
+ long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
+
+ if (pos == 0) {
+ writeLogHeader(codec);
+ pos = fos.size();
+ }
+ codec.init(fos);
+ codec.writeTag(JavaBinCodec.ARR, 4);
+ codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte
+ codec.writeLong(cmd.getVersion());
+ codec.writeTag(JavaBinCodec.INT); // Enforce the encoding of a plain integer, to simplify decoding
+ fos.writeInt(numRecords + 1); // the number of records in the file - +1 to account for the commit operation being written
+ codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file
+
+ endRecord(pos);
+
+ fos.flush(); // flush since this will be the last record in a log fill
+ assert fos.size() == channel.size();
+
+ isReplaying = false; // we have replayed and appended a commit record with the number of records in the file
+
+ return pos;
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+ }
+
+ /**
+ * Returns a reader that can be used while a log is still in use.
+ * Currently only *one* LogReader may be outstanding, and that log may only
+ * be used from a single thread.
+ */
+ @Override
+ public LogReader getReader(long startingPos) {
+ return new CdcrLogReader(startingPos);
+ }
+
+ public class CdcrLogReader extends LogReader {
+
+ private int numRecords = 1; // start at 1 to account for the header record
+
+ public CdcrLogReader(long startingPos) {
+ super(startingPos);
+ }
+
+ @Override
+ public Object next() throws IOException, InterruptedException {
+ Object o = super.next();
+ if (o != null) {
+ this.numRecords++;
+ // We are replaying the log. We need to update the number of records for the writeCommit.
+ if (isReplaying) {
+ synchronized (CdcrTransactionLog.this) {
+ CdcrTransactionLog.this.numRecords = this.numRecords;
+ }
+ }
+ }
+ return o;
+ }
+
+ }
+
+ @Override
+ public void incref() {
+ // if the refcount is 0, we need to reopen the output stream
+ if (refcount.getAndIncrement() == 0) {
+ reopenOutputStream(); // synchronised with this
+ }
+ }
+
+ /**
+ * Modified to act like {@link #incref()} in order to be compatible with {@link UpdateLog#recoverFromLog()}.
+ * Otherwise, we would have to duplicate the method {@link UpdateLog#recoverFromLog()} in
+ * {@link org.apache.solr.update.CdcrUpdateLog} and change the call
+ * {@code if (!ll.try_incref()) continue; } to {@code incref(); }.
+ */
+ @Override
+ public boolean try_incref() {
+ this.incref();
+ return true;
+ }
+
+ @Override
+ protected void close() {
+ try {
+ if (debug) {
+ log.debug("Closing tlog" + this);
+ }
+
+ synchronized (this) {
+ if (fos != null) {
+ fos.flush();
+ fos.close();
+
+ // dereference these variables for GC
+ fos = null;
+ os = null;
+ channel = null;
+ raf = null;
+ }
+ }
+
+ if (deleteOnClose) {
+ try {
+ Files.deleteIfExists(tlogFile.toPath());
+ } catch (IOException e) {
+ // TODO: should this class care if a file couldnt be deleted?
+ // this just emulates previous behavior, where only SecurityException would be handled.
+ }
+ }
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ } finally {
+ assert ObjectReleaseTracker.release(this);
+ }
+ }
+
+ /**
+ * Re-open the output stream of the tlog and position
+ * the file pointer at the end of the file. It assumes
+ * that the tlog is non-empty and that the tlog's header
+ * has been already read.
+ */
+ synchronized void reopenOutputStream() {
+ try {
+ if (debug) {
+ log.debug("Re-opening tlog's output stream: " + this);
+ }
+
+ raf = new RandomAccessFile(this.tlogFile, "rw");
+ channel = raf.getChannel();
+ long start = raf.length();
+ raf.seek(start);
+ os = Channels.newOutputStream(channel);
+ fos = new FastOutputStream(os, new byte[65536], 0);
+ fos.setWritten(start); // reflect that we aren't starting at the beginning
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+}
+