You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/05/22 20:58:29 UTC

svn commit: r1681186 [1/5] - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/processor/ core/src/java/org/apache/solr/util/ core/src/test-files/solr/collect...

Author: erick
Date: Fri May 22 18:58:29 2015
New Revision: 1681186

URL: http://svn.apache.org/r1681186
Log:
SOLR-6273: Cross Data Center Replication

Added:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrParams.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java   (with props)
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessorFactory.java   (with props)
    lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcr.xml   (with props)
    lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcrupdatelog.xml   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrCLI.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Fri May 22 18:58:29 2015
@@ -59,9 +59,37 @@ Other Changes
 
 * SOLR-7355: Switch from Google's ConcurrentLinkedHashMap to Caffeine.  Only
   affects HDFS support. (Ben Manes via Shawn Heisey)
+  
 
 ==================  5.3.0 ==================
-(No Changes)
+Versions of Major Components
+---------------------
+(no changes)
+
+Upgrading from Solr 5.2
+-----------------------
+(no changes)
+
+Detailed Change List
+----------------------
+
+New Features
+----------------------
+
+* SOLR-6273: Cross Data Center Replication. Active/passive replication for separate
+  SolrClouds hosted on separate data centers. (Renaud Delbru, Yonik Seeley via Erick Erickson)
+
+Bug Fixes
+----------------------
+(no changes)
+
+Optimizations
+----------------------
+(no changes)
+
+Other Changes
+----------------------
+(no changes)
 
 ==================  5.2.0 ==================
 

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java Fri May 22 18:58:29 2015
@@ -0,0 +1,76 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.CdcrUpdateLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This manager is responsible in enabling or disabling the buffering of the update logs. Currently, buffer
+ * is always activated for non-leader nodes. For leader nodes, it is enabled only if the user explicitly
+ * enabled it with the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#ENABLEBUFFER}.
+ */
+class CdcrBufferManager implements CdcrStateManager.CdcrStateObserver {
+
+  private CdcrLeaderStateManager leaderStateManager;
+  private CdcrBufferStateManager bufferStateManager;
+
+  private final SolrCore core;
+
+  protected static Logger log = LoggerFactory.getLogger(CdcrBufferManager.class);
+
+  CdcrBufferManager(SolrCore core) {
+    this.core = core;
+  }
+
+  void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
+    this.leaderStateManager = leaderStateManager;
+    this.leaderStateManager.register(this);
+  }
+
+  void setBufferStateManager(final CdcrBufferStateManager bufferStateManager) {
+    this.bufferStateManager = bufferStateManager;
+    this.bufferStateManager.register(this);
+  }
+
+  /**
+   * This method is synchronised as it can both be called by the leaderStateManager and the bufferStateManager.
+   */
+  @Override
+  public synchronized void stateUpdate() {
+    CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
+
+    // If I am not the leader, I should always buffer my updates
+    if (!leaderStateManager.amILeader()) {
+      ulog.enableBuffer();
+      return;
+    }
+    // If I am the leader, I should buffer my updates only if buffer is enabled
+    else if (bufferStateManager.getState().equals(CdcrParams.BufferState.ENABLED)) {
+      ulog.enableBuffer();
+      return;
+    }
+
+    // otherwise, disable the buffer
+    ulog.disableBuffer();
+  }
+
+}
+

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java Fri May 22 18:58:29 2015
@@ -0,0 +1,171 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.core.SolrCore;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+
+/**
+ * Manage the state of the update log buffer. It is responsible of synchronising the state
+ * through Zookeeper. The state of the buffer is stored in the zk node defined by {@link #getZnodePath()}.
+ */
+class CdcrBufferStateManager extends CdcrStateManager {
+
+  private CdcrParams.BufferState state = DEFAULT_STATE;
+
+  private BufferStateWatcher wrappedWatcher;
+  private Watcher watcher;
+
+  private SolrCore core;
+
+  static CdcrParams.BufferState DEFAULT_STATE = CdcrParams.BufferState.ENABLED;
+
+  protected static Logger log = LoggerFactory.getLogger(CdcrBufferStateManager.class);
+
+  CdcrBufferStateManager(final SolrCore core, SolrParams bufferConfiguration) {
+    this.core = core;
+
+    // Ensure that the state znode exists
+    this.createStateNode();
+
+    // set default state
+    if (bufferConfiguration != null) {
+      byte[] defaultState = bufferConfiguration.get(
+          CdcrParams.DEFAULT_STATE_PARAM, DEFAULT_STATE.toLower()).getBytes(Charset.forName("UTF-8"));
+      state = CdcrParams.BufferState.get(defaultState);
+    }
+    this.setState(state); // notify observers
+
+    // Startup and register the watcher at startup
+    try {
+      SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+      watcher = this.initWatcher(zkClient);
+      this.setState(CdcrParams.BufferState.get(zkClient.getData(this.getZnodePath(), watcher, null, true)));
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Failed fetching initial state", e);
+    }
+  }
+
+  /**
+   * SolrZkClient does not guarantee that a watch object will only be triggered once for a given notification
+   * if we does not wrap the watcher - see SOLR-6621.
+   */
+  private Watcher initWatcher(SolrZkClient zkClient) {
+    wrappedWatcher = new BufferStateWatcher();
+    return zkClient.wrapWatcher(wrappedWatcher);
+  }
+
+  private String getZnodeBase() {
+    return "/collections/" + core.getCoreDescriptor().getCloudDescriptor().getCollectionName() + "/cdcr/state";
+  }
+
+  private String getZnodePath() {
+    return getZnodeBase() + "/buffer";
+  }
+
+  void setState(CdcrParams.BufferState state) {
+    if (this.state != state) {
+      this.state = state;
+      this.callback(); // notify the observers of a state change
+    }
+  }
+
+  CdcrParams.BufferState getState() {
+    return state;
+  }
+
+  /**
+   * Synchronise the state to Zookeeper. This method must be called only by the handler receiving the
+   * action.
+   */
+  void synchronize() {
+    SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+    try {
+      zkClient.setData(this.getZnodePath(), this.getState().getBytes(), true);
+      // check if nobody changed it in the meantime, and set a new watcher
+      this.setState(CdcrParams.BufferState.get(zkClient.getData(this.getZnodePath(), watcher, null, true)));
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Failed synchronising new state", e);
+    }
+  }
+
+  private void createStateNode() {
+    SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+    try {
+      if (!zkClient.exists(this.getZnodePath(), true)) {
+        if (!zkClient.exists(this.getZnodeBase(), true)) {
+          zkClient.makePath(this.getZnodeBase(), CreateMode.PERSISTENT, true);
+        }
+        zkClient.create(this.getZnodePath(), DEFAULT_STATE.getBytes(), CreateMode.PERSISTENT, true);
+        log.info("Created znode {}", this.getZnodePath());
+      }
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Failed to create CDCR buffer state node", e);
+    }
+  }
+
+  void shutdown() {
+    if (wrappedWatcher != null) {
+      wrappedWatcher.cancel(); // cancel the watcher to avoid spurious warn messages during shutdown
+    }
+  }
+
+  private class BufferStateWatcher implements Watcher {
+
+    private boolean isCancelled = false;
+
+    /**
+     * Cancel the watcher to avoid spurious warn messages during shutdown.
+     */
+    void cancel() {
+      isCancelled = true;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      if (isCancelled) return; // if the watcher is cancelled, do nothing.
+      String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+      String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+
+      log.info("The CDCR buffer state has changed: {} @ {}:{}", event, collectionName, shard);
+      if (Event.EventType.None.equals(event.getType())) {
+        return;
+      }
+      SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+      try {
+        CdcrParams.BufferState state = CdcrParams.BufferState.get(zkClient.getData(CdcrBufferStateManager.this.getZnodePath(), watcher, null, true));
+        log.info("Received new CDCR buffer state from watcher: {} @ {}:{}", state, collectionName, shard);
+        CdcrBufferStateManager.this.setState(state);
+      } catch (KeeperException | InterruptedException e) {
+        log.warn("Failed synchronising new state @ " + collectionName + ":" + shard, e);
+      }
+    }
+
+  }
+
+}
+

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java Fri May 22 18:58:29 2015
@@ -0,0 +1,158 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.core.SolrCore;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Manage the leader state of the CDCR nodes.
+ * </p>
+ * <p>
+ * It takes care of notifying the {@link CdcrReplicatorManager} in case
+ * of a leader state change.
+ * </p>
+ */
+class CdcrLeaderStateManager extends CdcrStateManager {
+
+  private boolean amILeader = false;
+
+  private LeaderStateWatcher wrappedWatcher;
+  private Watcher watcher;
+
+  private SolrCore core;
+
+  protected static Logger log = LoggerFactory.getLogger(CdcrProcessStateManager.class);
+
+  CdcrLeaderStateManager(final SolrCore core) {
+    this.core = core;
+
+    // Fetch leader state and register the watcher at startup
+    try {
+      SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+      ClusterState clusterState = core.getCoreDescriptor().getCoreContainer().getZkController().getClusterState();
+
+      watcher = this.initWatcher(zkClient);
+      // if the node does not exist, it means that the leader was not yet registered. This can happen
+      // when the cluster is starting up. The core is not yet fully loaded, and the leader election process
+      // is waiting for it.
+      if (this.isLeaderRegistered(zkClient, clusterState)) {
+        this.checkIfIAmLeader();
+      }
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Failed fetching initial leader state and setting watch", e);
+    }
+  }
+
+  /**
+   * Checks if the leader is registered. If it is not registered, we are probably at the
+   * initialisation phase of the cluster. In this case, we must attach a watcher to
+   * be notified when the leader is registered.
+   */
+  private boolean isLeaderRegistered(SolrZkClient zkClient, ClusterState clusterState)
+      throws KeeperException, InterruptedException {
+    // First check if the znode exists, and register the watcher at the same time
+    return zkClient.exists(this.getZnodePath(), watcher, true) != null;
+  }
+
+  /**
+   * SolrZkClient does not guarantee that a watch object will only be triggered once for a given notification
+   * if we does not wrap the watcher - see SOLR-6621.
+   */
+  private Watcher initWatcher(SolrZkClient zkClient) {
+    wrappedWatcher = new LeaderStateWatcher();
+    return zkClient.wrapWatcher(wrappedWatcher);
+  }
+
+  private void checkIfIAmLeader() throws KeeperException, InterruptedException {
+    SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+    ZkNodeProps props = ZkNodeProps.load(zkClient.getData(CdcrLeaderStateManager.this.getZnodePath(), null, null, true));
+    if (props != null) {
+      CdcrLeaderStateManager.this.setAmILeader(props.get("core").equals(core.getName()));
+    }
+  }
+
+  private String getZnodePath() {
+    String myShardId = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+    String myCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+    return "/collections/" + myCollection + "/leaders/" + myShardId;
+  }
+
+  void setAmILeader(boolean amILeader) {
+    if (this.amILeader != amILeader) {
+      this.amILeader = amILeader;
+      this.callback(); // notify the observers of a state change
+    }
+  }
+
+  boolean amILeader() {
+    return amILeader;
+  }
+
+  void shutdown() {
+    if (wrappedWatcher != null) {
+      wrappedWatcher.cancel(); // cancel the watcher to avoid spurious warn messages during shutdown
+    }
+  }
+
+  private class LeaderStateWatcher implements Watcher {
+
+    private boolean isCancelled = false;
+
+    /**
+     * Cancel the watcher to avoid spurious warn messages during shutdown.
+     */
+    void cancel() {
+      isCancelled = true;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      if (isCancelled) return; // if the watcher is cancelled, do nothing.
+      String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+      String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+
+      log.debug("The leader state has changed: {} @ {}:{}", event, collectionName, shard);
+      if (Event.EventType.None.equals(event.getType())) {
+        return;
+      }
+
+      try {
+        log.info("Received new leader state @ {}:{}", collectionName, shard);
+        SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+        ClusterState clusterState = core.getCoreDescriptor().getCoreContainer().getZkController().getClusterState();
+        if (CdcrLeaderStateManager.this.isLeaderRegistered(zkClient, clusterState)) {
+          CdcrLeaderStateManager.this.checkIfIAmLeader();
+        }
+      } catch (KeeperException | InterruptedException e) {
+        log.warn("Failed updating leader state and setting watch @ " + collectionName + ":" + shard, e);
+      }
+    }
+
+  }
+
+}
+

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrParams.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrParams.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrParams.java Fri May 22 18:58:29 2015
@@ -0,0 +1,249 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.nio.charset.Charset;
+import java.util.Locale;
+
+public class CdcrParams {
+
+  /**
+   * The definition of a replica configuration *
+   */
+  public static final String REPLICA_PARAM = "replica";
+
+  /**
+   * The source collection of a replica *
+   */
+  public static final String SOURCE_COLLECTION_PARAM = "source";
+
+  /**
+   * The target collection of a replica *
+   */
+  public static final String TARGET_COLLECTION_PARAM = "target";
+
+  /**
+   * The Zookeeper host of the target cluster hosting the replica *
+   */
+  public static final String ZK_HOST_PARAM = "zkHost";
+
+  /**
+   * The definition of the {@link org.apache.solr.handler.CdcrReplicatorScheduler} configuration *
+   */
+  public static final String REPLICATOR_PARAM = "replicator";
+
+  /**
+   * The thread pool size of the replicator *
+   */
+  public static final String THREAD_POOL_SIZE_PARAM = "threadPoolSize";
+
+  /**
+   * The time schedule (in ms) of the replicator *
+   */
+  public static final String SCHEDULE_PARAM = "schedule";
+
+  /**
+   * The batch size of the replicator *
+   */
+  public static final String BATCH_SIZE_PARAM = "batchSize";
+
+  /**
+   * The definition of the {@link org.apache.solr.handler.CdcrUpdateLogSynchronizer} configuration *
+   */
+  public static final String UPDATE_LOG_SYNCHRONIZER_PARAM = "updateLogSynchronizer";
+
+  /**
+   * The definition of the {@link org.apache.solr.handler.CdcrBufferManager} configuration *
+   */
+  public static final String BUFFER_PARAM = "buffer";
+
+  /**
+   * The default state at startup of the buffer *
+   */
+  public static final String DEFAULT_STATE_PARAM = "defaultState";
+
+  /**
+   * The latest update checkpoint on a target cluster *
+   */
+  public final static String CHECKPOINT = "checkpoint";
+
+  /**
+   * The last processed version on a source cluster *
+   */
+  public final static String LAST_PROCESSED_VERSION = "lastProcessedVersion";
+
+  /**
+   * A list of replica queues on a source cluster *
+   */
+  public final static String QUEUES = "queues";
+
+  /**
+   * The size of a replica queue on a source cluster *
+   */
+  public final static String QUEUE_SIZE = "queueSize";
+
+  /**
+   * The timestamp of the last processed operation in a replica queue *
+   */
+  public final static String LAST_TIMESTAMP = "lastTimestamp";
+
+  /**
+   * A list of qps statistics per collection *
+   */
+  public final static String OPERATIONS_PER_SECOND = "operationsPerSecond";
+
+  /**
+   * Overall counter *
+   */
+  public final static String COUNTER_ALL = "all";
+
+  /**
+   * Counter for Adds *
+   */
+  public final static String COUNTER_ADDS = "adds";
+
+  /**
+   * Counter for Deletes *
+   */
+  public final static String COUNTER_DELETES = "deletes";
+
+  /**
+   * A list of errors per target collection *
+   */
+  public final static String ERRORS = "errors";
+
+  /**
+   * Counter for consecutive errors encountered by a replicator thread *
+   */
+  public final static String CONSECUTIVE_ERRORS = "consecutiveErrors";
+
+  /**
+   * A list of the last errors encountered by a replicator thread *
+   */
+  public final static String LAST = "last";
+
+  /**
+   * Total size of transaction logs *
+   */
+  public final static String TLOG_TOTAL_SIZE = "tlogTotalSize";
+
+  /**
+   * Total count of transaction logs *
+   */
+  public final static String TLOG_TOTAL_COUNT = "tlogTotalCount";
+
+  /**
+   * The state of the update log synchronizer *
+   */
+  public final static String UPDATE_LOG_SYNCHRONIZER = "updateLogSynchronizer";
+
+  /**
+   * The actions supported by the CDCR API
+   */
+  public enum CdcrAction {
+    START,
+    STOP,
+    STATUS,
+    COLLECTIONCHECKPOINT,
+    SHARDCHECKPOINT,
+    ENABLEBUFFER,
+    DISABLEBUFFER,
+    LASTPROCESSEDVERSION,
+    QUEUES,
+    OPS,
+    ERRORS;
+
+    public static CdcrAction get(String p) {
+      if (p != null) {
+        try {
+          return CdcrAction.valueOf(p.toUpperCase(Locale.ROOT));
+        } catch (Exception e) {
+        }
+      }
+      return null;
+    }
+
+    public String toLower() {
+      return toString().toLowerCase(Locale.ROOT);
+    }
+
+  }
+
+  /**
+   * The possible states of the CDCR process
+   */
+  public enum ProcessState {
+    STARTED,
+    STOPPED;
+
+    public static ProcessState get(byte[] state) {
+      if (state != null) {
+        try {
+          return ProcessState.valueOf(new String(state, Charset.forName("UTF-8")).toUpperCase(Locale.ROOT));
+        } catch (Exception e) {
+        }
+      }
+      return null;
+    }
+
+    public String toLower() {
+      return toString().toLowerCase(Locale.ROOT);
+    }
+
+    public byte[] getBytes() {
+      return toLower().getBytes(Charset.forName("UTF-8"));
+    }
+
+    public static String getParam() {
+      return "process";
+    }
+
+  }
+
+  /**
+   * The possible states of the CDCR buffer
+   */
+  public enum BufferState {
+    ENABLED,
+    DISABLED;
+
+    public static BufferState get(byte[] state) {
+      if (state != null) {
+        try {
+          return BufferState.valueOf(new String(state, Charset.forName("UTF-8")).toUpperCase(Locale.ROOT));
+        } catch (Exception e) {
+        }
+      }
+      return null;
+    }
+
+    public String toLower() {
+      return toString().toLowerCase(Locale.ROOT);
+    }
+
+    public byte[] getBytes() {
+      return toLower().getBytes(Charset.forName("UTF-8"));
+    }
+
+    public static String getParam() {
+      return "buffer";
+    }
+
+  }
+}
+

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java Fri May 22 18:58:29 2015
@@ -0,0 +1,170 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.core.SolrCore;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Manage the life-cycle state of the CDCR process. It is responsible of synchronising the state
+ * through Zookeeper. The state of the CDCR process is stored in the zk node defined by {@link #getZnodePath()}.
+ * </p>
+ * <p>
+ * It takes care of notifying the {@link CdcrReplicatorManager} in case
+ * of a process state change.
+ * </p>
+ */
+class CdcrProcessStateManager extends CdcrStateManager {
+
+  private CdcrParams.ProcessState state = DEFAULT_STATE;
+
+  private ProcessStateWatcher wrappedWatcher;
+  private Watcher watcher;
+
+  private SolrCore core;
+
+  /**
+   * The default state must be STOPPED. See comments in
+   * {@link #setState(org.apache.solr.handler.CdcrParams.ProcessState)}.
+   */
+  static CdcrParams.ProcessState DEFAULT_STATE = CdcrParams.ProcessState.STOPPED;
+
+  protected static Logger log = LoggerFactory.getLogger(CdcrProcessStateManager.class);
+
+  CdcrProcessStateManager(final SolrCore core) {
+    this.core = core;
+
+    // Ensure that the status znode exists
+    this.createStateNode();
+
+    // Register the watcher at startup
+    try {
+      SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+      watcher = this.initWatcher(zkClient);
+      this.setState(CdcrParams.ProcessState.get(zkClient.getData(this.getZnodePath(), watcher, null, true)));
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Failed fetching initial state", e);
+    }
+  }
+
+  /**
+   * SolrZkClient does not guarantee that a watch object will only be triggered once for a given notification
+   * if we does not wrap the watcher - see SOLR-6621.
+   */
+  private Watcher initWatcher(SolrZkClient zkClient) {
+    wrappedWatcher = new ProcessStateWatcher();
+    return zkClient.wrapWatcher(wrappedWatcher);
+  }
+
+  private String getZnodeBase() {
+    return "/collections/" + core.getCoreDescriptor().getCloudDescriptor().getCollectionName() + "/cdcr/state";
+  }
+
+  private String getZnodePath() {
+    return getZnodeBase() + "/process";
+  }
+
+  void setState(CdcrParams.ProcessState state) {
+    if (this.state != state) {
+      this.state = state;
+      this.callback(); // notify the observers of a state change
+    }
+  }
+
+  CdcrParams.ProcessState getState() {
+    return state;
+  }
+
+  /**
+   * Synchronise the state to Zookeeper. This method must be called only by the handler receiving the
+   * action.
+   */
+  void synchronize() {
+    SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+    try {
+      zkClient.setData(this.getZnodePath(), this.getState().getBytes(), true);
+      // check if nobody changed it in the meantime, and set a new watcher
+      this.setState(CdcrParams.ProcessState.get(zkClient.getData(this.getZnodePath(), watcher, null, true)));
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Failed synchronising new state", e);
+    }
+  }
+
+  private void createStateNode() {
+    SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+    try {
+      if (!zkClient.exists(this.getZnodePath(), true)) {
+        if (!zkClient.exists(this.getZnodeBase(), true)) {
+          zkClient.makePath(this.getZnodeBase(), CreateMode.PERSISTENT, true);
+        }
+        zkClient.create(this.getZnodePath(), DEFAULT_STATE.getBytes(), CreateMode.PERSISTENT, true);
+        log.info("Created znode {}", this.getZnodePath());
+      }
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Failed to create CDCR process state node", e);
+    }
+  }
+
+  void shutdown() {
+    if (wrappedWatcher != null) {
+      wrappedWatcher.cancel(); // cancel the watcher to avoid spurious warn messages during shutdown
+    }
+  }
+
+  private class ProcessStateWatcher implements Watcher {
+
+    private boolean isCancelled = false;
+
+    /**
+     * Cancel the watcher to avoid spurious warn messages during shutdown.
+     */
+    void cancel() {
+      isCancelled = true;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      if (isCancelled) return; // if the watcher is cancelled, do nothing.
+      String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+      String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+
+      log.info("The CDCR process state has changed: {} @ {}:{}", event, collectionName, shard);
+      if (Event.EventType.None.equals(event.getType())) {
+        return;
+      }
+      SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+      try {
+        CdcrParams.ProcessState state = CdcrParams.ProcessState.get(zkClient.getData(CdcrProcessStateManager.this.getZnodePath(), watcher, null, true));
+        log.info("Received new CDCR process state from watcher: {} @ {}:{}", state, collectionName, shard);
+        CdcrProcessStateManager.this.setState(state);
+      } catch (KeeperException | InterruptedException e) {
+        log.warn("Failed synchronising new state @ " + collectionName + ":" + shard, e);
+      }
+    }
+
+  }
+
+}
+

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java Fri May 22 18:58:29 2015
@@ -0,0 +1,222 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.update.CdcrUpdateLog;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.processor.CdcrUpdateProcessor;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+/**
+ * The replication logic. Given a {@link org.apache.solr.handler.CdcrReplicatorState}, it reads all the new entries
+ * in the update log and forward them to the target cluster. If an error occurs, the replication is stopped and
+ * will be tried again later.
+ */
+public class CdcrReplicator implements Runnable {
+
+  private final CdcrReplicatorState state;
+  private final int batchSize;
+
+  protected static Logger log = LoggerFactory.getLogger(CdcrReplicator.class);
+
+  public CdcrReplicator(CdcrReplicatorState state, int batchSize) {
+    this.state = state;
+    this.batchSize = batchSize;
+  }
+
+  @Override
+  public void run() {
+    CdcrUpdateLog.CdcrLogReader logReader = state.getLogReader();
+    CdcrUpdateLog.CdcrLogReader subReader = null;
+    if (logReader == null) {
+      log.warn("Log reader for target {} is not initialised, it will be ignored.", state.getTargetCollection());
+      return;
+    }
+
+    try {
+      // create update request
+      UpdateRequest req = new UpdateRequest();
+      // Add the param to indicate the {@link CdcrUpdateProcessor} to keep the provided version number
+      req.setParam(CdcrUpdateProcessor.CDCR_UPDATE, "");
+
+      // Start the benchmakr timer
+      state.getBenchmarkTimer().start();
+
+      long counter = 0;
+      subReader = logReader.getSubReader();
+
+      for (int i = 0; i < batchSize; i++) {
+        Object o = subReader.next();
+        if (o == null) break; // we have reached the end of the update logs, we should close the batch
+
+        if (isDelete(o)) {
+
+          /*
+          * Deletes are sent one at a time.
+          */
+
+          // First send out current batch of SolrInputDocument, the non-deletes.
+          List<SolrInputDocument> docs = req.getDocuments();
+
+          if (docs != null && docs.size() > 0) {
+            subReader.resetToLastPosition(); // Push back the delete for now.
+            this.sendRequest(req); // Send the batch update request
+            logReader.forwardSeek(subReader); // Advance the main reader to just before the delete.
+            o = subReader.next(); // Read the delete again
+            counter += docs.size();
+            req.clear();
+          }
+
+          // Process Delete
+          this.processUpdate(o, req);
+          this.sendRequest(req);
+          logReader.forwardSeek(subReader);
+          counter++;
+          req.clear();
+
+        } else {
+
+          this.processUpdate(o, req);
+
+        }
+      }
+
+      //Send the final batch out.
+      List<SolrInputDocument> docs = req.getDocuments();
+
+      if ((docs != null && docs.size() > 0)) {
+        this.sendRequest(req);
+        counter += docs.size();
+      }
+
+      // we might have read a single commit operation and reached the end of the update logs
+      logReader.forwardSeek(subReader);
+
+      log.debug("Forwarded {} updates to target {}", counter, state.getTargetCollection());
+    } catch (Exception e) {
+      // report error and update error stats
+      this.handleException(e);
+    } finally {
+      // stop the benchmark timer
+      state.getBenchmarkTimer().stop();
+      // ensure that the subreader is closed and the associated pointer is removed
+      if (subReader != null) subReader.close();
+    }
+  }
+
+  private void sendRequest(UpdateRequest req) throws IOException, SolrServerException, CdcrReplicatorException {
+    UpdateResponse rsp = req.process(state.getClient());
+    if (rsp.getStatus() != 0) {
+      throw new CdcrReplicatorException(req, rsp);
+    }
+    state.resetConsecutiveErrors();
+  }
+
+  private boolean isDelete(Object o) {
+    List entry = (List) o;
+    int operationAndFlags = (Integer) entry.get(0);
+    int oper = operationAndFlags & UpdateLog.OPERATION_MASK;
+    return oper == UpdateLog.DELETE_BY_QUERY || oper == UpdateLog.DELETE;
+  }
+
+  private void handleException(Exception e) {
+    if (e instanceof CdcrReplicatorException) {
+      UpdateRequest req = ((CdcrReplicatorException) e).req;
+      UpdateResponse rsp = ((CdcrReplicatorException) e).rsp;
+      log.warn("Failed to forward update request {}. Got response {}", req, rsp);
+      state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
+    } else if (e instanceof CloudSolrClient.RouteException) {
+      log.warn("Failed to forward update request", e);
+      state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
+    } else {
+      log.warn("Failed to forward update request", e);
+      state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
+    }
+  }
+
+  private UpdateRequest processUpdate(Object o, UpdateRequest req) {
+
+    // should currently be a List<Oper,Ver,Doc/Id>
+    List entry = (List) o;
+
+    int operationAndFlags = (Integer) entry.get(0);
+    int oper = operationAndFlags & UpdateLog.OPERATION_MASK;
+    long version = (Long) entry.get(1);
+
+    // record the operation in the benchmark timer
+    state.getBenchmarkTimer().incrementCounter(oper);
+
+    switch (oper) {
+      case UpdateLog.ADD: {
+        // the version is already attached to the document
+        SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
+        req.add(sdoc);
+        return req;
+      }
+      case UpdateLog.DELETE: {
+        byte[] idBytes = (byte[]) entry.get(2);
+        req.deleteById(new String(idBytes, Charset.forName("UTF-8")));
+        req.setParam(DistributedUpdateProcessor.VERSION_FIELD, Long.toString(version));
+        return req;
+      }
+
+      case UpdateLog.DELETE_BY_QUERY: {
+        String query = (String) entry.get(2);
+        req.deleteByQuery(query);
+        req.setParam(DistributedUpdateProcessor.VERSION_FIELD, Long.toString(version));
+        return req;
+      }
+
+      case UpdateLog.COMMIT: {
+        return null;
+      }
+
+      default:
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+    }
+  }
+
+  /**
+   * Exception to catch update request issues with the target cluster.
+   */
+  public class CdcrReplicatorException extends Exception {
+
+    private final UpdateRequest req;
+    private final UpdateResponse rsp;
+
+    public CdcrReplicatorException(UpdateRequest req, UpdateResponse rsp) {
+      this.req = req;
+      this.rsp = rsp;
+    }
+
+  }
+
+}
+

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java Fri May 22 18:58:29 2015
@@ -0,0 +1,170 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.CdcrUpdateLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
+
+  private List<CdcrReplicatorState> replicatorStates;
+
+  private final CdcrReplicatorScheduler scheduler;
+  private CdcrProcessStateManager processStateManager;
+  private CdcrLeaderStateManager leaderStateManager;
+
+  private SolrCore core;
+  private String path;
+
+  protected static Logger log = LoggerFactory.getLogger(CdcrReplicatorManager.class);
+
+  CdcrReplicatorManager(final SolrCore core, String path,
+                        SolrParams replicatorConfiguration,
+                        Map<String, List<SolrParams>> replicasConfiguration) {
+    this.core = core;
+    this.path = path;
+
+    // create states
+    replicatorStates = new ArrayList<>();
+    String myCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+    List<SolrParams> targets = replicasConfiguration.get(myCollection);
+    if (targets != null) {
+      for (SolrParams params : targets) {
+        String zkHost = params.get(CdcrParams.ZK_HOST_PARAM);
+        String targetCollection = params.get(CdcrParams.TARGET_COLLECTION_PARAM);
+
+        CloudSolrClient client = new CloudSolrClient(zkHost, true);
+        client.setDefaultCollection(targetCollection);
+        replicatorStates.add(new CdcrReplicatorState(targetCollection, zkHost, client));
+      }
+    }
+
+    this.scheduler = new CdcrReplicatorScheduler(this, replicatorConfiguration);
+  }
+
+  void setProcessStateManager(final CdcrProcessStateManager processStateManager) {
+    this.processStateManager = processStateManager;
+    this.processStateManager.register(this);
+  }
+
+  void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
+    this.leaderStateManager = leaderStateManager;
+    this.leaderStateManager.register(this);
+  }
+
+  /**
+   * <p>
+   * Inform the replicator manager of a change of state, and tell him to update its own state.
+   * </p>
+   * <p>
+   * If we are the leader and the process state is STARTED, we need to initialise the log readers and start the
+   * scheduled thread poll.
+   * Otherwise, if the process state is STOPPED or if we are not the leader, we need to close the log readers and stop
+   * the thread pool.
+   * </p>
+   * <p>
+   * This method is synchronised as it can both be called by the leaderStateManager and the processStateManager.
+   * </p>
+   */
+  @Override
+  public synchronized void stateUpdate() {
+    if (leaderStateManager.amILeader() && processStateManager.getState().equals(CdcrParams.ProcessState.STARTED)) {
+      this.initLogReaders();
+      this.scheduler.start();
+      return;
+    }
+
+    this.scheduler.shutdown();
+    this.closeLogReaders();
+  }
+
+  List<CdcrReplicatorState> getReplicatorStates() {
+    return replicatorStates;
+  }
+
+  void initLogReaders() {
+    String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+    String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+    CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
+
+    for (CdcrReplicatorState state : replicatorStates) {
+      state.closeLogReader();
+      try {
+        long checkpoint = this.getCheckpoint(state);
+        log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
+            checkpoint, collectionName, shard);
+        CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
+        reader.seek(checkpoint);
+        state.init(reader);
+      } catch (IOException | SolrServerException | SolrException e) {
+        log.warn("Unable to instantiate the log reader for target collection " + state.getTargetCollection(), e);
+      } catch (InterruptedException e) {
+        log.warn("Thread interrupted while instantiate the log reader for target collection " + state.getTargetCollection(), e);
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private long getCheckpoint(CdcrReplicatorState state) throws IOException, SolrServerException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
+
+    SolrRequest request = new QueryRequest(params);
+    request.setPath(path);
+
+    NamedList response = state.getClient().request(request);
+    return (Long) response.get(CdcrParams.CHECKPOINT);
+  }
+
+  void closeLogReaders() {
+    for (CdcrReplicatorState state : replicatorStates) {
+      state.closeLogReader();
+    }
+  }
+
+  /**
+   * Shutdown all the {@link org.apache.solr.handler.CdcrReplicatorState} by closing their
+   * {@link org.apache.solr.client.solrj.impl.CloudSolrClient} and
+   * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}.
+   */
+  void shutdown() {
+    this.scheduler.shutdown();
+    for (CdcrReplicatorState state : replicatorStates) {
+      state.shutdown();
+    }
+    replicatorStates.clear();
+  }
+
+}
+

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java Fri May 22 18:58:29 2015
@@ -0,0 +1,118 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * Schedule the execution of the {@link org.apache.solr.handler.CdcrReplicator} threads at
+ * regular time interval. It relies on a queue of {@link org.apache.solr.handler.CdcrReplicatorState} in
+ * order to avoid that one {@link org.apache.solr.handler.CdcrReplicatorState} is used by two threads at the same
+ * time.
+ */
+class CdcrReplicatorScheduler {
+
+  private boolean isStarted = false;
+
+  private ScheduledExecutorService scheduler;
+  private ExecutorService replicatorsPool;
+
+  private final CdcrReplicatorManager replicatorManager;
+  private final ConcurrentLinkedQueue<CdcrReplicatorState> statesQueue;
+
+  private int poolSize = DEFAULT_POOL_SIZE;
+  private int timeSchedule = DEFAULT_TIME_SCHEDULE;
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private static final int DEFAULT_POOL_SIZE = 2;
+  private static final int DEFAULT_TIME_SCHEDULE = 10;
+  private static final int DEFAULT_BATCH_SIZE = 128;
+
+  protected static Logger log = LoggerFactory.getLogger(CdcrReplicatorScheduler.class);
+
+  CdcrReplicatorScheduler(final CdcrReplicatorManager replicatorStatesManager, final SolrParams replicatorConfiguration) {
+    this.replicatorManager = replicatorStatesManager;
+    this.statesQueue = new ConcurrentLinkedQueue<>(replicatorManager.getReplicatorStates());
+    if (replicatorConfiguration != null) {
+      poolSize = replicatorConfiguration.getInt(CdcrParams.THREAD_POOL_SIZE_PARAM, DEFAULT_POOL_SIZE);
+      timeSchedule = replicatorConfiguration.getInt(CdcrParams.SCHEDULE_PARAM, DEFAULT_TIME_SCHEDULE);
+      batchSize = replicatorConfiguration.getInt(CdcrParams.BATCH_SIZE_PARAM, DEFAULT_BATCH_SIZE);
+    }
+  }
+
+  void start() {
+    if (!isStarted) {
+      scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-scheduler"));
+      //replicatorsPool = Executors.newFixedThreadPool(poolSize, new DefaultSolrThreadFactory("cdcr-replicator"));
+      replicatorsPool = ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new DefaultSolrThreadFactory("cdcr-replicator"));
+
+      // the scheduler thread is executed every second and submits one replication task
+      // per available state in the queue
+      scheduler.scheduleWithFixedDelay(new Runnable() {
+
+        @Override
+        public void run() {
+          int nCandidates = statesQueue.size();
+          for (int i = 0; i < nCandidates; i++) {
+            // a thread that pool one state from the queue, execute the replication task, and push back
+            // the state in the queue when the task is completed
+            replicatorsPool.execute(new Runnable() {
+
+              @Override
+              public void run() {
+                CdcrReplicatorState state = statesQueue.poll();
+                try {
+                  new CdcrReplicator(state, batchSize).run();
+                } finally {
+                  statesQueue.offer(state);
+                }
+              }
+
+            });
+
+          }
+        }
+
+      }, 0, timeSchedule, TimeUnit.MILLISECONDS);
+      isStarted = true;
+    }
+  }
+
+  void shutdown() {
+    if (isStarted) {
+      replicatorsPool.shutdown();
+      try {
+        replicatorsPool.awaitTermination(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        log.warn("Thread interrupted while waiting for CDCR replicator threadpool close.");
+        Thread.currentThread().interrupt();
+      } finally {
+        scheduler.shutdownNow();
+        isStarted = false;
+      }
+    }
+  }
+
+}
+

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java Fri May 22 18:58:29 2015
@@ -0,0 +1,270 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.schema.TrieDateField;
+import org.apache.solr.update.CdcrUpdateLog;
+import org.apache.solr.update.UpdateLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * The state of the replication with a target cluster.
+ */
+class CdcrReplicatorState {
+
+  private final String targetCollection;
+  private final String zkHost;
+  private final CloudSolrClient targetClient;
+
+  private CdcrUpdateLog.CdcrLogReader logReader;
+
+  private long consecutiveErrors = 0;
+  private final Map<ErrorType, Long> errorCounters = new HashMap<>();
+  private final FixedQueue<ErrorQueueEntry> errorsQueue = new FixedQueue<>(100); // keep the last 100 errors
+
+  private BenchmarkTimer benchmarkTimer;
+
+  private static Logger log = LoggerFactory.getLogger(CdcrReplicatorState.class);
+
+  CdcrReplicatorState(final String targetCollection, final String zkHost, final CloudSolrClient targetClient) {
+    this.targetCollection = targetCollection;
+    this.targetClient = targetClient;
+    this.zkHost = zkHost;
+    this.benchmarkTimer = new BenchmarkTimer();
+  }
+
+  /**
+   * Initialise the replicator state with a {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}
+   * that is positioned at the last target cluster checkpoint.
+   */
+  void init(final CdcrUpdateLog.CdcrLogReader logReader) {
+    this.logReader = logReader;
+  }
+
+  void closeLogReader() {
+    if (logReader != null) {
+      logReader.close();
+      logReader = null;
+    }
+  }
+
+  CdcrUpdateLog.CdcrLogReader getLogReader() {
+    return logReader;
+  }
+
+  String getTargetCollection() {
+    return targetCollection;
+  }
+
+  String getZkHost() {
+    return zkHost;
+  }
+
+  CloudSolrClient getClient() {
+    return targetClient;
+  }
+
+  void shutdown() {
+    try {
+      targetClient.close();
+    } catch (IOException ioe) {
+      log.warn("Caught exception trying to close server: ", ioe.getMessage());
+    }
+    this.closeLogReader();
+  }
+
+  void reportError(ErrorType error) {
+    if (!errorCounters.containsKey(error)) {
+      errorCounters.put(error, 0l);
+    }
+    errorCounters.put(error, errorCounters.get(error) + 1);
+    errorsQueue.add(new ErrorQueueEntry(error, System.currentTimeMillis()));
+    consecutiveErrors++;
+  }
+
+  void resetConsecutiveErrors() {
+    consecutiveErrors = 0;
+  }
+
+  /**
+   * Returns the number of consecutive errors encountered while trying to forward updates to the target.
+   */
+  long getConsecutiveErrors() {
+    return consecutiveErrors;
+  }
+
+  /**
+   * Gets the number of errors of a particular type.
+   */
+  long getErrorCount(ErrorType type) {
+    if (errorCounters.containsKey(type)) {
+      return errorCounters.get(type);
+    } else {
+      return 0;
+    }
+  }
+
+  /**
+   * Gets the last errors ordered by timestamp (most recent first)
+   */
+  List<String[]> getLastErrors() {
+    List<String[]> lastErrors = new ArrayList<>();
+    synchronized (errorsQueue) {
+      Iterator<ErrorQueueEntry> it = errorsQueue.iterator();
+      while (it.hasNext()) {
+        ErrorQueueEntry entry = it.next();
+        lastErrors.add(new String[]{TrieDateField.formatExternal(new Date(entry.timestamp)), entry.type.toLower()});
+      }
+    }
+    return lastErrors;
+  }
+
+  /**
+   * Return the timestamp of the last processed operations
+   */
+  String getTimestampOfLastProcessedOperation() {
+    if (logReader != null && logReader.getLastVersion() != -1) {
+      // Shift back to the right by 20 bits the version number - See VersionInfo#getNewClock
+      return TrieDateField.formatExternal(new Date(logReader.getLastVersion() >> 20));
+    }
+    return new String();
+  }
+
+  /**
+   * Gets the benchmark timer.
+   */
+  BenchmarkTimer getBenchmarkTimer() {
+    return this.benchmarkTimer;
+  }
+
+  enum ErrorType {
+    INTERNAL,
+    BAD_REQUEST;
+
+    public String toLower() {
+      return toString().toLowerCase(Locale.ROOT);
+    }
+
+  }
+
+  class BenchmarkTimer {
+
+    private long startTime;
+    private long runTime = 0;
+    private Map<Integer, Long> opCounters = new HashMap<>();
+
+    /**
+     * Start recording time.
+     */
+    void start() {
+      startTime = System.nanoTime();
+    }
+
+    /**
+     * Stop recording time.
+     */
+    void stop() {
+      runTime += System.nanoTime() - startTime;
+      startTime = -1;
+    }
+
+    void incrementCounter(final int operationType) {
+      switch (operationType) {
+        case UpdateLog.ADD:
+        case UpdateLog.DELETE:
+        case UpdateLog.DELETE_BY_QUERY: {
+          if (!opCounters.containsKey(operationType)) {
+            opCounters.put(operationType, 0l);
+          }
+          opCounters.put(operationType, opCounters.get(operationType) + 1);
+          return;
+        }
+
+        default:
+          return;
+      }
+    }
+
+    long getRunTime() {
+      long totalRunTime = runTime;
+      if (startTime != -1) { // we are currently recording the time
+        totalRunTime += System.nanoTime() - startTime;
+      }
+      return totalRunTime;
+    }
+
+    double getOperationsPerSecond() {
+      long total = 0;
+      for (long counter : opCounters.values()) {
+        total += counter;
+      }
+      double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
+      return total / elapsedTimeInSeconds;
+    }
+
+    double getAddsPerSecond() {
+      long total = opCounters.get(UpdateLog.ADD) != null ? opCounters.get(UpdateLog.ADD) : 0;
+      double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
+      return total / elapsedTimeInSeconds;
+    }
+
+    double getDeletesPerSecond() {
+      long total = opCounters.get(UpdateLog.DELETE) != null ? opCounters.get(UpdateLog.DELETE) : 0;
+      total += opCounters.get(UpdateLog.DELETE_BY_QUERY) != null ? opCounters.get(UpdateLog.DELETE_BY_QUERY) : 0;
+      double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
+      return total / elapsedTimeInSeconds;
+    }
+
+  }
+
+  private class ErrorQueueEntry {
+
+    private ErrorType type;
+    private long timestamp;
+
+    private ErrorQueueEntry(ErrorType type, long timestamp) {
+      this.type = type;
+      this.timestamp = timestamp;
+    }
+  }
+
+  private class FixedQueue<E> extends LinkedList<E> {
+
+    private int maxSize;
+
+    public FixedQueue(int maxSize) {
+      this.maxSize = maxSize;
+    }
+
+    @Override
+    public synchronized boolean add(E e) {
+      super.addFirst(e);
+      if (size() > maxSize) {
+        removeLast();
+      }
+      return true;
+    }
+  }
+
+}
+