You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/05/22 20:58:29 UTC
svn commit: r1681186 [1/5] - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/processor/
core/src/java/org/apache/solr/util/ core/src/test-files/solr/collect...
Author: erick
Date: Fri May 22 18:58:29 2015
New Revision: 1681186
URL: http://svn.apache.org/r1681186
Log:
SOLR-6273: Cross Data Center Replication
Added:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrParams.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrStateManager.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrUpdateLogSynchronizer.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java (with props)
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessorFactory.java (with props)
lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcr.xml (with props)
lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcrupdatelog.xml (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java (with props)
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrCLI.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Fri May 22 18:58:29 2015
@@ -59,9 +59,37 @@ Other Changes
* SOLR-7355: Switch from Google's ConcurrentLinkedHashMap to Caffeine. Only
affects HDFS support. (Ben Manes via Shawn Heisey)
+
================== 5.3.0 ==================
-(No Changes)
+Versions of Major Components
+---------------------
+(no changes)
+
+Upgrading from Solr 5.2
+-----------------------
+(no changes)
+
+Detailed Change List
+----------------------
+
+New Features
+----------------------
+
+* SOLR-6273: Cross Data Center Replication. Active/passive replication for separate
+ SolrClouds hosted on separate data centers. (Renaud Delbru, Yonik Seeley via Erick Erickson)
+
+Bug Fixes
+----------------------
+(no changes)
+
+Optimizations
+----------------------
+(no changes)
+
+Other Changes
+----------------------
+(no changes)
================== 5.2.0 ==================
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferManager.java Fri May 22 18:58:29 2015
@@ -0,0 +1,76 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.CdcrUpdateLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This manager is responsible in enabling or disabling the buffering of the update logs. Currently, buffer
+ * is always activated for non-leader nodes. For leader nodes, it is enabled only if the user explicitly
+ * enabled it with the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#ENABLEBUFFER}.
+ */
+class CdcrBufferManager implements CdcrStateManager.CdcrStateObserver {
+
+ private CdcrLeaderStateManager leaderStateManager;
+ private CdcrBufferStateManager bufferStateManager;
+
+ private final SolrCore core;
+
+ protected static Logger log = LoggerFactory.getLogger(CdcrBufferManager.class);
+
+ CdcrBufferManager(SolrCore core) {
+ this.core = core;
+ }
+
+ void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
+ this.leaderStateManager = leaderStateManager;
+ this.leaderStateManager.register(this);
+ }
+
+ void setBufferStateManager(final CdcrBufferStateManager bufferStateManager) {
+ this.bufferStateManager = bufferStateManager;
+ this.bufferStateManager.register(this);
+ }
+
+ /**
+ * This method is synchronised as it can both be called by the leaderStateManager and the bufferStateManager.
+ */
+ @Override
+ public synchronized void stateUpdate() {
+ CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
+
+ // If I am not the leader, I should always buffer my updates
+ if (!leaderStateManager.amILeader()) {
+ ulog.enableBuffer();
+ return;
+ }
+ // If I am the leader, I should buffer my updates only if buffer is enabled
+ else if (bufferStateManager.getState().equals(CdcrParams.BufferState.ENABLED)) {
+ ulog.enableBuffer();
+ return;
+ }
+
+ // otherwise, disable the buffer
+ ulog.disableBuffer();
+ }
+
+}
+
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java Fri May 22 18:58:29 2015
@@ -0,0 +1,171 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.core.SolrCore;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+
+/**
+ * Manage the state of the update log buffer. It is responsible of synchronising the state
+ * through Zookeeper. The state of the buffer is stored in the zk node defined by {@link #getZnodePath()}.
+ */
+class CdcrBufferStateManager extends CdcrStateManager {
+
+ private CdcrParams.BufferState state = DEFAULT_STATE;
+
+ private BufferStateWatcher wrappedWatcher;
+ private Watcher watcher;
+
+ private SolrCore core;
+
+ static CdcrParams.BufferState DEFAULT_STATE = CdcrParams.BufferState.ENABLED;
+
+ protected static Logger log = LoggerFactory.getLogger(CdcrBufferStateManager.class);
+
+ CdcrBufferStateManager(final SolrCore core, SolrParams bufferConfiguration) {
+ this.core = core;
+
+ // Ensure that the state znode exists
+ this.createStateNode();
+
+ // set default state
+ if (bufferConfiguration != null) {
+ byte[] defaultState = bufferConfiguration.get(
+ CdcrParams.DEFAULT_STATE_PARAM, DEFAULT_STATE.toLower()).getBytes(Charset.forName("UTF-8"));
+ state = CdcrParams.BufferState.get(defaultState);
+ }
+ this.setState(state); // notify observers
+
+ // Startup and register the watcher at startup
+ try {
+ SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+ watcher = this.initWatcher(zkClient);
+ this.setState(CdcrParams.BufferState.get(zkClient.getData(this.getZnodePath(), watcher, null, true)));
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Failed fetching initial state", e);
+ }
+ }
+
+ /**
+ * SolrZkClient does not guarantee that a watch object will only be triggered once for a given notification
+ * if we does not wrap the watcher - see SOLR-6621.
+ */
+ private Watcher initWatcher(SolrZkClient zkClient) {
+ wrappedWatcher = new BufferStateWatcher();
+ return zkClient.wrapWatcher(wrappedWatcher);
+ }
+
+ private String getZnodeBase() {
+ return "/collections/" + core.getCoreDescriptor().getCloudDescriptor().getCollectionName() + "/cdcr/state";
+ }
+
+ private String getZnodePath() {
+ return getZnodeBase() + "/buffer";
+ }
+
+ void setState(CdcrParams.BufferState state) {
+ if (this.state != state) {
+ this.state = state;
+ this.callback(); // notify the observers of a state change
+ }
+ }
+
+ CdcrParams.BufferState getState() {
+ return state;
+ }
+
+ /**
+ * Synchronise the state to Zookeeper. This method must be called only by the handler receiving the
+ * action.
+ */
+ void synchronize() {
+ SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+ try {
+ zkClient.setData(this.getZnodePath(), this.getState().getBytes(), true);
+ // check if nobody changed it in the meantime, and set a new watcher
+ this.setState(CdcrParams.BufferState.get(zkClient.getData(this.getZnodePath(), watcher, null, true)));
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Failed synchronising new state", e);
+ }
+ }
+
+ private void createStateNode() {
+ SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+ try {
+ if (!zkClient.exists(this.getZnodePath(), true)) {
+ if (!zkClient.exists(this.getZnodeBase(), true)) {
+ zkClient.makePath(this.getZnodeBase(), CreateMode.PERSISTENT, true);
+ }
+ zkClient.create(this.getZnodePath(), DEFAULT_STATE.getBytes(), CreateMode.PERSISTENT, true);
+ log.info("Created znode {}", this.getZnodePath());
+ }
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Failed to create CDCR buffer state node", e);
+ }
+ }
+
+ void shutdown() {
+ if (wrappedWatcher != null) {
+ wrappedWatcher.cancel(); // cancel the watcher to avoid spurious warn messages during shutdown
+ }
+ }
+
+ private class BufferStateWatcher implements Watcher {
+
+ private boolean isCancelled = false;
+
+ /**
+ * Cancel the watcher to avoid spurious warn messages during shutdown.
+ */
+ void cancel() {
+ isCancelled = true;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (isCancelled) return; // if the watcher is cancelled, do nothing.
+ String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+
+ log.info("The CDCR buffer state has changed: {} @ {}:{}", event, collectionName, shard);
+ if (Event.EventType.None.equals(event.getType())) {
+ return;
+ }
+ SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+ try {
+ CdcrParams.BufferState state = CdcrParams.BufferState.get(zkClient.getData(CdcrBufferStateManager.this.getZnodePath(), watcher, null, true));
+ log.info("Received new CDCR buffer state from watcher: {} @ {}:{}", state, collectionName, shard);
+ CdcrBufferStateManager.this.setState(state);
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Failed synchronising new state @ " + collectionName + ":" + shard, e);
+ }
+ }
+
+ }
+
+}
+
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java Fri May 22 18:58:29 2015
@@ -0,0 +1,158 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.core.SolrCore;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Manage the leader state of the CDCR nodes.
+ * </p>
+ * <p>
+ * It takes care of notifying the {@link CdcrReplicatorManager} in case
+ * of a leader state change.
+ * </p>
+ */
+class CdcrLeaderStateManager extends CdcrStateManager {
+
+ private boolean amILeader = false;
+
+ private LeaderStateWatcher wrappedWatcher;
+ private Watcher watcher;
+
+ private SolrCore core;
+
+ protected static Logger log = LoggerFactory.getLogger(CdcrProcessStateManager.class);
+
+ CdcrLeaderStateManager(final SolrCore core) {
+ this.core = core;
+
+ // Fetch leader state and register the watcher at startup
+ try {
+ SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+ ClusterState clusterState = core.getCoreDescriptor().getCoreContainer().getZkController().getClusterState();
+
+ watcher = this.initWatcher(zkClient);
+ // if the node does not exist, it means that the leader was not yet registered. This can happen
+ // when the cluster is starting up. The core is not yet fully loaded, and the leader election process
+ // is waiting for it.
+ if (this.isLeaderRegistered(zkClient, clusterState)) {
+ this.checkIfIAmLeader();
+ }
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Failed fetching initial leader state and setting watch", e);
+ }
+ }
+
+ /**
+ * Checks if the leader is registered. If it is not registered, we are probably at the
+ * initialisation phase of the cluster. In this case, we must attach a watcher to
+ * be notified when the leader is registered.
+ */
+ private boolean isLeaderRegistered(SolrZkClient zkClient, ClusterState clusterState)
+ throws KeeperException, InterruptedException {
+ // First check if the znode exists, and register the watcher at the same time
+ return zkClient.exists(this.getZnodePath(), watcher, true) != null;
+ }
+
+ /**
+ * SolrZkClient does not guarantee that a watch object will only be triggered once for a given notification
+ * if we does not wrap the watcher - see SOLR-6621.
+ */
+ private Watcher initWatcher(SolrZkClient zkClient) {
+ wrappedWatcher = new LeaderStateWatcher();
+ return zkClient.wrapWatcher(wrappedWatcher);
+ }
+
+ private void checkIfIAmLeader() throws KeeperException, InterruptedException {
+ SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+ ZkNodeProps props = ZkNodeProps.load(zkClient.getData(CdcrLeaderStateManager.this.getZnodePath(), null, null, true));
+ if (props != null) {
+ CdcrLeaderStateManager.this.setAmILeader(props.get("core").equals(core.getName()));
+ }
+ }
+
+ private String getZnodePath() {
+ String myShardId = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+ String myCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ return "/collections/" + myCollection + "/leaders/" + myShardId;
+ }
+
+ void setAmILeader(boolean amILeader) {
+ if (this.amILeader != amILeader) {
+ this.amILeader = amILeader;
+ this.callback(); // notify the observers of a state change
+ }
+ }
+
+ boolean amILeader() {
+ return amILeader;
+ }
+
+ void shutdown() {
+ if (wrappedWatcher != null) {
+ wrappedWatcher.cancel(); // cancel the watcher to avoid spurious warn messages during shutdown
+ }
+ }
+
+ private class LeaderStateWatcher implements Watcher {
+
+ private boolean isCancelled = false;
+
+ /**
+ * Cancel the watcher to avoid spurious warn messages during shutdown.
+ */
+ void cancel() {
+ isCancelled = true;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (isCancelled) return; // if the watcher is cancelled, do nothing.
+ String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+
+ log.debug("The leader state has changed: {} @ {}:{}", event, collectionName, shard);
+ if (Event.EventType.None.equals(event.getType())) {
+ return;
+ }
+
+ try {
+ log.info("Received new leader state @ {}:{}", collectionName, shard);
+ SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+ ClusterState clusterState = core.getCoreDescriptor().getCoreContainer().getZkController().getClusterState();
+ if (CdcrLeaderStateManager.this.isLeaderRegistered(zkClient, clusterState)) {
+ CdcrLeaderStateManager.this.checkIfIAmLeader();
+ }
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Failed updating leader state and setting watch @ " + collectionName + ":" + shard, e);
+ }
+ }
+
+ }
+
+}
+
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrParams.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrParams.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrParams.java Fri May 22 18:58:29 2015
@@ -0,0 +1,249 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.nio.charset.Charset;
+import java.util.Locale;
+
+public class CdcrParams {
+
+ /**
+ * The definition of a replica configuration *
+ */
+ public static final String REPLICA_PARAM = "replica";
+
+ /**
+ * The source collection of a replica *
+ */
+ public static final String SOURCE_COLLECTION_PARAM = "source";
+
+ /**
+ * The target collection of a replica *
+ */
+ public static final String TARGET_COLLECTION_PARAM = "target";
+
+ /**
+ * The Zookeeper host of the target cluster hosting the replica *
+ */
+ public static final String ZK_HOST_PARAM = "zkHost";
+
+ /**
+ * The definition of the {@link org.apache.solr.handler.CdcrReplicatorScheduler} configuration *
+ */
+ public static final String REPLICATOR_PARAM = "replicator";
+
+ /**
+ * The thread pool size of the replicator *
+ */
+ public static final String THREAD_POOL_SIZE_PARAM = "threadPoolSize";
+
+ /**
+ * The time schedule (in ms) of the replicator *
+ */
+ public static final String SCHEDULE_PARAM = "schedule";
+
+ /**
+ * The batch size of the replicator *
+ */
+ public static final String BATCH_SIZE_PARAM = "batchSize";
+
+ /**
+ * The definition of the {@link org.apache.solr.handler.CdcrUpdateLogSynchronizer} configuration *
+ */
+ public static final String UPDATE_LOG_SYNCHRONIZER_PARAM = "updateLogSynchronizer";
+
+ /**
+ * The definition of the {@link org.apache.solr.handler.CdcrBufferManager} configuration *
+ */
+ public static final String BUFFER_PARAM = "buffer";
+
+ /**
+ * The default state at startup of the buffer *
+ */
+ public static final String DEFAULT_STATE_PARAM = "defaultState";
+
+ /**
+ * The latest update checkpoint on a target cluster *
+ */
+ public final static String CHECKPOINT = "checkpoint";
+
+ /**
+ * The last processed version on a source cluster *
+ */
+ public final static String LAST_PROCESSED_VERSION = "lastProcessedVersion";
+
+ /**
+ * A list of replica queues on a source cluster *
+ */
+ public final static String QUEUES = "queues";
+
+ /**
+ * The size of a replica queue on a source cluster *
+ */
+ public final static String QUEUE_SIZE = "queueSize";
+
+ /**
+ * The timestamp of the last processed operation in a replica queue *
+ */
+ public final static String LAST_TIMESTAMP = "lastTimestamp";
+
+ /**
+ * A list of qps statistics per collection *
+ */
+ public final static String OPERATIONS_PER_SECOND = "operationsPerSecond";
+
+ /**
+ * Overall counter *
+ */
+ public final static String COUNTER_ALL = "all";
+
+ /**
+ * Counter for Adds *
+ */
+ public final static String COUNTER_ADDS = "adds";
+
+ /**
+ * Counter for Deletes *
+ */
+ public final static String COUNTER_DELETES = "deletes";
+
+ /**
+ * A list of errors per target collection *
+ */
+ public final static String ERRORS = "errors";
+
+ /**
+ * Counter for consecutive errors encountered by a replicator thread *
+ */
+ public final static String CONSECUTIVE_ERRORS = "consecutiveErrors";
+
+ /**
+ * A list of the last errors encountered by a replicator thread *
+ */
+ public final static String LAST = "last";
+
+ /**
+ * Total size of transaction logs *
+ */
+ public final static String TLOG_TOTAL_SIZE = "tlogTotalSize";
+
+ /**
+ * Total count of transaction logs *
+ */
+ public final static String TLOG_TOTAL_COUNT = "tlogTotalCount";
+
+ /**
+ * The state of the update log synchronizer *
+ */
+ public final static String UPDATE_LOG_SYNCHRONIZER = "updateLogSynchronizer";
+
+ /**
+ * The actions supported by the CDCR API
+ */
+ public enum CdcrAction {
+ START,
+ STOP,
+ STATUS,
+ COLLECTIONCHECKPOINT,
+ SHARDCHECKPOINT,
+ ENABLEBUFFER,
+ DISABLEBUFFER,
+ LASTPROCESSEDVERSION,
+ QUEUES,
+ OPS,
+ ERRORS;
+
+ public static CdcrAction get(String p) {
+ if (p != null) {
+ try {
+ return CdcrAction.valueOf(p.toUpperCase(Locale.ROOT));
+ } catch (Exception e) {
+ }
+ }
+ return null;
+ }
+
+ public String toLower() {
+ return toString().toLowerCase(Locale.ROOT);
+ }
+
+ }
+
+ /**
+ * The possible states of the CDCR process
+ */
+ public enum ProcessState {
+ STARTED,
+ STOPPED;
+
+ public static ProcessState get(byte[] state) {
+ if (state != null) {
+ try {
+ return ProcessState.valueOf(new String(state, Charset.forName("UTF-8")).toUpperCase(Locale.ROOT));
+ } catch (Exception e) {
+ }
+ }
+ return null;
+ }
+
+ public String toLower() {
+ return toString().toLowerCase(Locale.ROOT);
+ }
+
+ public byte[] getBytes() {
+ return toLower().getBytes(Charset.forName("UTF-8"));
+ }
+
+ public static String getParam() {
+ return "process";
+ }
+
+ }
+
+ /**
+ * The possible states of the CDCR buffer
+ */
+ public enum BufferState {
+ ENABLED,
+ DISABLED;
+
+ public static BufferState get(byte[] state) {
+ if (state != null) {
+ try {
+ return BufferState.valueOf(new String(state, Charset.forName("UTF-8")).toUpperCase(Locale.ROOT));
+ } catch (Exception e) {
+ }
+ }
+ return null;
+ }
+
+ public String toLower() {
+ return toString().toLowerCase(Locale.ROOT);
+ }
+
+ public byte[] getBytes() {
+ return toLower().getBytes(Charset.forName("UTF-8"));
+ }
+
+ public static String getParam() {
+ return "buffer";
+ }
+
+ }
+}
+
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java Fri May 22 18:58:29 2015
@@ -0,0 +1,170 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.core.SolrCore;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Manage the life-cycle state of the CDCR process. It is responsible of synchronising the state
+ * through Zookeeper. The state of the CDCR process is stored in the zk node defined by {@link #getZnodePath()}.
+ * </p>
+ * <p>
+ * It takes care of notifying the {@link CdcrReplicatorManager} in case
+ * of a process state change.
+ * </p>
+ */
+class CdcrProcessStateManager extends CdcrStateManager {
+
+ private CdcrParams.ProcessState state = DEFAULT_STATE;
+
+ private ProcessStateWatcher wrappedWatcher;
+ private Watcher watcher;
+
+ private SolrCore core;
+
+ /**
+ * The default state must be STOPPED. See comments in
+ * {@link #setState(org.apache.solr.handler.CdcrParams.ProcessState)}.
+ */
+ static CdcrParams.ProcessState DEFAULT_STATE = CdcrParams.ProcessState.STOPPED;
+
+ protected static Logger log = LoggerFactory.getLogger(CdcrProcessStateManager.class);
+
+ CdcrProcessStateManager(final SolrCore core) {
+ this.core = core;
+
+ // Ensure that the status znode exists
+ this.createStateNode();
+
+ // Register the watcher at startup
+ try {
+ SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+ watcher = this.initWatcher(zkClient);
+ this.setState(CdcrParams.ProcessState.get(zkClient.getData(this.getZnodePath(), watcher, null, true)));
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Failed fetching initial state", e);
+ }
+ }
+
+ /**
+ * SolrZkClient does not guarantee that a watch object will only be triggered once for a given notification
+ * if we does not wrap the watcher - see SOLR-6621.
+ */
+ private Watcher initWatcher(SolrZkClient zkClient) {
+ wrappedWatcher = new ProcessStateWatcher();
+ return zkClient.wrapWatcher(wrappedWatcher);
+ }
+
+ private String getZnodeBase() {
+ return "/collections/" + core.getCoreDescriptor().getCloudDescriptor().getCollectionName() + "/cdcr/state";
+ }
+
+ private String getZnodePath() {
+ return getZnodeBase() + "/process";
+ }
+
+ void setState(CdcrParams.ProcessState state) {
+ if (this.state != state) {
+ this.state = state;
+ this.callback(); // notify the observers of a state change
+ }
+ }
+
+ CdcrParams.ProcessState getState() {
+ return state;
+ }
+
+ /**
+ * Synchronise the state to Zookeeper. This method must be called only by the handler receiving the
+ * action.
+ */
+ void synchronize() {
+ SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+ try {
+ zkClient.setData(this.getZnodePath(), this.getState().getBytes(), true);
+ // check if nobody changed it in the meantime, and set a new watcher
+ this.setState(CdcrParams.ProcessState.get(zkClient.getData(this.getZnodePath(), watcher, null, true)));
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Failed synchronising new state", e);
+ }
+ }
+
+ private void createStateNode() {
+ SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+ try {
+ if (!zkClient.exists(this.getZnodePath(), true)) {
+ if (!zkClient.exists(this.getZnodeBase(), true)) {
+ zkClient.makePath(this.getZnodeBase(), CreateMode.PERSISTENT, true);
+ }
+ zkClient.create(this.getZnodePath(), DEFAULT_STATE.getBytes(), CreateMode.PERSISTENT, true);
+ log.info("Created znode {}", this.getZnodePath());
+ }
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Failed to create CDCR process state node", e);
+ }
+ }
+
+ void shutdown() {
+ if (wrappedWatcher != null) {
+ wrappedWatcher.cancel(); // cancel the watcher to avoid spurious warn messages during shutdown
+ }
+ }
+
+ private class ProcessStateWatcher implements Watcher {
+
+ private boolean isCancelled = false;
+
+ /**
+ * Cancel the watcher to avoid spurious warn messages during shutdown.
+ */
+ void cancel() {
+ isCancelled = true;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (isCancelled) return; // if the watcher is cancelled, do nothing.
+ String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+
+ log.info("The CDCR process state has changed: {} @ {}:{}", event, collectionName, shard);
+ if (Event.EventType.None.equals(event.getType())) {
+ return;
+ }
+ SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
+ try {
+ CdcrParams.ProcessState state = CdcrParams.ProcessState.get(zkClient.getData(CdcrProcessStateManager.this.getZnodePath(), watcher, null, true));
+ log.info("Received new CDCR process state from watcher: {} @ {}:{}", state, collectionName, shard);
+ CdcrProcessStateManager.this.setState(state);
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Failed synchronising new state @ " + collectionName + ":" + shard, e);
+ }
+ }
+
+ }
+
+}
+
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicator.java Fri May 22 18:58:29 2015
@@ -0,0 +1,222 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.update.CdcrUpdateLog;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.processor.CdcrUpdateProcessor;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+/**
+ * The replication logic. Given a {@link org.apache.solr.handler.CdcrReplicatorState}, it reads all the new entries
+ * in the update log and forward them to the target cluster. If an error occurs, the replication is stopped and
+ * will be tried again later.
+ */
+public class CdcrReplicator implements Runnable {
+
+ private final CdcrReplicatorState state;
+ private final int batchSize;
+
+ protected static Logger log = LoggerFactory.getLogger(CdcrReplicator.class);
+
+ public CdcrReplicator(CdcrReplicatorState state, int batchSize) {
+ this.state = state;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public void run() {
+ CdcrUpdateLog.CdcrLogReader logReader = state.getLogReader();
+ CdcrUpdateLog.CdcrLogReader subReader = null;
+ if (logReader == null) {
+ log.warn("Log reader for target {} is not initialised, it will be ignored.", state.getTargetCollection());
+ return;
+ }
+
+ try {
+ // create update request
+ UpdateRequest req = new UpdateRequest();
+ // Add the param to indicate the {@link CdcrUpdateProcessor} to keep the provided version number
+ req.setParam(CdcrUpdateProcessor.CDCR_UPDATE, "");
+
+ // Start the benchmakr timer
+ state.getBenchmarkTimer().start();
+
+ long counter = 0;
+ subReader = logReader.getSubReader();
+
+ for (int i = 0; i < batchSize; i++) {
+ Object o = subReader.next();
+ if (o == null) break; // we have reached the end of the update logs, we should close the batch
+
+ if (isDelete(o)) {
+
+ /*
+ * Deletes are sent one at a time.
+ */
+
+ // First send out current batch of SolrInputDocument, the non-deletes.
+ List<SolrInputDocument> docs = req.getDocuments();
+
+ if (docs != null && docs.size() > 0) {
+ subReader.resetToLastPosition(); // Push back the delete for now.
+ this.sendRequest(req); // Send the batch update request
+ logReader.forwardSeek(subReader); // Advance the main reader to just before the delete.
+ o = subReader.next(); // Read the delete again
+ counter += docs.size();
+ req.clear();
+ }
+
+ // Process Delete
+ this.processUpdate(o, req);
+ this.sendRequest(req);
+ logReader.forwardSeek(subReader);
+ counter++;
+ req.clear();
+
+ } else {
+
+ this.processUpdate(o, req);
+
+ }
+ }
+
+ //Send the final batch out.
+ List<SolrInputDocument> docs = req.getDocuments();
+
+ if ((docs != null && docs.size() > 0)) {
+ this.sendRequest(req);
+ counter += docs.size();
+ }
+
+ // we might have read a single commit operation and reached the end of the update logs
+ logReader.forwardSeek(subReader);
+
+ log.debug("Forwarded {} updates to target {}", counter, state.getTargetCollection());
+ } catch (Exception e) {
+ // report error and update error stats
+ this.handleException(e);
+ } finally {
+ // stop the benchmark timer
+ state.getBenchmarkTimer().stop();
+ // ensure that the subreader is closed and the associated pointer is removed
+ if (subReader != null) subReader.close();
+ }
+ }
+
+ private void sendRequest(UpdateRequest req) throws IOException, SolrServerException, CdcrReplicatorException {
+ UpdateResponse rsp = req.process(state.getClient());
+ if (rsp.getStatus() != 0) {
+ throw new CdcrReplicatorException(req, rsp);
+ }
+ state.resetConsecutiveErrors();
+ }
+
+ private boolean isDelete(Object o) {
+ List entry = (List) o;
+ int operationAndFlags = (Integer) entry.get(0);
+ int oper = operationAndFlags & UpdateLog.OPERATION_MASK;
+ return oper == UpdateLog.DELETE_BY_QUERY || oper == UpdateLog.DELETE;
+ }
+
+ private void handleException(Exception e) {
+ if (e instanceof CdcrReplicatorException) {
+ UpdateRequest req = ((CdcrReplicatorException) e).req;
+ UpdateResponse rsp = ((CdcrReplicatorException) e).rsp;
+ log.warn("Failed to forward update request {}. Got response {}", req, rsp);
+ state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
+ } else if (e instanceof CloudSolrClient.RouteException) {
+ log.warn("Failed to forward update request", e);
+ state.reportError(CdcrReplicatorState.ErrorType.BAD_REQUEST);
+ } else {
+ log.warn("Failed to forward update request", e);
+ state.reportError(CdcrReplicatorState.ErrorType.INTERNAL);
+ }
+ }
+
+ private UpdateRequest processUpdate(Object o, UpdateRequest req) {
+
+ // should currently be a List<Oper,Ver,Doc/Id>
+ List entry = (List) o;
+
+ int operationAndFlags = (Integer) entry.get(0);
+ int oper = operationAndFlags & UpdateLog.OPERATION_MASK;
+ long version = (Long) entry.get(1);
+
+ // record the operation in the benchmark timer
+ state.getBenchmarkTimer().incrementCounter(oper);
+
+ switch (oper) {
+ case UpdateLog.ADD: {
+ // the version is already attached to the document
+ SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
+ req.add(sdoc);
+ return req;
+ }
+ case UpdateLog.DELETE: {
+ byte[] idBytes = (byte[]) entry.get(2);
+ req.deleteById(new String(idBytes, Charset.forName("UTF-8")));
+ req.setParam(DistributedUpdateProcessor.VERSION_FIELD, Long.toString(version));
+ return req;
+ }
+
+ case UpdateLog.DELETE_BY_QUERY: {
+ String query = (String) entry.get(2);
+ req.deleteByQuery(query);
+ req.setParam(DistributedUpdateProcessor.VERSION_FIELD, Long.toString(version));
+ return req;
+ }
+
+ case UpdateLog.COMMIT: {
+ return null;
+ }
+
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+ }
+ }
+
+ /**
+ * Exception to catch update request issues with the target cluster.
+ */
+ public class CdcrReplicatorException extends Exception {
+
+ private final UpdateRequest req;
+ private final UpdateResponse rsp;
+
+ public CdcrReplicatorException(UpdateRequest req, UpdateResponse rsp) {
+ this.req = req;
+ this.rsp = rsp;
+ }
+
+ }
+
+}
+
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java Fri May 22 18:58:29 2015
@@ -0,0 +1,170 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.CdcrUpdateLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
+
+ private List<CdcrReplicatorState> replicatorStates;
+
+ private final CdcrReplicatorScheduler scheduler;
+ private CdcrProcessStateManager processStateManager;
+ private CdcrLeaderStateManager leaderStateManager;
+
+ private SolrCore core;
+ private String path;
+
+ protected static Logger log = LoggerFactory.getLogger(CdcrReplicatorManager.class);
+
+ CdcrReplicatorManager(final SolrCore core, String path,
+ SolrParams replicatorConfiguration,
+ Map<String, List<SolrParams>> replicasConfiguration) {
+ this.core = core;
+ this.path = path;
+
+ // create states
+ replicatorStates = new ArrayList<>();
+ String myCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ List<SolrParams> targets = replicasConfiguration.get(myCollection);
+ if (targets != null) {
+ for (SolrParams params : targets) {
+ String zkHost = params.get(CdcrParams.ZK_HOST_PARAM);
+ String targetCollection = params.get(CdcrParams.TARGET_COLLECTION_PARAM);
+
+ CloudSolrClient client = new CloudSolrClient(zkHost, true);
+ client.setDefaultCollection(targetCollection);
+ replicatorStates.add(new CdcrReplicatorState(targetCollection, zkHost, client));
+ }
+ }
+
+ this.scheduler = new CdcrReplicatorScheduler(this, replicatorConfiguration);
+ }
+
+ void setProcessStateManager(final CdcrProcessStateManager processStateManager) {
+ this.processStateManager = processStateManager;
+ this.processStateManager.register(this);
+ }
+
+ void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
+ this.leaderStateManager = leaderStateManager;
+ this.leaderStateManager.register(this);
+ }
+
+ /**
+ * <p>
+ * Inform the replicator manager of a change of state, and tell him to update its own state.
+ * </p>
+ * <p>
+ * If we are the leader and the process state is STARTED, we need to initialise the log readers and start the
+ * scheduled thread poll.
+ * Otherwise, if the process state is STOPPED or if we are not the leader, we need to close the log readers and stop
+ * the thread pool.
+ * </p>
+ * <p>
+ * This method is synchronised as it can both be called by the leaderStateManager and the processStateManager.
+ * </p>
+ */
+ @Override
+ public synchronized void stateUpdate() {
+ if (leaderStateManager.amILeader() && processStateManager.getState().equals(CdcrParams.ProcessState.STARTED)) {
+ this.initLogReaders();
+ this.scheduler.start();
+ return;
+ }
+
+ this.scheduler.shutdown();
+ this.closeLogReaders();
+ }
+
+ List<CdcrReplicatorState> getReplicatorStates() {
+ return replicatorStates;
+ }
+
+ void initLogReaders() {
+ String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
+ CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
+
+ for (CdcrReplicatorState state : replicatorStates) {
+ state.closeLogReader();
+ try {
+ long checkpoint = this.getCheckpoint(state);
+ log.info("Create new update log reader for target {} with checkpoint {} @ {}:{}", state.getTargetCollection(),
+ checkpoint, collectionName, shard);
+ CdcrUpdateLog.CdcrLogReader reader = ulog.newLogReader();
+ reader.seek(checkpoint);
+ state.init(reader);
+ } catch (IOException | SolrServerException | SolrException e) {
+ log.warn("Unable to instantiate the log reader for target collection " + state.getTargetCollection(), e);
+ } catch (InterruptedException e) {
+ log.warn("Thread interrupted while instantiate the log reader for target collection " + state.getTargetCollection(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private long getCheckpoint(CdcrReplicatorState state) throws IOException, SolrServerException {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CommonParams.ACTION, CdcrParams.CdcrAction.COLLECTIONCHECKPOINT.toString());
+
+ SolrRequest request = new QueryRequest(params);
+ request.setPath(path);
+
+ NamedList response = state.getClient().request(request);
+ return (Long) response.get(CdcrParams.CHECKPOINT);
+ }
+
+ void closeLogReaders() {
+ for (CdcrReplicatorState state : replicatorStates) {
+ state.closeLogReader();
+ }
+ }
+
+ /**
+ * Shutdown all the {@link org.apache.solr.handler.CdcrReplicatorState} by closing their
+ * {@link org.apache.solr.client.solrj.impl.CloudSolrClient} and
+ * {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}.
+ */
+ void shutdown() {
+ this.scheduler.shutdown();
+ for (CdcrReplicatorState state : replicatorStates) {
+ state.shutdown();
+ }
+ replicatorStates.clear();
+ }
+
+}
+
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java Fri May 22 18:58:29 2015
@@ -0,0 +1,118 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * Schedule the execution of the {@link org.apache.solr.handler.CdcrReplicator} threads at
+ * regular time interval. It relies on a queue of {@link org.apache.solr.handler.CdcrReplicatorState} in
+ * order to avoid that one {@link org.apache.solr.handler.CdcrReplicatorState} is used by two threads at the same
+ * time.
+ */
+class CdcrReplicatorScheduler {
+
+ private boolean isStarted = false;
+
+ private ScheduledExecutorService scheduler;
+ private ExecutorService replicatorsPool;
+
+ private final CdcrReplicatorManager replicatorManager;
+ private final ConcurrentLinkedQueue<CdcrReplicatorState> statesQueue;
+
+ private int poolSize = DEFAULT_POOL_SIZE;
+ private int timeSchedule = DEFAULT_TIME_SCHEDULE;
+ private int batchSize = DEFAULT_BATCH_SIZE;
+
+ private static final int DEFAULT_POOL_SIZE = 2;
+ private static final int DEFAULT_TIME_SCHEDULE = 10;
+ private static final int DEFAULT_BATCH_SIZE = 128;
+
+ protected static Logger log = LoggerFactory.getLogger(CdcrReplicatorScheduler.class);
+
+ CdcrReplicatorScheduler(final CdcrReplicatorManager replicatorStatesManager, final SolrParams replicatorConfiguration) {
+ this.replicatorManager = replicatorStatesManager;
+ this.statesQueue = new ConcurrentLinkedQueue<>(replicatorManager.getReplicatorStates());
+ if (replicatorConfiguration != null) {
+ poolSize = replicatorConfiguration.getInt(CdcrParams.THREAD_POOL_SIZE_PARAM, DEFAULT_POOL_SIZE);
+ timeSchedule = replicatorConfiguration.getInt(CdcrParams.SCHEDULE_PARAM, DEFAULT_TIME_SCHEDULE);
+ batchSize = replicatorConfiguration.getInt(CdcrParams.BATCH_SIZE_PARAM, DEFAULT_BATCH_SIZE);
+ }
+ }
+
+ void start() {
+ if (!isStarted) {
+ scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-scheduler"));
+ //replicatorsPool = Executors.newFixedThreadPool(poolSize, new DefaultSolrThreadFactory("cdcr-replicator"));
+ replicatorsPool = ExecutorUtil.newMDCAwareFixedThreadPool(poolSize, new DefaultSolrThreadFactory("cdcr-replicator"));
+
+ // the scheduler thread is executed every second and submits one replication task
+ // per available state in the queue
+ scheduler.scheduleWithFixedDelay(new Runnable() {
+
+ @Override
+ public void run() {
+ int nCandidates = statesQueue.size();
+ for (int i = 0; i < nCandidates; i++) {
+ // a thread that pool one state from the queue, execute the replication task, and push back
+ // the state in the queue when the task is completed
+ replicatorsPool.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ CdcrReplicatorState state = statesQueue.poll();
+ try {
+ new CdcrReplicator(state, batchSize).run();
+ } finally {
+ statesQueue.offer(state);
+ }
+ }
+
+ });
+
+ }
+ }
+
+ }, 0, timeSchedule, TimeUnit.MILLISECONDS);
+ isStarted = true;
+ }
+ }
+
+ void shutdown() {
+ if (isStarted) {
+ replicatorsPool.shutdown();
+ try {
+ replicatorsPool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Thread interrupted while waiting for CDCR replicator threadpool close.");
+ Thread.currentThread().interrupt();
+ } finally {
+ scheduler.shutdownNow();
+ isStarted = false;
+ }
+ }
+ }
+
+}
+
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorState.java Fri May 22 18:58:29 2015
@@ -0,0 +1,270 @@
+package org.apache.solr.handler;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.schema.TrieDateField;
+import org.apache.solr.update.CdcrUpdateLog;
+import org.apache.solr.update.UpdateLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * The state of the replication with a target cluster.
+ */
+class CdcrReplicatorState {
+
+ private final String targetCollection;
+ private final String zkHost;
+ private final CloudSolrClient targetClient;
+
+ private CdcrUpdateLog.CdcrLogReader logReader;
+
+ private long consecutiveErrors = 0;
+ private final Map<ErrorType, Long> errorCounters = new HashMap<>();
+ private final FixedQueue<ErrorQueueEntry> errorsQueue = new FixedQueue<>(100); // keep the last 100 errors
+
+ private BenchmarkTimer benchmarkTimer;
+
+ private static Logger log = LoggerFactory.getLogger(CdcrReplicatorState.class);
+
+ CdcrReplicatorState(final String targetCollection, final String zkHost, final CloudSolrClient targetClient) {
+ this.targetCollection = targetCollection;
+ this.targetClient = targetClient;
+ this.zkHost = zkHost;
+ this.benchmarkTimer = new BenchmarkTimer();
+ }
+
+ /**
+ * Initialise the replicator state with a {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}
+ * that is positioned at the last target cluster checkpoint.
+ */
+ void init(final CdcrUpdateLog.CdcrLogReader logReader) {
+ this.logReader = logReader;
+ }
+
+ void closeLogReader() {
+ if (logReader != null) {
+ logReader.close();
+ logReader = null;
+ }
+ }
+
+ CdcrUpdateLog.CdcrLogReader getLogReader() {
+ return logReader;
+ }
+
+ String getTargetCollection() {
+ return targetCollection;
+ }
+
+ String getZkHost() {
+ return zkHost;
+ }
+
+ CloudSolrClient getClient() {
+ return targetClient;
+ }
+
+ void shutdown() {
+ try {
+ targetClient.close();
+ } catch (IOException ioe) {
+ log.warn("Caught exception trying to close server: ", ioe.getMessage());
+ }
+ this.closeLogReader();
+ }
+
+ void reportError(ErrorType error) {
+ if (!errorCounters.containsKey(error)) {
+ errorCounters.put(error, 0l);
+ }
+ errorCounters.put(error, errorCounters.get(error) + 1);
+ errorsQueue.add(new ErrorQueueEntry(error, System.currentTimeMillis()));
+ consecutiveErrors++;
+ }
+
+ void resetConsecutiveErrors() {
+ consecutiveErrors = 0;
+ }
+
+ /**
+ * Returns the number of consecutive errors encountered while trying to forward updates to the target.
+ */
+ long getConsecutiveErrors() {
+ return consecutiveErrors;
+ }
+
+ /**
+ * Gets the number of errors of a particular type.
+ */
+ long getErrorCount(ErrorType type) {
+ if (errorCounters.containsKey(type)) {
+ return errorCounters.get(type);
+ } else {
+ return 0;
+ }
+ }
+
+ /**
+ * Gets the last errors ordered by timestamp (most recent first)
+ */
+ List<String[]> getLastErrors() {
+ List<String[]> lastErrors = new ArrayList<>();
+ synchronized (errorsQueue) {
+ Iterator<ErrorQueueEntry> it = errorsQueue.iterator();
+ while (it.hasNext()) {
+ ErrorQueueEntry entry = it.next();
+ lastErrors.add(new String[]{TrieDateField.formatExternal(new Date(entry.timestamp)), entry.type.toLower()});
+ }
+ }
+ return lastErrors;
+ }
+
+ /**
+ * Return the timestamp of the last processed operations
+ */
+ String getTimestampOfLastProcessedOperation() {
+ if (logReader != null && logReader.getLastVersion() != -1) {
+ // Shift back to the right by 20 bits the version number - See VersionInfo#getNewClock
+ return TrieDateField.formatExternal(new Date(logReader.getLastVersion() >> 20));
+ }
+ return new String();
+ }
+
+ /**
+ * Gets the benchmark timer.
+ */
+ BenchmarkTimer getBenchmarkTimer() {
+ return this.benchmarkTimer;
+ }
+
+ enum ErrorType {
+ INTERNAL,
+ BAD_REQUEST;
+
+ public String toLower() {
+ return toString().toLowerCase(Locale.ROOT);
+ }
+
+ }
+
+ class BenchmarkTimer {
+
+ private long startTime;
+ private long runTime = 0;
+ private Map<Integer, Long> opCounters = new HashMap<>();
+
+ /**
+ * Start recording time.
+ */
+ void start() {
+ startTime = System.nanoTime();
+ }
+
+ /**
+ * Stop recording time.
+ */
+ void stop() {
+ runTime += System.nanoTime() - startTime;
+ startTime = -1;
+ }
+
+ void incrementCounter(final int operationType) {
+ switch (operationType) {
+ case UpdateLog.ADD:
+ case UpdateLog.DELETE:
+ case UpdateLog.DELETE_BY_QUERY: {
+ if (!opCounters.containsKey(operationType)) {
+ opCounters.put(operationType, 0l);
+ }
+ opCounters.put(operationType, opCounters.get(operationType) + 1);
+ return;
+ }
+
+ default:
+ return;
+ }
+ }
+
+ long getRunTime() {
+ long totalRunTime = runTime;
+ if (startTime != -1) { // we are currently recording the time
+ totalRunTime += System.nanoTime() - startTime;
+ }
+ return totalRunTime;
+ }
+
+ double getOperationsPerSecond() {
+ long total = 0;
+ for (long counter : opCounters.values()) {
+ total += counter;
+ }
+ double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
+ return total / elapsedTimeInSeconds;
+ }
+
+ double getAddsPerSecond() {
+ long total = opCounters.get(UpdateLog.ADD) != null ? opCounters.get(UpdateLog.ADD) : 0;
+ double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
+ return total / elapsedTimeInSeconds;
+ }
+
+ double getDeletesPerSecond() {
+ long total = opCounters.get(UpdateLog.DELETE) != null ? opCounters.get(UpdateLog.DELETE) : 0;
+ total += opCounters.get(UpdateLog.DELETE_BY_QUERY) != null ? opCounters.get(UpdateLog.DELETE_BY_QUERY) : 0;
+ double elapsedTimeInSeconds = ((double) this.getRunTime() / 1E9);
+ return total / elapsedTimeInSeconds;
+ }
+
+ }
+
+ private class ErrorQueueEntry {
+
+ private ErrorType type;
+ private long timestamp;
+
+ private ErrorQueueEntry(ErrorType type, long timestamp) {
+ this.type = type;
+ this.timestamp = timestamp;
+ }
+ }
+
+ private class FixedQueue<E> extends LinkedList<E> {
+
+ private int maxSize;
+
+ public FixedQueue(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ public synchronized boolean add(E e) {
+ super.addFirst(e);
+ if (size() > maxSize) {
+ removeLast();
+ }
+ return true;
+ }
+ }
+
+}
+