You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/07/17 18:38:13 UTC

svn commit: r1691606 - in /lucene/dev/trunk/solr/core/src: java/org/apache/solr/handler/ test/org/apache/solr/cloud/

Author: erick
Date: Fri Jul 17 16:38:13 2015
New Revision: 1691606

URL: http://svn.apache.org/r1691606
Log:
SOLR-6273: Cross Data Center Replication. All tests are now passing on my machine, let's see if Jenkins flushes anything out

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java?rev=1691606&r1=1691605&r2=1691606&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java Fri Jul 17 16:38:13 2015
@@ -82,6 +82,7 @@ class CdcrReplicatorScheduler {
               @Override
               public void run() {
                 CdcrReplicatorState state = statesQueue.poll();
+                assert state != null; // Should never happen
                 try {
                   new CdcrReplicator(state, batchSize).run();
                 } finally {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java?rev=1691606&r1=1691605&r2=1691606&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java Fri Jul 17 16:38:13 2015
@@ -245,11 +245,43 @@ public class BaseCdcrDistributedZkTest e
     return jetty.client.request(request);
   }
 
+  protected void waitForCdcrStateReplication(String collection) throws Exception {
+    log.info("Wait for CDCR state to replicate - collection: " + collection);
+
+    int cnt = 30;
+    while (cnt > 0) {
+      NamedList status = null;
+      boolean allEquals = true;
+      for (CloudJettyRunner jetty : cloudJettys.get(collection)) { // check all replicas
+        NamedList rsp = invokeCdcrAction(jetty, CdcrParams.CdcrAction.STATUS);
+        if (status == null) {
+          status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+          continue;
+        }
+        allEquals &= status.equals(rsp.get(CdcrParams.CdcrAction.STATUS.toLower()));
+      }
+
+      if (allEquals) {
+        break;
+      }
+      else {
+        if (cnt == 0) {
+          throw new RuntimeException("Timeout waiting for CDCR state to replicate: collection="+collection);
+        }
+        cnt--;
+        Thread.sleep(500);
+      }
+    }
+
+    log.info("CDCR state is identical across nodes - collection: " + collection);
+  }
+
   /**
    * Assert the state of CDCR on each nodes of the given collection.
    */
   protected void assertState(String collection, CdcrParams.ProcessState processState, CdcrParams.BufferState bufferState)
-      throws Exception {
+  throws Exception {
+    this.waitForCdcrStateReplication(collection); // ensure that cdcr state is replicated and stable
     for (CloudJettyRunner jetty : cloudJettys.get(collection)) { // check all replicas
       NamedList rsp = invokeCdcrAction(jetty, CdcrParams.CdcrAction.STATUS);
       NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
@@ -282,6 +314,7 @@ public class BaseCdcrDistributedZkTest e
    */
   protected void clearSourceCollection() throws Exception {
     this.deleteCollection(SOURCE_COLLECTION);
+    this.waitForCollectionToDisappear(SOURCE_COLLECTION);
     this.createCollection(SOURCE_COLLECTION);
     this.waitForRecoveriesToFinish(SOURCE_COLLECTION, true);
     this.updateMappingsFromZk(SOURCE_COLLECTION);
@@ -305,6 +338,7 @@ public class BaseCdcrDistributedZkTest e
    */
   protected void clearTargetCollection() throws Exception {
     this.deleteCollection(TARGET_COLLECTION);
+    this.waitForCollectionToDisappear(TARGET_COLLECTION);
     this.createCollection(TARGET_COLLECTION);
     this.waitForRecoveriesToFinish(TARGET_COLLECTION, true);
     this.updateMappingsFromZk(TARGET_COLLECTION);
@@ -389,7 +423,7 @@ public class BaseCdcrDistributedZkTest e
   /**
    * Delete a collection through the Collection API.
    */
-  protected CollectionAdminResponse deleteCollection(String collectionName) throws SolrServerException, IOException {
+  protected CollectionAdminResponse deleteCollection(String collectionName) throws Exception {
     SolrClient client = createCloudClient(null);
     CollectionAdminResponse res;
 
@@ -412,6 +446,17 @@ public class BaseCdcrDistributedZkTest e
     return res;
   }
 
+  private void waitForCollectionToDisappear(String collection) throws Exception {
+    CloudSolrClient client = this.createCloudClient(null);
+    try {
+      client.connect();
+      ZkStateReader zkStateReader = client.getZkStateReader();
+      AbstractDistribZkTestBase.waitForCollectionToDisappear(collection, zkStateReader, false, true, 15);
+    } finally {
+      client.close();
+    }
+  }
+
   private void waitForRecoveriesToFinish(String collection, boolean verbose) throws Exception {
     CloudSolrClient client = this.createCloudClient(null);
     try {
@@ -673,15 +718,18 @@ public class BaseCdcrDistributedZkTest e
   }
 
   protected void waitForReplicationToComplete(String collectionName, String shardId) throws Exception {
-    while (true) {
+    int cnt = 15;
+    while (cnt > 0) {
       log.info("Checking queue size @ {}:{}", collectionName, shardId);
       long size = this.getQueueSize(collectionName, shardId);
-      if (size <= 0) {
+      if (size == 0) { // if we received -1, it means that the log reader is not yet initialised, we should wait
         return;
       }
       log.info("Waiting for replication to complete. Queue size: {} @ {}:{}", size, collectionName, shardId);
+      cnt--;
       Thread.sleep(1000); // wait a bit for the replication to complete
     }
+    throw new RuntimeException("Timeout waiting for CDCR replication to complete @" + collectionName + ":"  + shardId);
   }
 
   protected long getQueueSize(String collectionName, String shardId) throws Exception {
@@ -691,47 +739,6 @@ public class BaseCdcrDistributedZkTest e
     return (Long) status.get(CdcrParams.QUEUE_SIZE);
   }
 
-  /**
-   * Asserts that the number of transaction logs across all the shards
-   */
-  protected void assertUpdateLogs(String collection, int maxNumberOfTLogs) throws Exception {
-    CollectionInfo info = collectInfo(collection);
-    Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
-
-    int leaderLogs = 0;
-    ArrayList<Integer> replicasLogs = new ArrayList<>(Collections.nCopies(replicationFactor - 1, 0));
-
-    for (String shard : shardToCoresMap.keySet()) {
-      leaderLogs += numberOfFiles(info.getLeader(shard).ulogDir);
-      for (int i = 0; i < replicationFactor - 1; i++) {
-        replicasLogs.set(i, replicasLogs.get(i) + numberOfFiles(info.getReplicas(shard).get(i).ulogDir));
-      }
-    }
-
-    for (Integer replicaLogs : replicasLogs) {
-      log.info("Number of logs in update log on leader {} and on replica {}", leaderLogs, replicaLogs);
-
-      // replica logs must be always equal or superior to leader logs
-      assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is different than on leader: %d.",
-          replicaLogs, leaderLogs), leaderLogs <= replicaLogs);
-
-      assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on leader: %d is superior to: %d.",
-          leaderLogs, maxNumberOfTLogs), maxNumberOfTLogs >= leaderLogs);
-
-      assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is superior to: %d.",
-          replicaLogs, maxNumberOfTLogs), maxNumberOfTLogs >= replicaLogs);
-    }
-  }
-
-  private int numberOfFiles(String dir) {
-    File file = new File(dir);
-    if (!file.isDirectory()) {
-      assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
-    }
-    log.info("Update log dir {} contains: {}", dir, file.listFiles());
-    return file.listFiles().length;
-  }
-
   protected CollectionInfo collectInfo(String collection) throws Exception {
     CollectionInfo info = new CollectionInfo(collection);
     for (String shard : shardToJetty.get(collection).keySet()) {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java?rev=1691606&r1=1691605&r2=1691606&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java Fri Jul 17 16:38:13 2015
@@ -21,13 +21,15 @@ import org.apache.lucene.util.LuceneTest
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.handler.CdcrParams;
-import org.junit.Ignore;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 
-@Ignore
 @Slow
 public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest {
 
@@ -39,7 +41,7 @@ public class CdcrReplicationDistributedZ
 
   @Test
   @ShardsFixed(num = 4)
-  public void doTest() throws Exception {
+  public void doTests() throws Exception {
     this.doTestDeleteCreateSourceCollection();
     this.doTestTargetCollectionNotAvailable();
     this.doTestReplicationStartStop();
@@ -52,12 +54,13 @@ public class CdcrReplicationDistributedZ
     this.doTestBatchBoundaries();
     this.doTestResilienceWithDeleteByQueryOnTarget();
   }
-
   /**
    * Checks that the test framework handles properly the creation and deletion of collections and the
    * restart of servers.
    */
   public void doTestDeleteCreateSourceCollection() throws Exception {
+    this.clearSourceCollection();
+    this.clearTargetCollection();
     log.info("Indexing documents");
 
     List<SolrInputDocument> docs = new ArrayList<>();
@@ -153,8 +156,10 @@ public class CdcrReplicationDistributedZ
     assertEquals(0, getNumDocs(TARGET_COLLECTION));
 
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
 
     commit(TARGET_COLLECTION);
 
@@ -162,6 +167,7 @@ public class CdcrReplicationDistributedZ
     assertEquals(10, getNumDocs(TARGET_COLLECTION));
 
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     docs.clear();
     for (; start < 110; start++) {
@@ -176,8 +182,10 @@ public class CdcrReplicationDistributedZ
     // with the latest checkpoints
 
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
 
     commit(TARGET_COLLECTION);
 
@@ -196,6 +204,7 @@ public class CdcrReplicationDistributedZ
 
     // send start action to first shard
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     log.info("Indexing 10 documents");
 
@@ -260,6 +269,7 @@ public class CdcrReplicationDistributedZ
 
     // send start action to first shard
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     log.info("Indexing 10 documents");
 
@@ -337,11 +347,14 @@ public class CdcrReplicationDistributedZ
 
     // buffering is enabled by default, so disable it
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     for (int i = 0; i < 50; i++) {
-      index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i))); // will perform a commit for every document
+      // will perform a commit for every document and will create one tlog file per commit
+      index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
     }
 
     // wait a bit for the replication to complete
@@ -352,26 +365,23 @@ public class CdcrReplicationDistributedZ
 
     // Stop CDCR
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     assertEquals(50, getNumDocs(SOURCE_COLLECTION));
     assertEquals(50, getNumDocs(TARGET_COLLECTION));
 
-    index(SOURCE_COLLECTION, getDoc(id, Integer.toString(0))); // trigger update log cleaning on the non-leader nodes
-
     // some of the tlogs should be trimmed, we must have less than 50 tlog files on both leader and non-leader
-    assertUpdateLogs(SOURCE_COLLECTION, 50);
+    assertNumberOfTlogFiles(SOURCE_COLLECTION, 50);
 
     for (int i = 50; i < 100; i++) {
       index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
     }
 
-    index(SOURCE_COLLECTION, getDoc(id, Integer.toString(0))); // trigger update log cleaning on the non-leader nodes
-
     // at this stage, we should have created one tlog file per document, and some of them must have been cleaned on the
     // leader since we are not buffering and replication is stopped, (we should have exactly 10 tlog files on the leader
     // and 11 on the non-leader)
     // the non-leader must have synchronised its update log with its leader
-    assertUpdateLogs(SOURCE_COLLECTION, 50);
+    assertNumberOfTlogFiles(SOURCE_COLLECTION, 50);
   }
 
   /**
@@ -383,9 +393,11 @@ public class CdcrReplicationDistributedZ
 
     // buffering is enabled by default, so disable it
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     // Start CDCR
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     // Index documents
     for (int i = 0; i < 200; i++) {
@@ -425,6 +437,7 @@ public class CdcrReplicationDistributedZ
 
     // Start CDCR
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     // wait a bit for the replication to complete
     this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
@@ -485,6 +498,7 @@ public class CdcrReplicationDistributedZ
 
     // Start CDCR
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     // wait a bit for the replication to complete
     this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
@@ -501,7 +515,8 @@ public class CdcrReplicationDistributedZ
    * Checks that batches are correctly constructed when batch boundaries are reached.
    */
   public void doTestBatchBoundaries() throws Exception {
-    invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     log.info("Indexing documents");
 
@@ -514,6 +529,7 @@ public class CdcrReplicationDistributedZ
     assertEquals(128, getNumDocs(SOURCE_COLLECTION));
 
     this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+    this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
 
     commit(TARGET_COLLECTION);
 
@@ -538,6 +554,7 @@ public class CdcrReplicationDistributedZ
 
     // Start CDCR
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     // wait a bit for the replication to complete
     this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
@@ -577,8 +594,9 @@ public class CdcrReplicationDistributedZ
 
     // Restart CDCR
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
-    Thread.sleep(500); // wait a bit for the state to synch
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
     docs.clear();
     for (; start < 150; start++) {
@@ -596,5 +614,76 @@ public class CdcrReplicationDistributedZ
     assertEquals(50, getNumDocs(TARGET_COLLECTION));
   }
 
+  /**
+   * Asserts the number of transaction logs across all the shards. Since the cleaning of the update logs
+   * is not immediate on the slave nodes (it relies on the update log synchronizer that is executed every second),
+   * it will retry until the assert is successful or until the timeout.
+   */
+  protected void assertNumberOfTlogFiles(String collection, int maxNumberOfTLogs) throws Exception {
+    int cnt = 15; // timeout after 15 seconds
+    AssertionError lastAssertionError = null;
+
+    while (cnt > 0) {
+      try {
+        // Fire a DeleteById query with a commit to trigger update log cleaning on the non-leader nodes
+        List<String> ids = new ArrayList<>();
+        ids.add("_NON_EXISTING_ID_");
+        deleteById(collection, ids);
+
+        // Check the update logs
+        this._assertNumberOfTlogFiles(collection, maxNumberOfTLogs);
+        return;
+      }
+      catch (AssertionError e) {
+        lastAssertionError = e;
+        cnt--;
+        Thread.sleep(1000);
+      }
+    }
+
+    throw new AssertionError("Timeout while trying to assert update logs @ collection="+collection, lastAssertionError);
+  }
+
+  /**
+   * Asserts the number of transaction logs across all the shards
+   */
+  private void _assertNumberOfTlogFiles(String collection, int maxNumberOfTLogs) throws Exception {
+    CollectionInfo info = collectInfo(collection);
+    Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
+
+    int leaderLogs = 0;
+    ArrayList<Integer> replicasLogs = new ArrayList<>(Collections.nCopies(replicationFactor - 1, 0));
+
+    for (String shard : shardToCoresMap.keySet()) {
+      leaderLogs += numberOfFiles(info.getLeader(shard).ulogDir);
+      for (int i = 0; i < replicationFactor - 1; i++) {
+        replicasLogs.set(i, replicasLogs.get(i) + numberOfFiles(info.getReplicas(shard).get(i).ulogDir));
+      }
+    }
+
+    for (Integer replicaLogs : replicasLogs) {
+      log.info("Number of logs in update log on leader {} and on replica {}", leaderLogs, replicaLogs);
+
+      // replica logs must be always equal or superior to leader logs
+      assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is different than on leader: %d.",
+          replicaLogs, leaderLogs), leaderLogs <= replicaLogs);
+
+      assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on leader: %d is superior to: %d.",
+          leaderLogs, maxNumberOfTLogs), maxNumberOfTLogs >= leaderLogs);
+
+      assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is superior to: %d.",
+          replicaLogs, maxNumberOfTLogs), maxNumberOfTLogs >= replicaLogs);
+    }
+  }
+
+  private int numberOfFiles(String dir) {
+    File file = new File(dir);
+    if (!file.isDirectory()) {
+      assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
+    }
+    log.debug("Update log dir {} contains: {}", dir, file.listFiles());
+    return file.listFiles().length;
+  }
+
 }
 

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java?rev=1691606&r1=1691605&r2=1691606&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java Fri Jul 17 16:38:13 2015
@@ -23,7 +23,6 @@ package org.apache.solr.cloud;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.common.SolrInputDocument;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
@@ -32,7 +31,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-@Ignore
 @Slow
 public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
 
@@ -47,7 +45,7 @@ public class CdcrReplicationHandlerTest
   }
 
   @Test
