You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/05/22 20:58:29 UTC

svn commit: r1681186 [2/5] - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/processor/ core/src/java/org/apache/solr/util/ core/src/test-files/solr/collect...

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java Fri May 22 18:58:29 2015
@@ -0,0 +1,615 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.PluginBag;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.CdcrUpdateLog;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.plugin.SolrCoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ * This request handler implements the CDCR API and is responsible of the execution of the
+ * {@link CdcrReplicator} threads.
+ * </p>
+ * <p>
+ * It relies on three classes, {@link org.apache.solr.handler.CdcrLeaderStateManager},
+ * {@link org.apache.solr.handler.CdcrBufferStateManager} and {@link org.apache.solr.handler.CdcrProcessStateManager}
+ * to synchronise the state of the CDCR across all the nodes.
+ * </p>
+ * <p>
+ * The CDCR process can be either {@link org.apache.solr.handler.CdcrParams.ProcessState#STOPPED} or {@link org.apache.solr.handler.CdcrParams.ProcessState#STARTED} by using the
+ * actions {@link org.apache.solr.handler.CdcrParams.CdcrAction#STOP} and {@link org.apache.solr.handler.CdcrParams.CdcrAction#START} respectively. If a node is leader and the process
+ * state is {@link org.apache.solr.handler.CdcrParams.ProcessState#STARTED}, the {@link CdcrReplicatorManager} will
+ * start the {@link CdcrReplicator} threads. If a node becomes non-leader or if the process state becomes
+ * {@link org.apache.solr.handler.CdcrParams.ProcessState#STOPPED}, the {@link CdcrReplicator} threads are stopped.
+ * </p>
+ * <p>
+ * The CDCR can be switched to a "buffering" mode, in which the update log will never delete old transaction log
+ * files. Such a mode can be enabled or disabled using the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#ENABLEBUFFER} and
+ * {@link org.apache.solr.handler.CdcrParams.CdcrAction#DISABLEBUFFER} respectively.
+ * </p>
+ * <p>
+ * Known limitations: The source and target clusters must have the same topology. Replication between clusters
+ * with a different number of shards will likely results in an inconsistent index.
+ * </p>
+ */
+public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAware {
+
+  protected static Logger log = LoggerFactory.getLogger(CdcrRequestHandler.class);
+
+  private SolrCore core;
+  private String collection;
+  private String path;
+
+  private SolrParams updateLogSynchronizerConfiguration;
+  private SolrParams replicatorConfiguration;
+  private SolrParams bufferConfiguration;
+  private Map<String, List<SolrParams>> replicasConfiguration;
+
+  private CdcrProcessStateManager processStateManager;
+  private CdcrBufferStateManager bufferStateManager;
+  private CdcrReplicatorManager replicatorManager;
+  private CdcrLeaderStateManager leaderStateManager;
+  private CdcrUpdateLogSynchronizer updateLogSynchronizer;
+  private CdcrBufferManager bufferManager;
+
+  @Override
+  public void init(NamedList args) {
+    super.init(args);
+
+    if (args != null) {
+      // Configuration of the Update Log Synchronizer
+      Object updateLogSynchonizerParam = args.get(CdcrParams.UPDATE_LOG_SYNCHRONIZER_PARAM);
+      if (updateLogSynchonizerParam != null && updateLogSynchonizerParam instanceof NamedList) {
+        updateLogSynchronizerConfiguration = SolrParams.toSolrParams((NamedList) updateLogSynchonizerParam);
+      }
+
+      // Configuration of the Replicator
+      Object replicatorParam = args.get(CdcrParams.REPLICATOR_PARAM);
+      if (replicatorParam != null && replicatorParam instanceof NamedList) {
+        replicatorConfiguration = SolrParams.toSolrParams((NamedList) replicatorParam);
+      }
+
+      // Configuration of the Buffer
+      Object bufferParam = args.get(CdcrParams.BUFFER_PARAM);
+      if (bufferParam != null && bufferParam instanceof NamedList) {
+        bufferConfiguration = SolrParams.toSolrParams((NamedList) bufferParam);
+      }
+
+      // Configuration of the Replicas
+      replicasConfiguration = new HashMap<>();
+      List replicas = args.getAll(CdcrParams.REPLICA_PARAM);
+      for (Object replica : replicas) {
+        if (replicas != null && replica instanceof NamedList) {
+          SolrParams params = SolrParams.toSolrParams((NamedList) replica);
+          if (!replicasConfiguration.containsKey(params.get(CdcrParams.SOURCE_COLLECTION_PARAM))) {
+            replicasConfiguration.put(params.get(CdcrParams.SOURCE_COLLECTION_PARAM), new ArrayList<SolrParams>());
+          }
+          replicasConfiguration.get(params.get(CdcrParams.SOURCE_COLLECTION_PARAM)).add(params);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
+    // Pick the action
+    SolrParams params = req.getParams();
+    CdcrParams.CdcrAction action = null;
+    String a = params.get(CommonParams.ACTION);
+    if (a != null) {
+      action = CdcrParams.CdcrAction.get(a);
+    }
+    if (action == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
+    }
+
+    switch (action) {
+      case START: {
+        this.handleStartAction(req, rsp);
+        break;
+      }
+      case STOP: {
+        this.handleStopAction(req, rsp);
+        break;
+      }
+      case STATUS: {
+        this.handleStatusAction(req, rsp);
+        break;
+      }
+      case COLLECTIONCHECKPOINT: {
+        this.handleCollectionCheckpointAction(req, rsp);
+        break;
+      }
+      case SHARDCHECKPOINT: {
+        this.handleShardCheckpointAction(req, rsp);
+        break;
+      }
+      case ENABLEBUFFER: {
+        this.handleEnableBufferAction(req, rsp);
+        break;
+      }
+      case DISABLEBUFFER: {
+        this.handleDisableBufferAction(req, rsp);
+        break;
+      }
+      case LASTPROCESSEDVERSION: {
+        this.handleLastProcessedVersionAction(req, rsp);
+        break;
+      }
+      case QUEUES: {
+        this.handleQueuesAction(req, rsp);
+        break;
+      }
+      case OPS: {
+        this.handleOpsAction(req, rsp);
+        break;
+      }
+      case ERRORS: {
+        this.handleErrorsAction(req, rsp);
+        break;
+      }
+      default: {
+        throw new RuntimeException("Unknown action: " + action);
+      }
+    }
+
+    rsp.setHttpCaching(false);
+  }
+
+  @Override
+  public void inform(SolrCore core) {
+    this.core = core;
+    collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+
+    // Make sure that the core is ZKAware
+    if (!core.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Solr instance is not running in SolrCloud mode.");
+    }
+
+    // Make sure that the core is using the CdcrUpdateLog implementation
+    if (!(core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog)) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Solr instance is not configured with the cdcr update log.");
+    }
+
+    // Find the registered path of the handler
+    path = null;
+    for (Map.Entry<String, PluginBag.PluginHolder<SolrRequestHandler>> entry : core.getRequestHandlers().getRegistry().entrySet()) {
+      if (core.getRequestHandlers().isLoaded(entry.getKey()) && entry.getValue().get() == this) {
+        path = entry.getKey();
+        break;
+      }
+    }
+    if (path == null) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "The CdcrRequestHandler is not registered with the current core.");
+    }
+    if (!path.startsWith("/")) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "The CdcrRequestHandler needs to be registered to a path. Typically this is '/cdcr'");
+    }
+
+    // Initialisation phase
+    // If the Solr cloud is being initialised, each CDCR node will start up in its default state, i.e., STOPPED
+    // and non-leader. The leader state will be updated later, when all the Solr cores have been loaded.
+    // If the Solr cloud has already been initialised, and the core is reloaded (i.e., because a node died or a new node
+    // is added to the cluster), the CDCR node will synchronise its state with the global CDCR state that is stored
+    // in zookeeper.
+
+    // Initialise the buffer state manager
+    bufferStateManager = new CdcrBufferStateManager(core, bufferConfiguration);
+    // Initialise the process state manager
+    processStateManager = new CdcrProcessStateManager(core);
+    // Initialise the leader state manager
+    leaderStateManager = new CdcrLeaderStateManager(core);
+
+    // Initialise the replicator states manager
+    replicatorManager = new CdcrReplicatorManager(core, path, replicatorConfiguration, replicasConfiguration);
+    replicatorManager.setProcessStateManager(processStateManager);
+    replicatorManager.setLeaderStateManager(leaderStateManager);
+    // we need to inform it of a state event since the process and leader state
+    // may have been synchronised during the initialisation
+    replicatorManager.stateUpdate();
+
+    // Initialise the update log synchronizer
+    updateLogSynchronizer = new CdcrUpdateLogSynchronizer(core, path, updateLogSynchronizerConfiguration);
+    updateLogSynchronizer.setLeaderStateManager(leaderStateManager);
+    // we need to inform it of a state event since the leader state
+    // may have been synchronised during the initialisation
+    updateLogSynchronizer.stateUpdate();
+
+    // Initialise the buffer manager
+    bufferManager = new CdcrBufferManager(core);
+    bufferManager.setLeaderStateManager(leaderStateManager);
+    bufferManager.setBufferStateManager(bufferStateManager);
+    // we need to inform it of a state event since the leader state
+    // may have been synchronised during the initialisation
+    bufferManager.stateUpdate();
+
+    // register the close hook
+    this.registerCloseHook(core);
+  }
+
+  /**
+   * register a close hook to properly shutdown the state manager and scheduler
+   */
+  private void registerCloseHook(SolrCore core) {
+    core.addCloseHook(new CloseHook() {
+
+      @Override
+      public void preClose(SolrCore core) {
+        String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+        String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+        log.info("Solr core is being closed - shutting down CDCR handler @ {}:{}", collectionName, shard);
+
+        updateLogSynchronizer.shutdown();
+        replicatorManager.shutdown();
+        bufferStateManager.shutdown();
+        processStateManager.shutdown();
+        leaderStateManager.shutdown();
+      }
+
+      @Override
+      public void postClose(SolrCore core) {
+      }
+
+    });
+  }
+
+  /**
+   * <p>
+   * Update and synchronize the process state.
+   * </p>
+   * <p>
+   * The process state manager must notify the replicator states manager of the change of state.
+   * </p>
+   */
+  private void handleStartAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+    if (processStateManager.getState() == CdcrParams.ProcessState.STOPPED) {
+      processStateManager.setState(CdcrParams.ProcessState.STARTED);
+      processStateManager.synchronize();
+    }
+
+    rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
+  }
+
+  private void handleStopAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+    if (processStateManager.getState() == CdcrParams.ProcessState.STARTED) {
+      processStateManager.setState(CdcrParams.ProcessState.STOPPED);
+      processStateManager.synchronize();
+    }
+
+    rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
+  }
+
+  private void handleStatusAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+    rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
+  }
+
+  private NamedList getStatus() {
+    NamedList status = new NamedList();
+    status.add(CdcrParams.ProcessState.getParam(), processStateManager.getState().toLower());
+    status.add(CdcrParams.BufferState.getParam(), bufferStateManager.getState().toLower());
+    return status;
+  }
+
+  /**
+   * This action is generally executed on the target cluster in order to retrieve the latest update checkpoint.
+   * This checkpoint is used on the source cluster to setup the
+   * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader} of a shard leader. <br/>
+   * This method will execute in parallel one
+   * {@link org.apache.solr.handler.CdcrParams.CdcrAction#SHARDCHECKPOINT} request per shard leader. It will
+   * then pick the lowest version number as checkpoint. Picking the lowest amongst all shards will ensure that we do not
+   * pick a checkpoint that is ahead of the source cluster. This can occur when other shard leaders are sending new
+   * updates to the target cluster while we are currently instantiating the
+   * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}.
+   * This solution only works in scenarios where the topology of the source and target clusters are identical.
+   */
+  private void handleCollectionCheckpointAction(SolrQueryRequest req, SolrQueryResponse rsp)
+      throws IOException, SolrServerException {
+    ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
+    try {
+      zkController.getZkStateReader().updateClusterState(true);
+    } catch (Exception e) {
+      log.warn("Error when updating cluster state", e);
+    }
+    ClusterState cstate = zkController.getClusterState();
+    Collection<Slice> shards = cstate.getActiveSlices(collection);
+
+    ExecutorService parallelExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new DefaultSolrThreadFactory("parallelCdcrExecutor"));
+
+    long checkpoint = Long.MAX_VALUE;
+    try {
+      List<Callable<Long>> callables = new ArrayList<>();
+      for (Slice shard : shards) {
+        ZkNodeProps leaderProps = zkController.getZkStateReader().getLeaderRetry(collection, shard.getName());
+        ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
+        callables.add(new SliceCheckpointCallable(nodeProps.getCoreUrl(), path));
+      }
+
+      for (final Future<Long> future : parallelExecutor.invokeAll(callables)) {
+        long version = future.get();
+        if (version < checkpoint) { // we must take the lowest checkpoint from all the shards
+          checkpoint = version;
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Error while requesting shard's checkpoints", e);
+    } catch (ExecutionException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Error while requesting shard's checkpoints", e);
+    } finally {
+      parallelExecutor.shutdown();
+    }
+
+    rsp.add(CdcrParams.CHECKPOINT, checkpoint);
+  }
+
+  /**
+   * Retrieve the version number of the latest entry of the {@link org.apache.solr.update.UpdateLog}.
+   */
+  private void handleShardCheckpointAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+    if (!leaderStateManager.amILeader()) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
+          "' sent to non-leader replica");
+    }
+
+    UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+    UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
+    List<Long> versions = recentUpdates.getVersions(1);
+    long lastVersion = versions.isEmpty() ? -1 : Math.abs(versions.get(0));
+    rsp.add(CdcrParams.CHECKPOINT, lastVersion);
+    recentUpdates.close();
+  }
+
+  private void handleEnableBufferAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+    if (bufferStateManager.getState() == CdcrParams.BufferState.DISABLED) {
+      bufferStateManager.setState(CdcrParams.BufferState.ENABLED);
+      bufferStateManager.synchronize();
+    }
+
+    rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
+  }
+
+  private void handleDisableBufferAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+    if (bufferStateManager.getState() == CdcrParams.BufferState.ENABLED) {
+      bufferStateManager.setState(CdcrParams.BufferState.DISABLED);
+      bufferStateManager.synchronize();
+    }
+
+    rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
+  }
+
+  /**
+   * <p>
+   * We have to take care of four cases:
+   * <ul>
+   * <li>Replication & Buffering</li>
+   * <li>Replication & No Buffering</li>
+   * <li>No Replication & Buffering</li>
+   * <li>No Replication & No Buffering</li>
+   * </ul>
+   * In the first three cases, at least one log reader should have been initialised. We should take the lowest
+   * last processed version across all the initialised readers. In the last case, there isn't a log reader
+   * initialised. We should instantiate one and get the version of the first entries.
+   * </p>
+   */
+  private void handleLastProcessedVersionAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+    String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+    String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+
+    if (!leaderStateManager.amILeader()) {
+      log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.LASTPROCESSEDVERSION, collectionName, shard);
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.LASTPROCESSEDVERSION +
+          " sent to non-leader replica");
+    }
+
+    // take care of the first three cases
+    // first check the log readers from the replicator states
+    long lastProcessedVersion = Long.MAX_VALUE;
+    for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
+      long version = Long.MAX_VALUE;
+      if (state.getLogReader() != null) {
+        version = state.getLogReader().getLastVersion();
+      }
+      lastProcessedVersion = Math.min(lastProcessedVersion, version);
+    }
+
+    // next check the log reader of the buffer
+    CdcrUpdateLog.CdcrLogReader bufferLogReader = ((CdcrUpdateLog) core.getUpdateHandler().getUpdateLog()).getBufferToggle();
+    if (bufferLogReader != null) {
+      lastProcessedVersion = Math.min(lastProcessedVersion, bufferLogReader.getLastVersion());
+    }
+
+    // the fourth case: no cdc replication, no buffering: all readers were null
+    if (processStateManager.getState().equals(CdcrParams.ProcessState.STOPPED) &&
+        bufferStateManager.getState().equals(CdcrParams.BufferState.DISABLED)) {
+      CdcrUpdateLog.CdcrLogReader logReader = ((CdcrUpdateLog) core.getUpdateHandler().getUpdateLog()).newLogReader();
+      try {
+        // let the reader initialize lastVersion
+        logReader.next();
+        lastProcessedVersion = Math.min(lastProcessedVersion, logReader.getLastVersion());
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Error while fetching the last processed version", e);
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Error while fetching the last processed version", e);
+      } finally {
+        logReader.close();
+      }
+    }
+
+    log.info("Returning the lowest last processed version {}  @ {}:{}", lastProcessedVersion, collectionName, shard);
+    rsp.add(CdcrParams.LAST_PROCESSED_VERSION, lastProcessedVersion);
+  }
+
+  private void handleQueuesAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+    NamedList hosts = new NamedList();
+
+    for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
+      NamedList queueStats = new NamedList();
+
+      CdcrUpdateLog.CdcrLogReader logReader = state.getLogReader();
+      if (logReader == null) {
+        String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
+        String shard = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
+        log.warn("The log reader for target collection {} is not initialised @ {}:{}",
+            state.getTargetCollection(), collectionName, shard);
+        queueStats.add(CdcrParams.QUEUE_SIZE, -1l);
+      } else {
+        queueStats.add(CdcrParams.QUEUE_SIZE, logReader.getNumberOfRemainingRecords());
+      }
+      queueStats.add(CdcrParams.LAST_TIMESTAMP, state.getTimestampOfLastProcessedOperation());
+
+      if (hosts.get(state.getZkHost()) == null) {
+        hosts.add(state.getZkHost(), new NamedList());
+      }
+      ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), queueStats);
+    }
+
+    rsp.add(CdcrParams.QUEUES, hosts);
+    UpdateLog updateLog = core.getUpdateHandler().getUpdateLog();
+    rsp.add(CdcrParams.TLOG_TOTAL_SIZE, updateLog.getTotalLogsSize());
+    rsp.add(CdcrParams.TLOG_TOTAL_COUNT, updateLog.getTotalLogsNumber());
+    rsp.add(CdcrParams.UPDATE_LOG_SYNCHRONIZER,
+        updateLogSynchronizer.isStarted() ? CdcrParams.ProcessState.STARTED.toLower() : CdcrParams.ProcessState.STOPPED.toLower());
+  }
+
+  private void handleOpsAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+    NamedList hosts = new NamedList();
+
+    for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
+      NamedList ops = new NamedList();
+      ops.add(CdcrParams.COUNTER_ALL, state.getBenchmarkTimer().getOperationsPerSecond());
+      ops.add(CdcrParams.COUNTER_ADDS, state.getBenchmarkTimer().getAddsPerSecond());
+      ops.add(CdcrParams.COUNTER_DELETES, state.getBenchmarkTimer().getDeletesPerSecond());
+
+      if (hosts.get(state.getZkHost()) == null) {
+        hosts.add(state.getZkHost(), new NamedList());
+      }
+      ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), ops);
+    }
+
+    rsp.add(CdcrParams.OPERATIONS_PER_SECOND, hosts);
+  }
+
+  private void handleErrorsAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+    NamedList hosts = new NamedList();
+
+    for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
+      NamedList errors = new NamedList();
+
+      errors.add(CdcrParams.CONSECUTIVE_ERRORS, state.getConsecutiveErrors());
+      errors.add(CdcrReplicatorState.ErrorType.BAD_REQUEST.toLower(), state.getErrorCount(CdcrReplicatorState.ErrorType.BAD_REQUEST));
+      errors.add(CdcrReplicatorState.ErrorType.INTERNAL.toLower(), state.getErrorCount(CdcrReplicatorState.ErrorType.INTERNAL));
+
+      NamedList lastErrors = new NamedList();
+      for (String[] lastError : state.getLastErrors()) {
+        lastErrors.add(lastError[0], lastError[1]);
+      }
+      errors.add(CdcrParams.LAST, lastErrors);
+
+      if (hosts.get(state.getZkHost()) == null) {
+        hosts.add(state.getZkHost(), new NamedList());
+      }
+      ((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), errors);
+    }
+
+    rsp.add(CdcrParams.ERRORS, hosts);
+  }
+
+  @Override
+  public String getDescription() {
+    return "Manage Cross Data Center Replication";
+  }
+
+  /**
+   * A thread subclass for executing a single
+   * {@link org.apache.solr.handler.CdcrParams.CdcrAction#SHARDCHECKPOINT} action.
+   */
+  private static final class SliceCheckpointCallable implements Callable<Long> {
+
+    final String baseUrl;
+    final String cdcrPath;
+
+    SliceCheckpointCallable(final String baseUrl, final String cdcrPath) {
+      this.baseUrl = baseUrl;
+      this.cdcrPath = cdcrPath;
+    }
+
+    @Override
+    public Long call() throws Exception {
+      HttpSolrClient server = new HttpSolrClient(baseUrl);
+      try {
+        server.setConnectionTimeout(15000);
+        server.setSoTimeout(60000);
+
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CommonParams.ACTION, CdcrParams.CdcrAction.SHARDCHECKPOINT.toString());
+
+        SolrRequest request = new QueryRequest(params);
+        request.setPath(cdcrPath);
+
+        NamedList response = server.request(request);
+        return (Long) response.get(CdcrParams.CHECKPOINT);
+      } finally {
+        server.close();
+      }
+    }
+
+  }
+
+}
+

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java Fri May 22 18:58:29 2015
@@ -0,0 +1,48 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A state manager which implements an observer pattern to notify observers
+ * of a state change.
+ */
+abstract class CdcrStateManager {
+
+  private List<CdcrStateObserver> observers = new ArrayList<>();
+
+  void register(CdcrStateObserver observer) {
+    this.observers.add(observer);
+  }
+
+  void callback() {
+    for (CdcrStateObserver observer : observers) {
+      observer.stateUpdate();
+    }
+  }
+
+  static interface CdcrStateObserver {
+
+    public void stateUpdate();
+
+  }
+
+}
+

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java Fri May 22 18:58:29 2015
@@ -0,0 +1,183 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.CdcrUpdateLog;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ * Synchronize periodically the update log of non-leader nodes with their leaders.
+ * </p>
+ * <p>
+ * Non-leader nodes must always buffer updates in case of leader failures. They have to periodically
+ * synchronize their update logs with their leader to remove old transaction logs that will never be used anymore.
+ * This is performed by a background thread that is scheduled with a fixed delay. The background thread is sending
+ * the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#LASTPROCESSEDVERSION} to the leader to retrieve
+ * the lowest last version number processed. This version is then used to move forward the buffer log reader.
+ * </p>
+ */
+class CdcrUpdateLogSynchronizer implements CdcrStateManager.CdcrStateObserver {
+
+  private CdcrLeaderStateManager leaderStateManager;
+  private ScheduledExecutorService scheduler;
+
+  private final SolrCore core;
+  private final String collection;
+  private final String shardId;
+  private final String path;
+
+  private int timeSchedule = DEFAULT_TIME_SCHEDULE;
+
+  private static final int DEFAULT_TIME_SCHEDULE = 60000;  // by default, every minute
+
+  protected static Logger log = LoggerFactory.getLogger(CdcrUpdateLogSynchronizer.class);
+
+  CdcrUpdateLogSynchronizer(SolrCore core, String path, SolrParams updateLogSynchonizerConfiguration) {
+    this.core = core;
+    this.path = path;
+    this.collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+    this.shardId = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+    if (updateLogSynchonizerConfiguration != null) {
+      this.timeSchedule = updateLogSynchonizerConfiguration.getInt(CdcrParams.SCHEDULE_PARAM, DEFAULT_TIME_SCHEDULE);
+    }
+  }
+
+  void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
+    this.leaderStateManager = leaderStateManager;
+    this.leaderStateManager.register(this);
+  }
+
+  @Override
+  public void stateUpdate() {
+    // If I am not the leader, I need to synchronise periodically my update log with my leader.
+    if (!leaderStateManager.amILeader()) {
+      scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-update-log-synchronizer"));
+      scheduler.scheduleWithFixedDelay(new UpdateLogSynchronisation(), 0, timeSchedule, TimeUnit.MILLISECONDS);
+      return;
+    }
+
+    this.shutdown();
+  }
+
+  boolean isStarted() {
+    return scheduler != null;
+  }
+
+  void shutdown() {
+    if (scheduler != null) {
+      scheduler.shutdownNow();
+      scheduler = null;
+    }
+  }
+
+  private class UpdateLogSynchronisation implements Runnable {
+
+    private String getLeaderUrl() {
+      ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
+      ClusterState cstate = zkController.getClusterState();
+      ZkNodeProps leaderProps = cstate.getLeader(collection, shardId);
+      if (leaderProps == null) { // we might not have a leader yet, returns null
+        return null;
+      }
+      ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
+      return nodeProps.getCoreUrl();
+    }
+
+    @Override
+    public void run() {
+      try {
+        String leaderUrl = getLeaderUrl();
+        if (leaderUrl == null) { // we might not have a leader yet, stop and try again later
+          return;
+        }
+
+        HttpSolrClient server = new HttpSolrClient(leaderUrl);
+        server.setConnectionTimeout(15000);
+        server.setSoTimeout(60000);
+
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CommonParams.ACTION, CdcrParams.CdcrAction.LASTPROCESSEDVERSION.toString());
+
+        SolrRequest request = new QueryRequest(params);
+        request.setPath(path);
+
+        long lastVersion;
+        try {
+          NamedList response = server.request(request);
+          lastVersion = (Long) response.get(CdcrParams.LAST_PROCESSED_VERSION);
+          log.debug("My leader {} says its last processed _version_ number is: {}. I am {}", leaderUrl, lastVersion,
+              core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
+        } catch (IOException | SolrServerException e) {
+          log.warn("Couldn't get last processed version from leader {}: {}", leaderUrl, e.getMessage());
+          return;
+        } finally {
+          try {
+            server.close();
+          } catch (IOException ioe) {
+            log.warn("Caught exception trying to close server: ", leaderUrl, ioe.getMessage());
+          }
+        }
+
+        // if we received -1, it means that the log reader on the leader has not yet started to read log entries
+        // do nothing
+        if (lastVersion == -1) {
+          return;
+        }
+
+        try {
+          CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
+          if (ulog.isBuffering()) {
+            log.debug("Advancing replica buffering tlog reader to {} @ {}:{}", lastVersion, collection, shardId);
+            ulog.getBufferToggle().seek(lastVersion);
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.warn("Couldn't advance replica buffering tlog reader to {} (to remove old tlogs): {}", lastVersion, e.getMessage());
+        } catch (IOException e) {
+          log.warn("Couldn't advance replica buffering tlog reader to {} (to remove old tlogs): {}", lastVersion, e.getMessage());
+        }
+      } catch (Throwable e) {
+        log.warn("Caught unexpected exception", e);
+        throw e;
+      }
+    }
+  }
+
+}
+

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java Fri May 22 18:58:29 2015
@@ -29,7 +29,6 @@ import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
-import java.nio.file.Paths;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -46,7 +45,6 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.zip.Adler32;
@@ -78,11 +76,12 @@ import org.apache.solr.core.DirectoryFac
 import org.apache.solr.core.DirectoryFactory.DirContext;
 import org.apache.solr.core.IndexDeletionPolicyWrapper;
 import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.ReplicationHandler.FileInfo;
