You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/05/22 20:58:29 UTC

svn commit: r1681186 [4/5] - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/processor/ core/src/java/org/apache/solr/util/ core/src/test-files/solr/collect...

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java Fri May 22 18:58:29 2015
@@ -0,0 +1,812 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.*;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.CdcrParams;
+import org.apache.solr.servlet.SolrDispatchFilter;
+import org.apache.zookeeper.CreateMode;
+import org.junit.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.solr.cloud.OverseerCollectionProcessor.*;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+
+/**
+ * <p>
+ * Abstract class for CDCR unit testing. This class emulates two clusters, a source and target, by using different
+ * collections in the same SolrCloud cluster. Therefore, the two clusters will share the same Zookeeper cluster. In
+ * real scenario, the two collections/clusters will likely have their own zookeeper cluster.
+ * </p>
+ * <p>
+ * This class will automatically create two collections, the source and the target. Each collection will have
+ * {@link #shardCount} shards, and {@link #replicationFactor} replicas per shard. One jetty instance will
+ * be created per core.
+ * </p>
+ * <p>
+ * The source and target collection can be reinitialised at will by calling {@link #clearSourceCollection()} and
+ * {@link #clearTargetCollection()}. After reinitialisation, a collection will have a new fresh index and update log.
+ * </p>
+ * <p>
+ * Servers can be restarted at will by calling
+ * {@link #restartServer(BaseCdcrDistributedZkTest.CloudJettyRunner)} or
+ * {@link #restartServers(java.util.List)}.
+ * </p>
+ * <p>
+ * The creation of the target collection can be disabled with the flag {@link #createTargetCollection};
+ * </p>
+ * <p>
+ * NB: We cannot use multiple cores per jetty instance, as jetty will load only one core when restarting. It seems
+ * that this is a limitation of the {@link org.apache.solr.client.solrj.embedded.JettySolrRunner}. This class
+ * tries to ensure that there always is one single core per jetty instance.
+ * </p>
+ */
+public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
+
+  protected int shardCount = 2;
+  protected int replicationFactor = 2;
+  protected boolean createTargetCollection = true;
+
+  private static final String CDCR_PATH = "/cdcr";
+
+  protected static final String SOURCE_COLLECTION = "source_collection";
+  protected static final String TARGET_COLLECTION = "target_collection";
+
+  public static final String SHARD1 = "shard1";
+  public static final String SHARD2 = "shard2";
+
+  @Override
+  protected String getCloudSolrConfig() {
+    return "solrconfig-cdcr.xml";
+  }
+
+  @Override
+  public void distribSetUp() throws Exception {
+    super.distribSetUp();
+
+    if (shardCount > 0) {
+      System.setProperty("numShards", Integer.toString(shardCount));
+    } else {
+      System.clearProperty("numShards");
+    }
+
+    if (isSSLMode()) {
+      System.clearProperty("urlScheme");
+      ZkStateReader zkStateReader = new ZkStateReader(zkServer.getZkAddress(),
+          AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT);
+      try {
+        zkStateReader.getZkClient().create(ZkStateReader.CLUSTER_PROPS,
+            ZkStateReader.toJSON(Collections.singletonMap("urlScheme", "https")),
+            CreateMode.PERSISTENT, true);
+      } finally {
+        zkStateReader.close();
+      }
+    }
+  }
+
+  @Override
+  protected void createServers(int numServers) throws Exception {
+  }
+
+  @BeforeClass
+  public static void beforeClass() {
+    System.setProperty("solrcloud.update.delay", "0");
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    System.clearProperty("solrcloud.update.delay");
+  }
+
+  @Before
+  public void baseBefore() throws Exception {
+    this.createSourceCollection();
+    if (this.createTargetCollection) this.createTargetCollection();
+    RandVal.uniqueValues = new HashSet(); //reset random values
+  }
+
+  @After
+  public void baseAfter() throws Exception {
+    destroyServers();
+  }
+
+  protected CloudSolrClient createCloudClient(String defaultCollection) {
+    CloudSolrClient server = new CloudSolrClient(zkServer.getZkAddress(), random().nextBoolean());
+    server.setParallelUpdates(random().nextBoolean());
+    if (defaultCollection != null) server.setDefaultCollection(defaultCollection);
+    server.getLbClient().getHttpClient().getParams()
+        .setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000);
+    return server;
+  }
+
+  protected void printLayout() throws Exception {
+    SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT);
+    zkClient.printLayoutToStdOut();
+    zkClient.close();
+  }
+
+  protected SolrInputDocument getDoc(Object... fields) throws Exception {
+    SolrInputDocument doc = new SolrInputDocument();
+    addFields(doc, fields);
+    return doc;
+  }
+
+  protected void index(String collection, SolrInputDocument doc) throws IOException, SolrServerException {
+    CloudSolrClient client = createCloudClient(collection);
+    try {
+      client.add(doc);
+      client.commit(true, true);
+    } finally {
+      client.close();
+    }
+  }
+
+  protected void index(String collection, List<SolrInputDocument> docs) throws IOException, SolrServerException {
+    CloudSolrClient client = createCloudClient(collection);
+    try {
+      client.add(docs);
+      client.commit(true, true);
+    } finally {
+      client.close();
+    }
+  }
+
+  protected void deleteById(String collection, List<String> ids) throws IOException, SolrServerException {
+    CloudSolrClient client = createCloudClient(collection);
+    try {
+      client.deleteById(ids);
+      client.commit(true, true);
+    } finally {
+      client.close();
+    }
+  }
+
+  protected void deleteByQuery(String collection, String q) throws IOException, SolrServerException {
+    CloudSolrClient client = createCloudClient(collection);
+    try {
+      client.deleteByQuery(q);
+      client.commit(true, true);
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * Invokes a commit on the given collection.
+   */
+  protected void commit(String collection) throws IOException, SolrServerException {
+    CloudSolrClient client = createCloudClient(collection);
+    try {
+      client.commit(true, true);
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * Returns the number of documents in a given collection
+   */
+  protected long getNumDocs(String collection) throws SolrServerException, IOException {
+    CloudSolrClient client = createCloudClient(collection);
+    try {
+      return client.query(new SolrQuery("*:*")).getResults().getNumFound();
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * Invokes a CDCR action on a given node.
+   */
+  protected NamedList invokeCdcrAction(CloudJettyRunner jetty, CdcrParams.CdcrAction action) throws Exception {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CommonParams.ACTION, action.toString());
+
+    SolrRequest request = new QueryRequest(params);
+    request.setPath(CDCR_PATH);
+
+    return jetty.client.request(request);
+  }
+
+  /**
+   * Assert the state of CDCR on each nodes of the given collection.
+   */
+  protected void assertState(String collection, CdcrParams.ProcessState processState, CdcrParams.BufferState bufferState)
+      throws Exception {
+    for (CloudJettyRunner jetty : cloudJettys.get(collection)) { // check all replicas
+      NamedList rsp = invokeCdcrAction(jetty, CdcrParams.CdcrAction.STATUS);
+      NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+      assertEquals(processState.toLower(), status.get(CdcrParams.ProcessState.getParam()));
+      assertEquals(bufferState.toLower(), status.get(CdcrParams.BufferState.getParam()));
+    }
+  }
+
+  /**
+   * A mapping between collection and node names. This is used when creating the collection in
+   * {@link #createCollection(String)}.
+   */
+  private Map<String, List<String>> collectionToNodeNames = new HashMap<>();
+
+  /**
+   * Starts the servers, saves and associates the node names to the source collection,
+   * and finally creates the source collection.
+   */
+  private void createSourceCollection() throws Exception {
+    List<String> nodeNames = this.startServers(shardCount * replicationFactor);
+    this.collectionToNodeNames.put(SOURCE_COLLECTION, nodeNames);
+    this.createCollection(SOURCE_COLLECTION);
+    this.waitForRecoveriesToFinish(SOURCE_COLLECTION, true);
+    this.updateMappingsFromZk(SOURCE_COLLECTION);
+  }
+
+  /**
+   * Clear the source collection. It will delete then create the collection through the collection API.
+   * The collection will have a new fresh index, i.e., including a new update log.
+   */
+  protected void clearSourceCollection() throws Exception {
+    this.deleteCollection(SOURCE_COLLECTION);
+    this.createCollection(SOURCE_COLLECTION);
+    this.waitForRecoveriesToFinish(SOURCE_COLLECTION, true);
+    this.updateMappingsFromZk(SOURCE_COLLECTION);
+  }
+
+  /**
+   * Starts the servers, saves and associates the node names to the target collection,
+   * and finally creates the target collection.
+   */
+  private void createTargetCollection() throws Exception {
+    List<String> nodeNames = this.startServers(shardCount * replicationFactor);
+    this.collectionToNodeNames.put(TARGET_COLLECTION, nodeNames);
+    this.createCollection(TARGET_COLLECTION);
+    this.waitForRecoveriesToFinish(TARGET_COLLECTION, true);
+    this.updateMappingsFromZk(TARGET_COLLECTION);
+  }
+
+  /**
+   * Clear the source collection. It will delete then create the collection through the collection API.
+   * The collection will have a new fresh index, i.e., including a new update log.
+   */
+  protected void clearTargetCollection() throws Exception {
+    this.deleteCollection(TARGET_COLLECTION);
+    this.createCollection(TARGET_COLLECTION);
+    this.waitForRecoveriesToFinish(TARGET_COLLECTION, true);
+    this.updateMappingsFromZk(TARGET_COLLECTION);
+  }
+
+  /**
+   * Create a new collection through the Collection API. It enforces the use of one max shard per node.
+   * It will define the nodes to spread the new collection across by using the mapping {@link #collectionToNodeNames},
+   * to ensure that a node will not host more than one core (which will create problem when trying to restart servers).
+   */
+  private void createCollection(String name) throws Exception {
+    CloudSolrClient client = createCloudClient(null);
+    try {
+      // Create the target collection
+      Map<String, List<Integer>> collectionInfos = new HashMap<>();
+      int maxShardsPerNode = 1;
+
+      StringBuilder sb = new StringBuilder();
+      for (String nodeName : collectionToNodeNames.get(name)) {
+        sb.append(nodeName);
+        sb.append(',');
+      }
+      sb.deleteCharAt(sb.length() - 1);
+
+      createCollection(collectionInfos, name, shardCount, replicationFactor, maxShardsPerNode, client, sb.toString());
+    } finally {
+      client.close();
+    }
+  }
+
+  private CollectionAdminResponse createCollection(Map<String, List<Integer>> collectionInfos,
+                                                   String collectionName, int numShards, int replicationFactor,
+                                                   int maxShardsPerNode, SolrClient client, String createNodeSetStr)
+      throws SolrServerException, IOException {
+    return createCollection(collectionInfos, collectionName,
+        ZkNodeProps.makeMap(
+            NUM_SLICES, numShards,
+            REPLICATION_FACTOR, replicationFactor,
+            CREATE_NODE_SET, createNodeSetStr,
+            MAX_SHARDS_PER_NODE, maxShardsPerNode),
+        client, null);
+  }
+
+  private CollectionAdminResponse createCollection(Map<String, List<Integer>> collectionInfos, String collectionName,
+                                                   Map<String, Object> collectionProps, SolrClient client,
+                                                   String confSetName)
+      throws SolrServerException, IOException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set("action", CollectionParams.CollectionAction.CREATE.toString());
+    for (Map.Entry<String, Object> entry : collectionProps.entrySet()) {
+      if (entry.getValue() != null) params.set(entry.getKey(), String.valueOf(entry.getValue()));
+    }
+    Integer numShards = (Integer) collectionProps.get(NUM_SLICES);
+    if (numShards == null) {
+      String shardNames = (String) collectionProps.get(SHARDS_PROP);
+      numShards = StrUtils.splitSmart(shardNames, ',').size();
+    }
+    Integer replicationFactor = (Integer) collectionProps.get(REPLICATION_FACTOR);
+    if (replicationFactor == null) {
+      replicationFactor = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(REPLICATION_FACTOR);
+    }
+
+    if (confSetName != null) {
+      params.set("collection.configName", confSetName);
+    }
+
+    List<Integer> list = new ArrayList<>();
+    list.add(numShards);
+    list.add(replicationFactor);
+    if (collectionInfos != null) {
+      collectionInfos.put(collectionName, list);
+    }
+    params.set("name", collectionName);
+    SolrRequest request = new QueryRequest(params);
+    request.setPath("/admin/collections");
+
+    CollectionAdminResponse res = new CollectionAdminResponse();
+    res.setResponse(client.request(request));
+    return res;
+  }
+
+  /**
+   * Delete a collection through the Collection API.
+   */
+  protected CollectionAdminResponse deleteCollection(String collectionName) throws SolrServerException, IOException {
+    SolrClient client = createCloudClient(null);
+    CollectionAdminResponse res;
+
+    try {
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set("action", CollectionParams.CollectionAction.DELETE.toString());
+      params.set("name", collectionName);
+      QueryRequest request = new QueryRequest(params);
+      request.setPath("/admin/collections");
+
+      res = new CollectionAdminResponse();
+      res.setResponse(client.request(request));
+    } catch (Exception e) {
+      log.warn("Error while deleting the collection " + collectionName, e);
+      return new CollectionAdminResponse();
+    } finally {
+      client.close();
+    }
+
+    return res;
+  }
+
+  private void waitForRecoveriesToFinish(String collection, boolean verbose) throws Exception {
+    CloudSolrClient client = this.createCloudClient(null);
+    try {
+      client.connect();
+      ZkStateReader zkStateReader = client.getZkStateReader();
+      super.waitForRecoveriesToFinish(collection, zkStateReader, verbose);
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * Asserts that the collection has the correct number of shards and replicas
+   */
+  protected void assertCollectionExpectations(String collectionName) throws Exception {
+    CloudSolrClient client = this.createCloudClient(null);
+    try {
+      client.connect();
+      ClusterState clusterState = client.getZkStateReader().getClusterState();
+
+      assertTrue("Could not find new collection " + collectionName, clusterState.hasCollection(collectionName));
+      Map<String, Slice> shards = clusterState.getCollection(collectionName).getSlicesMap();
+      // did we find expectedSlices shards/shards?
+      assertEquals("Found new collection " + collectionName + ", but mismatch on number of shards.", shardCount, shards.size());
+      int totalShards = 0;
+      for (String shardName : shards.keySet()) {
+        totalShards += shards.get(shardName).getReplicas().size();
+      }
+      int expectedTotalShards = shardCount * replicationFactor;
+      assertEquals("Found new collection " + collectionName + " with correct number of shards, but mismatch on number " +
+          "of shards.", expectedTotalShards, totalShards);
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * Restart a server.
+   */
+  protected void restartServer(CloudJettyRunner server) throws Exception {
+    // it seems we need to set the collection property to have the jetty properly restarted
+    System.setProperty("collection", server.collection);
+    JettySolrRunner jetty = server.jetty;
+    ChaosMonkey.stop(jetty);
+    ChaosMonkey.start(jetty);
+    System.clearProperty("collection");
+    waitForRecoveriesToFinish(server.collection, true);
+    updateMappingsFromZk(server.collection); // must update the mapping as the core node name might have changed
+  }
+
+  /**
+   * Restarts a list of servers.
+   */
+  protected void restartServers(List<CloudJettyRunner> servers) throws Exception {
+    for (CloudJettyRunner server : servers) {
+      this.restartServer(server);
+    }
+  }
+
+  private List<JettySolrRunner> jettys = new ArrayList<>();
+
+  /**
+   * Creates and starts a given number of servers.
+   */
+  protected List<String> startServers(int nServer) throws Exception {
+    String temporaryCollection = "tmp_collection";
+    System.setProperty("collection", temporaryCollection);
+    for (int i = 1; i <= nServer; i++) {
+      // give everyone there own solrhome
+      File jettyDir = createTempDir("jetty").toFile();
+      jettyDir.mkdirs();
+      setupJettySolrHome(jettyDir);
+      JettySolrRunner jetty = createJetty(jettyDir, null, "shard" + i);
+      jettys.add(jetty);
+    }
+
+    ZkStateReader zkStateReader = ((SolrDispatchFilter) jettys.get(0)
+        .getDispatchFilter().getFilter()).getCores().getZkController()
+        .getZkStateReader();
+
+    // now wait till we see the leader for each shard
+    for (int i = 1; i <= shardCount; i++) {
+      this.printLayout();
+      zkStateReader.getLeaderRetry(temporaryCollection, "shard" + i, 15000);
+    }
+
+    // store the node names
+    List<String> nodeNames = new ArrayList<>();
+    for (Slice shard : zkStateReader.getClusterState().getCollection(temporaryCollection).getSlices()) {
+      for (Replica replica : shard.getReplicas()) {
+        nodeNames.add(replica.getNodeName());
+      }
+    }
+
+    // delete the temporary collection - we will create our own collections later
+    this.deleteCollection(temporaryCollection);
+    System.clearProperty("collection");
+
+    return nodeNames;
+  }
+
+  @Override
+  protected void destroyServers() throws Exception {
+    for (JettySolrRunner runner : jettys) {
+      try {
+        ChaosMonkey.stop(runner);
+      } catch (Exception e) {
+        log.error("", e);
+      }
+    }
+
+    jettys.clear();
+  }
+
+  /**
+   * Mapping from collection to jettys
+   */
+  protected Map<String, List<CloudJettyRunner>> cloudJettys = new HashMap<>();
+
+  /**
+   * Mapping from collection/shard to jettys
+   */
+  protected Map<String, Map<String, List<CloudJettyRunner>>> shardToJetty = new HashMap<>();
+
+  /**
+   * Mapping from collection/shard leader to jettys
+   */
+  protected Map<String, Map<String, CloudJettyRunner>> shardToLeaderJetty = new HashMap<>();
+
+  /**
+   * Updates the mappings between the jetty's instances and the zookeeper cluster state.
+   */
+  protected void updateMappingsFromZk(String collection) throws Exception {
+    List<CloudJettyRunner> cloudJettys = new ArrayList<>();
+    Map<String, List<CloudJettyRunner>> shardToJetty = new HashMap<>();
+    Map<String, CloudJettyRunner> shardToLeaderJetty = new HashMap<>();
+
+    CloudSolrClient cloudClient = this.createCloudClient(null);
+    try {
+      cloudClient.connect();
+      ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+      zkStateReader.updateClusterState(true);
+      ClusterState clusterState = zkStateReader.getClusterState();
+      DocCollection coll = clusterState.getCollection(collection);
+
+      for (JettySolrRunner jetty : jettys) {
+        int port = jetty.getLocalPort();
+        if (port == -1) {
+          throw new RuntimeException("Cannot find the port for jetty");
+        }
+
+        nextJetty:
+        for (Slice shard : coll.getSlices()) {
+          Set<Map.Entry<String, Replica>> entries = shard.getReplicasMap().entrySet();
+          for (Map.Entry<String, Replica> entry : entries) {
+            Replica replica = entry.getValue();
+            if (replica.getStr(ZkStateReader.BASE_URL_PROP).contains(":" + port)) {
+              if (!shardToJetty.containsKey(shard.getName())) {
+                shardToJetty.put(shard.getName(), new ArrayList<CloudJettyRunner>());
+              }
+              boolean isLeader = shard.getLeader() == replica;
+              CloudJettyRunner cjr = new CloudJettyRunner(jetty, replica, collection, shard.getName(), entry.getKey());
+              shardToJetty.get(shard.getName()).add(cjr);
+              if (isLeader) {
+                shardToLeaderJetty.put(shard.getName(), cjr);
+              }
+              cloudJettys.add(cjr);
+              break nextJetty;
+            }
+          }
+        }
+      }
+
+      this.cloudJettys.put(collection, cloudJettys);
+      this.shardToJetty.put(collection, shardToJetty);
+      this.shardToLeaderJetty.put(collection, shardToLeaderJetty);
+    } finally {
+      cloudClient.close();
+    }
+  }
+
+  /**
+   * Wrapper around a {@link org.apache.solr.client.solrj.embedded.JettySolrRunner} to map the jetty
+   * instance to various information of the cloud cluster, such as the collection and shard
+   * that is served by the jetty instance, the node name, core node name, url, etc.
+   */
+  public static class CloudJettyRunner {
+
+    public JettySolrRunner jetty;
+    public String nodeName;
+    public String coreNodeName;
+    public String url;
+    public SolrClient client;
+    public Replica info;
+    public String shard;
+    public String collection;
+
+    public CloudJettyRunner(JettySolrRunner jetty, Replica replica,
+                            String collection, String shard, String coreNodeName) {
+      this.jetty = jetty;
+      this.info = replica;
+      this.collection = collection;
+
+      // we need to update the jetty's shard so that it registers itself to the right shard when restarted
+      this.shard = shard;
+      this.jetty.setShards(this.shard);
+
+      // we need to update the jetty's shard so that it registers itself under the right core name when restarted
+      this.coreNodeName = coreNodeName;
+      this.jetty.setCoreNodeName(this.coreNodeName);
+
+      this.nodeName = replica.getNodeName();
+
+      ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(info);
+      this.url = coreNodeProps.getCoreUrl();
+
+      // strip the trailing slash as this can cause issues when executing requests
+      this.client = createNewSolrServer(this.url.substring(0, this.url.length() - 1));
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((url == null) ? 0 : url.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null) return false;
+      if (getClass() != obj.getClass()) return false;
+      CloudJettyRunner other = (CloudJettyRunner) obj;
+      if (url == null) {
+        if (other.url != null) return false;
+      } else if (!url.equals(other.url)) return false;
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "CloudJettyRunner [url=" + url + "]";
+    }
+
+  }
+
+  protected static SolrClient createNewSolrServer(String baseUrl) {
+    try {
+      // setup the server...
+      HttpSolrClient s = new HttpSolrClient(baseUrl);
+      s.setConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT);
+      s.setDefaultMaxConnectionsPerHost(100);
+      s.setMaxTotalConnections(100);
+      return s;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  protected void waitForReplicationToComplete(String collectionName, String shardId) throws Exception {
+    while (true) {
+      log.info("Checking queue size @ {}:{}", collectionName, shardId);
+      long size = this.getQueueSize(collectionName, shardId);
+      if (size <= 0) {
+        return;
+      }
+      log.info("Waiting for replication to complete. Queue size: {} @ {}:{}", size, collectionName, shardId);
+      Thread.sleep(1000); // wait a bit for the replication to complete
+    }
+  }
+
+  protected long getQueueSize(String collectionName, String shardId) throws Exception {
+    NamedList rsp = this.invokeCdcrAction(shardToLeaderJetty.get(collectionName).get(shardId), CdcrParams.CdcrAction.QUEUES);
+    NamedList host = (NamedList) ((NamedList) rsp.get(CdcrParams.QUEUES)).getVal(0);
+    NamedList status = (NamedList) host.get(TARGET_COLLECTION);
+    return (Long) status.get(CdcrParams.QUEUE_SIZE);
+  }
+
+  /**
+   * Asserts that the number of transaction logs across all the shards
+   */
+  protected void assertUpdateLogs(String collection, int maxNumberOfTLogs) throws Exception {
+    CollectionInfo info = collectInfo(collection);
+    Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
+
+    int leaderLogs = 0;
+    ArrayList<Integer> replicasLogs = new ArrayList<>(Collections.nCopies(replicationFactor - 1, 0));
+
+    for (String shard : shardToCoresMap.keySet()) {
+      leaderLogs += numberOfFiles(info.getLeader(shard).ulogDir);
+      for (int i = 0; i < replicationFactor - 1; i++) {
+        replicasLogs.set(i, replicasLogs.get(i) + numberOfFiles(info.getReplicas(shard).get(i).ulogDir));
+      }
+    }
+
+    for (Integer replicaLogs : replicasLogs) {
+      log.info("Number of logs in update log on leader {} and on replica {}", leaderLogs, replicaLogs);
+
+      // replica logs must be always equal or superior to leader logs
+      assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is different than on leader: %d.",
+          replicaLogs, leaderLogs), leaderLogs <= replicaLogs);
+
+      assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on leader: %d is superior to: %d.",
+          leaderLogs, maxNumberOfTLogs), maxNumberOfTLogs >= leaderLogs);
+
+      assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is superior to: %d.",
+          replicaLogs, maxNumberOfTLogs), maxNumberOfTLogs >= replicaLogs);
+    }
+  }
+
+  private int numberOfFiles(String dir) {
+    File file = new File(dir);
+    if (!file.isDirectory()) {
+      assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
+    }
+    log.info("Update log dir {} contains: {}", dir, file.listFiles());
+    return file.listFiles().length;
+  }
+
+  protected CollectionInfo collectInfo(String collection) throws Exception {
+    CollectionInfo info = new CollectionInfo(collection);
+    for (String shard : shardToJetty.get(collection).keySet()) {
+      List<CloudJettyRunner> jettyRunners = shardToJetty.get(collection).get(shard);
+      for (CloudJettyRunner jettyRunner : jettyRunners) {
+        SolrDispatchFilter filter = (SolrDispatchFilter) jettyRunner.jetty.getDispatchFilter().getFilter();
+        for (SolrCore core : filter.getCores().getCores()) {
+          info.addCore(core, shard, shardToLeaderJetty.get(collection).containsValue(jettyRunner));
+        }
+      }
+    }
+
+    return info;
+  }
+
+  protected class CollectionInfo {
+
+    List<CoreInfo> coreInfos = new ArrayList<>();
+
+    String collection;
+
+    CollectionInfo(String collection) {
+      this.collection = collection;
+    }
+
+    /**
+     * @return Returns a map shard -> list of cores
+     */
+    Map<String, List<CoreInfo>> getShardToCoresMap() {
+      Map<String, List<CoreInfo>> map = new HashMap<>();
+      for (CoreInfo info : coreInfos) {
+        List<CoreInfo> list = map.get(info.shard);
+        if (list == null) {
+          list = new ArrayList<>();
+          map.put(info.shard, list);
+        }
+        list.add(info);
+      }
+      return map;
+    }
+
+    CoreInfo getLeader(String shard) {
+      List<CoreInfo> coreInfos = getShardToCoresMap().get(shard);
+      for (CoreInfo info : coreInfos) {
+        if (info.isLeader) {
+          return info;
+        }
+      }
+      assertTrue(String.format(Locale.ENGLISH, "There is no leader for collection %s shard %s", collection, shard), false);
+      return null;
+    }
+
+    List<CoreInfo> getReplicas(String shard) {
+      List<CoreInfo> coreInfos = getShardToCoresMap().get(shard);
+      coreInfos.remove(getLeader(shard));
+      return coreInfos;
+    }
+
+    void addCore(SolrCore core, String shard, boolean isLeader) throws Exception {
+      CoreInfo info = new CoreInfo();
+      info.collectionName = core.getName();
+      info.shard = shard;
+      info.isLeader = isLeader;
+      info.ulogDir = core.getUpdateHandler().getUpdateLog().getLogDir();
+
+      this.coreInfos.add(info);
+    }
+
+    public class CoreInfo {
+      String collectionName;
+      String shard;
+      boolean isLeader;
+      String ulogDir;
+    }
+
+  }
+
+}
+

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java Fri May 22 18:58:29 2015
@@ -0,0 +1,598 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.CdcrParams;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slow
+public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest {
+
+  @Override
+  public void distribSetUp() throws Exception {
+    schemaString = "schema15.xml";      // we need a string id
+    super.distribSetUp();
+  }
+
+  @Test
+  @ShardsFixed(num = 4)
+  public void doTest() throws Exception {
+    this.doTestDeleteCreateSourceCollection();
+    this.doTestTargetCollectionNotAvailable();
+    this.doTestReplicationStartStop();
+    this.doTestReplicationAfterRestart();
+    this.doTestReplicationAfterLeaderChange();
+    this.doTestUpdateLogSynchronisation();
+    this.doTestBufferOnNonLeader();
+    this.doTestOps();
+    this.doTestBatchAddsWithDelete();
+    this.doTestBatchBoundaries();
+    this.doTestResilienceWithDeleteByQueryOnTarget();
+  }
+
+  /**
+   * Checks that the test framework handles properly the creation and deletion of collections and the
+   * restart of servers.
+   */
+  public void doTestDeleteCreateSourceCollection() throws Exception {
+    log.info("Indexing documents");
+
+    List<SolrInputDocument> docs = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      docs.add(getDoc(id, Integer.toString(i)));
+    }
+    index(SOURCE_COLLECTION, docs);
+    index(TARGET_COLLECTION, docs);
+
+    assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+    log.info("Restarting leader @ source_collection:shard1");
+
+    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
+
+    assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+    log.info("Clearing source_collection");
+
+    this.clearSourceCollection();
+
+    assertEquals(0, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+    log.info("Restarting leader @ target_collection:shard1");
+
+    this.restartServer(shardToLeaderJetty.get(TARGET_COLLECTION).get(SHARD1));
+
+    assertEquals(0, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+    log.info("Clearing target_collection");
+
+    this.clearTargetCollection();
+
+    assertEquals(0, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(0, getNumDocs(TARGET_COLLECTION));
+
+    assertCollectionExpectations(SOURCE_COLLECTION);
+    assertCollectionExpectations(TARGET_COLLECTION);
+  }
+
+  public void doTestTargetCollectionNotAvailable() throws Exception {
+    this.clearSourceCollection();
+    this.clearTargetCollection();
+
+    // send start action to first shard
+    NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+    assertEquals(CdcrParams.ProcessState.STARTED.toLower(), status.get(CdcrParams.ProcessState.getParam()));
+
+    // check status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
+
+    // Kill all the servers of the target
+    this.deleteCollection(TARGET_COLLECTION);
+
+    // Index a few documents to trigger the replication
+    index(SOURCE_COLLECTION, getDoc(id, "a"));
+    index(SOURCE_COLLECTION, getDoc(id, "b"));
+    index(SOURCE_COLLECTION, getDoc(id, "c"));
+    index(SOURCE_COLLECTION, getDoc(id, "d"));
+    index(SOURCE_COLLECTION, getDoc(id, "e"));
+    index(SOURCE_COLLECTION, getDoc(id, "f"));
+
+    assertEquals(6, getNumDocs(SOURCE_COLLECTION));
+
+    Thread.sleep(1000); // wait a bit for the replicator thread to be triggered
+
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.ERRORS);
+    NamedList collections = (NamedList) ((NamedList) rsp.get(CdcrParams.ERRORS)).getVal(0);
+    NamedList errors = (NamedList) collections.get(TARGET_COLLECTION);
+    assertTrue(0 < (Long) errors.get(CdcrParams.CONSECUTIVE_ERRORS));
+    NamedList lastErrors = (NamedList) errors.get(CdcrParams.LAST);
+    assertNotNull(lastErrors);
+    assertTrue(0 < lastErrors.size());
+  }
+
+  public void doTestReplicationStartStop() throws Exception {
+    this.clearSourceCollection();
+    this.clearTargetCollection(); // this might log a warning to indicate he was not able to delete the collection (collection was deleted in the previous test)
+
+    int start = 0;
+    List<SolrInputDocument> docs = new ArrayList<>();
+    for (; start < 10; start++) {
+      docs.add(getDoc(id, Integer.toString(start)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(0, getNumDocs(TARGET_COLLECTION));
+
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+
+    commit(TARGET_COLLECTION);
+
+    assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
+
+    docs.clear();
+    for (; start < 110; start++) {
+      docs.add(getDoc(id, Integer.toString(start)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    assertEquals(110, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+    // Start again CDCR, the source cluster should reinitialise its log readers
+    // with the latest checkpoints
+
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+
+    commit(TARGET_COLLECTION);
+
+    assertEquals(110, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(110, getNumDocs(TARGET_COLLECTION));
+  }
+
+  /**
+   * Check that the replication manager is properly restarted after a node failure.
+   */
+  public void doTestReplicationAfterRestart() throws Exception {
+    this.clearSourceCollection();
+    this.clearTargetCollection();
+
+    log.info("Starting CDCR");
+
+    // send start action to first shard
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+    log.info("Indexing 10 documents");
+
+    int start = 0;
+    List<SolrInputDocument> docs = new ArrayList<>();
+    for (; start < 10; start++) {
+      docs.add(getDoc(id, Integer.toString(start)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    log.info("Querying source collection");
+
+    assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+
+    log.info("Waiting for replication");
+
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+    log.info("Querying target collection");
+
+    commit(TARGET_COLLECTION);
+    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+    log.info("Restarting shard1");
+
+    this.restartServers(shardToJetty.get(SOURCE_COLLECTION).get(SHARD1));
+
+    log.info("Indexing 100 documents");
+
+    docs.clear();
+    for (; start < 110; start++) {
+      docs.add(getDoc(id, Integer.toString(start)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    log.info("Querying source collection");
+
+    assertEquals(110, getNumDocs(SOURCE_COLLECTION));
+
+    log.info("Waiting for replication");
+
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+    log.info("Querying target collection");
+
+    commit(TARGET_COLLECTION);
+    assertEquals(110, getNumDocs(TARGET_COLLECTION));
+  }
+
+  /**
+   * Check that the replication manager is properly started after a change of leader.
+   * This test also checks that the log readers on the new leaders are initialised with
+   * the target's checkpoint.
+   */
+  public void doTestReplicationAfterLeaderChange() throws Exception {
+    this.clearSourceCollection();
+    this.clearTargetCollection();
+
+    log.info("Starting CDCR");
+
+    // send start action to first shard
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+    log.info("Indexing 10 documents");
+
+    int start = 0;
+    List<SolrInputDocument> docs = new ArrayList<>();
+    for (; start < 10; start++) {
+      docs.add(getDoc(id, Integer.toString(start)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    log.info("Querying source collection");
+
+    assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+
+    log.info("Waiting for replication");
+
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+    log.info("Querying target collection");
+
+    commit(TARGET_COLLECTION);
+    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+
+    log.info("Restarting target leaders");
+
+    // Close all the leaders, then restart them
+    this.restartServer(shardToLeaderJetty.get(TARGET_COLLECTION).get(SHARD1));
+    this.restartServer(shardToLeaderJetty.get(TARGET_COLLECTION).get(SHARD2));
+
+    log.info("Restarting source leaders");
+
+    // Close all the leaders, then restart them
+    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
+    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2));
+
+    log.info("Checking queue size of new source leaders");
+
+    // If the log readers of the new leaders are initialised with the target's checkpoint, the
+    // queue size must be inferior to the current number of documents indexed.
+    // The queue might be not completely empty since the new target checkpoint is probably not the
+    // last document received
+    assertTrue(this.getQueueSize(SOURCE_COLLECTION, SHARD1) < 10);
+    assertTrue(this.getQueueSize(SOURCE_COLLECTION, SHARD2) < 10);
+
+    log.info("Indexing 100 documents");
+
+    docs.clear();
+    for (; start < 110; start++) {
+      docs.add(getDoc(id, Integer.toString(start)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    log.info("Querying source collection");
+
+    assertEquals(110, getNumDocs(SOURCE_COLLECTION));
+
+    log.info("Waiting for replication");
+
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+    log.info("Querying target collection");
+
+    commit(TARGET_COLLECTION);
+    assertEquals(110, getNumDocs(TARGET_COLLECTION));
+  }
+
+  /**
+   * Check that the update logs are synchronised between leader and non-leader nodes
+   */
+  public void doTestUpdateLogSynchronisation() throws Exception {
+    this.clearSourceCollection();
+    this.clearTargetCollection();
+
+    // buffering is enabled by default, so disable it
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
+
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+    for (int i = 0; i < 50; i++) {
+      index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i))); // will perform a commit for every document
+    }
+
+    // wait a bit for the replication to complete
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+    commit(TARGET_COLLECTION);
+
+    // Stop CDCR
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
+
+    assertEquals(50, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(50, getNumDocs(TARGET_COLLECTION));
+
+    index(SOURCE_COLLECTION, getDoc(id, Integer.toString(0))); // trigger update log cleaning on the non-leader nodes
+
+    // some of the tlogs should be trimmed, we must have less than 50 tlog files on both leader and non-leader
+    assertUpdateLogs(SOURCE_COLLECTION, 50);
+
+    for (int i = 50; i < 100; i++) {
+      index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
+    }
+
+    index(SOURCE_COLLECTION, getDoc(id, Integer.toString(0))); // trigger update log cleaning on the non-leader nodes
+
+    // at this stage, we should have created one tlog file per document, and some of them must have been cleaned on the
+    // leader since we are not buffering and replication is stopped, (we should have exactly 10 tlog files on the leader
+    // and 11 on the non-leader)
+    // the non-leader must have synchronised its update log with its leader
+    assertUpdateLogs(SOURCE_COLLECTION, 50);
+  }
+
+  /**
+   * Check that the buffer is always activated on non-leader nodes.
+   */
+  public void doTestBufferOnNonLeader() throws Exception {
+    this.clearSourceCollection();
+    this.clearTargetCollection();
+
+    // buffering is enabled by default, so disable it
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
+
+    // Start CDCR
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+    // Index documents
+    for (int i = 0; i < 200; i++) {
+      index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i))); // will perform a commit for every document
+    }
+
+    // And immediately, close all the leaders, then restart them. It is likely that the replication will not be
+    // performed fully, and therefore be continued by the new leader
+    // At this stage, the new leader must have been elected
+    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
+    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2));
+
+    // wait a bit for the replication to complete
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+    commit(TARGET_COLLECTION);
+
+    // If the non-leader node were buffering updates, then the replication must be complete
+    assertEquals(200, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(200, getNumDocs(TARGET_COLLECTION));
+  }
+
+  /**
+   * Check the ops statistics.
+   */
+  public void doTestOps() throws Exception {
+    this.clearSourceCollection();
+    this.clearTargetCollection();
+
+    // Index documents
+    List<SolrInputDocument> docs = new ArrayList<>();
+    for (int i = 0; i < 200; i++) {
+      docs.add(getDoc(id, Integer.toString(i)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    // Start CDCR
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+    // wait a bit for the replication to complete
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+    NamedList rsp = this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.OPS);
+    NamedList collections = (NamedList) ((NamedList) rsp.get(CdcrParams.OPERATIONS_PER_SECOND)).getVal(0);
+    NamedList ops = (NamedList) collections.get(TARGET_COLLECTION);
+    double opsAll = (Double) ops.get(CdcrParams.COUNTER_ALL);
+    double opsAdds = (Double) ops.get(CdcrParams.COUNTER_ADDS);
+    assertTrue(opsAll > 0);
+    assertEquals(opsAll, opsAdds, 0);
+
+    double opsDeletes = (Double) ops.get(CdcrParams.COUNTER_DELETES);
+    assertEquals(0, opsDeletes, 0);
+  }
+
+  /**
+   * Check that batch updates with deletes
+   */
+  public void doTestBatchAddsWithDelete() throws Exception {
+    this.clearSourceCollection();
+    this.clearTargetCollection();
+
+    // Index 50 documents
+    int start = 0;
+    List<SolrInputDocument> docs = new ArrayList<>();
+    for (; start < 50; start++) {
+      docs.add(getDoc(id, Integer.toString(start)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    // Delete 10 documents: 10-19
+    List<String> ids = new ArrayList<>();
+    for (int id = 10; id < 20; id++) {
+      ids.add(Integer.toString(id));
+    }
+    deleteById(SOURCE_COLLECTION, ids);
+
+    // Index 10 documents
+    docs = new ArrayList<>();
+    for (; start < 60; start++) {
+      docs.add(getDoc(id, Integer.toString(start)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    // Delete 1 document: 50
+    ids = new ArrayList<>();
+    ids.add(Integer.toString(50));
+    deleteById(SOURCE_COLLECTION, ids);
+
+    // Index 10 documents
+    docs = new ArrayList<>();
+    for (; start < 70; start++) {
+      docs.add(getDoc(id, Integer.toString(start)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    // Start CDCR
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+    // wait a bit for the replication to complete
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+    commit(TARGET_COLLECTION);
+
+    // If the non-leader node were buffering updates, then the replication must be complete
+    assertEquals(59, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(59, getNumDocs(TARGET_COLLECTION));
+  }
+
+  /**
+   * Checks that batches are correctly constructed when batch boundaries are reached.
+   */
+  public void doTestBatchBoundaries() throws Exception {
+    invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+    log.info("Indexing documents");
+
+    List<SolrInputDocument> docs = new ArrayList<>();
+    for (int i = 0; i < 128; i++) { // should create two full batches (default batch = 64)
+      docs.add(getDoc(id, Integer.toString(i)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    assertEquals(128, getNumDocs(SOURCE_COLLECTION));
+
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+
+    commit(TARGET_COLLECTION);
+
+    assertEquals(128, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(128, getNumDocs(TARGET_COLLECTION));
+  }
+
+  /**
+   * Check resilience of replication with delete by query executed on targets
+   */
+  public void doTestResilienceWithDeleteByQueryOnTarget() throws Exception {
+    this.clearSourceCollection();
+    this.clearTargetCollection();
+
+    // Index 50 documents
+    int start = 0;
+    List<SolrInputDocument> docs = new ArrayList<>();
+    for (; start < 50; start++) {
+      docs.add(getDoc(id, Integer.toString(start)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    // Start CDCR
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+    // wait a bit for the replication to complete
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+    commit(TARGET_COLLECTION);
+
+    // If the non-leader node were buffering updates, then the replication must be complete
+    assertEquals(50, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(50, getNumDocs(TARGET_COLLECTION));
+
+    deleteByQuery(SOURCE_COLLECTION, "*:*");
+    deleteByQuery(TARGET_COLLECTION, "*:*");
+
+    assertEquals(0, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(0, getNumDocs(TARGET_COLLECTION));
+
+    docs.clear();
+    for (; start < 100; start++) {
+      docs.add(getDoc(id, Integer.toString(start)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    // wait a bit for the replication to complete
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+    commit(TARGET_COLLECTION);
+
+    assertEquals(50, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(50, getNumDocs(TARGET_COLLECTION));
+
+    deleteByQuery(TARGET_COLLECTION, "*:*");
+
+    assertEquals(50, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(0, getNumDocs(TARGET_COLLECTION));
+
+    // Restart CDCR
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
+    Thread.sleep(500); // wait a bit for the state to synch
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+
+    docs.clear();
+    for (; start < 150; start++) {
+      docs.add(getDoc(id, Integer.toString(start)));
+    }
+    index(SOURCE_COLLECTION, docs);
+
+    // wait a bit for the replication to complete
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+
+    commit(TARGET_COLLECTION);
+
+    assertEquals(100, getNumDocs(SOURCE_COLLECTION));
+    assertEquals(50, getNumDocs(TARGET_COLLECTION));
+  }
+
+}
+

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java Fri May 22 18:58:29 2015
@@ -0,0 +1,245 @@
+/**
+ * Copyright (c) 2015 Renaud Delbru. All Rights Reserved.
+ */
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slow
+public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
+
+  @Override
+  public void distribSetUp() throws Exception {
+    schemaString = "schema15.xml";      // we need a string id
+    createTargetCollection = false;     // we do not need the target cluster
+    shardCount = 1; // we need only one shard
+    // we need a persistent directory, otherwise the UpdateHandler will erase existing tlog files after restarting a node
+    System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+    super.distribSetUp();
+  }
+
+  @Test
+  @ShardsFixed(num = 4)
+  public void doTest() throws Exception {
+    this.doTestFullReplication();
+    this.doTestPartialReplication();
+    this.doTestPartialReplicationWithTruncatedTlog();
+    this.doTestPartialReplicationAfterPeerSync();
+  }
+
+  /**
+   * Test the scenario where the slave is killed from the start. The replication
+   * strategy should fetch all the missing tlog files from the leader.
+   */
+  public void doTestFullReplication() throws Exception {
+    List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+    ChaosMonkey.stop(slaves.get(0).jetty);
+
+    for (int i = 0; i < 10; i++) {
+      List<SolrInputDocument> docs = new ArrayList<>();
+      for (int j = i * 10; j < (i * 10) + 10; j++) {
+        docs.add(getDoc(id, Integer.toString(j)));
+      }
+      index(SOURCE_COLLECTION, docs);
+    }
+
+    assertEquals(100, getNumDocs(SOURCE_COLLECTION));
+
+    // Restart the slave node to trigger Replication strategy
+    this.restartServer(slaves.get(0));
+
+    this.assertUpdateLogs(SOURCE_COLLECTION, 10);
+  }
+
+  /**
+   * Test the scenario where the slave is killed before receiving all the documents. The replication
+   * strategy should fetch all the missing tlog files from the leader.
+   */
+  public void doTestPartialReplication() throws Exception {
+    this.clearSourceCollection();
+
+    for (int i = 0; i < 5; i++) {
+      List<SolrInputDocument> docs = new ArrayList<>();
+      for (int j = i * 20; j < (i * 20) + 20; j++) {
+        docs.add(getDoc(id, Integer.toString(j)));
+      }
+      index(SOURCE_COLLECTION, docs);
+    }
+
+    List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+    ChaosMonkey.stop(slaves.get(0).jetty);
+
+    for (int i = 5; i < 10; i++) {
+      List<SolrInputDocument> docs = new ArrayList<>();
+      for (int j = i * 20; j < (i * 20) + 20; j++) {
+        docs.add(getDoc(id, Integer.toString(j)));
+      }
+      index(SOURCE_COLLECTION, docs);
+    }
+
+    assertEquals(200, getNumDocs(SOURCE_COLLECTION));
+
+    // Restart the slave node to trigger Replication strategy
+    this.restartServer(slaves.get(0));
+
+    // at this stage, the slave should have replicated the 5 missing tlog files
+    this.assertUpdateLogs(SOURCE_COLLECTION, 10);
+  }
+
+  /**
+   * Test the scenario where the slave is killed before receiving a commit. This creates a truncated tlog
+   * file on the slave node. The replication strategy should detect this truncated file, and fetch the
+   * non-truncated file from the leader.
+   */
+  public void doTestPartialReplicationWithTruncatedTlog() throws Exception {
+    this.clearSourceCollection();
+
+    CloudSolrClient client = createCloudClient(SOURCE_COLLECTION);
+    List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+
+    try {
+      for (int i = 0; i < 10; i++) {
+        for (int j = i * 20; j < (i * 20) + 20; j++) {
+          client.add(getDoc(id, Integer.toString(j)));
+
+          // Stop the slave in the middle of a batch to create a truncated tlog on the slave
+          if (j == 45) {
+            ChaosMonkey.stop(slaves.get(0).jetty);
+          }
+
+        }
+        commit(SOURCE_COLLECTION);
+      }
+    } finally {
+      client.close();
+    }
+
+    assertEquals(200, getNumDocs(SOURCE_COLLECTION));
+
+    // Restart the slave node to trigger Replication recovery
+    this.restartServer(slaves.get(0));
+
+    // at this stage, the slave should have replicated the 5 missing tlog files
+    this.assertUpdateLogs(SOURCE_COLLECTION, 10);
+  }
+
+  /**
+   * Test the scenario where the slave first recovered with a PeerSync strategy, then with a Replication strategy.
+   * The PeerSync strategy will generate a single tlog file for all the missing updates on the slave node.
+   * If a Replication strategy occurs at a later stage, it should remove this tlog file generated by PeerSync
+   * and fetch the corresponding tlog files from the leader.
+   */
+  public void doTestPartialReplicationAfterPeerSync() throws Exception {
+    this.clearSourceCollection();
+
+    for (int i = 0; i < 5; i++) {
+      List<SolrInputDocument> docs = new ArrayList<>();
+      for (int j = i * 10; j < (i * 10) + 10; j++) {
+        docs.add(getDoc(id, Integer.toString(j)));
+      }
+      index(SOURCE_COLLECTION, docs);
+    }
+
+    List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
+    ChaosMonkey.stop(slaves.get(0).jetty);
+
+    for (int i = 5; i < 10; i++) {
+      List<SolrInputDocument> docs = new ArrayList<>();
+      for (int j = i * 10; j < (i * 10) + 10; j++) {
+        docs.add(getDoc(id, Integer.toString(j)));
+      }
+      index(SOURCE_COLLECTION, docs);
+    }
+
+    assertEquals(100, getNumDocs(SOURCE_COLLECTION));
+
+    // Restart the slave node to trigger PeerSync recovery
+    // (the update windows between leader and slave is small enough)
+    this.restartServer(slaves.get(0));
+
+    ChaosMonkey.stop(slaves.get(0).jetty);
+
+    for (int i = 10; i < 15; i++) {
+      List<SolrInputDocument> docs = new ArrayList<>();
+      for (int j = i * 20; j < (i * 20) + 20; j++) {
+        docs.add(getDoc(id, Integer.toString(j)));
+      }
+      index(SOURCE_COLLECTION, docs);
+    }
+
+    // restart the slave node to trigger Replication recovery
+    this.restartServer(slaves.get(0));
+
+    // at this stage, the slave should have replicated the 5 missing tlog files
+    this.assertUpdateLogs(SOURCE_COLLECTION, 15);
+  }
+
+  private List<CloudJettyRunner> getShardToSlaveJetty(String collection, String shard) {
+    List<CloudJettyRunner> jetties = new ArrayList<>(shardToJetty.get(collection).get(shard));
+    CloudJettyRunner leader = shardToLeaderJetty.get(collection).get(shard);
+    jetties.remove(leader);
+    return jetties;
+  }
+
+  /**
+   * Asserts that the transaction logs between the leader and slave
+   */
+  @Override
+  protected void assertUpdateLogs(String collection, int maxNumberOfTLogs) throws Exception {
+    CollectionInfo info = collectInfo(collection);
+    Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
+
+    for (String shard : shardToCoresMap.keySet()) {
+      Map<Long, Long> leaderFilesMeta = this.getFilesMeta(info.getLeader(shard).ulogDir);
+      Map<Long, Long> slaveFilesMeta = this.getFilesMeta(info.getReplicas(shard).get(0).ulogDir);
+
+      assertEquals("Incorrect number of tlog files on the leader", maxNumberOfTLogs, leaderFilesMeta.size());
+      assertEquals("Incorrect number of tlog files on the slave", maxNumberOfTLogs, slaveFilesMeta.size());
+
+      for (Long leaderFileVersion : leaderFilesMeta.keySet()) {
+        assertTrue("Slave is missing a tlog for version " + leaderFileVersion, slaveFilesMeta.containsKey(leaderFileVersion));
+        assertEquals("Slave's tlog file size differs for version " + leaderFileVersion, leaderFilesMeta.get(leaderFileVersion), slaveFilesMeta.get(leaderFileVersion));
+      }
+    }
+  }
+
+  private Map<Long, Long> getFilesMeta(String dir) {
+    File file = new File(dir);
+    if (!file.isDirectory()) {
+      assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
+    }
+
+    Map<Long, Long> filesMeta = new HashMap<>();
+    for (File tlogFile : file.listFiles()) {
+      filesMeta.put(Math.abs(Long.parseLong(tlogFile.getName().substring(tlogFile.getName().lastIndexOf('.') + 1))), tlogFile.length());
+    }
+    return filesMeta;
+  }
+
+}

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java Fri May 22 18:58:29 2015
@@ -0,0 +1,157 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.handler.CdcrParams;
+import org.junit.Test;
+
+@Slow
+public class CdcrRequestHandlerTest extends BaseCdcrDistributedZkTest {
+
+  @Override
+  public void distribSetUp() throws Exception {
+    schemaString = "schema15.xml";      // we need a string id
+    createTargetCollection = false;     // we do not need the target cluster
+    super.distribSetUp();
+  }
+
+  @Test
+  @ShardsFixed(num = 4)
+
+  public void doTest() throws Exception {
+    this.doTestLifeCycleActions();
+    this.doTestCheckpointActions();
+    this.doTestBufferActions();
+  }
+
+  // check that the life-cycle state is properly synchronised across nodes
+  public void doTestLifeCycleActions() throws Exception {
+    // check initial status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
+
+    // send start action to first shard
+    NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+    assertEquals(CdcrParams.ProcessState.STARTED.toLower(), status.get(CdcrParams.ProcessState.getParam()));
+
+    // check status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
+
+    // Restart the leader of shard 1
+    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
+
+    // check status - the node that died should have picked up the original state
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STARTED, CdcrParams.BufferState.ENABLED);
+
+    // send stop action to second shard
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.STOP);
+    status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+    assertEquals(CdcrParams.ProcessState.STOPPED.toLower(), status.get(CdcrParams.ProcessState.getParam()));
+
+    // check status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
+  }
+
+  // check the checkpoint API
+  public void doTestCheckpointActions() throws Exception {
+    // initial request on an empty index, must return -1
+    NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+    assertEquals(-1l, rsp.get(CdcrParams.CHECKPOINT));
+
+    index(SOURCE_COLLECTION, getDoc(id, "a")); // shard 2
+
+    // only one document indexed in shard 2, the checkpoint must be still -1
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+    assertEquals(-1l, rsp.get(CdcrParams.CHECKPOINT));
+
+    index(SOURCE_COLLECTION, getDoc(id, "b")); // shard 1
+
+    // a second document indexed in shard 1, the checkpoint must come from shard 2
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+    long checkpoint1 = (Long) rsp.get(CdcrParams.CHECKPOINT);
+    long expected = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
+    assertEquals(expected, checkpoint1);
+
+    index(SOURCE_COLLECTION, getDoc(id, "c")); // shard 1
+
+    // a third document indexed in shard 1, the checkpoint must still come from shard 2
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+    assertEquals(checkpoint1, rsp.get(CdcrParams.CHECKPOINT));
+
+    index(SOURCE_COLLECTION, getDoc(id, "d")); // shard 2
+
+    // a fourth document indexed in shard 2, the checkpoint must come from shard 1
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+    long checkpoint2 = (Long) rsp.get(CdcrParams.CHECKPOINT);
+    expected = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
+    assertEquals(expected, checkpoint2);
+
+    // send a delete by query
+    deleteByQuery(SOURCE_COLLECTION, "*:*");
+
+    // all the checkpoints must come from the DBQ
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
+    long checkpoint3 = (Long) rsp.get(CdcrParams.CHECKPOINT);
+    assertTrue(checkpoint3 > 0); // ensure that checkpoints from deletes are in absolute form
+    checkpoint3 = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
+    assertTrue(checkpoint3 > 0); // ensure that checkpoints from deletes are in absolute form
+    checkpoint3 = (Long) invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.SHARDCHECKPOINT).get(CdcrParams.CHECKPOINT);
+    assertTrue(checkpoint3 > 0); // ensure that checkpoints from deletes are in absolute form
+
+    // replication never started, lastProcessedVersion should be -1 for both shards
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.LASTPROCESSEDVERSION);
+    long lastVersion = (Long) rsp.get(CdcrParams.LAST_PROCESSED_VERSION);
+    assertEquals(-1l, lastVersion);
+
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.LASTPROCESSEDVERSION);
+    lastVersion = (Long) rsp.get(CdcrParams.LAST_PROCESSED_VERSION);
+    assertEquals(-1l, lastVersion);
+  }
+
+  // check that the buffer state is properly synchronised across nodes
+  public void doTestBufferActions() throws Exception {
+    // check initial status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
+
+    // send disable buffer action to first shard
+    NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
+    NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+    assertEquals(CdcrParams.BufferState.DISABLED.toLower(), status.get(CdcrParams.BufferState.getParam()));
+
+    // check status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.DISABLED);
+
+    // Restart the leader of shard 1
+    this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
+
+    // check status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.DISABLED);
+
+    // send enable buffer action to second shard
+    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.ENABLEBUFFER);
+    status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+    assertEquals(CdcrParams.BufferState.ENABLED.toLower(), status.get(CdcrParams.BufferState.getParam()));
+
+    // check status
+    this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
+  }
+
+}
+

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java Fri May 22 18:58:29 2015
@@ -0,0 +1,304 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.update.processor.CdcrUpdateProcessor;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class CdcrVersionReplicationTest extends BaseCdcrDistributedZkTest {
+
+  private static final String vfield = DistributedUpdateProcessor.VERSION_FIELD;
+  SolrClient solrServer;
+
+  public CdcrVersionReplicationTest() {
+    schemaString = "schema15.xml";      // we need a string id
+    super.createTargetCollection = false;
+  }
+
+  SolrClient createClientRandomly() throws Exception {
+    int r = random().nextInt(100);
+
+    // testing the smart cloud client (requests to leaders) is more important than testing the forwarding logic
+    if (r < 80) {
+      return createCloudClient(SOURCE_COLLECTION);
+    }
+
+    if (r < 90) {
+      return createNewSolrServer(shardToJetty.get(SOURCE_COLLECTION).get(SHARD1).get(random().nextInt(2)).url);
+    }
+
+    return createNewSolrServer(shardToJetty.get(SOURCE_COLLECTION).get(SHARD2).get(random().nextInt(2)).url);
+  }
+
+  @Test
+  @ShardsFixed(num = 4)
+
+  public void doTest() throws Exception {
+    SolrClient client = createClientRandomly();
+    try {
+      handle.clear();
+      handle.put("timestamp", SKIPVAL);
+
+      doTestCdcrDocVersions(client);
+
+      commit(SOURCE_COLLECTION); // work arround SOLR-5628
+    } finally {
+      client.close();
+    }
+  }
+
+  private void doTestCdcrDocVersions(SolrClient solrClient) throws Exception {
+    this.solrServer = solrClient;
+
+    log.info("### STARTING doCdcrTestDocVersions - Add commands, client: " + solrClient);
+
+    vadd("doc1", 10, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "10");
+    vadd("doc2", 11, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "11");
+    vadd("doc3", 10, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "10");
+    vadd("doc4", 11, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "11");
+    commit(SOURCE_COLLECTION);
+
+    // versions are preserved and verifiable both by query and by real-time get
+    doQuery(solrClient, "doc1,10,doc2,11,doc3,10,doc4,11", "q", "*:*");
+    doRealTimeGet("doc1,doc2,doc3,doc4", "10,11,10,11");
+
+    vadd("doc1", 5, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "5");
+    vadd("doc2", 10, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "10");
+    vadd("doc3", 9, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "9");
+    vadd("doc4", 8, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "8");
+
+    // lower versions are ignored
+    doRealTimeGet("doc1,doc2,doc3,doc4", "10,11,10,11");
+
+    vadd("doc1", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
+    vadd("doc2", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
+    vadd("doc3", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
+    vadd("doc4", 12, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "12");
+
+    // higher versions are accepted
+    doRealTimeGet("doc1,doc2,doc3,doc4", "12,12,12,12");
+
+    // non-cdcr update requests throw a version conflict exception for non-equal versions (optimistic locking feature)
+    vaddFail("doc1", 13, 409);
+    vaddFail("doc2", 13, 409);
+    vaddFail("doc3", 13, 409);
+
+    commit(SOURCE_COLLECTION);
+
+    // versions are still as they were
+    doQuery(solrClient, "doc1,12,doc2,12,doc3,12,doc4,12", "q", "*:*");
+
+    // query all shard replicas individually
+    doQueryShardReplica(SHARD1, "doc1,12,doc2,12,doc3,12,doc4,12", "q", "*:*");
+    doQueryShardReplica(SHARD2, "doc1,12,doc2,12,doc3,12,doc4,12", "q", "*:*");
+
+    // optimistic locking update
+    vadd("doc4", 12);
+    commit(SOURCE_COLLECTION);
+
+    QueryResponse rsp = solrClient.query(params("qt", "/get", "ids", "doc4"));
+    long version = (long) rsp.getResults().get(0).get(vfield);
+
+    // update accepted and a new version number was generated
+    assertTrue(version > 1_000_000_000_000l);
+
+    log.info("### STARTING doCdcrTestDocVersions - Delete commands");
+
+    // send a delete update with an older version number
+    vdelete("doc1", 5, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "5");
+    // must ignore the delete
+    doRealTimeGet("doc1", "12");
+
+    // send a delete update with a higher version number
+    vdelete("doc1", 13, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "13");
+    // must be deleted
+    doRealTimeGet("doc1", "");
+
+    // send a delete update with a higher version number
+    vdelete("doc4", version + 1, CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, "" + (version + 1));
+    // must be deleted
+    doRealTimeGet("doc4", "");
+
+    commit(SOURCE_COLLECTION);
+
+    // query each shard replica individually
+    doQueryShardReplica(SHARD1, "doc2,12,doc3,12", "q", "*:*");
+    doQueryShardReplica(SHARD2, "doc2,12,doc3,12", "q", "*:*");
+
+    // version conflict thanks to optimistic locking
+    if (solrClient instanceof CloudSolrClient) // TODO: it seems that optimistic locking doesn't work with forwarding, test with shard2 client
+      vdeleteFail("doc2", 50, 409);
+
+    // cleanup after ourselves for the next run
+    // deleteByQuery should work as usual with the CDCR_UPDATE param
+    doDeleteByQuery("id:doc*", CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, Long.toString(1));
+    commit(SOURCE_COLLECTION);
+
+    // deleteByQuery with a version lower than anything else should have no effect
+    doQuery(solrClient, "doc2,12,doc3,12", "q", "*:*");
+
+    doDeleteByQuery("id:doc*", CdcrUpdateProcessor.CDCR_UPDATE, "", vfield, Long.toString(51));
+    commit(SOURCE_COLLECTION);
+
+    // deleteByQuery with a version higher than everything else should delete all remaining docs
+    doQuery(solrClient, "", "q", "*:*");
+
+    // check that replicas are as expected too
+    doQueryShardReplica(SHARD1, "", "q", "*:*");
+    doQueryShardReplica(SHARD2, "", "q", "*:*");
+  }
+
+
+  // ------------------ auxiliary methods ------------------
+
+
+  void doQueryShardReplica(String shard, String expectedDocs, String... queryParams) throws Exception {
+    for (CloudJettyRunner jetty : shardToJetty.get(SOURCE_COLLECTION).get(shard)) {
+      doQuery(jetty.client, expectedDocs, queryParams);
+    }
+  }
+
+  void vdelete(String id, long version, String... params) throws Exception {
+    UpdateRequest req = new UpdateRequest();
+    req.deleteById(id);
+    req.setParam(vfield, Long.toString(version));
+
+    for (int i = 0; i < params.length; i += 2) {
+      req.setParam(params[i], params[i + 1]);
+    }
+    solrServer.request(req);
+  }
+
+  void vdeleteFail(String id, long version, int errCode, String... params) throws Exception {
+    boolean failed = false;
+    try {
+      vdelete(id, version, params);
+    } catch (SolrException e) {
+      failed = true;
+      if (e.getCause() instanceof SolrException && e.getCause() != e) {
+        e = (SolrException) e.getCause();
+      }
+      assertEquals(errCode, e.code());
+    } catch (SolrServerException ex) {
+      Throwable t = ex.getCause();
+      if (t instanceof SolrException) {
+        failed = true;
+        SolrException exception = (SolrException) t;
+        assertEquals(errCode, exception.code());
+      }
+    } catch (Exception e) {
+      log.error("ERROR", e);
+    }
+    assertTrue(failed);
+  }
+
+  void vadd(String id, long version, String... params) throws Exception {
+    UpdateRequest req = new UpdateRequest();
+    req.add(sdoc("id", id, vfield, version));
+    for (int i = 0; i < params.length; i += 2) {
+      req.setParam(params[i], params[i + 1]);
+    }
+    solrServer.request(req);
+  }
+
+  void vaddFail(String id, long version, int errCode, String... params) throws Exception {
+    boolean failed = false;
+    try {
+      vadd(id, version, params);
+    } catch (SolrException e) {
+      failed = true;
+      if (e.getCause() instanceof SolrException && e.getCause() != e) {
+        e = (SolrException) e.getCause();
+      }
+      assertEquals(errCode, e.code());
+    } catch (SolrServerException ex) {
+      Throwable t = ex.getCause();
+      if (t instanceof SolrException) {
+        failed = true;
+        SolrException exception = (SolrException) t;
+        assertEquals(errCode, exception.code());
+      }
+    } catch (Exception e) {
+      log.error("ERROR", e);
+    }
+    assertTrue(failed);
+  }
+
+  void doQuery(SolrClient ss, String expectedDocs, String... queryParams) throws Exception {
+
+    List<String> strs = StrUtils.splitSmart(expectedDocs, ",", true);
+    Map<String, Object> expectedIds = new HashMap<>();
+    for (int i = 0; i < strs.size(); i += 2) {
+      String id = strs.get(i);
+      String vS = strs.get(i + 1);
+      Long v = Long.valueOf(vS);
+      expectedIds.put(id, v);
+    }
+
+    QueryResponse rsp = ss.query(params(queryParams));
+    Map<String, Object> obtainedIds = new HashMap<>();
+    for (SolrDocument doc : rsp.getResults()) {
+      obtainedIds.put((String) doc.get("id"), doc.get(vfield));
+    }
+
+    assertEquals(expectedIds, obtainedIds);
+  }
+
+
+  void doRealTimeGet(String ids, String versions) throws Exception {
+    Map<String, Object> expectedIds = new HashMap<>();
+    List<String> strs = StrUtils.splitSmart(ids, ",", true);
+    List<String> verS = StrUtils.splitSmart(versions, ",", true);
+    for (int i = 0; i < strs.size(); i++) {
+      if (!verS.isEmpty()) {
+        expectedIds.put(strs.get(i), Long.valueOf(verS.get(i)));
+      }
+    }
+
+    QueryResponse rsp = solrServer.query(params("qt", "/get", "ids", ids));
+    Map<String, Object> obtainedIds = new HashMap<>();
+    for (SolrDocument doc : rsp.getResults()) {
+      obtainedIds.put((String) doc.get("id"), doc.get(vfield));
+    }
+
+    assertEquals(expectedIds, obtainedIds);
+  }
+
+  void doDeleteByQuery(String q, String... reqParams) throws Exception {
+    UpdateRequest req = new UpdateRequest();
+    req.deleteByQuery(q);
+    req.setParams(params(reqParams));
+    req.process(solrServer);
+  }
+
+}
+