-  @ShardsFixed(num = 4)
+  @ShardsFixed(num = 2)
   public void doTest() throws Exception {
     this.doTestFullReplication();
     this.doTestPartialReplication();
@@ -60,6 +58,7 @@ public class CdcrReplicationHandlerTest
    * strategy should fetch all the missing tlog files from the leader.
    */
   public void doTestFullReplication() throws Exception {
+    this.clearSourceCollection();
     List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
     ChaosMonkey.stop(slaves.get(0).jetty);
 
@@ -76,7 +75,7 @@ public class CdcrReplicationHandlerTest
     // Restart the slave node to trigger Replication strategy
     this.restartServer(slaves.get(0));
 
-    this.assertUpdateLogs(SOURCE_COLLECTION, 10);
+    this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
   }
 
   /**
@@ -85,7 +84,6 @@ public class CdcrReplicationHandlerTest
    */
   public void doTestPartialReplication() throws Exception {
     this.clearSourceCollection();
-
     for (int i = 0; i < 5; i++) {
       List<SolrInputDocument> docs = new ArrayList<>();
       for (int j = i * 20; j < (i * 20) + 20; j++) {
@@ -111,7 +109,7 @@ public class CdcrReplicationHandlerTest
     this.restartServer(slaves.get(0));
 
     // at this stage, the slave should have replicated the 5 missing tlog files
-    this.assertUpdateLogs(SOURCE_COLLECTION, 10);
+    this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
   }
 
   /**
@@ -121,7 +119,6 @@ public class CdcrReplicationHandlerTest
    */
   public void doTestPartialReplicationWithTruncatedTlog() throws Exception {
     this.clearSourceCollection();
-
     CloudSolrClient client = createCloudClient(SOURCE_COLLECTION);
     List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
 
@@ -148,7 +145,7 @@ public class CdcrReplicationHandlerTest
     this.restartServer(slaves.get(0));
 
     // at this stage, the slave should have replicated the 5 missing tlog files
-    this.assertUpdateLogs(SOURCE_COLLECTION, 10);
+    this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
   }
 
   /**
@@ -159,7 +156,6 @@ public class CdcrReplicationHandlerTest
    */
   public void doTestPartialReplicationAfterPeerSync() throws Exception {
     this.clearSourceCollection();
-
     for (int i = 0; i < 5; i++) {
       List<SolrInputDocument> docs = new ArrayList<>();
       for (int j = i * 10; j < (i * 10) + 10; j++) {
@@ -199,7 +195,7 @@ public class CdcrReplicationHandlerTest
     this.restartServer(slaves.get(0));
 
     // at this stage, the slave should have replicated the 5 missing tlog files
-    this.assertUpdateLogs(SOURCE_COLLECTION, 15);
+    this.assertUpdateLogsEquals(SOURCE_COLLECTION, 15);
   }
 
   private List<CloudJettyRunner> getShardToSlaveJetty(String collection, String shard) {
@@ -210,10 +206,10 @@ public class CdcrReplicationHandlerTest
   }
 
   /**
-   * Asserts that the transaction logs between the leader and slave
+   * Asserts that the update logs are in sync between the leader and slave. The leader and the slaves
+   * must have identical tlog files.
    */
-  @Override
-  protected void assertUpdateLogs(String collection, int maxNumberOfTLogs) throws Exception {
+  protected void assertUpdateLogsEquals(String collection, int numberOfTLogs) throws Exception {
     CollectionInfo info = collectInfo(collection);
     Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
 
@@ -221,8 +217,8 @@ public class CdcrReplicationHandlerTest
       Map<Long, Long> leaderFilesMeta = this.getFilesMeta(info.getLeader(shard).ulogDir);
       Map<Long, Long> slaveFilesMeta = this.getFilesMeta(info.getReplicas(shard).get(0).ulogDir);
 
-      assertEquals("Incorrect number of tlog files on the leader", maxNumberOfTLogs, leaderFilesMeta.size());
-      assertEquals("Incorrect number of tlog files on the slave", maxNumberOfTLogs, slaveFilesMeta.size());
+      assertEquals("Incorrect number of tlog files on the leader", numberOfTLogs, leaderFilesMeta.size());
+      assertEquals("Incorrect number of tlog files on the slave", numberOfTLogs, slaveFilesMeta.size());
 
       for (Long leaderFileVersion : leaderFilesMeta.keySet()) {
         assertTrue("Slave is missing a tlog for version " + leaderFileVersion, slaveFilesMeta.containsKey(leaderFileVersion));

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java?rev=1691606&r1=1691605&r2=1691606&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java Fri Jul 17 16:38:13 2015
@@ -20,10 +20,8 @@ package org.apache.solr.cloud;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.handler.CdcrParams;
-import org.junit.Ignore;
 import org.junit.Test;
 
-@Ignore
 @Slow
 public class CdcrRequestHandlerTest extends BaseCdcrDistributedZkTest {
 
@@ -35,7 +33,7 @@ public class CdcrRequestHandlerTest exte
   }
 
   @Test
-  @ShardsFixed(num = 4)
+  @ShardsFixed(num = 2)
   public void doTest() throws Exception {
     this.doTestLifeCycleActions();
     this.doTestCheckpointActions();

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java?rev=1691606&r1=1691605&r2=1691606&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java Fri Jul 17 16:38:13 2015
@@ -61,8 +61,7 @@ public class CdcrVersionReplicationTest
 
   @Test
   @ShardsFixed(num = 4)
-
-  public void doTest() throws Exception {
+  public void testCdcrDocVersions() throws Exception {
     SolrClient client = createClientRandomly();
     try {
       handle.clear();