+import org.apache.solr.handler.ReplicationHandler.*;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.UpdateLog;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.FileUtils;
 import org.apache.solr.util.PropertiesInputStream;
@@ -111,6 +110,8 @@ import static org.apache.solr.handler.Re
 import static org.apache.solr.handler.ReplicationHandler.MASTER_URL;
 import static org.apache.solr.handler.ReplicationHandler.OFFSET;
 import static org.apache.solr.handler.ReplicationHandler.SIZE;
+import static org.apache.solr.handler.ReplicationHandler.TLOG_FILE;
+import static org.apache.solr.handler.ReplicationHandler.TLOG_FILES;
 
 /**
  * <p> Provides functionality of downloading changed index files as well as config files and a timer for scheduling fetches from the
@@ -138,14 +139,18 @@ public class IndexFetcher {
 
   private volatile List<Map<String, Object>> confFilesToDownload;
 
+  private volatile List<Map<String, Object>> tlogFilesToDownload;
+
   private volatile List<Map<String, Object>> filesDownloaded;
 
   private volatile List<Map<String, Object>> confFilesDownloaded;
 
+  private volatile List<Map<String, Object>> tlogFilesDownloaded;
+
   private volatile Map<String, Object> currentFile;
 
   private volatile DirectoryFileFetcher dirFileFetcher;
-  
+
   private volatile LocalFsFileFetcher localFileFetcher;
 
   private volatile ExecutorService fsyncService;
@@ -180,7 +185,7 @@ public class IndexFetcher {
       LOG.warn("'masterUrl' must be specified without the /replication suffix");
     }
     this.masterUrl = masterUrl;
-    
+
     this.replicationHandler = handler;
     String compress = (String) initArgs.get(COMPRESSION);
     useInternal = INTERNAL.equals(compress);
@@ -207,7 +212,7 @@ public class IndexFetcher {
     try (HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient)) {
       client.setSoTimeout(60000);
       client.setConnectionTimeout(15000);
-      
+
       return client.request(req);
     } catch (SolrServerException e) {
       throw new SolrException(ErrorCode.SERVER_ERROR, e.getMessage(), e);
@@ -243,6 +248,10 @@ public class IndexFetcher {
       if (files != null)
         confFilesToDownload = Collections.synchronizedList(files);
 
+      files = (List<Map<String, Object>>) response.get(TLOG_FILES);
+      if (files != null) {
+        tlogFilesToDownload = Collections.synchronizedList(files);
+      }
     } catch (SolrServerException e) {
       throw new IOException(e);
     }
@@ -251,18 +260,18 @@ public class IndexFetcher {
   boolean fetchLatestIndex(boolean forceReplication) throws IOException, InterruptedException {
     return fetchLatestIndex(forceReplication, false);
   }
-  
+
   /**
    * This command downloads all the necessary files from master to install a index commit point. Only changed files are
    * downloaded. It also downloads the conf files (if they are modified).
    *
-   * @param forceReplication force a replication in all cases 
+   * @param forceReplication force a replication in all cases
    * @param forceCoreReload force a core reload in all cases
    * @return true on success, false if slave is already in sync
    * @throws IOException if an exception occurs
    */
   boolean fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException {
-    
+
     boolean cleanupDone = false;
     boolean successfulInstall = false;
     replicationStartTime = System.currentTimeMillis();
@@ -271,14 +280,14 @@ public class IndexFetcher {
     Directory indexDir = null;
     String indexDirPath;
     boolean deleteTmpIdxDir = true;
-    
+
     if (!solrCore.getSolrCoreState().getLastReplicateIndexSuccess()) {
       // if the last replication was not a success, we force a full replication
       // when we are a bit more confident we may want to try a partial replication
       // if the error is connection related or something, but we have to be careful
       forceReplication = true;
     }
-    
+
     try {
       //get the current 'replicateable' index version in the master
       NamedList response;
@@ -323,12 +332,12 @@ public class IndexFetcher {
           SolrQueryRequest req = new LocalSolrQueryRequest(solrCore, new ModifiableSolrParams());
           solrCore.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
         }
-        
+
         //there is nothing to be replicated
         successfulInstall = true;
         return true;
       }
-      
+
       if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
         //master and slave are already in sync just return
         LOG.info("Slave in sync with master.");
@@ -345,6 +354,9 @@ public class IndexFetcher {
         return false;
       }
       LOG.info("Number of files in latest index in master: " + filesToDownload.size());
+      if (tlogFilesToDownload != null) {
+        LOG.info("Number of tlog files in master: " + tlogFilesToDownload.size());
+      }
 
       // Create the sync service
       fsyncService = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("fsyncService"));
@@ -356,11 +368,12 @@ public class IndexFetcher {
           .getCommitTimestamp(commit) >= latestVersion
           || commit.getGeneration() >= latestGeneration || forceReplication;
 
-      String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
+      String timestamp = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
+      String tmpIdxDirName = "index." + timestamp;
       tmpIndex = solrCore.getDataDir() + tmpIdxDirName;
 
       tmpIndexDir = solrCore.getDirectoryFactory().get(tmpIndex, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
-      
+
       // cindex dir...
       indexDirPath = solrCore.getIndexDir();
       indexDir = solrCore.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
@@ -372,7 +385,7 @@ public class IndexFetcher {
         if (!isFullCopyNeeded && isIndexStale(indexDir)) {
           isFullCopyNeeded = true;
         }
-        
+
         if (!isFullCopyNeeded) {
           // a searcher might be using some flushed but not committed segments
           // because of soft commits (which open a searcher on IW's data)
@@ -410,13 +423,16 @@ public class IndexFetcher {
           solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(solrCore, true);
         }
         boolean reloadCore = false;
-        
+
         try {
           LOG.info("Starting download to " + tmpIndexDir + " fullCopy="
               + isFullCopyNeeded);
           successfulInstall = false;
-          
+
           downloadIndexFiles(isFullCopyNeeded, indexDir, tmpIndexDir, latestGeneration);
+          if (tlogFilesToDownload != null) {
+            downloadTlogFiles(timestamp, latestGeneration);
+          }
           LOG.info("Total time taken for download : "
               + ((System.currentTimeMillis() - replicationStartTime) / 1000)
               + " secs");
@@ -440,7 +456,7 @@ public class IndexFetcher {
                   solrCore.getDirectoryFactory().remove(indexDir);
                 }
               }
-              
+
               LOG.info("Configuration files are modified, core will be reloaded");
               logReplicationTimeAndConfFiles(modifiedConfFiles,
                   successfulInstall);// write to a file time of replication and
@@ -464,7 +480,7 @@ public class IndexFetcher {
             solrCore.getUpdateHandler().getSolrCoreState().openIndexWriter(solrCore);
           }
         }
-        
+
         // we must reload the core after we open the IW back up
        if (successfulInstall && (reloadCore || forceCoreReload)) {
           LOG.info("Reloading SolrCore {}", solrCore.getName());
@@ -484,10 +500,10 @@ public class IndexFetcher {
           if (isFullCopyNeeded) {
             solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);
           }
-          
+
           openNewSearcherAndUpdateCommitPoint();
         }
-        
+
         if (!isFullCopyNeeded && !forceReplication && !successfulInstall) {
           cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, successfulInstall);
           cleanupDone = true;
@@ -497,7 +513,7 @@ public class IndexFetcher {
               reloadCore);
           successfulInstall = fetchLatestIndex(true, reloadCore);
         }
-        
+
         replicationStartTime = 0;
         return successfulInstall;
       } catch (ReplicationHandlerException e) {
@@ -605,7 +621,7 @@ public class IndexFetcher {
     Directory dir = null;
     try {
       dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), DirContext.META_DATA, solrCore.getSolrConfig().indexConfig.lockType);
-      
+
       int indexCount = 1, confFilesCount = 1;
       if (props.containsKey(TIMES_INDEX_REPLICATED)) {
         indexCount = Integer.valueOf(props.getProperty(TIMES_INDEX_REPLICATED)) + 1;
@@ -696,7 +712,7 @@ public class IndexFetcher {
   private void openNewSearcherAndUpdateCommitPoint() throws IOException {
     SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
         new ModifiableSolrParams());
-    
+
     RefCounted<SolrIndexSearcher> searcher = null;
     IndexCommit commitPoint;
     try {
@@ -719,7 +735,7 @@ public class IndexFetcher {
 
     // update the commit point in replication handler
     replicationHandler.indexCommitPoint = commitPoint;
-    
+
   }
 
   private void reloadCore() {
@@ -756,7 +772,7 @@ public class IndexFetcher {
       }
       for (Map<String, Object> file : confFilesToDownload) {
         String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
-        localFileFetcher = new LocalFsFileFetcher(tmpconfDir, file, saveAs, true, latestGeneration);
+        localFileFetcher = new LocalFsFileFetcher(tmpconfDir, file, saveAs, CONF_FILE_SHORT, latestGeneration);
         currentFile = file;
         localFileFetcher.fetchFile();
         confFilesDownloaded.add(new HashMap<>(file));
@@ -770,6 +786,34 @@ public class IndexFetcher {
     }
   }
 
+  private void downloadTlogFiles(String timestamp, long latestGeneration) throws Exception {
+    UpdateLog ulog = solrCore.getUpdateHandler().getUpdateLog();
+
+    LOG.info("Starting download of tlog files from master: " + tlogFilesToDownload);
+    tlogFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
+    File tmpTlogDir = new File(ulog.getLogDir(), "tlog." + getDateAsStr(new Date()));
+    try {
+      boolean status = tmpTlogDir.mkdirs();
+      if (!status) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Failed to create temporary tlog folder: " + tmpTlogDir.getName());
+      }
+      for (Map<String, Object> file : tlogFilesToDownload) {
+        String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
+        localFileFetcher = new LocalFsFileFetcher(tmpTlogDir, file, saveAs, TLOG_FILE, latestGeneration);
+        currentFile = file;
+        localFileFetcher.fetchFile();
+        tlogFilesDownloaded.add(new HashMap<>(file));
+      }
+      // this is called before copying the files to the original conf dir
+      // so that if there is an exception avoid corrupting the original files.
+      terminateAndWaitFsyncService();
+      copyTmpTlogFiles2Tlog(tmpTlogDir, timestamp);
+    } finally {
+      delTree(tmpTlogDir);
+    }
+  }
+
   /**
    * Download the index files. If a new index is needed, download all the files.
    *
@@ -790,7 +834,7 @@ public class IndexFetcher {
       if (!compareResult.equal || downloadCompleteIndex
           || filesToAlwaysDownloadIfNoChecksums(filename, size, compareResult)) {
         dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file,
-            (String) file.get(NAME), false, latestGeneration);
+            (String) file.get(NAME), FILE, latestGeneration);
         currentFile = file;
         dirFileFetcher.fetchFile();
         filesDownloaded.add(new HashMap<>(file));
@@ -829,10 +873,10 @@ public class IndexFetcher {
             LOG.warn("Could not retrieve checksum from file.", e);
           }
         }
-        
+
         if (!compareResult.checkSummed) {
           // we don't have checksums to compare
-          
+
           if (indexFileLen == backupIndexFileLen) {
             compareResult.equal = true;
             return compareResult;
@@ -843,9 +887,9 @@ public class IndexFetcher {
             return compareResult;
           }
         }
-        
+
         // we have checksums to compare
-        
+
         if (indexFileLen == backupIndexFileLen && indexFileChecksum == backupIndexFileChecksum) {
           compareResult.equal = true;
           return compareResult;
@@ -878,7 +922,7 @@ public class IndexFetcher {
     } catch (NoSuchFileException | FileNotFoundException e) {
       return false;
     }
-  }  
+  }
 
   /**
    * All the files which are common between master and slave must have same size and same checksum else we assume
@@ -969,7 +1013,7 @@ public class IndexFetcher {
   }
 
   /**
-   * Make file list 
+   * Make file list
    */
   private List<File> makeTmpConfDirFileList(File dir, List<File> fileList) {
     File[] files = dir.listFiles();
@@ -982,7 +1026,7 @@ public class IndexFetcher {
     }
     return fileList;
   }
-  
+
   /**
    * The conf files are copied to the tmp dir to the conf dir. A backup of the old file is maintained
    */
@@ -1021,6 +1065,30 @@ public class IndexFetcher {
     }
   }
 
+  /**
+   * The tlog files are copied from the tmp dir to the tlog dir by renaming the directory if possible.
+   * A backup of the old file is maintained.
+   */
+  private void copyTmpTlogFiles2Tlog(File tmpTlogDir, String timestamp) {
+    File tlogDir = new File(solrCore.getUpdateHandler().getUpdateLog().getLogDir());
+    File backupTlogDir = new File(tlogDir.getParent(), UpdateLog.TLOG_NAME + "." + timestamp);
+
+    try {
+      org.apache.commons.io.FileUtils.moveDirectory(tlogDir, backupTlogDir);
+    } catch (IOException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR,
+          "Unable to rename: " + tlogDir + " to: " + backupTlogDir, e);
+    }
+
+    try {
+      tmpTlogDir = new File(backupTlogDir, tmpTlogDir.getName());
+      org.apache.commons.io.FileUtils.moveDirectory(tmpTlogDir, tlogDir);
+    } catch (IOException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR,
+          "Unable to rename: " + tmpTlogDir + " to: " + tlogDir, e);
+    }
+  }
+
   private String getDateAsStr(Date d) {
     return new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(d);
   }
@@ -1114,10 +1182,10 @@ public class IndexFetcher {
     }
     return nameVsFile.isEmpty() ? Collections.EMPTY_LIST : nameVsFile.values();
   }
-  
-  /** 
+
+  /**
    * This simulates File.delete exception-wise, since this class has some strange behavior with it.
-   * The only difference is it returns null on success, throws SecurityException on SecurityException, 
+   * The only difference is it returns null on success, throws SecurityException on SecurityException,
    * otherwise returns Throwable preventing deletion (instead of false), for additional information.
    */
   static Throwable delete(File file) {
@@ -1130,7 +1198,7 @@ public class IndexFetcher {
       return other;
     }
   }
-  
+
   static boolean delTree(File dir) {
     try {
       org.apache.lucene.util.IOUtils.rm(dir.toPath());
@@ -1159,6 +1227,20 @@ public class IndexFetcher {
     return timeElapsed;
   }
 
+  List<Map<String, Object>> getTlogFilesToDownload() {
+    //make a copy first because it can be null later
+    List<Map<String, Object>> tmp = tlogFilesToDownload;
+    //create a new instance. or else iterator may fail
+    return tmp == null ? Collections.EMPTY_LIST : new ArrayList<>(tmp);
+  }
+
+  List<Map<String, Object>> getTlogFilesDownloaded() {
+    //make a copy first because it can be null later
+    List<Map<String, Object>> tmp = tlogFilesDownloaded;
+    // NOTE: it's safe to make a copy of a SynchronizedCollection(ArrayList)
+    return tmp == null ? Collections.EMPTY_LIST : new ArrayList<>(tmp);
+  }
+
   List<Map<String, Object>> getConfFilesToDownload() {
     //make a copy first because it can be null later
     List<Map<String, Object>> tmp = confFilesToDownload;
@@ -1219,7 +1301,7 @@ public class IndexFetcher {
     private boolean includeChecksum = true;
     private String fileName;
     private String saveAs;
-    private boolean isConf;
+    private String solrParamOutput;
     private Long indexGen;
 
     private long size;
@@ -1230,11 +1312,11 @@ public class IndexFetcher {
     private boolean aborted = false;
 
     FileFetcher(FileInterface file, Map<String, Object> fileDetails, String saveAs,
-                boolean isConf, long latestGen) throws IOException {
+                String solrParamOutput, long latestGen) throws IOException {
       this.file = file;
       this.fileName = (String) fileDetails.get(NAME);
       this.size = (Long) fileDetails.get(SIZE);
-      this.isConf = isConf;
+      this.solrParamOutput = solrParamOutput;
       this.saveAs = saveAs;
       indexGen = latestGen;
       if (includeChecksum)
@@ -1404,12 +1486,7 @@ public class IndexFetcher {
       params.set(GENERATION, Long.toString(indexGen));
       params.set(CommonParams.QT, "/replication");
       //add the version to download. This is used to reserve the download
-      if (isConf) {
-        //set cf instead of file for config file
-        params.set(CONF_FILE_SHORT, fileName);
-      } else {
-        params.set(FILE, fileName);
-      }
+      params.set(solrParamOutput, fileName);
       if (useInternal) {
         params.set(COMPRESSION, "true");
       }
@@ -1478,8 +1555,8 @@ public class IndexFetcher {
 
   private class DirectoryFileFetcher extends FileFetcher {
     DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
-                boolean isConf, long latestGen) throws IOException {
-      super(new DirectoryFile(tmpIndexDir, saveAs), fileDetails, saveAs, isConf, latestGen);
+                         String solrParamOutput, long latestGen) throws IOException {
+      super(new DirectoryFile(tmpIndexDir, saveAs), fileDetails, saveAs, solrParamOutput, latestGen);
     }
   }
 
@@ -1527,8 +1604,8 @@ public class IndexFetcher {
 
   private class LocalFsFileFetcher extends FileFetcher {
     LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
-                boolean isConf, long latestGen) throws IOException {
-      super(new LocalFsFile(dir, saveAs), fileDetails, saveAs, isConf, latestGen);
+                       String solrParamOutput, long latestGen) throws IOException {
+      super(new LocalFsFile(dir, saveAs), fileDetails, saveAs, solrParamOutput, latestGen);
     }
   }
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Fri May 22 18:58:29 2015
@@ -81,7 +81,9 @@ import org.apache.solr.core.SolrEventLis
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.CdcrUpdateLog;
 import org.apache.solr.update.SolrIndexWriter;
+import org.apache.solr.update.UpdateLog;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.NumberUtils;
 import org.apache.solr.util.PropertiesInputStream;
@@ -121,8 +123,8 @@ public class ReplicationHandler extends
       version = v;
     }
     /**
-     * builds a CommitVersionInfo data for the specified IndexCommit.  
-     * Will never be null, ut version and generation may be zero if 
+     * builds a CommitVersionInfo data for the specified IndexCommit.
+     * Will never be null, ut version and generation may be zero if
      * there are problems extracting them from the commit data
      */
     public static CommitVersionInfo build(IndexCommit commit) {
@@ -512,8 +514,11 @@ public class ReplicationHandler extends
     rawParams.set(CommonParams.WT, FILE_STREAM);
 
     String cfileName = solrParams.get(CONF_FILE_SHORT);
+    String tlogFileName = solrParams.get(TLOG_FILE);
     if (cfileName != null) {
-      rsp.add(FILE_STREAM, new LocalFsFileStream(solrParams));
+      rsp.add(FILE_STREAM, new LocalFsConfFileStream(solrParams));
+    } else if (tlogFileName != null) {
+      rsp.add(FILE_STREAM, new LocalFsTlogFileStream(solrParams));
     } else {
       rsp.add(FILE_STREAM, new DirectoryFileStream(solrParams));
     }
@@ -589,6 +594,14 @@ public class ReplicationHandler extends
       }
     }
     rsp.add(CMD_GET_FILE_LIST, result);
+
+    // fetch list of tlog files only if cdcr is activated
+    if (core.getUpdateHandler().getUpdateLog() != null && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
+      List<Map<String, Object>> tlogfiles = getTlogFileList();
+      LOG.info("Adding tlog files to list: " + tlogfiles);
+      rsp.add(TLOG_FILES, tlogfiles);
+    }
+
     if (confFileNameAlias.size() < 1 || core.getCoreDescriptor().getCoreContainer().isZooKeeperAware())
       return;
     LOG.debug("Adding config files to list: " + includeConfFiles);
@@ -596,6 +609,19 @@ public class ReplicationHandler extends
     rsp.add(CONF_FILES, getConfFileInfoFromCache(confFileNameAlias, confFileInfoCache));
   }
 
+  List<Map<String, Object>> getTlogFileList() {
+    UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+    String[] logList = ulog.getLogList(new File(ulog.getLogDir()));
+    List<Map<String, Object>> tlogFiles = new ArrayList<>();
+    for (String fileName : logList) {
+      Map<String, Object> fileMeta = new HashMap<>();
+      fileMeta.put(NAME, fileName);
+      fileMeta.put(SIZE, new File(ulog.getLogDir(), fileName).length());
+      tlogFiles.add(fileMeta);
+    }
+    return tlogFiles;
+  }
+
   /**
    * For configuration files, checksum of the file is included because, unlike index files, they may have same content
    * but different timestamps.
@@ -1247,11 +1273,11 @@ public class ReplicationHandler extends
           ***/
         }
         if (snapshoot) {
-          try {            
+          try {
             int numberToKeep = numberBackupsToKeep;
             if (numberToKeep < 1) {
               numberToKeep = Integer.MAX_VALUE;
-            }            
+            }
             SnapShooter snapShooter = new SnapShooter(core, null, null);
             snapShooter.validateCreateSnapshot();
             snapShooter.createSnapAsync(currentCommitPoint, numberToKeep, ReplicationHandler.this);
@@ -1284,6 +1310,7 @@ public class ReplicationHandler extends
 
     protected String fileName;
     protected String cfileName;
+    protected String tlogFileName;
     protected String sOffset;
     protected String sLen;
     protected String compress;
@@ -1304,6 +1331,7 @@ public class ReplicationHandler extends
 
       fileName = params.get(FILE);
       cfileName = params.get(CONF_FILE_SHORT);
+      tlogFileName = params.get(TLOG_FILE);
       sOffset = params.get(OFFSET);
       sLen = params.get(LEN);
       compress = params.get(COMPRESSION);
@@ -1320,7 +1348,7 @@ public class ReplicationHandler extends
     protected void initWrite() throws IOException {
       if (sOffset != null) offset = Long.parseLong(sOffset);
       if (sLen != null) len = Integer.parseInt(sLen);
-      if (fileName == null && cfileName == null) {
+      if (fileName == null && cfileName == null && tlogFileName == null) {
         // no filename do nothing
         writeNothingAndFlush();
       }
@@ -1370,7 +1398,7 @@ public class ReplicationHandler extends
         in = dir.openInput(fileName, IOContext.READONCE);
         // if offset is mentioned move the pointer to that point
         if (offset != -1) in.seek(offset);
-        
+
         long filelen = dir.fileLength(fileName);
         long maxBytesBeforePause = 0;
 
@@ -1425,12 +1453,17 @@ public class ReplicationHandler extends
 
   /**This is used to write files in the conf directory.
    */
-  private class LocalFsFileStream extends DirectoryFileStream {
+  private abstract class LocalFsFileStream extends DirectoryFileStream {
+
+    private File file;
 
     public LocalFsFileStream(SolrParams solrParams) {
       super(solrParams);
+      this.file = this.initFile();
     }
 
+    protected abstract File initFile();
+
     @Override
     public void write(OutputStream out) throws IOException {
       createOutputStream(out);
@@ -1438,9 +1471,6 @@ public class ReplicationHandler extends
       try {
         initWrite();
 
-        //if if is a conf file read from config directory
-        File file = new File(core.getResourceLoader().getConfigDir(), cfileName);
-
         if (file.exists() && file.canRead()) {
           inputStream = new FileInputStream(file);
           FileChannel channel = inputStream.getChannel();
@@ -1478,6 +1508,32 @@ public class ReplicationHandler extends
     }
   }
 
+  private class LocalFsTlogFileStream extends LocalFsFileStream {
+
+    public LocalFsTlogFileStream(SolrParams solrParams) {
+      super(solrParams);
+    }
+
+    protected File initFile() {
+      //if it is a tlog file read from tlog directory
+      return new File(core.getUpdateHandler().getUpdateLog().getLogDir(), tlogFileName);
+    }
+
+  }
+
+  private class LocalFsConfFileStream extends LocalFsFileStream {
+
+    public LocalFsConfFileStream(SolrParams solrParams) {
+      super(solrParams);
+    }
+
+    protected File initFile() {
+      //if it is a conf file read from config directory
+      return new File(core.getResourceLoader().getConfigDir(), cfileName);
+    }
+
+  }
+
   static Integer readInterval(String interval) {
     if (interval == null)
       return null;
@@ -1568,6 +1624,8 @@ public class ReplicationHandler extends
 
   public static final String CONF_FILE_SHORT = "cf";
 
+  public static final String TLOG_FILE = "tlogFile";
+
   public static final String CHECKSUM = "checksum";
 
   public static final String ALIAS = "alias";
@@ -1576,6 +1634,8 @@ public class ReplicationHandler extends
 
   public static final String CONF_FILES = "confFiles";
 
+  public static final String TLOG_FILES = "tlogFiles";
+
   public static final String REPLICATE_AFTER = "replicateAfter";
 
   public static final String FILE_STREAM = "filestream";
@@ -1601,15 +1661,15 @@ public class ReplicationHandler extends
   public static final String OK_STATUS = "OK";
 
   public static final String NEXT_EXECUTION_AT = "nextExecutionAt";
-  
+
   public static final String NUMBER_BACKUPS_TO_KEEP_REQUEST_PARAM = "numberToKeep";
-  
+
   public static final String NUMBER_BACKUPS_TO_KEEP_INIT_PARAM = "maxNumberOfBackups";
 
-  /** 
-   * Boolean param for tests that can be specified when using 
-   * {@link #CMD_FETCH_INDEX} to force the current request to block until 
-   * the fetch is complete.  <b>NOTE:</b> This param is not advised for 
+  /**
+   * Boolean param for tests that can be specified when using
+   * {@link #CMD_FETCH_INDEX} to force the current request to block until
+   * the fetch is complete.  <b>NOTE:</b> This param is not advised for
    * non-test code, since the the duration of the fetch for non-trivial
    * indexes will likeley cause the request to time out.
    *

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java Fri May 22 18:58:29 2015
@@ -0,0 +1,249 @@
+package org.apache.solr.update;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.Channels;
+import java.nio.file.Files;
+import java.util.Collection;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.FastOutputStream;
+import org.apache.solr.common.util.JavaBinCodec;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+
+/**
+ * Extends {@link org.apache.solr.update.TransactionLog} to:
+ * <ul>
+ * <li>reopen automatically the output stream if its reference count reached 0. This is achieved by extending
+ * methods {@link #incref()}, {@link #close()} and {@link #reopenOutputStream()}.</li>
+ * <li>encode the number of records in the tlog file in the last commit record. The number of records will be
+ * decoded and reuse if the tlog file is reopened. This is achieved by extending the constructor, and the
+ * methods {@link #writeCommit(CommitUpdateCommand, int)} and {@link #getReader(long)}.</li>
+ * </ul>
+ */
+public class CdcrTransactionLog extends TransactionLog {
+
+  private boolean isReplaying;
+  long startVersion; // (absolute) version of the first element of this transaction log
+
+  CdcrTransactionLog(File tlogFile, Collection<String> globalStrings) {
+    super(tlogFile, globalStrings);
+
+    // The starting version number will be used to seek more efficiently tlogs
+    String filename = tlogFile.getName();
+    startVersion = Math.abs(Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1)));
+
+    isReplaying = false;
+  }
+
+  CdcrTransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) {
+    super(tlogFile, globalStrings, openExisting);
+
+    // The starting version number will be used to seek more efficiently tlogs
+    String filename = tlogFile.getName();
+    startVersion = Math.abs(Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1)));
+
+    numRecords = openExisting ? this.readNumRecords() : 0;
+    // if we try to reopen an existing tlog file and that the number of records is equal to 0, then we are replaying
+    // the log and we will append a commit
+    if (openExisting && numRecords == 0) {
+      isReplaying = true;
+    }
+  }
+
+  /**
+   * Returns the number of records in the log (currently includes the header and an optional commit).
+   */
+  public int numRecords() {
+    return super.numRecords();
+  }
+
+  /**
+   * The last record of the transaction log file is expected to be a commit with a 4 byte integer that encodes the
+   * number of records in the file.
+   */
+  private int readNumRecords() {
+    try {
+      if (endsWithCommit()) {
+        long size = fos.size();
+        // 4 bytes for the record size, the lenght of the end message + 1 byte for its value tag,
+        // and 4 bytes for the number of records
+        long pos = size - 4 - END_MESSAGE.length() - 1 - 4;
+        if (pos < 0) return 0;
+
+        ChannelFastInputStream is = new ChannelFastInputStream(channel, pos);
+        return is.readInt();
+      }
+    } catch (IOException e) {
+      log.error("Error while reading number of records in tlog " + this, e);
+    }
+    return 0;
+  }
+
+  @Override
+  public long writeCommit(CommitUpdateCommand cmd, int flags) {
+    LogCodec codec = new LogCodec(resolver);
+    synchronized (this) {
+      try {
+        long pos = fos.size();   // if we had flushed, this should be equal to channel.position()
+
+        if (pos == 0) {
+          writeLogHeader(codec);
+          pos = fos.size();
+        }
+        codec.init(fos);
+        codec.writeTag(JavaBinCodec.ARR, 4);
+        codec.writeInt(UpdateLog.COMMIT | flags);  // should just take one byte
+        codec.writeLong(cmd.getVersion());
+        codec.writeTag(JavaBinCodec.INT); // Enforce the encoding of a plain integer, to simplify decoding
+        fos.writeInt(numRecords + 1); // the number of records in the file - +1 to account for the commit operation being written
+        codec.writeStr(END_MESSAGE);  // ensure these bytes are (almost) last in the file
+
+        endRecord(pos);
+
+        fos.flush();  // flush since this will be the last record in a log fill
+        assert fos.size() == channel.size();
+
+        isReplaying = false; // we have replayed and appended a commit record with the number of records in the file
+
+        return pos;
+      } catch (IOException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+      }
+    }
+  }
+
+  /**
+   * Returns a reader that can be used while a log is still in use.
+   * Currently only *one* LogReader may be outstanding, and that log may only
+   * be used from a single thread.
+   */
+  @Override
+  public LogReader getReader(long startingPos) {
+    return new CdcrLogReader(startingPos);
+  }
+
+  public class CdcrLogReader extends LogReader {
+
+    private int numRecords = 1; // start at 1 to account for the header record
+
+    public CdcrLogReader(long startingPos) {
+      super(startingPos);
+    }
+
+    @Override
+    public Object next() throws IOException, InterruptedException {
+      Object o = super.next();
+      if (o != null) {
+        this.numRecords++;
+        // We are replaying the log. We need to update the number of records for the writeCommit.
+        if (isReplaying) {
+          synchronized (CdcrTransactionLog.this) {
+            CdcrTransactionLog.this.numRecords = this.numRecords;
+          }
+        }
+      }
+      return o;
+    }
+
+  }
+
+  @Override
+  public void incref() {
+    // if the refcount is 0, we need to reopen the output stream
+    if (refcount.getAndIncrement() == 0) {
+      reopenOutputStream(); // synchronised with this
+    }
+  }
+
+  /**
+   * Modified to act like {@link #incref()} in order to be compatible with {@link UpdateLog#recoverFromLog()}.
+   * Otherwise, we would have to duplicate the method {@link UpdateLog#recoverFromLog()} in
+   * {@link org.apache.solr.update.CdcrUpdateLog} and change the call
+   * {@code if (!ll.try_incref()) continue; } to {@code incref(); }.
+   */
+  @Override
+  public boolean try_incref() {
+    this.incref();
+    return true;
+  }
+
+  @Override
+  protected void close() {
+    try {
+      if (debug) {
+        log.debug("Closing tlog" + this);
+      }
+
+      synchronized (this) {
+        if (fos != null) {
+          fos.flush();
+          fos.close();
+
+          // dereference these variables for GC
+          fos = null;
+          os = null;
+          channel = null;
+          raf = null;
+        }
+      }
+
+      if (deleteOnClose) {
+        try {
+          Files.deleteIfExists(tlogFile.toPath());
+        } catch (IOException e) {
+          // TODO: should this class care if a file couldnt be deleted?
+          // this just emulates previous behavior, where only SecurityException would be handled.
+        }
+      }
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    } finally {
+      assert ObjectReleaseTracker.release(this);
+    }
+  }
+
+  /**
+   * Re-open the output stream of the tlog and position
+   * the file pointer at the end of the file. It assumes
+   * that the tlog is non-empty and that the tlog's header
+   * has been already read.
+   */
+  synchronized void reopenOutputStream() {
+    try {
+      if (debug) {
+        log.debug("Re-opening tlog's output stream: " + this);
+      }
+
+      raf = new RandomAccessFile(this.tlogFile, "rw");
+      channel = raf.getChannel();
+      long start = raf.length();
+      raf.seek(start);
+      os = Channels.newOutputStream(channel);
+      fos = new FastOutputStream(os, new byte[65536], 0);
+      fos.setWritten(start);    // reflect that we aren't starting at the beginning
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+}
+