You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/07/17 18:38:13 UTC
svn commit: r1691606 - in /lucene/dev/trunk/solr/core/src:
java/org/apache/solr/handler/ test/org/apache/solr/cloud/
Author: erick
Date: Fri Jul 17 16:38:13 2015
New Revision: 1691606
URL: http://svn.apache.org/r1691606
Log:
SOLR-6273: Cross Data Center Replication. All tests are now passing on my machine, let's see if Jenkins flushes anything out
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java?rev=1691606&r1=1691605&r2=1691606&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorScheduler.java Fri Jul 17 16:38:13 2015
@@ -82,6 +82,7 @@ class CdcrReplicatorScheduler {
@Override
public void run() {
CdcrReplicatorState state = statesQueue.poll();
+ assert state != null; // Should never happen
try {
new CdcrReplicator(state, batchSize).run();
} finally {
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java?rev=1691606&r1=1691605&r2=1691606&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java Fri Jul 17 16:38:13 2015
@@ -245,11 +245,43 @@ public class BaseCdcrDistributedZkTest e
return jetty.client.request(request);
}
+ protected void waitForCdcrStateReplication(String collection) throws Exception {
+ log.info("Wait for CDCR state to replicate - collection: " + collection);
+
+ int cnt = 30;
+ while (cnt > 0) {
+ NamedList status = null;
+ boolean allEquals = true;
+ for (CloudJettyRunner jetty : cloudJettys.get(collection)) { // check all replicas
+ NamedList rsp = invokeCdcrAction(jetty, CdcrParams.CdcrAction.STATUS);
+ if (status == null) {
+ status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
+ continue;
+ }
+ allEquals &= status.equals(rsp.get(CdcrParams.CdcrAction.STATUS.toLower()));
+ }
+
+ if (allEquals) {
+ break;
+ }
+ else {
+ if (cnt == 0) {
+ throw new RuntimeException("Timeout waiting for CDCR state to replicate: collection="+collection);
+ }
+ cnt--;
+ Thread.sleep(500);
+ }
+ }
+
+ log.info("CDCR state is identical across nodes - collection: " + collection);
+ }
+
/**
* Assert the state of CDCR on each nodes of the given collection.
*/
protected void assertState(String collection, CdcrParams.ProcessState processState, CdcrParams.BufferState bufferState)
- throws Exception {
+ throws Exception {
+ this.waitForCdcrStateReplication(collection); // ensure that cdcr state is replicated and stable
for (CloudJettyRunner jetty : cloudJettys.get(collection)) { // check all replicas
NamedList rsp = invokeCdcrAction(jetty, CdcrParams.CdcrAction.STATUS);
NamedList status = (NamedList) rsp.get(CdcrParams.CdcrAction.STATUS.toLower());
@@ -282,6 +314,7 @@ public class BaseCdcrDistributedZkTest e
*/
protected void clearSourceCollection() throws Exception {
this.deleteCollection(SOURCE_COLLECTION);
+ this.waitForCollectionToDisappear(SOURCE_COLLECTION);
this.createCollection(SOURCE_COLLECTION);
this.waitForRecoveriesToFinish(SOURCE_COLLECTION, true);
this.updateMappingsFromZk(SOURCE_COLLECTION);
@@ -305,6 +338,7 @@ public class BaseCdcrDistributedZkTest e
*/
protected void clearTargetCollection() throws Exception {
this.deleteCollection(TARGET_COLLECTION);
+ this.waitForCollectionToDisappear(TARGET_COLLECTION);
this.createCollection(TARGET_COLLECTION);
this.waitForRecoveriesToFinish(TARGET_COLLECTION, true);
this.updateMappingsFromZk(TARGET_COLLECTION);
@@ -389,7 +423,7 @@ public class BaseCdcrDistributedZkTest e
/**
* Delete a collection through the Collection API.
*/
- protected CollectionAdminResponse deleteCollection(String collectionName) throws SolrServerException, IOException {
+ protected CollectionAdminResponse deleteCollection(String collectionName) throws Exception {
SolrClient client = createCloudClient(null);
CollectionAdminResponse res;
@@ -412,6 +446,17 @@ public class BaseCdcrDistributedZkTest e
return res;
}
+ private void waitForCollectionToDisappear(String collection) throws Exception {
+ CloudSolrClient client = this.createCloudClient(null);
+ try {
+ client.connect();
+ ZkStateReader zkStateReader = client.getZkStateReader();
+ AbstractDistribZkTestBase.waitForCollectionToDisappear(collection, zkStateReader, false, true, 15);
+ } finally {
+ client.close();
+ }
+ }
+
private void waitForRecoveriesToFinish(String collection, boolean verbose) throws Exception {
CloudSolrClient client = this.createCloudClient(null);
try {
@@ -673,15 +718,18 @@ public class BaseCdcrDistributedZkTest e
}
protected void waitForReplicationToComplete(String collectionName, String shardId) throws Exception {
- while (true) {
+ int cnt = 15;
+ while (cnt > 0) {
log.info("Checking queue size @ {}:{}", collectionName, shardId);
long size = this.getQueueSize(collectionName, shardId);
- if (size <= 0) {
+ if (size == 0) { // if we received -1, it means that the log reader is not yet initialised, we should wait
return;
}
log.info("Waiting for replication to complete. Queue size: {} @ {}:{}", size, collectionName, shardId);
+ cnt--;
Thread.sleep(1000); // wait a bit for the replication to complete
}
+ throw new RuntimeException("Timeout waiting for CDCR replication to complete @" + collectionName + ":" + shardId);
}
protected long getQueueSize(String collectionName, String shardId) throws Exception {
@@ -691,47 +739,6 @@ public class BaseCdcrDistributedZkTest e
return (Long) status.get(CdcrParams.QUEUE_SIZE);
}
- /**
- * Asserts that the number of transaction logs across all the shards
- */
- protected void assertUpdateLogs(String collection, int maxNumberOfTLogs) throws Exception {
- CollectionInfo info = collectInfo(collection);
- Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
-
- int leaderLogs = 0;
- ArrayList<Integer> replicasLogs = new ArrayList<>(Collections.nCopies(replicationFactor - 1, 0));
-
- for (String shard : shardToCoresMap.keySet()) {
- leaderLogs += numberOfFiles(info.getLeader(shard).ulogDir);
- for (int i = 0; i < replicationFactor - 1; i++) {
- replicasLogs.set(i, replicasLogs.get(i) + numberOfFiles(info.getReplicas(shard).get(i).ulogDir));
- }
- }
-
- for (Integer replicaLogs : replicasLogs) {
- log.info("Number of logs in update log on leader {} and on replica {}", leaderLogs, replicaLogs);
-
- // replica logs must be always equal or superior to leader logs
- assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is different than on leader: %d.",
- replicaLogs, leaderLogs), leaderLogs <= replicaLogs);
-
- assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on leader: %d is superior to: %d.",
- leaderLogs, maxNumberOfTLogs), maxNumberOfTLogs >= leaderLogs);
-
- assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is superior to: %d.",
- replicaLogs, maxNumberOfTLogs), maxNumberOfTLogs >= replicaLogs);
- }
- }
-
- private int numberOfFiles(String dir) {
- File file = new File(dir);
- if (!file.isDirectory()) {
- assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
- }
- log.info("Update log dir {} contains: {}", dir, file.listFiles());
- return file.listFiles().length;
- }
-
protected CollectionInfo collectInfo(String collection) throws Exception {
CollectionInfo info = new CollectionInfo(collection);
for (String shard : shardToJetty.get(collection).keySet()) {
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java?rev=1691606&r1=1691605&r2=1691606&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationDistributedZkTest.java Fri Jul 17 16:38:13 2015
@@ -21,13 +21,15 @@ import org.apache.lucene.util.LuceneTest
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.CdcrParams;
-import org.junit.Ignore;
import org.junit.Test;
+import java.io.File;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
-@Ignore
@Slow
public class CdcrReplicationDistributedZkTest extends BaseCdcrDistributedZkTest {
@@ -39,7 +41,7 @@ public class CdcrReplicationDistributedZ
@Test
@ShardsFixed(num = 4)
- public void doTest() throws Exception {
+ public void doTests() throws Exception {
this.doTestDeleteCreateSourceCollection();
this.doTestTargetCollectionNotAvailable();
this.doTestReplicationStartStop();
@@ -52,12 +54,13 @@ public class CdcrReplicationDistributedZ
this.doTestBatchBoundaries();
this.doTestResilienceWithDeleteByQueryOnTarget();
}
-
/**
* Checks that the test framework handles properly the creation and deletion of collections and the
* restart of servers.
*/
public void doTestDeleteCreateSourceCollection() throws Exception {
+ this.clearSourceCollection();
+ this.clearTargetCollection();
log.info("Indexing documents");
List<SolrInputDocument> docs = new ArrayList<>();
@@ -153,8 +156,10 @@ public class CdcrReplicationDistributedZ
assertEquals(0, getNumDocs(TARGET_COLLECTION));
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
commit(TARGET_COLLECTION);
@@ -162,6 +167,7 @@ public class CdcrReplicationDistributedZ
assertEquals(10, getNumDocs(TARGET_COLLECTION));
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
docs.clear();
for (; start < 110; start++) {
@@ -176,8 +182,10 @@ public class CdcrReplicationDistributedZ
// with the latest checkpoints
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
commit(TARGET_COLLECTION);
@@ -196,6 +204,7 @@ public class CdcrReplicationDistributedZ
// send start action to first shard
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
log.info("Indexing 10 documents");
@@ -260,6 +269,7 @@ public class CdcrReplicationDistributedZ
// send start action to first shard
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
log.info("Indexing 10 documents");
@@ -337,11 +347,14 @@ public class CdcrReplicationDistributedZ
// buffering is enabled by default, so disable it
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
for (int i = 0; i < 50; i++) {
- index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i))); // will perform a commit for every document
+ // will perform a commit for every document and will create one tlog file per commit
+ index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
}
// wait a bit for the replication to complete
@@ -352,26 +365,23 @@ public class CdcrReplicationDistributedZ
// Stop CDCR
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
assertEquals(50, getNumDocs(SOURCE_COLLECTION));
assertEquals(50, getNumDocs(TARGET_COLLECTION));
- index(SOURCE_COLLECTION, getDoc(id, Integer.toString(0))); // trigger update log cleaning on the non-leader nodes
-
// some of the tlogs should be trimmed, we must have less than 50 tlog files on both leader and non-leader
- assertUpdateLogs(SOURCE_COLLECTION, 50);
+ assertNumberOfTlogFiles(SOURCE_COLLECTION, 50);
for (int i = 50; i < 100; i++) {
index(SOURCE_COLLECTION, getDoc(id, Integer.toString(i)));
}
- index(SOURCE_COLLECTION, getDoc(id, Integer.toString(0))); // trigger update log cleaning on the non-leader nodes
-
// at this stage, we should have created one tlog file per document, and some of them must have been cleaned on the
// leader since we are not buffering and replication is stopped, (we should have exactly 10 tlog files on the leader
// and 11 on the non-leader)
// the non-leader must have synchronised its update log with its leader
- assertUpdateLogs(SOURCE_COLLECTION, 50);
+ assertNumberOfTlogFiles(SOURCE_COLLECTION, 50);
}
/**
@@ -383,9 +393,11 @@ public class CdcrReplicationDistributedZ
// buffering is enabled by default, so disable it
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.DISABLEBUFFER);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
// Start CDCR
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
// Index documents
for (int i = 0; i < 200; i++) {
@@ -425,6 +437,7 @@ public class CdcrReplicationDistributedZ
// Start CDCR
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
// wait a bit for the replication to complete
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
@@ -485,6 +498,7 @@ public class CdcrReplicationDistributedZ
// Start CDCR
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
// wait a bit for the replication to complete
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
@@ -501,7 +515,8 @@ public class CdcrReplicationDistributedZ
* Checks that batches are correctly constructed when batch boundaries are reached.
*/
public void doTestBatchBoundaries() throws Exception {
- invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
log.info("Indexing documents");
@@ -514,6 +529,7 @@ public class CdcrReplicationDistributedZ
assertEquals(128, getNumDocs(SOURCE_COLLECTION));
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
+ this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD2);
commit(TARGET_COLLECTION);
@@ -538,6 +554,7 @@ public class CdcrReplicationDistributedZ
// Start CDCR
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
// wait a bit for the replication to complete
this.waitForReplicationToComplete(SOURCE_COLLECTION, SHARD1);
@@ -577,8 +594,9 @@ public class CdcrReplicationDistributedZ
// Restart CDCR
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.STOP);
- Thread.sleep(500); // wait a bit for the state to synch
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
this.invokeCdcrAction(shardToLeaderJetty.get(SOURCE_COLLECTION).get(SHARD1), CdcrParams.CdcrAction.START);
+ this.waitForCdcrStateReplication(SOURCE_COLLECTION);
docs.clear();
for (; start < 150; start++) {
@@ -596,5 +614,76 @@ public class CdcrReplicationDistributedZ
assertEquals(50, getNumDocs(TARGET_COLLECTION));
}
+ /**
+ * Asserts the number of transaction logs across all the shards. Since the cleaning of the update logs
+ * is not immediate on the slave nodes (it relies on the update log synchronizer that is executed every second),
+ * it will retry until the assert is successful or until the timeout.
+ */
+ protected void assertNumberOfTlogFiles(String collection, int maxNumberOfTLogs) throws Exception {
+ int cnt = 15; // timeout after 15 seconds
+ AssertionError lastAssertionError = null;
+
+ while (cnt > 0) {
+ try {
+ // Fire a DeleteById query with a commit to trigger update log cleaning on the non-leader nodes
+ List<String> ids = new ArrayList<>();
+ ids.add("_NON_EXISTING_ID_");
+ deleteById(collection, ids);
+
+ // Check the update logs
+ this._assertNumberOfTlogFiles(collection, maxNumberOfTLogs);
+ return;
+ }
+ catch (AssertionError e) {
+ lastAssertionError = e;
+ cnt--;
+ Thread.sleep(1000);
+ }
+ }
+
+ throw new AssertionError("Timeout while trying to assert update logs @ collection="+collection, lastAssertionError);
+ }
+
+ /**
+ * Asserts the number of transaction logs across all the shards
+ */
+ private void _assertNumberOfTlogFiles(String collection, int maxNumberOfTLogs) throws Exception {
+ CollectionInfo info = collectInfo(collection);
+ Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
+
+ int leaderLogs = 0;
+ ArrayList<Integer> replicasLogs = new ArrayList<>(Collections.nCopies(replicationFactor - 1, 0));
+
+ for (String shard : shardToCoresMap.keySet()) {
+ leaderLogs += numberOfFiles(info.getLeader(shard).ulogDir);
+ for (int i = 0; i < replicationFactor - 1; i++) {
+ replicasLogs.set(i, replicasLogs.get(i) + numberOfFiles(info.getReplicas(shard).get(i).ulogDir));
+ }
+ }
+
+ for (Integer replicaLogs : replicasLogs) {
+ log.info("Number of logs in update log on leader {} and on replica {}", leaderLogs, replicaLogs);
+
+ // replica logs must be always equal or superior to leader logs
+ assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is different than on leader: %d.",
+ replicaLogs, leaderLogs), leaderLogs <= replicaLogs);
+
+ assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on leader: %d is superior to: %d.",
+ leaderLogs, maxNumberOfTLogs), maxNumberOfTLogs >= leaderLogs);
+
+ assertTrue(String.format(Locale.ENGLISH, "Number of tlogs on replica: %d is superior to: %d.",
+ replicaLogs, maxNumberOfTLogs), maxNumberOfTLogs >= replicaLogs);
+ }
+ }
+
+ private int numberOfFiles(String dir) {
+ File file = new File(dir);
+ if (!file.isDirectory()) {
+ assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
+ }
+ log.debug("Update log dir {} contains: {}", dir, file.listFiles());
+ return file.listFiles().length;
+ }
+
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java?rev=1691606&r1=1691605&r2=1691606&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrReplicationHandlerTest.java Fri Jul 17 16:38:13 2015
@@ -23,7 +23,6 @@ package org.apache.solr.cloud;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
@@ -32,7 +31,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-@Ignore
@Slow
public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
@@ -47,7 +45,7 @@ public class CdcrReplicationHandlerTest
}
@Test
- @ShardsFixed(num = 4)
+ @ShardsFixed(num = 2)
public void doTest() throws Exception {
this.doTestFullReplication();
this.doTestPartialReplication();
@@ -60,6 +58,7 @@ public class CdcrReplicationHandlerTest
* strategy should fetch all the missing tlog files from the leader.
*/
public void doTestFullReplication() throws Exception {
+ this.clearSourceCollection();
List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
ChaosMonkey.stop(slaves.get(0).jetty);
@@ -76,7 +75,7 @@ public class CdcrReplicationHandlerTest
// Restart the slave node to trigger Replication strategy
this.restartServer(slaves.get(0));
- this.assertUpdateLogs(SOURCE_COLLECTION, 10);
+ this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
}
/**
@@ -85,7 +84,6 @@ public class CdcrReplicationHandlerTest
*/
public void doTestPartialReplication() throws Exception {
this.clearSourceCollection();
-
for (int i = 0; i < 5; i++) {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = i * 20; j < (i * 20) + 20; j++) {
@@ -111,7 +109,7 @@ public class CdcrReplicationHandlerTest
this.restartServer(slaves.get(0));
// at this stage, the slave should have replicated the 5 missing tlog files
- this.assertUpdateLogs(SOURCE_COLLECTION, 10);
+ this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
}
/**
@@ -121,7 +119,6 @@ public class CdcrReplicationHandlerTest
*/
public void doTestPartialReplicationWithTruncatedTlog() throws Exception {
this.clearSourceCollection();
-
CloudSolrClient client = createCloudClient(SOURCE_COLLECTION);
List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
@@ -148,7 +145,7 @@ public class CdcrReplicationHandlerTest
this.restartServer(slaves.get(0));
// at this stage, the slave should have replicated the 5 missing tlog files
- this.assertUpdateLogs(SOURCE_COLLECTION, 10);
+ this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
}
/**
@@ -159,7 +156,6 @@ public class CdcrReplicationHandlerTest
*/
public void doTestPartialReplicationAfterPeerSync() throws Exception {
this.clearSourceCollection();
-
for (int i = 0; i < 5; i++) {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = i * 10; j < (i * 10) + 10; j++) {
@@ -199,7 +195,7 @@ public class CdcrReplicationHandlerTest
this.restartServer(slaves.get(0));
// at this stage, the slave should have replicated the 5 missing tlog files
- this.assertUpdateLogs(SOURCE_COLLECTION, 15);
+ this.assertUpdateLogsEquals(SOURCE_COLLECTION, 15);
}
private List<CloudJettyRunner> getShardToSlaveJetty(String collection, String shard) {
@@ -210,10 +206,10 @@ public class CdcrReplicationHandlerTest
}
/**
- * Asserts that the transaction logs between the leader and slave
+ * Asserts that the update logs are in sync between the leader and slave. The leader and the slaves
+ * must have identical tlog files.
*/
- @Override
- protected void assertUpdateLogs(String collection, int maxNumberOfTLogs) throws Exception {
+ protected void assertUpdateLogsEquals(String collection, int numberOfTLogs) throws Exception {
CollectionInfo info = collectInfo(collection);
Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
@@ -221,8 +217,8 @@ public class CdcrReplicationHandlerTest
Map<Long, Long> leaderFilesMeta = this.getFilesMeta(info.getLeader(shard).ulogDir);
Map<Long, Long> slaveFilesMeta = this.getFilesMeta(info.getReplicas(shard).get(0).ulogDir);
- assertEquals("Incorrect number of tlog files on the leader", maxNumberOfTLogs, leaderFilesMeta.size());
- assertEquals("Incorrect number of tlog files on the slave", maxNumberOfTLogs, slaveFilesMeta.size());
+ assertEquals("Incorrect number of tlog files on the leader", numberOfTLogs, leaderFilesMeta.size());
+ assertEquals("Incorrect number of tlog files on the slave", numberOfTLogs, slaveFilesMeta.size());
for (Long leaderFileVersion : leaderFilesMeta.keySet()) {
assertTrue("Slave is missing a tlog for version " + leaderFileVersion, slaveFilesMeta.containsKey(leaderFileVersion));
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java?rev=1691606&r1=1691605&r2=1691606&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrRequestHandlerTest.java Fri Jul 17 16:38:13 2015
@@ -20,10 +20,8 @@ package org.apache.solr.cloud;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.CdcrParams;
-import org.junit.Ignore;
import org.junit.Test;
-@Ignore
@Slow
public class CdcrRequestHandlerTest extends BaseCdcrDistributedZkTest {
@@ -35,7 +33,7 @@ public class CdcrRequestHandlerTest exte
}
@Test
- @ShardsFixed(num = 4)
+ @ShardsFixed(num = 2)
public void doTest() throws Exception {
this.doTestLifeCycleActions();
this.doTestCheckpointActions();
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java?rev=1691606&r1=1691605&r2=1691606&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CdcrVersionReplicationTest.java Fri Jul 17 16:38:13 2015
@@ -61,8 +61,7 @@ public class CdcrVersionReplicationTest
@Test
@ShardsFixed(num = 4)
-
- public void doTest() throws Exception {
+ public void testCdcrDocVersions() throws Exception {
SolrClient client = createClientRandomly();
try {
handle.clear();