You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/11/06 21:18:19 UTC

svn commit: r1713022 - in /lucene/dev/trunk/solr/core/src: java/org/apache/solr/handler/ java/org/apache/solr/update/ test/org/apache/solr/cloud/ test/org/apache/solr/update/

Author: erick
Date: Fri Nov  6 20:18:19 2015
New Revision: 1713022

URL: http://svn.apache.org/viewvc?rev=1713022&view=rev
Log:
SOLR-6273: testfix7, improves test pass ratio significantly

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java Fri Nov  6 20:18:19 2015
@@ -118,12 +118,14 @@ class CdcrBufferStateManager extends Cdc
     try {
       if (!zkClient.exists(this.getZnodePath(), true)) {
         if (!zkClient.exists(this.getZnodeBase(), true)) {
-          zkClient.makePath(this.getZnodeBase(), CreateMode.PERSISTENT, true);
+          zkClient.makePath(this.getZnodeBase(), null, CreateMode.PERSISTENT, null, false, true); // Should be a no-op if node exists
         }
         zkClient.create(this.getZnodePath(), DEFAULT_STATE.getBytes(), CreateMode.PERSISTENT, true);
         log.info("Created znode {}", this.getZnodePath());
       }
-    } catch (KeeperException | InterruptedException e) {
+    } catch (KeeperException.NodeExistsException ne) {
+      // Someone got in first and created the node.
+    }  catch (KeeperException | InterruptedException e) {
       log.warn("Failed to create CDCR buffer state node", e);
     }
   }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java Fri Nov  6 20:18:19 2015
@@ -116,12 +116,14 @@ class CdcrProcessStateManager extends Cd
     SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
     try {
       if (!zkClient.exists(this.getZnodePath(), true)) {
-        if (!zkClient.exists(this.getZnodeBase(), true)) {
-          zkClient.makePath(this.getZnodeBase(), CreateMode.PERSISTENT, true);
+        if (!zkClient.exists(this.getZnodeBase(), true)) { // Should be a no-op if the node exists
+          zkClient.makePath(this.getZnodeBase(), null, CreateMode.PERSISTENT, null, false, true);
         }
         zkClient.create(this.getZnodePath(), DEFAULT_STATE.getBytes(), CreateMode.PERSISTENT, true);
         log.info("Created znode {}", this.getZnodePath());
       }
+    } catch (KeeperException.NodeExistsException ne) {
+      // Someone got in first and created the node.
     } catch (KeeperException | InterruptedException e) {
       log.warn("Failed to create CDCR process state node", e);
     }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java Fri Nov  6 20:18:19 2015
@@ -491,7 +491,7 @@ public class CdcrRequestHandler extends
       }
     }
 
