You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/11/06 21:18:19 UTC
svn commit: r1713022 - in /lucene/dev/trunk/solr/core/src:
java/org/apache/solr/handler/ java/org/apache/solr/update/
test/org/apache/solr/cloud/ test/org/apache/solr/update/
Author: erick
Date: Fri Nov 6 20:18:19 2015
New Revision: 1713022
URL: http://svn.apache.org/viewvc?rev=1713022&view=rev
Log:
SOLR-6273: testfix7, improves test pass ratio significantly
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrBufferStateManager.java Fri Nov 6 20:18:19 2015
@@ -118,12 +118,14 @@ class CdcrBufferStateManager extends Cdc
try {
if (!zkClient.exists(this.getZnodePath(), true)) {
if (!zkClient.exists(this.getZnodeBase(), true)) {
- zkClient.makePath(this.getZnodeBase(), CreateMode.PERSISTENT, true);
+ zkClient.makePath(this.getZnodeBase(), null, CreateMode.PERSISTENT, null, false, true); // Should be a no-op if node exists
}
zkClient.create(this.getZnodePath(), DEFAULT_STATE.getBytes(), CreateMode.PERSISTENT, true);
log.info("Created znode {}", this.getZnodePath());
}
- } catch (KeeperException | InterruptedException e) {
+ } catch (KeeperException.NodeExistsException ne) {
+ // Someone got in first and created the node.
+ } catch (KeeperException | InterruptedException e) {
log.warn("Failed to create CDCR buffer state node", e);
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrProcessStateManager.java Fri Nov 6 20:18:19 2015
@@ -116,12 +116,14 @@ class CdcrProcessStateManager extends Cd
SolrZkClient zkClient = core.getCoreDescriptor().getCoreContainer().getZkController().getZkClient();
try {
if (!zkClient.exists(this.getZnodePath(), true)) {
- if (!zkClient.exists(this.getZnodeBase(), true)) {
- zkClient.makePath(this.getZnodeBase(), CreateMode.PERSISTENT, true);
+ if (!zkClient.exists(this.getZnodeBase(), true)) { // Should be a no-op if the node exists
+ zkClient.makePath(this.getZnodeBase(), null, CreateMode.PERSISTENT, null, false, true);
}
zkClient.create(this.getZnodePath(), DEFAULT_STATE.getBytes(), CreateMode.PERSISTENT, true);
log.info("Created znode {}", this.getZnodePath());
}
+ } catch (KeeperException.NodeExistsException ne) {
+ // Someone got in first and created the node.
} catch (KeeperException | InterruptedException e) {
log.warn("Failed to create CDCR process state node", e);
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java Fri Nov 6 20:18:19 2015
@@ -491,7 +491,7 @@ public class CdcrRequestHandler extends
}
}
- log.info("Returning the lowest last processed version {} @ {}:{}", lastProcessedVersion, collectionName, shard);
+ log.debug("Returning the lowest last processed version {} @ {}:{}", lastProcessedVersion, collectionName, shard);
rsp.add(CdcrParams.LAST_PROCESSED_VERSION, lastProcessedVersion);
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java Fri Nov 6 20:18:19 2015
@@ -27,8 +27,11 @@ import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -284,6 +287,7 @@ public class IndexFetcher {
Directory indexDir = null;
String indexDirPath;
boolean deleteTmpIdxDir = true;
+ File tmpTlogDir = null;
if (!solrCore.getSolrCoreState().getLastReplicateIndexSuccess()) {
// if the last replication was not a success, we force a full replication
@@ -379,6 +383,11 @@ public class IndexFetcher {
tmpIndexDir = solrCore.getDirectoryFactory().get(tmpIndex, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
+ // tmp dir for tlog files
+ if (tlogFilesToDownload != null) {
+ tmpTlogDir = new File(solrCore.getUpdateHandler().getUpdateLog().getLogDir(), "tlog." + timestamp);
+ }
+
// cindex dir...
indexDirPath = solrCore.getIndexDir();
indexDir = solrCore.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
@@ -435,7 +444,8 @@ public class IndexFetcher {
long bytesDownloaded = downloadIndexFiles(isFullCopyNeeded, indexDir, tmpIndexDir, latestGeneration);
if (tlogFilesToDownload != null) {
- bytesDownloaded += downloadTlogFiles(timestamp, latestGeneration);
+ bytesDownloaded += downloadTlogFiles(tmpTlogDir, latestGeneration);
+ reloadCore = true; // reload update log
}
final long timeTakenSeconds = getReplicationTimeElapsed();
final Long bytesDownloadedPerSecond = (timeTakenSeconds != 0 ? new Long(bytesDownloaded/timeTakenSeconds) : null);
@@ -452,6 +462,10 @@ public class IndexFetcher {
} else {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
+ if (tlogFilesToDownload != null) {
+ // move tlog files and refresh ulog only if we successfully installed a new index
+ successfulInstall &= moveTlogFiles(tmpTlogDir);
+ }
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
@@ -476,6 +490,10 @@ public class IndexFetcher {
} else {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
+ if (tlogFilesToDownload != null) {
+ // move tlog files and refresh ulog only if we successfully installed a new index
+ successfulInstall &= moveTlogFiles(tmpTlogDir);
+ }
if (successfulInstall) {
logReplicationTimeAndConfFiles(modifiedConfFiles,
successfulInstall);
@@ -489,7 +507,7 @@ public class IndexFetcher {
// we must reload the core after we open the IW back up
if (successfulInstall && (reloadCore || forceCoreReload)) {
- LOG.info("Reloading SolrCore {}", solrCore.getName());
+ LOG.info("Reloading SolrCore {}", solrCore.getName());
reloadCore();
}
@@ -511,7 +529,7 @@ public class IndexFetcher {
}
if (!isFullCopyNeeded && !forceReplication && !successfulInstall) {
- cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, successfulInstall);
+ cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, tmpTlogDir, successfulInstall);
cleanupDone = true;
// we try with a full copy of the index
LOG.warn(
@@ -534,13 +552,13 @@ public class IndexFetcher {
}
} finally {
if (!cleanupDone) {
- cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, successfulInstall);
+ cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, tmpTlogDir, successfulInstall);
}
}
}
private void cleanup(final SolrCore core, Directory tmpIndexDir,
- Directory indexDir, boolean deleteTmpIdxDir, boolean successfulInstall) throws IOException {
+ Directory indexDir, boolean deleteTmpIdxDir, File tmpTlogDir, boolean successfulInstall) throws IOException {
try {
if (!successfulInstall) {
try {
@@ -552,7 +570,7 @@ public class IndexFetcher {
core.getUpdateHandler().getSolrCoreState().setLastReplicateIndexSuccess(successfulInstall);
- filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
+ filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = tlogFilesToDownload = tlogFilesDownloaded = null;
markReplicationStop();
dirFileFetcher = null;
localFileFetcher = null;
@@ -577,6 +595,10 @@ public class IndexFetcher {
if (indexDir != null) {
core.getDirectoryFactory().release(indexDir);
}
+
+ if (tmpTlogDir != null) {
+ delTree(tmpTlogDir);
+ }
}
}
@@ -792,35 +814,26 @@ public class IndexFetcher {
}
}
- private long downloadTlogFiles(String timestamp, long latestGeneration) throws Exception {
- UpdateLog ulog = solrCore.getUpdateHandler().getUpdateLog();
-
+ /**
+ * Download all the tlog files to the temp tlog directory.
+ */
+ private long downloadTlogFiles(File tmpTlogDir, long latestGeneration) throws Exception {
LOG.info("Starting download of tlog files from master: " + tlogFilesToDownload);
- tlogFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
+ tlogFilesDownloaded = Collections.synchronizedList(new ArrayList<>());
long bytesDownloaded = 0;
- File tmpTlogDir = new File(ulog.getLogDir(), "tlog." + getDateAsStr(new Date()));
- try {
- boolean status = tmpTlogDir.mkdirs();
- if (!status) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Failed to create temporary tlog folder: " + tmpTlogDir.getName());
- }
- for (Map<String, Object> file : tlogFilesToDownload) {
- String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
- localFileFetcher = new LocalFsFileFetcher(tmpTlogDir, file, saveAs, TLOG_FILE, latestGeneration);
- currentFile = file;
- localFileFetcher.fetchFile();
- bytesDownloaded += localFileFetcher.getBytesDownloaded();
- tlogFilesDownloaded.add(new HashMap<>(file));
- }
- // this is called before copying the files to the original conf dir
- // so that if there is an exception avoid corrupting the original files.
- terminateAndWaitFsyncService();
- ((CdcrUpdateLog) ulog).reset(); // reset the update log before copying the new tlog directory
- copyTmpTlogFiles2Tlog(tmpTlogDir, timestamp);
- ulog.init(solrCore.getUpdateHandler(), solrCore); // re-initialise the update log with the new directory
- } finally {
- delTree(tmpTlogDir);
+
+ boolean status = tmpTlogDir.mkdirs();
+ if (!status) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Failed to create temporary tlog folder: " + tmpTlogDir.getName());
+ }
+ for (Map<String, Object> file : tlogFilesToDownload) {
+ String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
+ localFileFetcher = new LocalFsFileFetcher(tmpTlogDir, file, saveAs, TLOG_FILE, latestGeneration);
+ currentFile = file;
+ localFileFetcher.fetchFile();
+ bytesDownloaded += localFileFetcher.getBytesDownloaded();
+ tlogFilesDownloaded.add(new HashMap<>(file));
}
return bytesDownloaded;
}
@@ -1032,6 +1045,22 @@ public class IndexFetcher {
}
/**
+ * Copy all the tlog files from the temp tlog dir to the actual tlog dir, and reset
+ * the {@link UpdateLog}. The copy will try to preserve the original tlog directory
+ * if the copy fails.
+ */
+ private boolean moveTlogFiles(File tmpTlogDir) {
+ UpdateLog ulog = solrCore.getUpdateHandler().getUpdateLog();
+
+ // reset the update log before copying the new tlog directory, it will be reinitialized
+ // during the core reload
+ ((CdcrUpdateLog) ulog).reset();
+ // try to move the temp tlog files to the tlog directory
+ if (!copyTmpTlogFiles2Tlog(tmpTlogDir)) return false;
+ return true;
+ }
+
+ /**
* Make file list
*/
private List<File> makeTmpConfDirFileList(File dir, List<File> fileList) {
@@ -1085,27 +1114,40 @@ public class IndexFetcher {
}
/**
- * The tlog files are copied from the tmp dir to the tlog dir by renaming the directory if possible.
- * A backup of the old file is maintained.
+ * The tlog files are moved from the tmp dir to the tlog dir as an atomic filesystem operation.
+ * A backup of the old directory is maintained. If the directory move fails, it will try to revert back the original
+ * tlog directory.
*/
- private void copyTmpTlogFiles2Tlog(File tmpTlogDir, String timestamp) {
- File tlogDir = new File(solrCore.getUpdateHandler().getUpdateLog().getLogDir());
- File backupTlogDir = new File(tlogDir.getParent(), UpdateLog.TLOG_NAME + "." + timestamp);
+ private boolean copyTmpTlogFiles2Tlog(File tmpTlogDir) {
+ Path tlogDir = FileSystems.getDefault().getPath(solrCore.getUpdateHandler().getUpdateLog().getLogDir());
+ Path backupTlogDir = FileSystems.getDefault().getPath(tlogDir.getParent().toAbsolutePath().toString(), tmpTlogDir.getName());
try {
- org.apache.commons.io.FileUtils.moveDirectory(tlogDir, backupTlogDir);
+ Files.move(tlogDir, backupTlogDir, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Unable to rename: " + tlogDir + " to: " + backupTlogDir, e);
+ SolrException.log(LOG, "Unable to rename: " + tlogDir + " to: " + backupTlogDir, e);
+ return false;
}
+ Path src = FileSystems.getDefault().getPath(backupTlogDir.toAbsolutePath().toString(), tmpTlogDir.getName());
try {
- tmpTlogDir = new File(backupTlogDir, tmpTlogDir.getName());
- org.apache.commons.io.FileUtils.moveDirectory(tmpTlogDir, tlogDir);
+ Files.move(src, tlogDir, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Unable to rename: " + tmpTlogDir + " to: " + tlogDir, e);
+ SolrException.log(LOG, "Unable to rename: " + src + " to: " + tlogDir, e);
+
+ // In case of error, try to revert back the original tlog directory
+ try {
+ Files.move(backupTlogDir, tlogDir, StandardCopyOption.ATOMIC_MOVE);
+ } catch (IOException e2) {
+ // bad, we were not able to revert back the original tlog directory
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unable to rename: " + backupTlogDir + " to: " + tlogDir);
+ }
+
+ return false;
}
+
+ return true;
}
private String getDateAsStr(Date d) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java Fri Nov 6 20:18:19 2015
@@ -248,20 +248,44 @@ public class CdcrUpdateLog extends Updat
for (CdcrLogReader reader : new ArrayList<>(logPointers.keySet())) {
reader.close();
}
+ logPointers.clear();
// Close and clear logs
+ doClose(prevTlog);
+ doClose(tlog);
+
for (TransactionLog log : logs) {
- log.deleteOnClose = false;
- log.decref();
- log.forceClose();
+ if (log == prevTlog || log == tlog) continue;
+ doClose(log);
}
+
logs.clear();
+ newestLogsOnStartup.clear();
+ tlog = prevTlog = null;
+ prevMapLog = prevMapLog2 = null;
+
+ map.clear();
+ if (prevMap != null) prevMap.clear();
+ if (prevMap2 != null) prevMap2.clear();
+
+ numOldRecords = 0;
- // reset lastDataDir for #init()
+ oldDeletes.clear();
+ deleteByQueries.clear();
+
+ // reset lastDataDir for triggering full #init()
lastDataDir = null;
}
}
+ private void doClose(TransactionLog theLog) {
+ if (theLog != null) {
+ theLog.deleteOnClose = false;
+ theLog.decref();
+ theLog.forceClose();
+ }
+ }
+
@Override
public void close(boolean committed, boolean deleteOnClose) {
for (CdcrLogReader reader : new ArrayList<>(logPointers.keySet())) {
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java Fri Nov 6 20:18:19 2015
@@ -221,12 +221,26 @@ public class BaseCdcrDistributedZkTest e
}
/**
- * Returns the number of documents in a given collection
+ * Assert the number of documents in a given collection
*/
- protected long getNumDocs(String collection) throws SolrServerException, IOException {
+ protected void assertNumDocs(int expectedNumDocs, String collection)
+ throws SolrServerException, IOException, InterruptedException {
CloudSolrClient client = createCloudClient(collection);
try {
- return client.query(new SolrQuery("*:*")).getResults().getNumFound();
+ int cnt = 30; // timeout after 15 seconds
+ AssertionError lastAssertionError = null;
+ while (cnt > 0) {
+ try {
+ assertEquals(expectedNumDocs, client.query(new SolrQuery("*:*")).getResults().getNumFound());
+ return;
+ }
+ catch (AssertionError e) {
+ lastAssertionError = e;
+ cnt--;
+ Thread.sleep(500);
+ }
+ }
+ throw new AssertionError("Timeout while trying to assert number of documents on collection: " + collection, lastAssertionError);
} finally {
client.close();
}
@@ -553,6 +567,7 @@ public class BaseCdcrDistributedZkTest e
// delete the temporary collection - we will create our own collections later
this.deleteCollection(temporaryCollection);
+ this.waitForCollectionToDisappear(temporaryCollection);
System.clearProperty("collection");
return nodeNames;
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java Fri Nov 6 20:18:19 2015
@@ -17,6 +17,7 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.common.SolrInputDocument;
@@ -26,12 +27,9 @@ import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
-@Slow
@Nightly
public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest {
@@ -41,28 +39,13 @@ public class CdcrReplicationDistributedZ
super.distribSetUp();
}
- @Test
- @ShardsFixed(num = 4)
- public void doTests() throws Exception {
- this.doTestDeleteCreateSourceCollection();
- this.doTestTargetCollectionNotAvailable();
- this.doTestReplicationStartStop();
- this.doTestReplicationAfterRestart();
- this.doTestReplicationAfterLeaderChange();
- this.doTestUpdateLogSynchronisation();
- this.doTestBufferOnNonLeader();
- this.doTestOps();
- this.doTestBatchAddsWithDelete();
- this.doTestBatchBoundaries();
- this.doTestResilienceWithDeleteByQueryOnTarget();
- }
/**
* Checks that the test framework handles properly the creation and deletion of collections and the
* restart of servers.
*/
- public void doTestDeleteCreateSourceCollection() throws Exception {
- this.clearSourceCollection();
- this.clearTargetCollection();
+ @Test
+ @ShardsFixed(num = 4)
+ public void testDeleteCreateSourceCollection() throws Exception {
log.info("Indexing documents");
List<SolrInputDocument> docs = new ArrayList<>();
@@ -72,45 +55,44 @@ public class CdcrReplicationDistributedZ
index(SOURCE_COLLECTION, docs);
index(TARGET_COLLECTION, docs);
- assertEquals(10, getNumDocs(SOURCE_COLLECTION));
- assertEquals(10, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(10, SOURCE_COLLECTION);
+ assertNumDocs(10, TARGET_COLLECTION);
log.info("Restarting leader @ source_collection:shard1");
this.restartServer(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1));
- assertEquals(10, getNumDocs(SOURCE_COLLECTION));
- assertEquals(10, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(10, SOURCE_COLLECTION);
+ assertNumDocs(10, TARGET_COLLECTION);
log.info("Clearing source_collection");
this.clearSourceCollection();
- assertEquals(0, getNumDocs(SOURCE_COLLECTION));
- assertEquals(10, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(0, SOURCE_COLLECTION);
+ assertNumDocs(10, TARGET_COLLECTION);
log.info("Restarting leader @ target_collection:shard1");
this.restartServer(shardToLeaderJetty.get(TARGET_COLLECTION).get(SHARD1));
- assertEquals(0, getNumDocs(SOURCE_COLLECTION));
- assertEquals(10, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(0, SOURCE_COLLECTION);
+ assertNumDocs(10, TARGET_COLLECTION);
log.info("Clearing target_collection");
this.clearTargetCollection();
- assertEquals(0, getNumDocs(SOURCE_COLLECTION));
- assertEquals(0, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(0, SOURCE_COLLECTION);
+ assertNumDocs(0, TARGET_COLLECTION);
assertCollectionExpectations(SOURCE_COLLECTION);
assertCollectionExpectations(TARGET_COLLECTION);
}
- public void doTestTargetCollectionNotAvailable() throws Exception {
- this.clearSourceCollection();
- this.clearTargetCollection();
-
+ @Test
+ @ShardsFixed(num = 4)
+ public void testTargetCollectionNotAvailable() throws Exception {
// send start action to first shard
NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
@@ -130,23 +112,35 @@ public class CdcrReplicationDistributedZ
index(SOURCE_COLLECTION, getDoc(id, "e"));
index(SOURCE_COLLECTION, getDoc(id, "f"));
- assertEquals(6, getNumDocs(SOURCE_COLLECTION));
+ assertNumDocs(6, SOURCE_COLLECTION);
- Thread.sleep(1000); // wait a bit for the replicator thread to be triggered
+ // we need to wait until the replicator thread is triggered
+ int cnt = 15; // timeout after 15 seconds
+ AssertionError lastAssertionError = null;
+ while (cnt > 0) {
+ try {
+ rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.ERRORS);
+ NamedList collections = (NamedList) ((NamedList) rsp.get(CdcrParams.ERRORS)).getVal(0);
+ NamedList errors = (NamedList) collections.get(TARGET_COLLECTION);
+ assertTrue(0 < (Long) errors.get(CdcrParams.CONSECUTIVE_ERRORS));
+ NamedList lastErrors = (NamedList) errors.get(CdcrParams.LAST);
+ assertNotNull(lastErrors);
+ assertTrue(0 < lastErrors.size());
+ return;
+ }
+ catch (AssertionError e) {
+ lastAssertionError = e;
+ cnt--;
+ Thread.sleep(1000);
+ }
+ }
- rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD2), CdcrParams.CdcrAction.ERRORS);
- NamedList collections = (NamedList) ((NamedList) rsp.get(CdcrParams.ERRORS)).getVal(0);
- NamedList errors = (NamedList) collections.get(TARGET_COLLECTION);
- assertTrue(0 < (Long) errors.get(CdcrParams.CONSECUTIVE_ERRORS));
- NamedList lastErrors = (NamedList) errors.get(CdcrParams.LAST);
- assertNotNull(lastErrors);
- assertTrue(0 < lastErrors.size());
+ throw new AssertionError("Timeout while trying to assert replication errors", lastAssertionError);
}
- public void doTestReplicationStartStop() throws Exception {
- this.clearSourceCollection();
- this.clearTargetCollection(); // this might log a warning to indicate he was not able to delete the collection (collection was deleted in the previous test)
-
+ @Test
+ @ShardsFixed(num = 4)
+ public void testReplicationStartStop() throws Exception {
int start = 0;
List<SolrInputDocument> docs = new ArrayList<>();
for (; start < 10; start++) {
@@ -154,8 +148,8 @@ public class CdcrReplicationDistributedZ
}
index(SOURCE_COLLECTION, docs);
- assertEquals(10, getNumDocs(SOURCE_COLLECTION));
- assertEquals(0, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(10, SOURCE_COLLECTION);
+ assertNumDocs(0, TARGET_COLLECTION);
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
@@ -165,8 +159,8 @@ public class CdcrReplicationDistributedZ
commit(TARGET_COLLECTION);
- assertEquals(10, getNumDocs(SOURCE_COLLECTION));
- assertEquals(10, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(10, SOURCE_COLLECTION);
+ assertNumDocs(10, TARGET_COLLECTION);
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
@@ -177,8 +171,8 @@ public class CdcrReplicationDistributedZ
}
index(SOURCE_COLLECTION, docs);
- assertEquals(110, getNumDocs(SOURCE_COLLECTION));
- assertEquals(10, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(110, SOURCE_COLLECTION);
+ assertNumDocs(10, TARGET_COLLECTION);
// Start again CDCR, the source cluster should reinitialise its log readers
// with the latest checkpoints
@@ -191,17 +185,16 @@ public class CdcrReplicationDistributedZ
commit(TARGET_COLLECTION);
- assertEquals(110, getNumDocs(SOURCE_COLLECTION));
- assertEquals(110, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(110, SOURCE_COLLECTION);
+ assertNumDocs(110, TARGET_COLLECTION);
}
/**
* Check that the replication manager is properly restarted after a node failure.
*/
- public void doTestReplicationAfterRestart() throws Exception {
- this.clearSourceCollection();
- this.clearTargetCollection();
-
+ @Test
+ @ShardsFixed(num = 4)
+ public void testReplicationAfterRestart() throws Exception {
log.info("Starting CDCR");
// send start action to first shard
@@ -219,7 +212,7 @@ public class CdcrReplicationDistributedZ
log.info("Querying source collection");
- assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+ assertNumDocs(10, SOURCE_COLLECTION);
log.info("Waiting for replication");
@@ -229,7 +222,7 @@ public class CdcrReplicationDistributedZ
log.info("Querying target collection");
commit(TARGET_COLLECTION);
- assertEquals(10, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(10, TARGET_COLLECTION);
log.info("Restarting shard1");
@@ -245,7 +238,7 @@ public class CdcrReplicationDistributedZ
log.info("Querying source collection");
- assertEquals(110, getNumDocs(SOURCE_COLLECTION));
+ assertNumDocs(110, SOURCE_COLLECTION);
log.info("Waiting for replication");
@@ -255,7 +248,7 @@ public class CdcrReplicationDistributedZ
log.info("Querying target collection");
commit(TARGET_COLLECTION);
- assertEquals(110, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(110, TARGET_COLLECTION);
}
/**
@@ -263,10 +256,9 @@ public class CdcrReplicationDistributedZ
* This test also checks that the log readers on the new leaders are initialised with
* the target's checkpoint.
*/
- public void doTestReplicationAfterLeaderChange() throws Exception {
- this.clearSourceCollection();
- this.clearTargetCollection();
-
+ @Test
+ @ShardsFixed(num = 4)
+ public void testReplicationAfterLeaderChange() throws Exception {
log.info("Starting CDCR");
// send start action to first shard
@@ -284,7 +276,7 @@ public class CdcrReplicationDistributedZ
log.info("Querying source collection");
- assertEquals(10, getNumDocs(SOURCE_COLLECTION));
+ assertNumDocs(10, SOURCE_COLLECTION);
log.info("Waiting for replication");
@@ -294,7 +286,7 @@ public class CdcrReplicationDistributedZ
log.info("Querying target collection");
commit(TARGET_COLLECTION);
- assertEquals(10, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(10, TARGET_COLLECTION);
log.info("Restarting target leaders");
@@ -327,7 +319,7 @@ public class CdcrReplicationDistributedZ
log.info("Querying source collection");
- assertEquals(110, getNumDocs(SOURCE_COLLECTION));
+ assertNumDocs(110, SOURCE_COLLECTION);
log.info("Waiting for replication");
@@ -337,24 +329,20 @@ public class CdcrReplicationDistributedZ
log.info("Querying target collection");
commit(TARGET_COLLECTION);
- assertEquals(110, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(110, TARGET_COLLECTION);
}
/**
* Check that the update logs are synchronised between leader and non-leader nodes
+ * when CDCR is on and buffer is disabled
*/
- public void doTestUpdateLogSynchronisation() throws Exception {
- this.clearSourceCollection();
- this.clearTargetCollection();
-
- // buffering is enabled by default, so disable it
- this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
- this.waitForCdcrStateReplication(SOURCE_COLLECTION);
-
+ @Test
+ @ShardsFixed(num = 4)
+ public void testUpdateLogSynchronisation() throws Exception {
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 100; i++) {
// will perform a commit for every document and will create one tlog file per commit
index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
}
@@ -365,34 +353,39 @@ public class CdcrReplicationDistributedZ
commit(TARGET_COLLECTION);
- // Stop CDCR
- this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
+ // Check that the replication was done properly
+ assertNumDocs(100, SOURCE_COLLECTION);
+ assertNumDocs(100, TARGET_COLLECTION);
+
+ // Get the number of tlog files on the replicas (should be equal to the number of documents indexed)
+ int nTlogs = getNumberOfTlogFilesOnReplicas(SOURCE_COLLECTION);
+
+ // Disable the buffer - ulog synch should start on non-leader nodes
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
- assertEquals(50, getNumDocs(SOURCE_COLLECTION));
- assertEquals(50, getNumDocs(TARGET_COLLECTION));
+ int cnt = 15; // timeout after 15 seconds
+ while (cnt > 0) {
+ // Index a new document with a commit to trigger update log cleaning
+ index(SOURCE_COLLECTION, getDoc(id, Integer.toString(50)));
- // some of the tlogs should be trimmed, we must have less than 50 tlog files on both leader and non-leader
- assertNumberOfTlogFiles(SOURCE_COLLECTION, 50);
+ // Check the update logs on non-leader nodes, the number of tlog files should decrease
+ int n = getNumberOfTlogFilesOnReplicas(SOURCE_COLLECTION);
+ if (n < nTlogs) return;
- for (int i = 50; i < 100; i++) {
- index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
+ cnt--;
+ Thread.sleep(1000);
}
- // at this stage, we should have created one tlog file per document, and some of them must have been cleaned on the
- // leader since we are not buffering and replication is stopped, (we should have exactly 10 tlog files on the leader
- // and 11 on the non-leader)
- // the non-leader must have synchronised its update log with its leader
- assertNumberOfTlogFiles(SOURCE_COLLECTION, 50);
+ throw new AssertionError("Timeout while trying to assert update logs @ source_collection");
}
/**
* Check that the buffer is always activated on non-leader nodes.
*/
- public void doTestBufferOnNonLeader() throws Exception {
- this.clearSourceCollection();
- this.clearTargetCollection();
-
+ @Test
+ @ShardsFixed(num = 4)
+ public void testBufferOnNonLeader() throws Exception {
// buffering is enabled by default, so disable it
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
@@ -416,20 +409,20 @@ public class CdcrReplicationDistributedZ
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
+ // Commit to make the documents visible on the target
commit(TARGET_COLLECTION);
// If the non-leader node were buffering updates, then the replication must be complete
- assertEquals(200, getNumDocs(SOURCE_COLLECTION));
- assertEquals(200, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(200, SOURCE_COLLECTION);
+ assertNumDocs(200, TARGET_COLLECTION);
}
/**
* Check the ops statistics.
*/
- public void doTestOps() throws Exception {
- this.clearSourceCollection();
- this.clearTargetCollection();
-
+ @Test
+ @ShardsFixed(num = 4)
+ public void testOps() throws Exception {
// Index documents
List<SolrInputDocument> docs = new ArrayList<>();
for (int i = 0; i < 200; i++) {
@@ -460,10 +453,9 @@ public class CdcrReplicationDistributedZ
/**
* Check that batch updates with deletes
*/
- public void doTestBatchAddsWithDelete() throws Exception {
- this.clearSourceCollection();
- this.clearTargetCollection();
-
+ @Test
+ @ShardsFixed(num = 4)
+ public void testBatchAddsWithDelete() throws Exception {
// Index 50 documents
int start = 0;
List<SolrInputDocument> docs = new ArrayList<>();
@@ -509,14 +501,16 @@ public class CdcrReplicationDistributedZ
commit(TARGET_COLLECTION);
// If the non-leader node were buffering updates, then the replication must be complete
- assertEquals(59, getNumDocs(SOURCE_COLLECTION));
- assertEquals(59, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(59, SOURCE_COLLECTION);
+ assertNumDocs(59, TARGET_COLLECTION);
}
/**
* Checks that batches are correctly constructed when batch boundaries are reached.
*/
- public void doTestBatchBoundaries() throws Exception {
+ @Test
+ @ShardsFixed(num = 4)
+ public void testBatchBoundaries() throws Exception {
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
this.waitForCdcrStateReplication(SOURCE_COLLECTION);
@@ -528,24 +522,23 @@ public class CdcrReplicationDistributedZ
}
index(SOURCE_COLLECTION, docs);
- assertEquals(128, getNumDocs(SOURCE_COLLECTION));
+ assertNumDocs(128, SOURCE_COLLECTION);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
commit(TARGET_COLLECTION);
- assertEquals(128, getNumDocs(SOURCE_COLLECTION));
- assertEquals(128, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(128, SOURCE_COLLECTION);
+ assertNumDocs(128, TARGET_COLLECTION);
}
/**
* Check resilience of replication with delete by query executed on targets
*/
- public void doTestResilienceWithDeleteByQueryOnTarget() throws Exception {
- this.clearSourceCollection();
- this.clearTargetCollection();
-
+ @Test
+ @ShardsFixed(num = 4)
+ public void testResilienceWithDeleteByQueryOnTarget() throws Exception {
// Index 50 documents
int start = 0;
List<SolrInputDocument> docs = new ArrayList<>();
@@ -565,14 +558,14 @@ public class CdcrReplicationDistributedZ
commit(TARGET_COLLECTION);
// If the non-leader node were buffering updates, then the replication must be complete
- assertEquals(50, getNumDocs(SOURCE_COLLECTION));
- assertEquals(50, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(50, SOURCE_COLLECTION);
+ assertNumDocs(50, TARGET_COLLECTION);
deleteByQuery(SOURCE_COLLECTION, "*:*");
deleteByQuery(TARGET_COLLECTION, "*:*");
- assertEquals(0, getNumDocs(SOURCE_COLLECTION));
- assertEquals(0, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(0, SOURCE_COLLECTION);
+ assertNumDocs(0, TARGET_COLLECTION);
docs.clear();
for (; start < 100; start++) {
@@ -586,13 +579,13 @@ public class CdcrReplicationDistributedZ
commit(TARGET_COLLECTION);
- assertEquals(50, getNumDocs(SOURCE_COLLECTION));
- assertEquals(50, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(50, SOURCE_COLLECTION);
+ assertNumDocs(50, TARGET_COLLECTION);
deleteByQuery(TARGET_COLLECTION, "*:*");
- assertEquals(50, getNumDocs(SOURCE_COLLECTION));
- assertEquals(0, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(50, SOURCE_COLLECTION);
+ assertNumDocs(0, TARGET_COLLECTION);
// Restart CDCR
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
@@ -612,79 +605,32 @@ public class CdcrReplicationDistributedZ
commit(TARGET_COLLECTION);
- assertEquals(100, getNumDocs(SOURCE_COLLECTION));
- assertEquals(50, getNumDocs(TARGET_COLLECTION));
+ assertNumDocs(100, SOURCE_COLLECTION);
+ assertNumDocs(50, TARGET_COLLECTION);
}
- /**
- * Asserts the number of transaction logs across all the shards. Since the cleaning of the update logs
- * is not immediate on the slave nodes (it relies on the update log synchronizer that is executed every second),
- * it will retry until the assert is successful or until the timeout.
- */
- protected void assertNumberOfTlogFiles(String collection, int maxNumberOfTLogs) throws Exception {
- int cnt = 15; // timeout after 15 seconds
- AssertionError lastAssertionError = null;
-
- while (cnt > 0) {
- try {
- // Fire a DeleteById query with a commit to trigger update log cleaning on the non-leader nodes
- List<String> ids = new ArrayList<>();
- ids.add("_NON_EXISTING_ID_");
- deleteById(collection, ids);
-
- // Check the update logs
- this._assertNumberOfTlogFiles(collection, maxNumberOfTLogs);
- return;
- }
- catch (AssertionError e) {
- lastAssertionError = e;
- cnt--;
- Thread.sleep(1000);
- }
+ private int numberOfFiles(String dir) {
+ File file = new File(dir);
+ if (!file.isDirectory()) {
+ assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
}
-
- throw new AssertionError("Timeout while trying to assert update logs @ collection="+collection, lastAssertionError);
+ log.debug("Update log dir {} contains: {}", dir, file.listFiles());
+ return file.listFiles().length;
}
- /**
- * Asserts the number of transaction logs across all the shards
- */
- private void _assertNumberOfTlogFiles(String collection, int maxNumberOfTLogs) throws Exception {
+ private int getNumberOfTlogFilesOnReplicas(String collection) throws Exception {
CollectionInfo info = collectInfo(collection);
Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
- int leaderLogs = 0;
- ArrayList<Integer> replicasLogs = new ArrayList<>(Collections.nCopies(replicationFactor - 1, 0));
+ int count = 0;
for (String shard : shardToCoresMap.keySet()) {
- leaderLogs += numberOfFiles(info.getLeader(shard).ulogDir);
for (int i = 0; i < replicationFactor - 1; i++) {
- replicasLogs.set(i, replicasLogs.get(i) + numberOfFiles(info.getReplicas(shard).get(i).ulogDir));
+ count += numberOfFiles(info.getReplicas(shard).get(i).ulogDir);
}
}
- for (Integer replicaLogs : replicasLogs) {
- log.info("Number of logs in update log on leader {} and on replica {}", leaderLogs, replicaLogs);
-
- // replica logs must be always equal or superior to leader logs
- assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is different than on leader: %d.",
- replicaLogs, leaderLogs), leaderLogs <= replicaLogs);
-
- assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on leader: %d is superior to: %d.",
- leaderLogs, maxNumberOfTLogs), maxNumberOfTLogs >= leaderLogs);
-
- assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is superior to: %d.",
- replicaLogs, maxNumberOfTLogs), maxNumberOfTLogs >= replicaLogs);
- }
- }
-
- private int numberOfFiles(String dir) {
- File file = new File(dir);
- if (!file.isDirectory()) {
- assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
- }
- log.debug("Update log dir {} contains: {}", dir, file.listFiles());
- return file.listFiles().length;
+ return count;
}
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java Fri Nov 6 20:18:19 2015
@@ -20,7 +20,7 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
-import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.junit.Test;
@@ -31,7 +31,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-@Slow
+@Nightly
public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
@Override
@@ -44,21 +44,13 @@ public class CdcrReplicationHandlerTest
super.distribSetUp();
}
- @Test
- @ShardsFixed(num = 2)
- public void doTest() throws Exception {
- this.doTestFullReplication();
- this.doTestPartialReplication();
- this.doTestPartialReplicationWithTruncatedTlog();
- this.doTestPartialReplicationAfterPeerSync();
- }
-
/**
* Test the scenario where the slave is killed from the start. The replication
* strategy should fetch all the missing tlog files from the leader.
*/
- public void doTestFullReplication() throws Exception {
- this.clearSourceCollection();
+ @Test
+ @ShardsFixed(num = 2)
+ public void testFullReplication() throws Exception {
List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
ChaosMonkey.stop(slaves.get(0).jetty);
@@ -70,7 +62,7 @@ public class CdcrReplicationHandlerTest
index(SOURCE_COLLECTION, docs);
}
- assertEquals(100, getNumDocs(SOURCE_COLLECTION));
+ assertNumDocs(100, SOURCE_COLLECTION);
// Restart the slave node to trigger Replication strategy
this.restartServer(slaves.get(0));
@@ -82,8 +74,9 @@ public class CdcrReplicationHandlerTest
* Test the scenario where the slave is killed before receiving all the documents. The replication
* strategy should fetch all the missing tlog files from the leader.
*/
- public void doTestPartialReplication() throws Exception {
- this.clearSourceCollection();
+ @Test
+ @ShardsFixed(num = 2)
+ public void testPartialReplication() throws Exception {
for (int i = 0; i < 5; i++) {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = i * 20; j < (i * 20) + 20; j++) {
@@ -103,7 +96,7 @@ public class CdcrReplicationHandlerTest
index(SOURCE_COLLECTION, docs);
}
- assertEquals(200, getNumDocs(SOURCE_COLLECTION));
+ assertNumDocs(200, SOURCE_COLLECTION);
// Restart the slave node to trigger Replication strategy
this.restartServer(slaves.get(0));
@@ -117,8 +110,9 @@ public class CdcrReplicationHandlerTest
* file on the slave node. The replication strategy should detect this truncated file, and fetch the
* non-truncated file from the leader.
*/
- public void doTestPartialReplicationWithTruncatedTlog() throws Exception {
- this.clearSourceCollection();
+ @Test
+ @ShardsFixed(num = 2)
+ public void testPartialReplicationWithTruncatedTlog() throws Exception {
CloudSolrClient client = createCloudClient(SOURCE_COLLECTION);
List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
@@ -139,7 +133,7 @@ public class CdcrReplicationHandlerTest
client.close();
}
- assertEquals(200, getNumDocs(SOURCE_COLLECTION));
+ assertNumDocs(200, SOURCE_COLLECTION);
// Restart the slave node to trigger Replication recovery
this.restartServer(slaves.get(0));
@@ -154,8 +148,9 @@ public class CdcrReplicationHandlerTest
* If a Replication strategy occurs at a later stage, it should remove this tlog file generated by PeerSync
* and fetch the corresponding tlog files from the leader.
*/
- public void doTestPartialReplicationAfterPeerSync() throws Exception {
- this.clearSourceCollection();
+ @Test
+ @ShardsFixed(num = 2)
+ public void testPartialReplicationAfterPeerSync() throws Exception {
for (int i = 0; i < 5; i++) {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = i * 10; j < (i * 10) + 10; j++) {
@@ -175,7 +170,7 @@ public class CdcrReplicationHandlerTest
index(SOURCE_COLLECTION, docs);
}
- assertEquals(100, getNumDocs(SOURCE_COLLECTION));
+ assertNumDocs(100, SOURCE_COLLECTION);
// Restart the slave node to trigger PeerSync recovery
// (the update windows between leader and slave is small enough)
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java Fri Nov 6 20:18:19 2015
@@ -17,12 +17,12 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
-import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.CdcrParams;
import org.junit.Test;
-@Slow
+@Nightly
public class CdcrRequestHandlerTest extends BaseCdcrDistributedZkTest {
@Override
@@ -32,16 +32,10 @@ public class CdcrRequestHandlerTest exte
super.distribSetUp();
}
+ // check that the life-cycle state is properly synchronised across nodes
@Test
@ShardsFixed(num = 2)
- public void doTest() throws Exception {
- this.doTestLifeCycleActions();
- this.doTestCheckpointActions();
- this.doTestBufferActions();
- }
-
- // check that the life-cycle state is properly synchronised across nodes
- public void doTestLifeCycleActions() throws Exception {
+ public void testLifeCycleActions() throws Exception {
// check initial status
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
@@ -69,7 +63,9 @@ public class CdcrRequestHandlerTest exte
}
// check the checkpoint API
- public void doTestCheckpointActions() throws Exception {
+ @Test
+ @ShardsFixed(num = 2)
+ public void testCheckpointActions() throws Exception {
// initial request on an empty index, must return -1
NamedList rsp = invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.COLLECTIONCHECKPOINT);
assertEquals(-1l, rsp.get(CdcrParams.CHECKPOINT));
@@ -125,7 +121,9 @@ public class CdcrRequestHandlerTest exte
}
// check that the buffer state is properly synchronised across nodes
- public void doTestBufferActions() throws Exception {
+ @Test
+ @ShardsFixed(num = 2)
+ public void testBufferActions() throws Exception {
// check initial status
this.assertState(SOURCE_COLLECTION, CdcrParams.ProcessState.STOPPED, CdcrParams.BufferState.ENABLED);
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java?rev=1713022&r1=1713021&r2=1713022&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/CdcrUpdateLogTest.java Fri Nov 6 20:18:19 2015
@@ -28,7 +28,7 @@ import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.request.SolrQueryRequest;
@@ -40,7 +40,7 @@ import org.noggit.ObjectBuilder;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
-@Slow
+@Nightly
public class CdcrUpdateLogTest extends SolrTestCaseJ4 {
// means that we've seen the leader and have version info (i.e. we are a non-leader replica)