You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/05/22 20:58:29 UTC
svn commit: r1681186 [4/5] - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/processor/
core/src/java/org/apache/solr/util/ core/src/test-files/solr/collect...
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java Fri May 22 18:58:29 2015
@@ -0,0 +1,812 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.*;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.CdcrParams;
+import org.apache.solr.servlet.SolrDispatchFilter;
+import org.apache.zookeeper.CreateMode;
+import org.junit.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.solr.cloud.OverseerCollectionProcessor.*;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+
+/**
+ * <p>
+ * Abstract class for CDCR unit testing. This class emulates two clusters, a source and target, by using different
+ * collections in the same SolrCloud cluster. Therefore, the two clusters will share the same Zookeeper cluster. In
+ * real scenario, the two collections/clusters will likely have their own zookeeper cluster.
+ * </p>
+ * <p>
+ * This class will automatically create two collections, the source and the target. Each collection will have
+ * {@link #shardCount} shards, and {@link #replicationFactor} replicas per shard. One jetty instance will
+ * be created per core.
+ * </p>
+ * <p>
+ * The source and target collection can be reinitialised at will by calling {@link #clearSourceCollection()} and
+ * {@link #clearTargetCollection()}. After reinitialisation, a collection will have a new fresh index and update log.
+ * </p>
+ * <p>
+ * Servers can be restarted at will by calling
+ * {@link #restartServer(BaseCdcrDistributedZkTest.CloudJettyRunner)} or
+ * {@link #restartServers(java.util.List)}.
+ * </p>
+ * <p>
+ * The creation of the target collection can be disabled with the flag {@link #createTargetCollection};
+ * </p>
+ * <p>
+ * NB: We cannot use multiple cores per jetty instance, as jetty will load only one core when restarting. It seems
+ * that this is a limitation of the {@link org.apache.solr.client.solrj.embedded.JettySolrRunner}. This class
+ * tries to ensure that there always is one single core per jetty instance.
+ * </p>
+ */
+public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
+
+ protected int shardCount = 2;
+ protected int replicationFactor = 2;
+ protected boolean createTargetCollection = true;
+
+ private static final String CDCR_PATH = "/cdcr";
+
+ protected static final String SOURCE_COLLECTION = "source_collection";
+ protected static final String TARGET_COLLECTION = "target_collection";
+
+ public static final String SHARD1 = "shard1";
+ public static final String SHARD2 = "shard2";
+
+ @Override
+ protected String getCloudSolrConfig() {
+ return "solrconfig-cdcr.xml";
+ }
+
+ @Override
+ public void distribSetUp() throws Exception {
+ super.distribSetUp();
+
+ if (shardCount > 0) {
+ System.setProperty("numShards", Integer.toString(shardCount));
+ } else {
+ System.clearProperty("numShards");
+ }
+
+ if (isSSLMode()) {
+ System.clearProperty("urlScheme");
+ ZkStateReader zkStateReader = new ZkStateReader(zkServer.getZkAddress(),
+ AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT);
+ try {
+ zkStateReader.getZkClient().create(ZkStateReader.CLUSTER_PROPS,
+ ZkStateReader.toJSON(Collections.singletonMap("urlScheme", "https")),
+ CreateMode.PERSISTENT, true);
+ } finally {
+ zkStateReader.close();
+ }
+ }
+ }
+
+ @Override
+ protected void createServers(int numServers) throws Exception {
+ }
+
+ @BeforeClass
+ public static void beforeClass() {
+ System.setProperty("solrcloud.update.delay", "0");
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ System.clearProperty("solrcloud.update.delay");
+ }
+
+ @Before
+ public void baseBefore() throws Exception {
+ this.createSourceCollection();
+ if (this.createTargetCollection) this.createTargetCollection();
+ RandVal.uniqueValues = new HashSet(); //reset random values
+ }
+
+ @After
+ public void baseAfter() throws Exception {
+ destroyServers();
+ }
+
+ protected CloudSolrClient createCloudClient(String defaultCollection) {
+ CloudSolrClient server = new CloudSolrClient(zkServer.getZkAddress(), random().nextBoolean());
+ server.setParallelUpdates(random().nextBoolean());
+ if (defaultCollection != null) server.setDefaultCollection(defaultCollection);
+ server.getLbClient().getHttpClient().getParams()
+ .setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000);
+ return server;
+ }
+
+ protected void printLayout() throws Exception {
+ SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT);
+ zkClient.printLayoutToStdOut();
+ zkClient.close();
+ }
+
+ protected SolrInputDocument getDoc(Object... fields) throws Exception {
+ SolrInputDocument doc = new SolrInputDocument();
+ addFields(doc, fields);
+ return doc;
+ }
+
+ protected void index(String collection, SolrInputDocument doc) throws IOException, SolrServerException {
+ CloudSolrClient client = createCloudClient(collection);
+ try {
+ client.add(doc);
+ client.commit(true, true);
+ } finally {
+ client.close();
+ }
+ }
+
+ protected void index(String collection, List<SolrInputDocument> docs) throws IOException, SolrServerException {
+ CloudSolrClient client = createCloudClient(collection);
+ try {
+ client.add(docs);
+ client.commit(true, true);
+ } finally {
+ client.close();
+ }
+ }
+
+ protected void deleteById(String collection, List<String> ids) throws IOException, SolrServerException {
+ CloudSolrClient client = createCloudClient(collection);
+ try {
+ client.deleteById(ids);
+ client.commit(true, true);
+ } finally {
+ client.close();
+ }
+ }
+
+ protected void deleteByQuery(String collection, String q) throws IOException, SolrServerException {
+ CloudSolrClient client = createCloudClient(collection);
+ try {
+ client.deleteByQuery(q);
+ client.commit(true, true);
+ } finally {
+ client.close();
+ }
+ }
+
+ /**
+ * Invokes a commit on the given collection.
+ */
+ protected void commit(String collection) throws IOException, SolrServerException {
+ CloudSolrClient client = createCloudClient(collection);
+ try {
+ client.commit(true, true);
+ } finally {
+ client.close();
+ }
+ }
+
+ /**
+ * Returns the number of documents in a given collection
+ */
+ protected long getNumDocs(String collection) throws SolrServerException, IOException {
+ CloudSolrClient client = createCloudClient(collection);
+ try {
+ return client.query(new SolrQuery("*:*")).getResults().getNumFound();
+ } finally {
+ client.close();
+ }
+ }
+
+ /**
+ * Invokes a CDCR action on a given node.
+ */
+ protected NamedList invokeCdcrAction(CloudJettyRunner jetty, CdcrParams.CdcrAction action) throws Exception {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CommonParams.ACTION, action.toString());
+
+ SolrRequest request = new QueryRequest(params);
+ request.setPath(CDCR_PATH);
+
+ return jetty.client.request(request);
+ }
+
+ /**
+ * Assert the state of CDCR on each nodes of the given collection.
+ */
+ protected void assertState(String collection, CdcrParams.ProcessState processState, CdcrParams.BufferState bufferState)
+ throws Exception {
+ for (CloudJettyRunner jetty : cloudJettys.get(collection)) { // check all replicas
+ NamedList rsp = invokeCdcrAction(jetty, CdcrParams.CdcrAction.STATUS);
+ NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+ assertEquals(processState.toLower(), status.get(CdcrParams.ProcessState.getParam()));
+ assertEquals(bufferState.toLower(), status.get(CdcrParams.BufferState.getParam()));
+ }
+ }
+
+ /**
+ * A mapping between collection and node names. This is used when creating the collection in
+ * {@link #createCollection(String)}.
+ */
+ private Map<String, List<String>> collectionToNodeNames = new HashMap<>();
+
+ /**
+ * Starts the servers, saves and associates the node names to the source collection,
+ * and finally creates the source collection.
+ */
+ private void createSourceCollection() throws Exception {
+ List<String> nodeNames = this.startServers(shardCount * replicationFactor);
+ this.collectionToNodeNames.put(SOURCE_COLLECTION, nodeNames);
+ this.createCollection(SOURCE_COLLECTION);
+ this.waitForRecoveriesToFinish(SOURCE_COLLECTION, true);
+ this.updateMappingsFromZk(SOURCE_COLLECTION);
+ }
+
+ /**
+ * Clear the source collection. It will delete then create the collection through the collection API.
+ * The collection will have a new fresh index, i.e., including a new update log.
+ */
+ protected void clearSourceCollection() throws Exception {
+ this.deleteCollection(SOURCE_COLLECTION);
+ this.createCollection(SOURCE_COLLECTION);
+ this.waitForRecoveriesToFinish(SOURCE_COLLECTION, true);
+ this.updateMappingsFromZk(SOURCE_COLLECTION);
+ }
+
+ /**
+ * Starts the servers, saves and associates the node names to the target collection,
+ * and finally creates the target collection.
+ */
+ private void createTargetCollection() throws Exception {
+ List<String> nodeNames = this.startServers(shardCount * replicationFactor);
+ this.collectionToNodeNames.put(TARGET_COLLECTION, nodeNames);
+ this.createCollection(TARGET_COLLECTION);
+ this.waitForRecoveriesToFinish(TARGET_COLLECTION, true);
+ this.updateMappingsFromZk(TARGET_COLLECTION);
+ }
+
+ /**
+ * Clear the source collection. It will delete then create the collection through the collection API.
+ * The collection will have a new fresh index, i.e., including a new update log.
+ */
+ protected void clearTargetCollection() throws Exception {
+ this.deleteCollection(TARGET_COLLECTION);
+ this.createCollection(TARGET_COLLECTION);
+ this.waitForRecoveriesToFinish(TARGET_COLLECTION, true);
+ this.updateMappingsFromZk(TARGET_COLLECTION);
+ }
+
+ /**
+ * Create a new collection through the Collection API. It enforces the use of one max shard per node.
+ * It will define the nodes to spread the new collection across by using the mapping {@link #collectionToNodeNames},
+ * to ensure that a node will not host more than one core (which will create problem when trying to restart servers).
+ */
+ private void createCollection(String name) throws Exception {
+ CloudSolrClient client = createCloudClient(null);
+ try {
+ // Create the target collection
+ Map<String, List<Integer>> collectionInfos = new HashMap<>();
+ int maxShardsPerNode = 1;
+
+ StringBuilder sb = new StringBuilder();
+ for (String nodeName : collectionToNodeNames.get(name)) {
+ sb.append(nodeName);
+ sb.append(',');
+ }
+ sb.deleteCharAt(sb.length() - 1);
+
+ createCollection(collectionInfos, name, shardCount, replicationFactor, maxShardsPerNode, client, sb.toString());
+ } finally {
+ client.close();
+ }
+ }
+
+ private CollectionAdminResponse createCollection(Map<String, List<Integer>> collectionInfos,
+ String collectionName, int numShards, int replicationFactor,
+ int maxShardsPerNode, SolrClient client, String createNodeSetStr)
+ throws SolrServerException, IOException {
+ return createCollection(collectionInfos, collectionName,
+ ZkNodeProps.makeMap(
+ NUM_SLICES, numShards,
+ REPLICATION_FACTOR, replicationFactor,
+ CREATE_NODE_SET, createNodeSetStr,
+ MAX_SHARDS_PER_NODE, maxShardsPerNode),
+ client, null);
+ }
+
+ private CollectionAdminResponse createCollection(Map<String, List<Integer>> collectionInfos, String collectionName,
+ Map<String, Object> collectionProps, SolrClient client,
+ String confSetName)
+ throws SolrServerException, IOException {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("action", CollectionParams.CollectionAction.CREATE.toString());
+ for (Map.Entry<String, Object> entry : collectionProps.entrySet()) {
+ if (entry.getValue() != null) params.set(entry.getKey(), String.valueOf(entry.getValue()));
+ }
+ Integer numShards = (Integer) collectionProps.get(NUM_SLICES);
+ if (numShards == null) {
+ String shardNames = (String) collectionProps.get(SHARDS_PROP);
+ numShards = StrUtils.splitSmart(shardNames, ',').size();
+ }
+ Integer replicationFactor = (Integer) collectionProps.get(REPLICATION_FACTOR);
+ if (replicationFactor == null) {
+ replicationFactor = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(REPLICATION_FACTOR);
+ }
+
+ if (confSetName != null) {
+ params.set("collection.configName", confSetName);
+ }
+
+ List<Integer> list = new ArrayList<>();
+ list.add(numShards);
+ list.add(replicationFactor);
+ if (collectionInfos != null) {
+ collectionInfos.put(collectionName, list);
+ }
+ params.set("name", collectionName);
+ SolrRequest request = new QueryRequest(params);
+ request.setPath("/admin/collections");
+
+ CollectionAdminResponse res = new CollectionAdminResponse();
+ res.setResponse(client.request(request));
+ return res;
+ }
+
+ /**
+ * Delete a collection through the Collection API.
+ */
+ protected CollectionAdminResponse deleteCollection(String collectionName) throws SolrServerException, IOException {
+ SolrClient client = createCloudClient(null);
+ CollectionAdminResponse res;
+
+ try {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("action", CollectionParams.CollectionAction.DELETE.toString());
+ params.set("name", collectionName);
+ QueryRequest request = new QueryRequest(params);
+ request.setPath("/admin/collections");
+
+ res = new CollectionAdminResponse();
+ res.setResponse(client.request(request));
+ } catch (Exception e) {
+ log.warn("Error while deleting the collection " + collectionName, e);
+ return new CollectionAdminResponse();
+ } finally {
+ client.close();
+ }
+
+ return res;
+ }
+
+ private void waitForRecoveriesToFinish(String collection, boolean verbose) throws Exception {
+ CloudSolrClient client = this.createCloudClient(null);
+ try {
+ client.connect();
+ ZkStateReader zkStateReader = client.getZkStateReader();
+ super.waitForRecoveriesToFinish(collection, zkStateReader, verbose);
+ } finally {
+ client.close();
+ }
+ }
+
+ /**
+ * Asserts that the collection has the correct number of shards and replicas
+ */
+ protected void assertCollectionExpectations(String collectionName) throws Exception {
+ CloudSolrClient client = this.createCloudClient(null);
+ try {
+ client.connect();
+ ClusterState clusterState = client.getZkStateReader().getClusterState();
+
+ assertTrue("Could not find new collection " + collectionName, clusterState.hasCollection(collectionName));
+ Map<String, Slice> shards = clusterState.getCollection(collectionName).getSlicesMap();
+ // did we find expectedSlices shards/shards?
+ assertEquals("Found new collection " + collectionName + ", but mismatch on number of shards.", shardCount, shards.size());
+ int totalShards = 0;
+ for (String shardName : shards.keySet()) {
+ totalShards += shards.get(shardName).getReplicas().size();
+ }
+ int expectedTotalShards = shardCount * replicationFactor;
+ assertEquals("Found new collection " + collectionName + " with correct number of shards, but mismatch on number " +
+ "of shards.", expectedTotalShards, totalShards);
+ } finally {
+ client.close();
+ }
+ }
+
+ /**
+ * Restart a server.
+ */
+ protected void restartServer(CloudJettyRunner server) throws Exception {
+ // it seems we need to set the collection property to have the jetty properly restarted
+ System.setProperty("collection", server.collection);
+ JettySolrRunner jetty = server.jetty;
+ ChaosMonkey.stop(jetty);
+ ChaosMonkey.start(jetty);
+ System.clearProperty("collection");
+ waitForRecoveriesToFinish(server.collection, true);
+ updateMappingsFromZk(server.collection); // must update the mapping as the core node name might have changed
+ }
+
+ /**
+ * Restarts a list of servers.
+ */
+ protected void restartServers(List<CloudJettyRunner> servers) throws Exception {
+ for (CloudJettyRunner server : servers) {
+ this.restartServer(server);
+ }
+ }
+
+ private List<JettySolrRunner> jettys = new ArrayList<>();
+
+ /**
+ * Creates and starts a given number of servers.
+ */
+ protected List<String> startServers(int nServer) throws Exception {
+ String temporaryCollection = "tmp_collection";
+ System.setProperty("collection", temporaryCollection);
+ for (int i = 1; i <= nServer; i++) {
+ // give everyone there own solrhome
+ File jettyDir = createTempDir("jetty").toFile();
+ jettyDir.mkdirs();
+ setupJettySolrHome(jettyDir);
+ JettySolrRunner jetty = createJetty(jettyDir, null, "shard" + i);
+ jettys.add(jetty);
+ }
+
+ ZkStateReader zkStateReader = ((SolrDispatchFilter) jettys.get(0)
+ .getDispatchFilter().getFilter()).getCores().getZkController()
+ .getZkStateReader();
+
+ // now wait till we see the leader for each shard
+ for (int i = 1; i <= shardCount; i++) {
+ this.printLayout();
+ zkStateReader.getLeaderRetry(temporaryCollection, "shard" + i, 15000);
+ }
+
+ // store the node names
+ List<String> nodeNames = new ArrayList<>();
+ for (Slice shard : zkStateReader.getClusterState().getCollection(temporaryCollection).getSlices()) {
+ for (Replica replica : shard.getReplicas()) {
+ nodeNames.add(replica.getNodeName());
+ }
+ }
+
+ // delete the temporary collection - we will create our own collections later
+ this.deleteCollection(temporaryCollection);
+ System.clearProperty("collection");
+
+ return nodeNames;
+ }
+
+ @Override
+ protected void destroyServers() throws Exception {
+ for (JettySolrRunner runner : jettys) {
+ try {
+ ChaosMonkey.stop(runner);
+ } catch (Exception e) {
+ log.error("", e);
+ }
+ }
+
+ jettys.clear();
+ }
+
+ /**
+ * Mapping from collection to jettys
+ */
+ protected Map<String, List<CloudJettyRunner>> cloudJettys = new HashMap<>();
+
+ /**
+ * Mapping from collection/shard to jettys
+ */
+ protected Map<String, Map<String, List<CloudJettyRunner>>> shardToJetty = new HashMap<>();
+
+ /**
+ * Mapping from collection/shard leader to jettys
+ */
+ protected Map<String, Map<String, CloudJettyRunner>> shardToLeaderJetty = new HashMap<>();
+
+ /**
+ * Updates the mappings between the jetty's instances and the zookeeper cluster state.
+ */
+ protected void updateMappingsFromZk(String collection) throws Exception {
+ List<CloudJettyRunner> cloudJettys = new ArrayList<>();
+ Map<String, List<CloudJettyRunner>> shardToJetty = new HashMap<>();
+ Map<String, CloudJettyRunner> shardToLeaderJetty = new HashMap<>();
+
+ CloudSolrClient cloudClient = this.createCloudClient(null);
+ try {
+ cloudClient.connect();
+ ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+ zkStateReader.updateClusterState(true);
+ ClusterState clusterState = zkStateReader.getClusterState();
+ DocCollection coll = clusterState.getCollection(collection);
+
+ for (JettySolrRunner jetty : jettys) {
+ int port = jetty.getLocalPort();
+ if (port == -1) {
+ throw new RuntimeException("Cannot find the port for jetty");
+ }
+
+ nextJetty:
+ for (Slice shard : coll.getSlices()) {
+ Set<Map.Entry<String, Replica>> entries = shard.getReplicasMap().entrySet();
+ for (Map.Entry<String, Replica> entry : entries) {
+ Replica replica = entry.getValue();
+ if (replica.getStr(ZkStateReader.BASE_URL_PROP).contains(":" + port)) {
+ if (!shardToJetty.containsKey(shard.getName())) {
+ shardToJetty.put(shard.getName(), new ArrayList<CloudJettyRunner>());
+ }
+ boolean isLeader = shard.getLeader() == replica;
+ CloudJettyRunner cjr = new CloudJettyRunner(jetty, replica, collection, shard.getName(), entry.getKey());
+ shardToJetty.get(shard.getName()).add(cjr);
+ if (isLeader) {
+ shardToLeaderJetty.put(shard.getName(), cjr);
+ }
+ cloudJettys.add(cjr);
+ break nextJetty;
+ }
+ }
+ }
+ }
+
+ this.cloudJettys.put(collection, cloudJettys);
+ this.shardToJetty.put(collection, shardToJetty);
+ this.shardToLeaderJetty.put(collection, shardToLeaderJetty);
+ } finally {
+ cloudClient.close();
+ }
+ }
+
+ /**
+ * Wrapper around a {@link org.apache.solr.client.solrj.embedded.JettySolrRunner} to map the jetty
+ * instance to various information of the cloud cluster, such as the collection and shard
+ * that is served by the jetty instance, the node name, core node name, url, etc.
+ */
+ public static class CloudJettyRunner {
+
+ public JettySolrRunner jetty;
+ public String nodeName;
+ public String coreNodeName;
+ public String url;
+ public SolrClient client;
+ public Replica info;
+ public String shard;
+ public String collection;
+
+ public CloudJettyRunner(JettySolrRunner jetty, Replica replica,
+ String collection, String shard, String coreNodeName) {
+ this.jetty = jetty;
+ this.info = replica;
+ this.collection = collection;
+
+ // we need to update the jetty's shard so that it registers itself to the right shard when restarted
+ this.shard = shard;
+ this.jetty.setShards(this.shard);
+
+ // we need to update the jetty's shard so that it registers itself under the right core name when restarted
+ this.coreNodeName = coreNodeName;
+ this.jetty.setCoreNodeName(this.coreNodeName);
+
+ this.nodeName = replica.getNodeName();
+
+ ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(info);
+ this.url = coreNodeProps.getCoreUrl();
+
+ // strip the trailing slash as this can cause issues when executing requests
+ this.client = createNewSolrServer(this.url.substring(0, this.url.length() - 1));
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((url == null) ? 0 : url.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ CloudJettyRunner other = (CloudJettyRunner) obj;
+ if (url == null) {
+ if (other.url != null) return false;
+ } else if (!url.equals(other.url)) return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "CloudJettyRunner [url=" + url + "]";
+ }
+
+ }
+
+ protected static SolrClient createNewSolrServer(String baseUrl) {
+ try {
+ // setup the server...
+ HttpSolrClient s = new HttpSolrClient(baseUrl);
+ s.setConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT);
+ s.setDefaultMaxConnectionsPerHost(100);
+ s.setMaxTotalConnections(100);
+ return s;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ protected void waitForReplicationToComplete(String collectionName, String shardId) throws Exception {
+ while (true) {
+ log.info("Checking queue size @ {}:{}", collectionName, shardId);
+ long size = this.getQueueSize(collectionName, shardId);
+ if (size <= 0) {
+ return;
+ }
+ log.info("Waiting for replication to complete. Queue size: {} @ {}:{}", size, collectionName, shardId);
+ Thread.sleep(1000); // wait a bit for the replication to complete
+ }
+ }
+
+ protected long getQueueSize(String collectionName, String shardId) throws Exception {
+ NamedList rsp = this.invokeCdcrAction(shardToLeaderJetty.get(collectionName).get(shardId), CdcrParams.CdcrAction.QUEUES);
+ NamedList host = (NamedList) ((NamedList) rsp.get(CdcrParams.QUEUES)).getVal(0);
+ NamedList status = (NamedList) host.get(TARGET_COLLECTION);
+ return (Long) status.get(CdcrParams.QUEUE_SIZE);
+ }
+
+ /**
+ * Asserts that the number of transaction logs across all the shards
+ */
+ protected void assertUpdateLogs(String collection, int maxNumberOfTLogs) throws Exception {
+ CollectionInfo info = collectInfo(collection);
+ Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
+
+ int leaderLogs = 0;
+ ArrayList<Integer> replicasLogs = new ArrayList<>(Collections.nCopies(replicationFactor - 1, 0));
+
+ for (String shard : shardToCoresMap.keySet()) {
+ leaderLogs += numberOfFiles(info.getLeader(shard).ulogDir);
+ for (int i = 0; i < replicationFactor - 1; i++) {
+ replicasLogs.set(i, replicasLogs.get(i) + numberOfFiles(info.getReplicas(shard).get(i).ulogDir));
+ }
+ }
+
+ for (Integer replicaLogs : replicasLogs) {
+ log.info("Number of logs in update log on leader {} and on replica {}", leaderLogs, replicaLogs);
+
+ // replica logs must be always equal or superior to leader logs
+ assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is different than on leader: %d.",
+ replicaLogs, leaderLogs), leaderLogs <= replicaLogs);
+
+ assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on leader: %d is superior to: %d.",
+ leaderLogs, maxNumberOfTLogs), maxNumberOfTLogs >= leaderLogs);
+
+ assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is superior to: %d.",
+ replicaLogs, maxNumberOfTLogs), maxNumberOfTLogs >= replicaLogs);
+ }
+ }
+
+ private int numberOfFiles(String dir) {
+ File file = new File(dir);
+ if (!file.isDirectory()) {
+ assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
+ }
+ log.info("Update log dir {} contains: {}", dir, file.listFiles());
+ return file.listFiles().length;
+ }
+
+ protected CollectionInfo collectInfo(String collection) throws Exception {
+ CollectionInfo info = new CollectionInfo(collection);
+ for (String shard : shardToJetty.get(collection).keySet()) {
+ List<CloudJettyRunner> jettyRunners = shardToJetty.get(collection).get(shard);
+ for (CloudJettyRunner jettyRunner : jettyRunners) {
+ SolrDispatchFilter filter = (SolrDispatchFilter) jettyRunner.jetty.getDispatchFilter().getFilter();
+ for (SolrCore core : filter.getCores().getCores()) {
+ info.addCore(core, shard, shardToLeaderJetty.get(collection).containsValue(jettyRunner));
+ }
+ }
+ }
+
+ return info;
+ }
+
+ protected class CollectionInfo {
+
+ List<CoreInfo> coreInfos = new ArrayList<>();
+
+ String collection;
+
+ CollectionInfo(String collection) {
+ this.collection = collection;
+ }
+
+ /**
+ * @return Returns a map shard -> list of cores
+ */
+ Map<String, List<CoreInfo>> getShardToCoresMap() {
+ Map<String, List<CoreInfo>> map = new HashMap<>();
+ for (CoreInfo info : coreInfos) {
+ List<CoreInfo> list = map.get(info.shard);
+ if (list == null) {
+ list = new ArrayList<>();
+ map.put(info.shard, list);
+ }
+ list.add(info);
+ }
+ return map;
+ }
+
+ CoreInfo getLeader(String shard) {
+ List<CoreInfo> coreInfos = getShardToCoresMap().get(shard);
+ for (CoreInfo info : coreInfos) {
+ if (info.isLeader) {
+ return info;
+ }
+ }
+ assertTrue(String.format(Locale.ENGLISH, "There is no leader for collection %s shard %s", collection, shard), false);
+ return null;
+ }
+
+ List<CoreInfo> getReplicas(String shard) {
+ List<CoreInfo> coreInfos = getShardToCoresMap().get(shard);
+ coreInfos.remove(getLeader(shard));
+ return coreInfos;
+ }
+
+ void addCore(SolrCore core, String shard, boolean isLeader) throws Exception {
+ CoreInfo info = new CoreInfo();
+ info.collectionName = core.getName();
+ info.shard = shard;
+ info.isLeader = isLeader;
+ info.ulogDir = core.getUpdateHandler().getUpdateLog().getLogDir();
+
+ this.coreInfos.add(info);
+ }
+
+ public class CoreInfo {
+ String collectionName;
+ String shard;
+ boolean isLeader;
+ String ulogDir;
+ }
+
+ }
+
+}
+
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java Fri May 22 18:58:29 2015
@@ -0,0 +1,598 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.CdcrParams;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slow
+public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest {
+
+ @Override
+ public void distribSetUp() throws Exception {
+ schemaString = "schema15.xml"; // we need a string id
+ super.distribSetUp();
+ }
+
+ @Test
+ @ShardsFixed(num = 4)
+ public void doTest() throws Exception {
+ this.doTestDeleteCreateSourceCollection();
+ this.doTestTargetCollectionNotAvailable();
+ this.doTestReplicationStartStop();
+ this.doTestReplicationAfterRestart();
+ this.doTestReplicationAfterLeaderChange();
+ this.doTestUpdateLogSynchronisation();
+ this.doTestBufferOnNonLeader();
+ this.doTestOps();
+ this.doTestBatchAddsWithDelete();
+ this.doTestBatchBoundaries();
+ this.doTestResilienceWithDeleteByQueryOnTarget();
+ }
+
+ /**
+ * Checks that the test framework handles properly the creation and deletion of collections and the
+ * restart of servers.
+ */
+ public void doTestDeleteCreateSourceCollection() throws Exception {
+ log.info("Indexing documents");
+
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ docs.add(getDoc(id, Integer.toString(i)));
+ }
+ index(SOURCE_COLLECTION, docs);
+ index(TARGET_COLLECTION, docs);
+
+ assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+ log.info("Restarting leader @ source_collection:shard1");
+
+ this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
+
+ assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+ log.info("Clearing source_collection");
+
+ this.clearSourceCollection();
+
+ assertEquals(0, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+ log.info("Restarting leader @ target_collection:shard1");
+
+ this.restartServer(shardToLeaderJetty.get(TARGET_COLLECTION).get(SHARD1));
+
+ assertEquals(0, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+ log.info("Clearing target_collection");
+
+ this.clearTargetCollection();
+
+ assertEquals(0, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(0, getNumDocs(TARGET_COLLECTION));
+
+ assertCollectionExpectations(SOURCE_COLLECTION);
+ assertCollectionExpectations(TARGET_COLLECTION);
+ }
+
+ public void doTestTargetCollectionNotAvailable() throws Exception {
+ this.clearSourceCollection();
+ this.clearTargetCollection();
+
+ // send start action to first shard
+ NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+ assertEquals(CdcrParams.ProcessState.STARTED.toLower(), status.get(CdcrParams.ProcessState.getParam()));
+
+ // check status
+ this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
+
+ // Kill all the servers of the target
+ this.deleteCollection(TARGET_COLLECTION);
+
+ // Index a few documents to trigger the replication
+ index(SOURCE_COLLECTION, getDoc(id, "a"));
+ index(SOURCE_COLLECTION, getDoc(id, "b"));
+ index(SOURCE_COLLECTION, getDoc(id, "c"));
+ index(SOURCE_COLLECTION, getDoc(id, "d"));
+ index(SOURCE_COLLECTION, getDoc(id, "e"));
+ index(SOURCE_COLLECTION, getDoc(id, "f"));
+
+ assertEquals(6, getNumDocs(SOURCE_COLLECTION));
+
+ Thread.sleep(1000); // wait a bit for the replicator thread to be triggered
+
+ rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.ERRORS);
+ NamedList collections = (NamedList) ((NamedList) rsp.get(CdcrParams.ERRORS)).getVal(0);
+ NamedList errors = (NamedList) collections.get(TARGET_COLLECTION);
+ assertTrue(0 < (Long) errors.get(CdcrParams.CONSECUTIVE_ERRORS));
+ NamedList lastErrors = (NamedList) errors.get(CdcrParams.LAST);
+ assertNotNull(lastErrors);
+ assertTrue(0 < lastErrors.size());
+ }
+
+ public void doTestReplicationStartStop() throws Exception {
+ this.clearSourceCollection();
+ this.clearTargetCollection(); // this might log a warning to indicate he was not able to delete the collection (collection was deleted in the previous test)
+
+ int start = 0;
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (; start < 10; start++) {
+ docs.add(getDoc(id, Integer.toString(start)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(0, getNumDocs(TARGET_COLLECTION));
+
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+
+ commit(TARGET_COLLECTION);
+
+ assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
+
+ docs.clear();
+ for (; start < 110; start++) {
+ docs.add(getDoc(id, Integer.toString(start)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ assertEquals(110, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+ // Start again CDCR, the source cluster should reinitialise its log readers
+ // with the latest checkpoints
+
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+
+ commit(TARGET_COLLECTION);
+
+ assertEquals(110, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(110, getNumDocs(TARGET_COLLECTION));
+ }
+
+ /**
+ * Check that the replication manager is properly restarted after a node failure.
+ */
+ public void doTestReplicationAfterRestart() throws Exception {
+ this.clearSourceCollection();
+ this.clearTargetCollection();
+
+ log.info("Starting CDCR");
+
+ // send start action to first shard
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+ log.info("Indexing 10 documents");
+
+ int start = 0;
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (; start < 10; start++) {
+ docs.add(getDoc(id, Integer.toString(start)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ log.info("Querying source collection");
+
+ assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+
+ log.info("Waiting for replication");
+
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+ log.info("Querying target collection");
+
+ commit(TARGET_COLLECTION);
+ assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+ log.info("Restarting shard1");
+
+ this.restartServers(shardToJetty.get(SOURCE_COLLECTION).get(SHARD1));
+
+ log.info("Indexing 100 documents");
+
+ docs.clear();
+ for (; start < 110; start++) {
+ docs.add(getDoc(id, Integer.toString(start)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ log.info("Querying source collection");
+
+ assertEquals(110, getNumDocs(SOURCE_COLLECTION));
+
+ log.info("Waiting for replication");
+
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+ log.info("Querying target collection");
+
+ commit(TARGET_COLLECTION);
+ assertEquals(110, getNumDocs(TARGET_COLLECTION));
+ }
+
+ /**
+ * Check that the replication manager is properly started after a change of leader.
+ * This test also checks that the log readers on the new leaders are initialised with
+ * the target's checkpoint.
+ */
+ public void doTestReplicationAfterLeaderChange() throws Exception {
+ this.clearSourceCollection();
+ this.clearTargetCollection();
+
+ log.info("Starting CDCR");
+
+ // send start action to first shard
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+ log.info("Indexing 10 documents");
+
+ int start = 0;
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (; start < 10; start++) {
+ docs.add(getDoc(id, Integer.toString(start)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ log.info("Querying source collection");
+
+ assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+
+ log.info("Waiting for replication");
+
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+ log.info("Querying target collection");
+
+ commit(TARGET_COLLECTION);
+ assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+ log.info("Restarting target leaders");
+
+ // Close all the leaders, then restart them
+ this.restartServer(shardToLeaderJetty.get(TARGET_COLLECTION).get(SHARD1));
+ this.restartServer(shardToLeaderJetty.get(TARGET_COLLECTION).get(SHARD2));
+
+ log.info("Restarting source leaders");
+
+ // Close all the leaders, then restart them
+ this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
+ this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2));
+
+ log.info("Checking queue size of new source leaders");
+
+ // If the log readers of the new leaders are initialised with the target's checkpoint, the
+ // queue size must be inferior to the current number of documents indexed.
+ // The queue might be not completely empty since the new target checkpoint is probably not the
+ // last document received
+ assertTrue(this.getQueueSize(SOURCE_COLLECTION, SHARD1) < 10);
+ assertTrue(this.getQueueSize(SOURCE_COLLECTION, SHARD2) < 10);
+
+ log.info("Indexing 100 documents");
+
+ docs.clear();
+ for (; start < 110; start++) {
+ docs.add(getDoc(id, Integer.toString(start)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ log.info("Querying source collection");
+
+ assertEquals(110, getNumDocs(SOURCE_COLLECTION));
+
+ log.info("Waiting for replication");
+
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+ log.info("Querying target collection");
+
+ commit(TARGET_COLLECTION);
+ assertEquals(110, getNumDocs(TARGET_COLLECTION));
+ }
+
+ /**
+ * Check that the update logs are synchronised between leader and non-leader nodes
+ */
+ public void doTestUpdateLogSynchronisation() throws Exception {
+ this.clearSourceCollection();
+ this.clearTargetCollection();
+
+ // buffering is enabled by default, so disable it
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
+
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+ for (int i = 0; i < 50; i++) {
+ index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i))); // will perform a commit for every document
+ }
+
+ // wait a bit for the replication to complete
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+ commit(TARGET_COLLECTION);
+
+ // Stop CDCR
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
+
+ assertEquals(50, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(50, getNumDocs(TARGET_COLLECTION));
+
+ index(SOURCE_COLLECTION, getDoc(id, Integer.toString(0))); // trigger update log cleaning on the non-leader nodes
+
+ // some of the tlogs should be trimmed, we must have less than 50 tlog files on both leader and non-leader
+ assertUpdateLogs(SOURCE_COLLECTION, 50);
+
+ for (int i = 50; i < 100; i++) {
+ index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
+ }
+
+ index(SOURCE_COLLECTION, getDoc(id, Integer.toString(0))); // trigger update log cleaning on the non-leader nodes
+
+ // at this stage, we should have created one tlog file per document, and some of them must have been cleaned on the
+ // leader since we are not buffering and replication is stopped, (we should have exactly 10 tlog files on the leader
+ // and 11 on the non-leader)
+ // the non-leader must have synchronised its update log with its leader
+ assertUpdateLogs(SOURCE_COLLECTION, 50);
+ }
+
+ /**
+ * Check that the buffer is always activated on non-leader nodes.
+ */
+ public void doTestBufferOnNonLeader() throws Exception {
+ this.clearSourceCollection();
+ this.clearTargetCollection();
+
+ // buffering is enabled by default, so disable it
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
+
+ // Start CDCR
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+ // Index documents
+ for (int i = 0; i < 200; i++) {
+ index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i))); // will perform a commit for every document
+ }
+
+ // And immediately, close all the leaders, then restart them. It is likely that the replication will not be
+ // performed fully, and therefore be continued by the new leader
+ // At this stage, the new leader must have been elected
+ this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
+ this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2));
+
+ // wait a bit for the replication to complete
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+ commit(TARGET_COLLECTION);
+
+ // If the non-leader node were buffering updates, then the replication must be complete
+ assertEquals(200, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(200, getNumDocs(TARGET_COLLECTION));
+ }
+
+ /**
+ * Check the ops statistics.
+ */
+ public void doTestOps() throws Exception {
+ this.clearSourceCollection();
+ this.clearTargetCollection();
+
+ // Index documents
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (int i = 0; i < 200; i++) {
+ docs.add(getDoc(id, Integer.toString(i)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ // Start CDCR
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+ // wait a bit for the replication to complete
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+ NamedList rsp = this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.OPS);
+ NamedList collections = (NamedList) ((NamedList) rsp.get(CdcrParams.OPERATIONS_PER_SECOND)).getVal(0);
+ NamedList ops = (NamedList) collections.get(TARGET_COLLECTION);
+ double opsAll = (Double) ops.get(CdcrParams.COUNTER_ALL);
+ double opsAdds = (Double) ops.get(CdcrParams.COUNTER_ADDS);
+ assertTrue(opsAll > 0);
+ assertEquals(opsAll, opsAdds, 0);
+
+ double opsDeletes = (Double) ops.get(CdcrParams.COUNTER_DELETES);
+ assertEquals(0, opsDeletes, 0);
+ }
+
+ /**
+ * Check that batch updates with deletes
+ */
+ public void doTestBatchAddsWithDelete() throws Exception {
+ this.clearSourceCollection();
+ this.clearTargetCollection();
+
+ // Index 50 documents
+ int start = 0;
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (; start < 50; start++) {
+ docs.add(getDoc(id, Integer.toString(start)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ // Delete 10 documents: 10-19
+ List<String> ids = new ArrayList<>();
+ for (int id = 10; id < 20; id++) {
+ ids.add(Integer.toString(id));
+ }
+ deleteById(SOURCE_COLLECTION, ids);
+
+ // Index 10 documents
+ docs = new ArrayList<>();
+ for (; start < 60; start++) {
+ docs.add(getDoc(id, Integer.toString(start)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ // Delete 1 document: 50
+ ids = new ArrayList<>();
+ ids.add(Integer.toString(50));
+ deleteById(SOURCE_COLLECTION, ids);
+
+ // Index 10 documents
+ docs = new ArrayList<>();
+ for (; start < 70; start++) {
+ docs.add(getDoc(id, Integer.toString(start)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ // Start CDCR
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+ // wait a bit for the replication to complete
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+ commit(TARGET_COLLECTION);
+
+ // If the non-leader node were buffering updates, then the replication must be complete
+ assertEquals(59, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(59, getNumDocs(TARGET_COLLECTION));
+ }
+
+ /**
+ * Checks that batches are correctly constructed when batch boundaries are reached.
+ */
+ public void doTestBatchBoundaries() throws Exception {
+ invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+ log.info("Indexing documents");
+
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (int i = 0; i < 128; i++) { // should create two full batches (default batch = 64)
+ docs.add(getDoc(id, Integer.toString(i)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ assertEquals(128, getNumDocs(SOURCE_COLLECTION));
+
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+
+ commit(TARGET_COLLECTION);
+
+ assertEquals(128, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(128, getNumDocs(TARGET_COLLECTION));
+ }
+
+ /**
+ * Check resilience of replication with delete by query executed on targets
+ */
+ public void doTestResilienceWithDeleteByQueryOnTarget() throws Exception {
+ this.clearSourceCollection();
+ this.clearTargetCollection();
+
+ // Index 50 documents
+ int start = 0;
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (; start < 50; start++) {
+ docs.add(getDoc(id, Integer.toString(start)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ // Start CDCR
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+ // wait a bit for the replication to complete
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+ commit(TARGET_COLLECTION);
+
+ // If the non-leader node were buffering updates, then the replication must be complete
+ assertEquals(50, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(50, getNumDocs(TARGET_COLLECTION));
+
+ deleteByQuery(SOURCE_COLLECTION, "*:*");
+ deleteByQuery(TARGET_COLLECTION, "*:*");
+
+ assertEquals(0, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(0, getNumDocs(TARGET_COLLECTION));
+
+ docs.clear();
+ for (; start < 100; start++) {
+ docs.add(getDoc(id, Integer.toString(start)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ // wait a bit for the replication to complete
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+ commit(TARGET_COLLECTION);
+
+ assertEquals(50, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(50, getNumDocs(TARGET_COLLECTION));
+
+ deleteByQuery(TARGET_COLLECTION, "*:*");
+
+ assertEquals(50, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(0, getNumDocs(TARGET_COLLECTION));
+
+ // Restart CDCR
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
+ Thread.sleep(500); // wait a bit for the state to synch
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+ docs.clear();
+ for (; start < 150; start++) {
+ docs.add(getDoc(id, Integer.toString(start)));
+ }
+ index(SOURCE_COLLECTION, docs);
+
+ // wait a bit for the replication to complete
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+ commit(TARGET_COLLECTION);
+
+ assertEquals(100, getNumDocs(SOURCE_COLLECTION));
+ assertEquals(50, getNumDocs(TARGET_COLLECTION));
+ }
+
+}
+
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java Fri May 22 18:58:29 2015
@@ -0,0 +1,245 @@
+/**
+ * Copyright (c) 2015 Renaud Delbru. All Rights Reserved.
+ */
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slow
+public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
+
+ @Override
+ public void distribSetUp() throws Exception {
+ schemaString = "schema15.xml"; // we need a string id
+ createTargetCollection = false; // we do not need the target cluster
+ shardCount = 1; // we need only one shard
+ // we need a persistent directory, otherwise the UpdateHandler will erase existing tlog files after restarting a node
+ System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+ super.distribSetUp();
+ }
+
+ @Test
+ @ShardsFixed(num = 4)
+ public void doTest() throws Exception {
+ this.doTestFullReplication();
+ this.doTestPartialReplication();
+ this.doTestPartialReplicationWithTruncatedTlog();
+ this.doTestPartialReplicationAfterPeerSync();
+ }
+
+ /**
+ * Test the scenario where the slave is killed from the start. The replication
+ * strategy should fetch all the missing tlog files from the leader.
+ */
+ public void doTestFullReplication() throws Exception {
+ List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+ ChaosMonkey.stop(slaves.get(0).jetty);
+
+ for (int i = 0; i < 10; i++) {
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (int j = i * 10; j < (i * 10) + 10; j++) {
+ docs.add(getDoc(id, Integer.toString(j)));
+ }
+ index(SOURCE_COLLECTION, docs);
+ }
+
+ assertEquals(100, getNumDocs(SOURCE_COLLECTION));
+
+ // Restart the slave node to trigger Replication strategy
+ this.restartServer(slaves.get(0));
+
+ this.assertUpdateLogs(SOURCE_COLLECTION, 10);
+ }
+
+ /**
+ * Test the scenario where the slave is killed before receiving all the documents. The replication
+ * strategy should fetch all the missing tlog files from the leader.
+ */
+ public void doTestPartialReplication() throws Exception {
+ this.clearSourceCollection();
+
+ for (int i = 0; i < 5; i++) {
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (int j = i * 20; j < (i * 20) + 20; j++) {
+ docs.add(getDoc(id, Integer.toString(j)));
+ }
+ index(SOURCE_COLLECTION, docs);
+ }
+
+ List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+ ChaosMonkey.stop(slaves.get(0).jetty);
+
+ for (int i = 5; i < 10; i++) {
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (int j = i * 20; j < (i * 20) + 20; j++) {
+ docs.add(getDoc(id, Integer.toString(j)));
+ }
+ index(SOURCE_COLLECTION, docs);
+ }
+
+ assertEquals(200, getNumDocs(SOURCE_COLLECTION));
+
+ // Restart the slave node to trigger Replication strategy
+ this.restartServer(slaves.get(0));
+
+ // at this stage, the slave should have replicated the 5 missing tlog files
+ this.assertUpdateLogs(SOURCE_COLLECTION, 10);
+ }
+
+ /**
+ * Test the scenario where the slave is killed before receiving a commit. This creates a truncated tlog
+ * file on the slave node. The replication strategy should detect this truncated file, and fetch the
+ * non-truncated file from the leader.
+ */
+ public void doTestPartialReplicationWithTruncatedTlog() throws Exception {
+ this.clearSourceCollection();
+
+ CloudSolrClient client = createCloudClient(SOURCE_COLLECTION);
+ List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+
+ try {
+ for (int i = 0; i < 10; i++) {
+ for (int j = i * 20; j < (i * 20) + 20; j++) {
+ client.add(getDoc(id, Integer.toString(j)));
+
+ // Stop the slave in the middle of a batch to create a truncated tlog on the slave
+ if (j == 45) {
+ ChaosMonkey.stop(slaves.get(0).jetty);
+ }
+
+ }
+ commit(SOURCE_COLLECTION);
+ }
+ } finally {
+ client.close();
+ }
+
+ assertEquals(200, getNumDocs(SOURCE_COLLECTION));
+
+ // Restart the slave node to trigger Replication recovery
+ this.restartServer(slaves.get(0));
+
+ // at this stage, the slave should have replicated the 5 missing tlog files
+ this.assertUpdateLogs(SOURCE_COLLECTION, 10);
+ }
+
+ /**
+ * Test the scenario where the slave first recovered with a PeerSync strategy, then with a Replication strategy.
+ * The PeerSync strategy will generate a single tlog file for all the missing updates on the slave node.
+ * If a Replication strategy occurs at a later stage, it should remove this tlog file generated by PeerSync
+ * and fetch the corresponding tlog files from the leader.
+ */
+ public void doTestPartialReplicationAfterPeerSync() throws Exception {
+ this.clearSourceCollection();
+
+ for (int i = 0; i < 5; i++) {
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (int j = i * 10; j < (i * 10) + 10; j++) {
+ docs.add(getDoc(id, Integer.toString(j)));
+ }
+ index(SOURCE_COLLECTION, docs);
+ }
+
+ List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+ ChaosMonkey.stop(slaves.get(0).jetty);
+
+ for (int i = 5; i < 10; i++) {
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (int j = i * 10; j < (i * 10) + 10; j++) {
+ docs.add(getDoc(id, Integer.toString(j)));
+ }
+ index(SOURCE_COLLECTION, docs);
+ }
+
+ assertEquals(100, getNumDocs(SOURCE_COLLECTION));
+
+ // Restart the slave node to trigger PeerSync recovery
+ // (the update windows between leader and slave is small enough)
+ this.restartServer(slaves.get(0));
+
+ ChaosMonkey.stop(slaves.get(0).jetty);
+
+ for (int i = 10; i < 15; i++) {
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (int j = i * 20; j < (i * 20) + 20; j++) {
+ docs.add(getDoc(id, Integer.toString(j)));
+ }
+ index(SOURCE_COLLECTION, docs);
+ }
+
+ // restart the slave node to trigger Replication recovery
+ this.restartServer(slaves.get(0));
+
+ // at this stage, the slave should have replicated the 5 missing tlog files
+ this.assertUpdateLogs(SOURCE_COLLECTION, 15);
+ }
+
+ private List<CloudJettyRunner> getShardToSlaveJetty(String collection, String shard) {
+ List<CloudJettyRunner> jetties = new ArrayList<>(shardToJetty.get(collection).get(shard));
+ CloudJettyRunner leader = shardToLeaderJetty.get(collection).get(shard);
+ jetties.remove(leader);
+ return jetties;
+ }
+
+ /**
+ * Asserts that the transaction logs between the leader and slave
+ */
+ @Override
+ protected void assertUpdateLogs(String collection, int maxNumberOfTLogs) throws Exception {
+ CollectionInfo info = collectInfo(collection);
+ Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
+
+ for (String shard : shardToCoresMap.keySet()) {
+ Map<Long, Long> leaderFilesMeta = this.getFilesMeta(info.getLeader(shard).ulogDir);
+ Map<Long, Long> slaveFilesMeta = this.getFilesMeta(info.getReplicas(shard).get(0).ulogDir);
+
+ assertEquals("Incorrect number of tlog files on the leader", maxNumberOfTLogs, leaderFilesMeta.size());
+ assertEquals("Incorrect number of tlog files on the slave", maxNumberOfTLogs, slaveFilesMeta.size());
+
+ for (Long leaderFileVersion : leaderFilesMeta.keySet()) {
+ assertTrue("Slave is missing a tlog for version " + leaderFileVersion, slaveFilesMeta.containsKey(leaderFileVersion));
+ assertEquals("Slave's tlog file size differs for version " + leaderFileVersion, leaderFilesMeta.get(leaderFileVersion), slaveFilesMeta.get(leaderFileVersion));
+ }
+ }
+ }
+
+ private Map<Long, Long> getFilesMeta(String dir) {
+ File file = new File(dir);
+ if (!file.isDirectory()) {
+ assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
+ }
+
+ Map<Long, Long> filesMeta = new HashMap<>();
+ for (File tlogFile : file.listFiles()) {
+ filesMeta.put(Math.abs(Long.parseLong(tlogFile.getName().substring(tlogFile.getName().lastIndexOf('.') + 1))), tlogFile.length());
+ }
+ return filesMeta;
+ }
+
+}
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java Fri May 22 18:58:29 2015
@@ -0,0 +1,157 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.CdcrParams;
+import org.junit.Test;
+
+@Slow
+public class CdcrRequestHandlerTest extends BaseCdcrDistributedZkTest {
+
+ @Override
+ public void distribSetUp() throws Exception {
+ schemaString = "schema15.xml"; // we need a string id
+ createTargetCollection = false; // we do not need the target cluster
+ super.distribSetUp();
+ }
+
+ @Test
+ @ShardsFixed(num = 4)
+
+ public void doTest() throws Exception {
+ this.doTestLifeCycleActions();
+ this.doTestCheckpointActions();
+ this.doTestBufferActions();
+ }
+
+ // check that the life-cycle state is properly synchronised across nodes
+ public void doTestLifeCycleActions() throws Exception {
+ // check initial status
+ this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
+
+ // send start action to first shard
+ NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+ assertEquals(CdcrParams.ProcessState.STARTED.toLower(), status.get(CdcrParams.ProcessState.getParam()));
+
+ // check status
+ this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
+
+ // Restart the leader of shard 1
+ this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
+
+ // check status - the node that died should have picked up the original state
+ this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
+
+ // send stop action to second shard
+ rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.STOP);
+ status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+ assertEquals(CdcrParams.ProcessState.STOPPED.toLower(), status.get(CdcrParams.ProcessState.getParam()));
+
+ // check status
+ this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
+ }
+
+ // check the checkpoint API
+ public void doTestCheckpointActions() throws Exception {
+ // initial request on an empty index, must return -1
+ NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+ assertEquals(-1l, rsp.get(CdcrParams.CHECKPOINT));
+
+ index(SOURCE_COLLECTION, getDoc(id, "a")); // shard 2
+
+ // only one document indexed in shard 2, the checkpoint must be still -1
+ rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+ assertEquals(-1l, rsp.get(CdcrParams.CHECKPOINT));
+
+ index(SOURCE_COLLECTION, getDoc(id, "b")); // shard 1
+
+ // a second document indexed in shard 1, the checkpoint must come from shard 2
+ rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+ long checkpoint1 = (Long) rsp.get(CdcrParams.CHECKPOINT);
+ long expected = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
+ assertEquals(expected, checkpoint1);
+
+ index(SOURCE_COLLECTION, getDoc(id, "c")); // shard 1
+
+ // a third document indexed in shard 1, the checkpoint must still come from shard 2
+ rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+ assertEquals(checkpoint1, rsp.get(CdcrParams.CHECKPOINT));
+
+ index(SOURCE_COLLECTION, getDoc(id, "d")); // shard 2
+
+ // a fourth document indexed in shard 2, the checkpoint must come from shard 1
+ rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+ long checkpoint2 = (Long) rsp.get(CdcrParams.CHECKPOINT);
+ expected = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
+ assertEquals(expected, checkpoint2);
+
+ // send a delete by query
+ deleteByQuery(SOURCE_COLLECTION, "*:*");
+
+ // all the checkpoints must come from the DBQ
+ rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+ long checkpoint3 = (Long) rsp.get(CdcrParams.CHECKPOINT);
+ assertTrue(checkpoint3 > 0); // ensure that checkpoints from deletes are in absolute form
+ checkpoint3 = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
+ assertTrue(checkpoint3 > 0); // ensure that checkpoints from deletes are in absolute form
+ checkpoint3 = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
+ assertTrue(checkpoint3 > 0); // ensure that checkpoints from deletes are in absolute form
+
+ // replication never started, lastProcessedVersion should be -1 for both shards
+ rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.LASTPROCESSEDVERSION);
+ long lastVersion = (Long) rsp.get(CdcrParams.LAST_PROCESSED_VERSION);
+ assertEquals(-1l, lastVersion);
+
+ rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.LASTPROCESSEDVERSION);
+ lastVersion = (Long) rsp.get(CdcrParams.LAST_PROCESSED_VERSION);
+ assertEquals(-1l, lastVersion);
+ }
+
+ // check that the buffer state is properly synchronised across nodes
+ public void doTestBufferActions() throws Exception {
+ // check initial status
+ this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
+
+ // send disable buffer action to first shard
+ NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
+ NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+ assertEquals(CdcrParams.BufferState.DISABLED.toLower(), status.get(CdcrParams.BufferState.getParam()));
+
+ // check status
+ this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.DISABLED);
+
+ // Restart the leader of shard 1
+ this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
+
+ // check status
+ this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.DISABLED);
+
+ // send enable buffer action to second shard
+ rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.ENABLEBUFFER);
+ status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+ assertEquals(CdcrParams.BufferState.ENABLED.toLower(), status.get(CdcrParams.BufferState.getParam()));
+
+ // check status
+ this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
+ }
+
+}
+
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java Fri May 22 18:58:29 2015
@@ -0,0 +1,304 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.update.processor.CdcrUpdateProcessor;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class CdcrVersionReplicationTest extends BaseCdcrDistributedZkTest {
+
+ private static final String vfield = DistributedUpdateProcessor.VERSION_FIELD;
+ SolrClient solrServer;
+
+ public CdcrVersionReplicationTest() {
+ schemaString = "schema15.xml"; // we need a string id
+ super.createTargetCollection = false;
+ }
+
+ SolrClient createClientRandomly() throws Exception {
+ int r = random().nextInt(100);
+
+ // testing the smart cloud client (requests to leaders) is more important than testing the forwarding logic
+ if (r < 80) {
+ return createCloudClient(SOURCE_COLLECTION);
+ }
+
+ if (r < 90) {
+ return createNewSolrServer(shardToJetty.get(SOURCE_COLLECTION).get(SHARD1).get(random().nextInt(2)).url);
+ }
+
+ return createNewSolrServer(shardToJetty.get(SOURCE_COLLECTION).get(SHARD2).get(random().nextInt(2)).url);
+ }
+
+ @Test
+ @ShardsFixed(num = 4)
+
+ public void doTest() throws Exception {
+ SolrClient client = createClientRandomly();
+ try {
+ handle.clear();
+ handle.put("timestamp", SKIPVAL);
+
+ doTestCdcrDocVersions(client);
+
+ commit(SOURCE_COLLECTION); // work arround SOLR-5628
+ } finally {
+ client.close();
+ }
+ }
+
+ private void doTestCdcrDocVersions(SolrClient solrClient) throws Exception {
+ this.solrServer = solrClient;
+
+ log.info("### STARTING doCdcrTestDocVersions - Add commands, client: " + solrClient);
+
+ vadd("doc1", 10, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "10");
+ vadd("doc2", 11, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "11");
+ vadd("doc3", 10, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "10");
+ vadd("doc4", 11, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "11");
+ commit(SOURCE_COLLECTION);
+
+ // versions are preserved and verifiable both by query and by real-time get
+ doQuery(solrClient, "doc1,10,doc2,11,doc3,10,doc4,11", "q", "*:*");
+ doRealTimeGet("doc1,doc2,doc3,doc4", "10,11,10,11");
+
+ vadd("doc1", 5, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "5");
+ vadd("doc2", 10, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "10");
+ vadd("doc3", 9, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "9");
+ vadd("doc4", 8, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "8");
+
+ // lower versions are ignored
+ doRealTimeGet("doc1,doc2,doc3,doc4", "10,11,10,11");
+
+ vadd("doc1", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
+ vadd("doc2", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
+ vadd("doc3", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
+ vadd("doc4", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
+
+ // higher versions are accepted
+ doRealTimeGet("doc1,doc2,doc3,doc4", "12,12,12,12");
+
+ // non-cdcr update requests throw a version conflict exception for non-equal versions (optimistic locking feature)
+ vaddFail("doc1", 13, 409);
+ vaddFail("doc2", 13, 409);
+ vaddFail("doc3", 13, 409);
+
+ commit(SOURCE_COLLECTION);
+
+ // versions are still as they were
+ doQuery(solrClient, "doc1,12,doc2,12,doc3,12,doc4,12", "q", "*:*");
+
+ // query all shard replicas individually
+ doQueryShardReplica(SHARD1, "doc1,12,doc2,12,doc3,12,doc4,12", "q", "*:*");
+ doQueryShardReplica(SHARD2, "doc1,12,doc2,12,doc3,12,doc4,12", "q", "*:*");
+
+ // optimistic locking update
+ vadd("doc4", 12);
+ commit(SOURCE_COLLECTION);
+
+ QueryResponse rsp = solrClient.query(params("qt", "/get", "ids", "doc4"));
+ long version = (long) rsp.getResults().get(0).get(vfield);
+
+ // update accepted and a new version number was generated
+ assertTrue(version > 1_000_000_000_000l);
+
+ log.info("### STARTING doCdcrTestDocVersions - Delete commands");
+
+ // send a delete update with an older version number
+ vdelete("doc1", 5, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "5");
+ // must ignore the delete
+ doRealTimeGet("doc1", "12");
+
+ // send a delete update with a higher version number
+ vdelete("doc1", 13, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "13");
+ // must be deleted
+ doRealTimeGet("doc1", "");
+
+ // send a delete update with a higher version number
+ vdelete("doc4", version + 1, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "" + (version + 1));
+ // must be deleted
+ doRealTimeGet("doc4", "");
+
+ commit(SOURCE_COLLECTION);
+
+ // query each shard replica individually
+ doQueryShardReplica(SHARD1, "doc2,12,doc3,12", "q", "*:*");
+ doQueryShardReplica(SHARD2, "doc2,12,doc3,12", "q", "*:*");
+
+ // version conflict thanks to optimistic locking
+ if (solrClient instanceof CloudSolrClient) // TODO: it seems that optimistic locking doesn't work with forwarding, test with shard2 client
+ vdeleteFail("doc2", 50, 409);
+
+ // cleanup after ourselves for the next run
+ // deleteByQuery should work as usual with the CDCR_UPDATE param
+ doDeleteByQuery("id:doc*", CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, Long.toString(1));
+ commit(SOURCE_COLLECTION);
+
+ // deleteByQuery with a version lower than anything else should have no effect
+ doQuery(solrClient, "doc2,12,doc3,12", "q", "*:*");
+
+ doDeleteByQuery("id:doc*", CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, Long.toString(51));
+ commit(SOURCE_COLLECTION);
+
+ // deleteByQuery with a version higher than everything else should delete all remaining docs
+ doQuery(solrClient, "", "q", "*:*");
+
+ // check that replicas are as expected too
+ doQueryShardReplica(SHARD1, "", "q", "*:*");
+ doQueryShardReplica(SHARD2, "", "q", "*:*");
+ }
+
+
+ // ------------------ auxiliary methods ------------------
+
+
+ void doQueryShardReplica(String shard, String expectedDocs, String... queryParams) throws Exception {
+ for (CloudJettyRunner jetty : shardToJetty.get(SOURCE_COLLECTION).get(shard)) {
+ doQuery(jetty.client, expectedDocs, queryParams);
+ }
+ }
+
+ void vdelete(String id, long version, String... params) throws Exception {
+ UpdateRequest req = new UpdateRequest();
+ req.deleteById(id);
+ req.setParam(vfield, Long.toString(version));
+
+ for (int i = 0; i < params.length; i += 2) {
+ req.setParam(params[i], params[i + 1]);
+ }
+ solrServer.request(req);
+ }
+
+ void vdeleteFail(String id, long version, int errCode, String... params) throws Exception {
+ boolean failed = false;
+ try {
+ vdelete(id, version, params);
+ } catch (SolrException e) {
+ failed = true;
+ if (e.getCause() instanceof SolrException && e.getCause() != e) {
+ e = (SolrException) e.getCause();
+ }
+ assertEquals(errCode, e.code());
+ } catch (SolrServerException ex) {
+ Throwable t = ex.getCause();
+ if (t instanceof SolrException) {
+ failed = true;
+ SolrException exception = (SolrException) t;
+ assertEquals(errCode, exception.code());
+ }
+ } catch (Exception e) {
+ log.error("ERROR", e);
+ }
+ assertTrue(failed);
+ }
+
+ void vadd(String id, long version, String... params) throws Exception {
+ UpdateRequest req = new UpdateRequest();
+ req.add(sdoc("id", id, vfield, version));
+ for (int i = 0; i < params.length; i += 2) {
+ req.setParam(params[i], params[i + 1]);
+ }
+ solrServer.request(req);
+ }
+
+ void vaddFail(String id, long version, int errCode, String... params) throws Exception {
+ boolean failed = false;
+ try {
+ vadd(id, version, params);
+ } catch (SolrException e) {
+ failed = true;
+ if (e.getCause() instanceof SolrException && e.getCause() != e) {
+ e = (SolrException) e.getCause();
+ }
+ assertEquals(errCode, e.code());
+ } catch (SolrServerException ex) {
+ Throwable t = ex.getCause();
+ if (t instanceof SolrException) {
+ failed = true;
+ SolrException exception = (SolrException) t;
+ assertEquals(errCode, exception.code());
+ }
+ } catch (Exception e) {
+ log.error("ERROR", e);
+ }
+ assertTrue(failed);
+ }
+
+ void doQuery(SolrClient ss, String expectedDocs, String... queryParams) throws Exception {
+
+ List<String> strs = StrUtils.splitSmart(expectedDocs, ",", true);
+ Map<String, Object> expectedIds = new HashMap<>();
+ for (int i = 0; i < strs.size(); i += 2) {
+ String id = strs.get(i);
+ String vS = strs.get(i + 1);
+ Long v = Long.valueOf(vS);
+ expectedIds.put(id, v);
+ }
+
+ QueryResponse rsp = ss.query(params(queryParams));
+ Map<String, Object> obtainedIds = new HashMap<>();
+ for (SolrDocument doc : rsp.getResults()) {
+ obtainedIds.put((String) doc.get("id"), doc.get(vfield));
+ }
+
+ assertEquals(expectedIds, obtainedIds);
+ }
+
+
+ void doRealTimeGet(String ids, String versions) throws Exception {
+ Map<String, Object> expectedIds = new HashMap<>();
+ List<String> strs = StrUtils.splitSmart(ids, ",", true);
+ List<String> verS = StrUtils.splitSmart(versions, ",", true);
+ for (int i = 0; i < strs.size(); i++) {
+ if (!verS.isEmpty()) {
+ expectedIds.put(strs.get(i), Long.valueOf(verS.get(i)));
+ }
+ }
+
+ QueryResponse rsp = solrServer.query(params("qt", "/get", "ids", ids));
+ Map<String, Object> obtainedIds = new HashMap<>();
+ for (SolrDocument doc : rsp.getResults()) {
+ obtainedIds.put((String) doc.get("id"), doc.get(vfield));
+ }
+
+ assertEquals(expectedIds, obtainedIds);
+ }
+
+ void doDeleteByQuery(String q, String... reqParams) throws Exception {
+ UpdateRequest req = new UpdateRequest();
+ req.deleteByQuery(q);
+ req.setParams(params(reqParams));
+ req.process(solrServer);
+ }
+
+}
+