-    log.info("Returning the lowest last processed version {}  @ {}:{}", lastProcessedVersion, collectionName, shard);
+    log.debug("Returning the lowest last processed version {}  @ {}:{}", lastProcessedVersion, collectionName, shard);
     rsp.add(CdcrParams.LAST_PROCESSED_VERSION, lastProcessedVersion);
   }
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java Fri Nov  6 20:18:19 2015
@@ -27,8 +27,11 @@ import java.io.Writer;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -284,6 +287,7 @@ public class IndexFetcher {
     Directory indexDir = null;
     String indexDirPath;
     boolean deleteTmpIdxDir = true;
+    File tmpTlogDir = null;
 
     if (!solrCore.getSolrCoreState().getLastReplicateIndexSuccess()) {
       // if the last replication was not a success, we force a full replication
@@ -379,6 +383,11 @@ public class IndexFetcher {
 
       tmpIndexDir = solrCore.getDirectoryFactory().get(tmpIndex, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
 
+      // tmp dir for tlog files
+      if (tlogFilesToDownload != null) {
+        tmpTlogDir = new File(solrCore.getUpdateHandler().getUpdateLog().getLogDir(), "tlog." + timestamp);
+      }
+
       // cindex dir...
       indexDirPath = solrCore.getIndexDir();
       indexDir = solrCore.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
@@ -435,7 +444,8 @@ public class IndexFetcher {
 
           long bytesDownloaded = downloadIndexFiles(isFullCopyNeeded, indexDir, tmpIndexDir, latestGeneration);
           if (tlogFilesToDownload != null) {
-            bytesDownloaded += downloadTlogFiles(timestamp, latestGeneration);
+            bytesDownloaded += downloadTlogFiles(tmpTlogDir, latestGeneration);
+            reloadCore = true; // reload update log
           }
           final long timeTakenSeconds = getReplicationTimeElapsed();
           final Long bytesDownloadedPerSecond = (timeTakenSeconds != 0 ? new Long(bytesDownloaded/timeTakenSeconds) : null);
@@ -452,6 +462,10 @@ public class IndexFetcher {
             } else {
               successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
             }
+            if (tlogFilesToDownload != null) {
+              // move tlog files and refresh ulog only if we successfully installed a new index
+              successfulInstall &= moveTlogFiles(tmpTlogDir);
+            }
             if (successfulInstall) {
               if (isFullCopyNeeded) {
                 // let the system know we are changing dir's and the old one
@@ -476,6 +490,10 @@ public class IndexFetcher {
             } else {
               successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
             }
+            if (tlogFilesToDownload != null) {
+              // move tlog files and refresh ulog only if we successfully installed a new index
+              successfulInstall &= moveTlogFiles(tmpTlogDir);
+            }
             if (successfulInstall) {
               logReplicationTimeAndConfFiles(modifiedConfFiles,
                   successfulInstall);
@@ -489,7 +507,7 @@ public class IndexFetcher {
 
         // we must reload the core after we open the IW back up
        if (successfulInstall && (reloadCore || forceCoreReload)) {
-          LOG.info("Reloading SolrCore {}", solrCore.getName());
+         LOG.info("Reloading SolrCore {}", solrCore.getName());
           reloadCore();
         }
 
@@ -511,7 +529,7 @@ public class IndexFetcher {
         }
 
         if (!isFullCopyNeeded && !forceReplication && !successfulInstall) {
-          cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, successfulInstall);
+          cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, tmpTlogDir, successfulInstall);
           cleanupDone = true;
           // we try with a full copy of the index
           LOG.warn(
@@ -534,13 +552,13 @@ public class IndexFetcher {
       }
     } finally {
       if (!cleanupDone) {
-        cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, successfulInstall);
+        cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, tmpTlogDir, successfulInstall);
       }
     }
   }
 
   private void cleanup(final SolrCore core, Directory tmpIndexDir,
-      Directory indexDir, boolean deleteTmpIdxDir, boolean successfulInstall) throws IOException {
+      Directory indexDir, boolean deleteTmpIdxDir, File tmpTlogDir, boolean successfulInstall) throws IOException {
     try {
       if (!successfulInstall) {
         try {
@@ -552,7 +570,7 @@ public class IndexFetcher {
 
       core.getUpdateHandler().getSolrCoreState().setLastReplicateIndexSuccess(successfulInstall);
 
-      filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
+      filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = tlogFilesToDownload = tlogFilesDownloaded = null;
       markReplicationStop();
       dirFileFetcher = null;
       localFileFetcher = null;
@@ -577,6 +595,10 @@ public class IndexFetcher {
       if (indexDir != null) {
         core.getDirectoryFactory().release(indexDir);
       }
+
+      if (tmpTlogDir != null) {
+        delTree(tmpTlogDir);
+      }
     }
   }
 
@@ -792,35 +814,26 @@ public class IndexFetcher {
     }
   }
 
-  private long downloadTlogFiles(String timestamp, long latestGeneration) throws Exception {
-    UpdateLog ulog = solrCore.getUpdateHandler().getUpdateLog();
-
+  /**
+   * Download all the tlog files to the temp tlog directory.
+   */
+  private long downloadTlogFiles(File tmpTlogDir, long latestGeneration) throws Exception {
     LOG.info("Starting download of tlog files from master: " + tlogFilesToDownload);
-    tlogFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
+    tlogFilesDownloaded = Collections.synchronizedList(new ArrayList<>());
     long bytesDownloaded = 0;
-    File tmpTlogDir = new File(ulog.getLogDir(), "tlog." + getDateAsStr(new Date()));
-    try {
-      boolean status = tmpTlogDir.mkdirs();
-      if (!status) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            "Failed to create temporary tlog folder: " + tmpTlogDir.getName());
-      }
-      for (Map<String, Object> file : tlogFilesToDownload) {
-        String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
-        localFileFetcher = new LocalFsFileFetcher(tmpTlogDir, file, saveAs, TLOG_FILE, latestGeneration);
-        currentFile = file;
-        localFileFetcher.fetchFile();
-        bytesDownloaded += localFileFetcher.getBytesDownloaded();
-        tlogFilesDownloaded.add(new HashMap<>(file));
-      }
-      // this is called before copying the files to the original conf dir
-      // so that if there is an exception avoid corrupting the original files.
-      terminateAndWaitFsyncService();
-      ((CdcrUpdateLog) ulog).reset(); // reset the update log before copying the new tlog directory
-      copyTmpTlogFiles2Tlog(tmpTlogDir, timestamp);
-      ulog.init(solrCore.getUpdateHandler(), solrCore); // re-initialise the update log with the new directory
-    } finally {
-      delTree(tmpTlogDir);
+
+    boolean status = tmpTlogDir.mkdirs();
+    if (!status) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Failed to create temporary tlog folder: " + tmpTlogDir.getName());
+    }
+    for (Map<String, Object> file : tlogFilesToDownload) {
+      String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
+      localFileFetcher = new LocalFsFileFetcher(tmpTlogDir, file, saveAs, TLOG_FILE, latestGeneration);
+      currentFile = file;
+      localFileFetcher.fetchFile();
+      bytesDownloaded += localFileFetcher.getBytesDownloaded();
+      tlogFilesDownloaded.add(new HashMap<>(file));
     }
     return bytesDownloaded;
   }
@@ -1032,6 +1045,22 @@ public class IndexFetcher {
   }
 
   /**
+   * Copy all the tlog files from the temp tlog dir to the actual tlog dir, and reset
+   * the {@link UpdateLog}. The copy will try to preserve the original tlog directory
+   * if the copy fails.
+   */
+  private boolean moveTlogFiles(File tmpTlogDir) {
+    UpdateLog ulog = solrCore.getUpdateHandler().getUpdateLog();
+
+    // reset the update log before copying the new tlog directory, it will be reinitialized
+    // during the core reload
+    ((CdcrUpdateLog) ulog).reset();
+    // try to move the temp tlog files to the tlog directory
+    if (!copyTmpTlogFiles2Tlog(tmpTlogDir)) return false;
+    return true;
+  }
+
+  /**
    * Make file list
    */
   private List<File> makeTmpConfDirFileList(File dir, List<File> fileList) {
@@ -1085,27 +1114,40 @@ public class IndexFetcher {
   }
 
   /**
-   * The tlog files are copied from the tmp dir to the tlog dir by renaming the directory if possible.
-   * A backup of the old file is maintained.
+   * The tlog files are moved from the tmp dir to the tlog dir as an atomic filesystem operation.
+   * A backup of the old directory is maintained. If the directory move fails, it will try to revert back the original
+   * tlog directory.
    */
-  private void copyTmpTlogFiles2Tlog(File tmpTlogDir, String timestamp) {
-    File tlogDir = new File(solrCore.getUpdateHandler().getUpdateLog().getLogDir());
-    File backupTlogDir = new File(tlogDir.getParent(), UpdateLog.TLOG_NAME + "." + timestamp);
+  private boolean copyTmpTlogFiles2Tlog(File tmpTlogDir) {
+    Path tlogDir = FileSystems.getDefault().getPath(solrCore.getUpdateHandler().getUpdateLog().getLogDir());
+    Path backupTlogDir = FileSystems.getDefault().getPath(tlogDir.getParent().toAbsolutePath().toString(), tmpTlogDir.getName());
 
     try {
-      org.apache.commons.io.FileUtils.moveDirectory(tlogDir, backupTlogDir);
+      Files.move(tlogDir, backupTlogDir, StandardCopyOption.ATOMIC_MOVE);
     } catch (IOException e) {
-      throw new SolrException(ErrorCode.SERVER_ERROR,
-          "Unable to rename: " + tlogDir + " to: " + backupTlogDir, e);
+      SolrException.log(LOG, "Unable to rename: " + tlogDir + " to: " + backupTlogDir, e);
+      return false;
     }
 
+    Path src = FileSystems.getDefault().getPath(backupTlogDir.toAbsolutePath().toString(), tmpTlogDir.getName());
     try {
-      tmpTlogDir = new File(backupTlogDir, tmpTlogDir.getName());
-      org.apache.commons.io.FileUtils.moveDirectory(tmpTlogDir, tlogDir);
+      Files.move(src, tlogDir, StandardCopyOption.ATOMIC_MOVE);
     } catch (IOException e) {
-      throw new SolrException(ErrorCode.SERVER_ERROR,
-          "Unable to rename: " + tmpTlogDir + " to: " + tlogDir, e);
+      SolrException.log(LOG, "Unable to rename: " + src + " to: " + tlogDir, e);
+
+      // In case of error, try to revert back the original tlog directory
+      try {
+        Files.move(backupTlogDir, tlogDir, StandardCopyOption.ATOMIC_MOVE);
+      } catch (IOException e2) {
+        // bad, we were not able to revert back the original tlog directory
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Unable to rename: " + backupTlogDir + " to: " + tlogDir);
+      }
+
+      return false;
     }
+
+    return true;
   }
 
   private String getDateAsStr(Date d) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java Fri Nov  6 20:18:19 2015
@@ -248,20 +248,44 @@ public class CdcrUpdateLog extends Updat
       for (CdcrLogReader reader : new ArrayList<>(logPointers.keySet())) {
         reader.close();
       }
+      logPointers.clear();
 
       // Close and clear logs
+      doClose(prevTlog);
+      doClose(tlog);
+
       for (TransactionLog log : logs) {
-        log.deleteOnClose = false;
-        log.decref();
-        log.forceClose();
+        if (log == prevTlog || log == tlog) continue;
+        doClose(log);
       }
+
       logs.clear();
+      newestLogsOnStartup.clear();
+      tlog = prevTlog = null;
+      prevMapLog = prevMapLog2 = null;
+
+      map.clear();
+      if (prevMap != null) prevMap.clear();
+      if (prevMap2 != null) prevMap2.clear();
+
+      numOldRecords = 0;
 
-      // reset lastDataDir for #init()
+      oldDeletes.clear();
+      deleteByQueries.clear();
+
+      // reset lastDataDir for triggering full #init()
       lastDataDir = null;
     }
   }
 
+  private void doClose(TransactionLog theLog) {
+    if (theLog != null) {
+      theLog.deleteOnClose = false;
+      theLog.decref();
+      theLog.forceClose();
+    }
+  }
+
   @Override
   public void close(boolean committed, boolean deleteOnClose) {
     for (CdcrLogReader reader : new ArrayList<>(logPointers.keySet())) {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java Fri Nov  6 20:18:19 2015
@@ -221,12 +221,26 @@ public class BaseCdcrDistributedZkTest e
   }
 
   /**
-   * Returns the number of documents in a given collection
+   * Assert the number of documents in a given collection
    */
-  protected long getNumDocs(String collection) throws SolrServerException, IOException {
+  protected void assertNumDocs(int expectedNumDocs, String collection)
+  throws SolrServerException, IOException, InterruptedException {
     CloudSolrClient client = createCloudClient(collection);
     try {
-      return client.query(new SolrQuery("*:*")).getResults().getNumFound();
+      int cnt = 30; // timeout after 15 seconds
+      AssertionError lastAssertionError = null;
+      while (cnt > 0) {
+        try {
+          assertEquals(expectedNumDocs, client.query(new SolrQuery("*:*")).getResults().getNumFound());
+          return;
+        }
+        catch (AssertionError e) {
+          lastAssertionError = e;
+          cnt--;
+          Thread.sleep(500);
+        }
+      }
+      throw new AssertionError("Timeout while trying to assert number of documents on collection: " + collection, lastAssertionError);
     } finally {
       client.close();
     }
@@ -553,6 +567,7 @@ public class BaseCdcrDistributedZkTest e
 
     // delete the temporary collection - we will create our own collections later
     this.deleteCollection(temporaryCollection);
+    this.waitForCollectionToDisappear(temporaryCollection);
     System.clearProperty("collection");
 
     return nodeNames;

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java Fri Nov  6 20:18:19 2015
@@ -17,6 +17,7 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.lucene.util.LuceneTestCase.Nightly;
 import org.apache.solr.common.SolrInputDocument;
@@ -26,12 +27,9 @@ import org.junit.Test;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 
-@Slow
 @Nightly
 public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest {
 
@@ -41,28 +39,13 @@ public class CdcrReplicationDistributedZ
     super.distribSetUp();
   }
 
-  @Test
-  @ShardsFixed(num = 4)
-  public void doTests() throws Exception {
-    this.doTestDeleteCreateSourceCollection();
-    this.doTestTargetCollectionNotAvailable();
-    this.doTestReplicationStartStop();
-    this.doTestReplicationAfterRestart();
-    this.doTestReplicationAfterLeaderChange();
-    this.doTestUpdateLogSynchronisation();
-    this.doTestBufferOnNonLeader();
-    this.doTestOps();
-    this.doTestBatchAddsWithDelete();
-    this.doTestBatchBoundaries();
-    this.doTestResilienceWithDeleteByQueryOnTarget();
-  }
   /**
    * Checks that the test framework handles properly the creation and deletion of collections and the
    * restart of servers.
    */
-  public void doTestDeleteCreateSourceCollection() throws Exception {
-    this.clearSourceCollection();
-    this.clearTargetCollection();
+  @Test
+  @ShardsFixed(num = 4)
+  public void testDeleteCreateSourceCollection() throws Exception {
     log.info("Indexing documents");
 
     List<SolrInputDocument> docs = new ArrayList<>();
@@ -72,45 +55,44 @@ public class CdcrReplicationDistributedZ
     index(SOURCE_COLLECTION, docs);
     index(TARGET_COLLECTION, docs);
 
-    assertEquals(10, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(10, SOURCE_COLLECTION);
+    assertNumDocs(10, TARGET_COLLECTION);
 
     log.info("Restarting leader @ source_collection:shard1");
 
     this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
 
-    assertEquals(10, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(10, SOURCE_COLLECTION);
+    assertNumDocs(10, TARGET_COLLECTION);
 
     log.info("Clearing source_collection");
 
     this.clearSourceCollection();
 
-    assertEquals(0, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(0, SOURCE_COLLECTION);
+    assertNumDocs(10, TARGET_COLLECTION);
 
     log.info("Restarting leader @ target_collection:shard1");
 
     this.restartServer(shardToLeaderJetty.get(TARGET_COLLECTION).get(SHARD1));
 
-    assertEquals(0, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(0, SOURCE_COLLECTION);
+    assertNumDocs(10, TARGET_COLLECTION);
 
     log.info("Clearing target_collection");
 
     this.clearTargetCollection();
 
-    assertEquals(0, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(0, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(0, SOURCE_COLLECTION);
+    assertNumDocs(0, TARGET_COLLECTION);
 
     assertCollectionExpectations(SOURCE_COLLECTION);
     assertCollectionExpectations(TARGET_COLLECTION);
   }
 
-  public void doTestTargetCollectionNotAvailable() throws Exception {
-    this.clearSourceCollection();
-    this.clearTargetCollection();
-
+  @Test
+  @ShardsFixed(num = 4)
+  public void testTargetCollectionNotAvailable() throws Exception {
     // send start action to first shard
     NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
     NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
@@ -130,23 +112,35 @@ public class CdcrReplicationDistributedZ
     index(SOURCE_COLLECTION, getDoc(id, "e"));
     index(SOURCE_COLLECTION, getDoc(id, "f"));
 
-    assertEquals(6, getNumDocs(SOURCE_COLLECTION));
+    assertNumDocs(6, SOURCE_COLLECTION);
 
-    Thread.sleep(1000); // wait a bit for the replicator thread to be triggered
+    // we need to wait until the replicator thread is triggered
+    int cnt = 15; // timeout after 15 seconds
+    AssertionError lastAssertionError = null;
+    while (cnt > 0) {
+      try {
+        rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.ERRORS);
+        NamedList collections = (NamedList) ((NamedList) rsp.get(CdcrParams.ERRORS)).getVal(0);
+        NamedList errors = (NamedList) collections.get(TARGET_COLLECTION);
+        assertTrue(0 < (Long) errors.get(CdcrParams.CONSECUTIVE_ERRORS));
+        NamedList lastErrors = (NamedList) errors.get(CdcrParams.LAST);
+        assertNotNull(lastErrors);
+        assertTrue(0 < lastErrors.size());
+        return;
+      }
+      catch (AssertionError e) {
+        lastAssertionError = e;
+        cnt--;
+        Thread.sleep(1000);
+      }
+    }
 
-    rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.ERRORS);
-    NamedList collections = (NamedList) ((NamedList) rsp.get(CdcrParams.ERRORS)).getVal(0);
-    NamedList errors = (NamedList) collections.get(TARGET_COLLECTION);
-    assertTrue(0 < (Long) errors.get(CdcrParams.CONSECUTIVE_ERRORS));
-    NamedList lastErrors = (NamedList) errors.get(CdcrParams.LAST);
-    assertNotNull(lastErrors);
-    assertTrue(0 < lastErrors.size());
+    throw new AssertionError("Timeout while trying to assert replication errors", lastAssertionError);
   }
 
-  public void doTestReplicationStartStop() throws Exception {
-    this.clearSourceCollection();
-    this.clearTargetCollection(); // this might log a warning to indicate he was not able to delete the collection (collection was deleted in the previous test)
-
+  @Test
+  @ShardsFixed(num = 4)
+  public void testReplicationStartStop() throws Exception {
     int start = 0;
     List<SolrInputDocument> docs = new ArrayList<>();
     for (; start < 10; start++) {
@@ -154,8 +148,8 @@ public class CdcrReplicationDistributedZ
     }
     index(SOURCE_COLLECTION, docs);
 
-    assertEquals(10, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(0, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(10, SOURCE_COLLECTION);
+    assertNumDocs(0, TARGET_COLLECTION);
 
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
     this.waitForCdcrStateReplication(SOURCE_COLLECTION);
@@ -165,8 +159,8 @@ public class CdcrReplicationDistributedZ
 
     commit(TARGET_COLLECTION);
 
-    assertEquals(10, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(10, SOURCE_COLLECTION);
+    assertNumDocs(10, TARGET_COLLECTION);
 
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
     this.waitForCdcrStateReplication(SOURCE_COLLECTION);
@@ -177,8 +171,8 @@ public class CdcrReplicationDistributedZ
     }
     index(SOURCE_COLLECTION, docs);
 
-    assertEquals(110, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(110, SOURCE_COLLECTION);
+    assertNumDocs(10, TARGET_COLLECTION);
 
     // Start again CDCR, the source cluster should reinitialise its log readers
     // with the latest checkpoints
@@ -191,17 +185,16 @@ public class CdcrReplicationDistributedZ
 
     commit(TARGET_COLLECTION);
 
-    assertEquals(110, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(110, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(110, SOURCE_COLLECTION);
+    assertNumDocs(110, TARGET_COLLECTION);
   }
 
   /**
    * Check that the replication manager is properly restarted after a node failure.
    */
-  public void doTestReplicationAfterRestart() throws Exception {
-    this.clearSourceCollection();
-    this.clearTargetCollection();
-
+  @Test
+  @ShardsFixed(num = 4)
+  public void testReplicationAfterRestart() throws Exception {
     log.info("Starting CDCR");
 
     // send start action to first shard
@@ -219,7 +212,7 @@ public class CdcrReplicationDistributedZ
 
     log.info("Querying source collection");
 
-    assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+    assertNumDocs(10, SOURCE_COLLECTION);
 
     log.info("Waiting for replication");
 
@@ -229,7 +222,7 @@ public class CdcrReplicationDistributedZ
     log.info("Querying target collection");
 
     commit(TARGET_COLLECTION);
-    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(10, TARGET_COLLECTION);
 
     log.info("Restarting shard1");
 
@@ -245,7 +238,7 @@ public class CdcrReplicationDistributedZ
 
     log.info("Querying source collection");
 
-    assertEquals(110, getNumDocs(SOURCE_COLLECTION));
+    assertNumDocs(110, SOURCE_COLLECTION);
 
     log.info("Waiting for replication");
 
@@ -255,7 +248,7 @@ public class CdcrReplicationDistributedZ
     log.info("Querying target collection");
 
     commit(TARGET_COLLECTION);
-    assertEquals(110, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(110, TARGET_COLLECTION);
   }
 
   /**
@@ -263,10 +256,9 @@ public class CdcrReplicationDistributedZ
    * This test also checks that the log readers on the new leaders are initialised with
    * the target's checkpoint.
    */
-  public void doTestReplicationAfterLeaderChange() throws Exception {
-    this.clearSourceCollection();
-    this.clearTargetCollection();
-
+  @Test
+  @ShardsFixed(num = 4)
+  public void testReplicationAfterLeaderChange() throws Exception {
     log.info("Starting CDCR");
 
     // send start action to first shard
@@ -284,7 +276,7 @@ public class CdcrReplicationDistributedZ
 
     log.info("Querying source collection");
 
-    assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+    assertNumDocs(10, SOURCE_COLLECTION);
 
     log.info("Waiting for replication");
 
@@ -294,7 +286,7 @@ public class CdcrReplicationDistributedZ
     log.info("Querying target collection");
 
     commit(TARGET_COLLECTION);
-    assertEquals(10, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(10, TARGET_COLLECTION);
 
     log.info("Restarting target leaders");
 
@@ -327,7 +319,7 @@ public class CdcrReplicationDistributedZ
 
     log.info("Querying source collection");
 
-    assertEquals(110, getNumDocs(SOURCE_COLLECTION));
+    assertNumDocs(110, SOURCE_COLLECTION);
 
     log.info("Waiting for replication");
 
@@ -337,24 +329,20 @@ public class CdcrReplicationDistributedZ
     log.info("Querying target collection");
 
     commit(TARGET_COLLECTION);
-    assertEquals(110, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(110, TARGET_COLLECTION);
   }
 
   /**
    * Check that the update logs are synchronised between leader and non-leader nodes
+   * when CDCR is on and buffer is disabled
    */
-  public void doTestUpdateLogSynchronisation() throws Exception {
-    this.clearSourceCollection();
-    this.clearTargetCollection();
-
-    // buffering is enabled by default, so disable it
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
-    this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
+  @Test
+  @ShardsFixed(num = 4)
+  public void testUpdateLogSynchronisation() throws Exception {
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
     this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
-    for (int i = 0; i < 50; i++) {
+    for (int i = 0; i < 100; i++) {
       // will perform a commit for every document and will create one tlog file per commit
       index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
     }
@@ -365,34 +353,39 @@ public class CdcrReplicationDistributedZ
 
     commit(TARGET_COLLECTION);
 
-    // Stop CDCR
-    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
+    // Check that the replication was done properly
+    assertNumDocs(100, SOURCE_COLLECTION);
+    assertNumDocs(100, TARGET_COLLECTION);
+
+    // Get the number of tlog files on the replicas (should be equal to the number of documents indexed)
+    int nTlogs = getNumberOfTlogFilesOnReplicas(SOURCE_COLLECTION);
+
+    // Disable the buffer - ulog synch should start on non-leader nodes
+    this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
     this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
-    assertEquals(50, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(50, getNumDocs(TARGET_COLLECTION));
+    int cnt = 15; // timeout after 15 seconds
+    while (cnt > 0) {
+      // Index a new document with a commit to trigger update log cleaning
+      index(SOURCE_COLLECTION, getDoc(id, Integer.toString(50)));
 
-    // some of the tlogs should be trimmed, we must have less than 50 tlog files on both leader and non-leader
-    assertNumberOfTlogFiles(SOURCE_COLLECTION, 50);
+      // Check the update logs on non-leader nodes, the number of tlog files should decrease
+      int n = getNumberOfTlogFilesOnReplicas(SOURCE_COLLECTION);
+      if (n < nTlogs) return;
 
-    for (int i = 50; i < 100; i++) {
-      index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
+      cnt--;
+      Thread.sleep(1000);
     }
 
-    // at this stage, we should have created one tlog file per document, and some of them must have been cleaned on the
-    // leader since we are not buffering and replication is stopped, (we should have exactly 10 tlog files on the leader
-    // and 11 on the non-leader)
-    // the non-leader must have synchronised its update log with its leader
-    assertNumberOfTlogFiles(SOURCE_COLLECTION, 50);
+    throw new AssertionError("Timeout while trying to assert update logs @ source_collection");
   }
 
   /**
    * Check that the buffer is always activated on non-leader nodes.
    */
-  public void doTestBufferOnNonLeader() throws Exception {
-    this.clearSourceCollection();
-    this.clearTargetCollection();
-
+  @Test
+  @ShardsFixed(num = 4)
+  public void testBufferOnNonLeader() throws Exception {
     // buffering is enabled by default, so disable it
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
     this.waitForCdcrStateReplication(SOURCE_COLLECTION);
@@ -416,20 +409,20 @@ public class CdcrReplicationDistributedZ
     this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
     this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
 
+    // Commit to make the documents visible on the target
     commit(TARGET_COLLECTION);
 
     // If the non-leader node were buffering updates, then the replication must be complete
-    assertEquals(200, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(200, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(200, SOURCE_COLLECTION);
+    assertNumDocs(200, TARGET_COLLECTION);
   }
 
   /**
    * Check the ops statistics.
    */
-  public void doTestOps() throws Exception {
-    this.clearSourceCollection();
-    this.clearTargetCollection();
-
+  @Test
+  @ShardsFixed(num = 4)
+  public void testOps() throws Exception {
     // Index documents
     List<SolrInputDocument> docs = new ArrayList<>();
     for (int i = 0; i < 200; i++) {
@@ -460,10 +453,9 @@ public class CdcrReplicationDistributedZ
   /**
    * Check that batch updates with deletes
    */
-  public void doTestBatchAddsWithDelete() throws Exception {
-    this.clearSourceCollection();
-    this.clearTargetCollection();
-
+  @Test
+  @ShardsFixed(num = 4)
+  public void testBatchAddsWithDelete() throws Exception {
     // Index 50 documents
     int start = 0;
     List<SolrInputDocument> docs = new ArrayList<>();
@@ -509,14 +501,16 @@ public class CdcrReplicationDistributedZ
     commit(TARGET_COLLECTION);
 
     // If the non-leader node were buffering updates, then the replication must be complete
-    assertEquals(59, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(59, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(59, SOURCE_COLLECTION);
+    assertNumDocs(59, TARGET_COLLECTION);
   }
 
   /**
    * Checks that batches are correctly constructed when batch boundaries are reached.
    */
-  public void doTestBatchBoundaries() throws Exception {
+  @Test
+  @ShardsFixed(num = 4)
+  public void testBatchBoundaries() throws Exception {
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
     this.waitForCdcrStateReplication(SOURCE_COLLECTION);
 
@@ -528,24 +522,23 @@ public class CdcrReplicationDistributedZ
     }
     index(SOURCE_COLLECTION, docs);
 
-    assertEquals(128, getNumDocs(SOURCE_COLLECTION));
+    assertNumDocs(128, SOURCE_COLLECTION);
 
     this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
     this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
 
     commit(TARGET_COLLECTION);
 
-    assertEquals(128, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(128, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(128, SOURCE_COLLECTION);
+    assertNumDocs(128, TARGET_COLLECTION);
   }
 
   /**
    * Check resilience of replication with delete by query executed on targets
    */
-  public void doTestResilienceWithDeleteByQueryOnTarget() throws Exception {
-    this.clearSourceCollection();
-    this.clearTargetCollection();
-
+  @Test
+  @ShardsFixed(num = 4)
+  public void testResilienceWithDeleteByQueryOnTarget() throws Exception {
     // Index 50 documents
     int start = 0;
     List<SolrInputDocument> docs = new ArrayList<>();
@@ -565,14 +558,14 @@ public class CdcrReplicationDistributedZ
     commit(TARGET_COLLECTION);
 
     // If the non-leader node were buffering updates, then the replication must be complete
-    assertEquals(50, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(50, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(50, SOURCE_COLLECTION);
+    assertNumDocs(50, TARGET_COLLECTION);
 
     deleteByQuery(SOURCE_COLLECTION, "*:*");
     deleteByQuery(TARGET_COLLECTION, "*:*");
 
-    assertEquals(0, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(0, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(0, SOURCE_COLLECTION);
+    assertNumDocs(0, TARGET_COLLECTION);
 
     docs.clear();
     for (; start < 100; start++) {
@@ -586,13 +579,13 @@ public class CdcrReplicationDistributedZ
 
     commit(TARGET_COLLECTION);
 
-    assertEquals(50, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(50, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(50, SOURCE_COLLECTION);
+    assertNumDocs(50, TARGET_COLLECTION);
 
     deleteByQuery(TARGET_COLLECTION, "*:*");
 
-    assertEquals(50, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(0, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(50, SOURCE_COLLECTION);
+    assertNumDocs(0, TARGET_COLLECTION);
 
     // Restart CDCR
     this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
@@ -612,79 +605,32 @@ public class CdcrReplicationDistributedZ
 
     commit(TARGET_COLLECTION);
 
-    assertEquals(100, getNumDocs(SOURCE_COLLECTION));
-    assertEquals(50, getNumDocs(TARGET_COLLECTION));
+    assertNumDocs(100, SOURCE_COLLECTION);
+    assertNumDocs(50, TARGET_COLLECTION);
   }
 
-  /**
-   * Asserts the number of transaction logs across all the shards. Since the cleaning of the update logs
-   * is not immediate on the slave nodes (it relies on the update log synchronizer that is executed every second),
-   * it will retry until the assert is successful or until the timeout.
-   */
-  protected void assertNumberOfTlogFiles(String collection, int maxNumberOfTLogs) throws Exception {
-    int cnt = 15; // timeout after 15 seconds
-    AssertionError lastAssertionError = null;
-
-    while (cnt > 0) {
-      try {
-        // Fire a DeleteById query with a commit to trigger update log cleaning on the non-leader nodes
-        List<String> ids = new ArrayList<>();
-        ids.add("_NON_EXISTING_ID_");
-        deleteById(collection, ids);
-
-        // Check the update logs
-        this._assertNumberOfTlogFiles(collection, maxNumberOfTLogs);
-        return;
-      }
-      catch (AssertionError e) {
-        lastAssertionError = e;
-        cnt--;
-        Thread.sleep(1000);
-      }
+  private int numberOfFiles(String dir) {
+    File file = new File(dir);
+    if (!file.isDirectory()) {
+      assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
     }
-
-    throw new AssertionError("Timeout while trying to assert update logs @ collection="+collection, lastAssertionError);
+    log.debug("Update log dir {} contains: {}", dir, file.listFiles());
+    return file.listFiles().length;
   }
 
-  /**
-   * Asserts the number of transaction logs across all the shards
-   */
-  private void _assertNumberOfTlogFiles(String collection, int maxNumberOfTLogs) throws Exception {
+  private int getNumberOfTlogFilesOnReplicas(String collection) throws Exception {
     CollectionInfo info = collectInfo(collection);
     Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
 
-    int leaderLogs = 0;
-    ArrayList<Integer> replicasLogs = new ArrayList<>(Collections.nCopies(replicationFactor - 1, 0));
+    int count = 0;
 
     for (String shard : shardToCoresMap.keySet()) {
-      leaderLogs += numberOfFiles(info.getLeader(shard).ulogDir);
       for (int i = 0; i < replicationFactor - 1; i++) {
-        replicasLogs.set(i, replicasLogs.get(i) + numberOfFiles(info.getReplicas(shard).get(i).ulogDir));
+        count += numberOfFiles(info.getReplicas(shard).get(i).ulogDir);
       }
     }
 
-    for (Integer replicaLogs : replicasLogs) {
-      log.info("Number of logs in update log on leader {} and on replica {}", leaderLogs, replicaLogs);
-
-      // replica logs must be always equal or superior to leader logs
-      assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is different than on leader: %d.",
-          replicaLogs, leaderLogs), leaderLogs <= replicaLogs);
-
-      assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on leader: %d is superior to: %d.",
-          leaderLogs, maxNumberOfTLogs), maxNumberOfTLogs >= leaderLogs);
-
-      assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is superior to: %d.",
-          replicaLogs, maxNumberOfTLogs), maxNumberOfTLogs >= replicaLogs);
-    }
-  }
-
-  private int numberOfFiles(String dir) {
-    File file = new File(dir);
-    if (!file.isDirectory()) {
-      assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
-    }
-    log.debug("Update log dir {} contains: {}", dir, file.listFiles());
-    return file.listFiles().length;
+    return count;
   }
 
 }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java Fri Nov  6 20:18:19 2015
@@ -20,7 +20,7 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
-import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.LuceneTestCase.Nightly;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.common.SolrInputDocument;
 import org.junit.Test;
@@ -31,7 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-@Slow
+@Nightly
 public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
 
   @Override
@@ -44,21 +44,13 @@ public class CdcrReplicationHandlerTest
     super.distribSetUp();
   }
 
-  @Test
-  @ShardsFixed(num = 2)
-  public void doTest() throws Exception {
-    this.doTestFullReplication();
-    this.doTestPartialReplication();
-    this.doTestPartialReplicationWithTruncatedTlog();
-    this.doTestPartialReplicationAfterPeerSync();
-  }
-
   /**
    * Test the scenario where the slave is killed from the start. The replication
    * strategy should fetch all the missing tlog files from the leader.
    */
-  public void doTestFullReplication() throws Exception {
-    this.clearSourceCollection();
+  @Test
+  @ShardsFixed(num = 2)
+  public void testFullReplication() throws Exception {
     List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
     ChaosMonkey.stop(slaves.get(0).jetty);
 
@@ -70,7 +62,7 @@ public class CdcrReplicationHandlerTest
       index(SOURCE_COLLECTION, docs);
     }
 
-    assertEquals(100, getNumDocs(SOURCE_COLLECTION));
+    assertNumDocs(100, SOURCE_COLLECTION);
 
     // Restart the slave node to trigger Replication strategy
     this.restartServer(slaves.get(0));
@@ -82,8 +74,9 @@ public class CdcrReplicationHandlerTest
    * Test the scenario where the slave is killed before receiving all the documents. The replication
    * strategy should fetch all the missing tlog files from the leader.
    */
-  public void doTestPartialReplication() throws Exception {
-    this.clearSourceCollection();
+  @Test
+  @ShardsFixed(num = 2)
+  public void testPartialReplication() throws Exception {
     for (int i = 0; i < 5; i++) {
       List<SolrInputDocument> docs = new ArrayList<>();
       for (int j = i * 20; j < (i * 20) + 20; j++) {
@@ -103,7 +96,7 @@ public class CdcrReplicationHandlerTest
       index(SOURCE_COLLECTION, docs);
     }
 
-    assertEquals(200, getNumDocs(SOURCE_COLLECTION));
+    assertNumDocs(200, SOURCE_COLLECTION);
 
     // Restart the slave node to trigger Replication strategy
     this.restartServer(slaves.get(0));
@@ -117,8 +110,9 @@ public class CdcrReplicationHandlerTest
    * file on the slave node. The replication strategy should detect this truncated file, and fetch the
    * non-truncated file from the leader.
    */
-  public void doTestPartialReplicationWithTruncatedTlog() throws Exception {
-    this.clearSourceCollection();
+  @Test
+  @ShardsFixed(num = 2)
+  public void testPartialReplicationWithTruncatedTlog() throws Exception {
     CloudSolrClient client = createCloudClient(SOURCE_COLLECTION);
     List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
 
@@ -139,7 +133,7 @@ public class CdcrReplicationHandlerTest
       client.close();
     }
 
-    assertEquals(200, getNumDocs(SOURCE_COLLECTION));
+    assertNumDocs(200, SOURCE_COLLECTION);
 
     // Restart the slave node to trigger Replication recovery
     this.restartServer(slaves.get(0));
@@ -154,8 +148,9 @@ public class CdcrReplicationHandlerTest
    * If a Replication strategy occurs at a later stage, it should remove this tlog file generated by PeerSync
    * and fetch the corresponding tlog files from the leader.
    */
-  public void doTestPartialReplicationAfterPeerSync() throws Exception {
-    this.clearSourceCollection();
+  @Test
+  @ShardsFixed(num = 2)
+  public void testPartialReplicationAfterPeerSync() throws Exception {
     for (int i = 0; i < 5; i++) {
       List<SolrInputDocument> docs = new ArrayList<>();
       for (int j = i * 10; j < (i * 10) + 10; j++) {
@@ -175,7 +170,7 @@ public class CdcrReplicationHandlerTest
       index(SOURCE_COLLECTION, docs);
     }
 
-    assertEquals(100, getNumDocs(SOURCE_COLLECTION));
+    assertNumDocs(100, SOURCE_COLLECTION);
 
     // Restart the slave node to trigger PeerSync recovery
     // (the update windows between leader and slave is small enough)

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java Fri Nov  6 20:18:19 2015
@@ -17,12 +17,12 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
-import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.LuceneTestCase.Nightly;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.handler.CdcrParams;
 import org.junit.Test;
 
-@Slow
+@Nightly
 public class CdcrRequestHandlerTest extends BaseCdcrDistributedZkTest {
 
   @Override
@@ -32,16 +32,10 @@ public class CdcrRequestHandlerTest exte
     super.distribSetUp();
   }
 
+  // check that the life-cycle state is properly synchronised across nodes
   @Test
   @ShardsFixed(num = 2)
-  public void doTest() throws Exception {
-    this.doTestLifeCycleActions();
-    this.doTestCheckpointActions();
-    this.doTestBufferActions();
-  }
-
-  // check that the life-cycle state is properly synchronised across nodes
-  public void doTestLifeCycleActions() throws Exception {
+  public void testLifeCycleActions() throws Exception {
     // check initial status
     this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
 
@@ -69,7 +63,9 @@ public class CdcrRequestHandlerTest exte
   }
 
   // check the checkpoint API
-  public void doTestCheckpointActions() throws Exception {
+  @Test
+  @ShardsFixed(num = 2)
+  public void testCheckpointActions() throws Exception {
     // initial request on an empty index, must return -1
     NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
     assertEquals(-1l, rsp.get(CdcrParams.CHECKPOINT));
@@ -125,7 +121,9 @@ public class CdcrRequestHandlerTest exte
   }
 
   // check that the buffer state is properly synchronised across nodes
-  public void doTestBufferActions() throws Exception {
+  @Test
+  @ShardsFixed(num = 2)
+  public void testBufferActions() throws Exception {
     // check initial status
     this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
 

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java Fri Nov  6 20:18:19 2015
@@ -28,7 +28,7 @@ import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.LuceneTestCase.Nightly;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.request.SolrQueryRequest;
@@ -40,7 +40,7 @@ import org.noggit.ObjectBuilder;
 
 import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
 
-@Slow
+@Nightly
 public class CdcrUpdateLogTest extends SolrTestCaseJ4 {
 
   // means that we've seen the leader and have version info (i.e. we are a non-leader replica)