You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by tf...@apache.org on 2017/04/25 23:49:44 UTC
[2/3] lucene-solr:jira/solr-10233: Moved patch to (new) branch.
Updated to master
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index c50add4..ddacb19 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1706,7 +1706,7 @@ public static final int VERSION_IDX = 1;
public void doReplay(TransactionLog translog) {
try {
- loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart);
+ loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart + " inSortedOrder=" + inSortedOrder);
long lastStatusTime = System.nanoTime();
if (inSortedOrder) {
tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart);
@@ -1786,7 +1786,7 @@ public static final int VERSION_IDX = 1;
recoveryInfo.adds++;
AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
- log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
+ if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
proc.processAdd(cmd);
break;
}
@@ -1854,6 +1854,7 @@ public static final int VERSION_IDX = 1;
// something wrong with the request?
}
assert TestInjection.injectUpdateLogReplayRandomPause();
+
}
CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index cb1b2fb..e9f63d5 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -279,7 +280,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// this is set to true in the constructor if the next processors in the chain
// are custom and may modify the SolrInputDocument racing with its serialization for replication
private final boolean cloneRequiredOnLeader;
- private final boolean onlyLeaderIndexes;
+ private final Replica.Type replicaType;
public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
@@ -324,12 +325,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (cloudDesc != null) {
collection = cloudDesc.getCollectionName();
- ClusterState cstate = zkController.getClusterState();
- DocCollection coll = cstate.getCollection(collection);
- onlyLeaderIndexes = coll.getRealtimeReplicas() == 1;
+ replicaType = cloudDesc.getReplicaType();
} else {
collection = null;
- onlyLeaderIndexes = false;
+ replicaType = Replica.Type.REALTIME;
}
boolean shouldClone = false;
@@ -666,7 +665,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// used for deleteByQuery to get the list of nodes this leader should forward to
- private List<Node> setupRequest() {
+ private List<Node> setupRequestForDBQ() {
List<Node> nodes = null;
String shardId = cloudDesc.getShardId();
@@ -680,7 +679,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
forwardToLeader = false;
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
- .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN);
+ .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.REALTIME, Replica.Type.APPEND));
if (replicaProps != null) {
nodes = new ArrayList<>(replicaProps.size());
for (ZkCoreNodeProps props : replicaProps) {
@@ -1190,7 +1189,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
checkDeleteByQueries = true;
}
}
- if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ if (replicaType == Replica.Type.APPEND && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
}
@@ -1576,7 +1575,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (zkEnabled && DistribPhase.TOLEADER == phase) {
// This core should be a leader
isLeader = true;
- replicas = setupRequest();
+ replicas = setupRequestForDBQ();
} else if (DistribPhase.FROMLEADER == phase) {
isLeader = false;
}
@@ -1610,8 +1609,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, myShardId);
+ // DBQ forwarded to Realtime and Append
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
- .getReplicaProps(collection, myShardId, leaderReplica.getName(), null, Replica.State.DOWN);
+ .getReplicaProps(collection, myShardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.REALTIME, Replica.Type.APPEND));
if (replicaProps != null) {
final List<Node> myReplicas = new ArrayList<>(replicaProps.size());
for (ZkCoreNodeProps replicaProp : replicaProps) {
@@ -1699,10 +1699,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return;
}
- if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ if (replicaType == Replica.Type.APPEND && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ // Append replica not leader, don't write the DBQ to IW
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
-
doLocalDelete(cmd);
}
}
@@ -1857,7 +1857,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
- if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ if (replicaType == Replica.Type.APPEND && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
}
@@ -1884,14 +1884,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
zkCheck();
nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
- .getCloudDescriptor().getCollectionName());
+ .getCloudDescriptor().getCollectionName(), EnumSet.of(Replica.Type.APPEND,Replica.Type.REALTIME));
if (isLeader && nodes.size() == 1) {
singleLeader = true;
}
}
if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) {
- if (onlyLeaderIndexes) {
+ if (replicaType == Replica.Type.APPEND) { // REALTIME will always commit
try {
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, cloudDesc.getShardId());
@@ -1904,7 +1904,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
doLocalCommit(cmd);
} else {
assert TestInjection.waitForInSyncWithLeader(req.getCore(),
- zkController, collection, cloudDesc.getShardId());
+ zkController, collection, cloudDesc.getShardId()): "Core " + req.getCore() + " not in sync with leader";
}
} catch (InterruptedException e) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
@@ -1958,7 +1958,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
- private List<Node> getCollectionUrls(SolrQueryRequest req, String collection) {
+ private List<Node> getCollectionUrls(SolrQueryRequest req, String collection, EnumSet<Replica.Type> types) {
ClusterState clusterState = req.getCore()
.getCoreContainer().getZkController().getClusterState();
Map<String,Slice> slices = clusterState.getSlicesMap(collection);
@@ -1973,6 +1973,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
Map<String,Replica> shardMap = replicas.getReplicasMap();
for (Entry<String,Replica> entry : shardMap.entrySet()) {
+ if (!types.contains(entry.getValue().getType())) {
+ log.info("getCollectionUrls: Skipping replica " + entry.getValue().getName());//nocommit: too verbose
+ continue;
+ }
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
if (clusterState.liveNodesContain(nodeProps.getNodeName())) {
urls.add(new StdNode(nodeProps, collection, replicas.getName()));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test-files/log4j.properties
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/log4j.properties b/solr/core/src/test-files/log4j.properties
index 2697203..c464a9f 100644
--- a/solr/core/src/test-files/log4j.properties
+++ b/solr/core/src/test-files/log4j.properties
@@ -1,5 +1,5 @@
# Logging level
-log4j.rootLogger=INFO, CONSOLE
+log4j.rootLogger=DEBUG, CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Target=System.err
@@ -10,6 +10,12 @@ log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.directory=WARN
log4j.logger.org.apache.solr.hadoop=INFO
+log4j.logger.org.apache.solr.cloud.OverseerTaskProcessor=INFO
+log4j.logger.org.apache.solr.cloud.OverseerTaskQueue=INFO
+log4j.logger.org.apache.solr.cloud.OverseerTaskQueue=INFO
+log4j.logger.org.apache.solr.common.cloud.SolrZkClient=INFO
+log4j.logger.org.apache.solr.util.stats.InstrumentedPoolingHttpClientConnectionManager=INFO
+log4j.logger.com.codehale.metrics=INFO
#log4j.logger.org.apache.solr.update.processor.LogUpdateProcessorFactory=DEBUG
#log4j.logger.org.apache.solr.update.processor.DistributedUpdateProcessor=DEBUG
#log4j.logger.org.apache.solr.update.PeerSync=DEBUG
@@ -31,6 +37,6 @@ log4j.logger.org.apache.solr.hadoop=INFO
#log4j.logger.org.apache.http.impl.conn.PoolingHttpClientConnectionManager=DEBUG
#log4j.logger.org.apache.http.impl.conn.BasicClientConnectionManager=DEBUG
-#log4j.logger.org.apache.http=DEBUG
+log4j.logger.org.apache.http=INFO
#log4j.logger.org.apache.solr.client.solrj.impl.SolrHttpRequestRetryHandler=DEBUG
-#log4j.logger.org.eclipse.jetty=DEBUG
+log4j.logger.org.eclipse.jetty=INFO
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
index 059e58f..8da7d28 100644
--- a/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
+++ b/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
@@ -33,7 +33,7 @@
<commitWithin>
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
</commitWithin>
- <updateLog></updateLog>
+ <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
</updateHandler>
<requestHandler name="/select" class="solr.SearchHandler">
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
index 5eb4b3b..51f9fe9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
@@ -62,8 +62,8 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
}
@Override
- protected int getRealtimeReplicas() {
- return onlyLeaderIndexes? 1 : -1;
+ protected boolean useAppendReplicas() {
+ return onlyLeaderIndexes;
}
@Test
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
index 1c23c9c..8d0839f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
@@ -119,8 +119,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
}
@Override
- protected int getRealtimeReplicas() {
- return onlyLeaderIndexes? 1 : -1;
+ protected boolean useAppendReplicas() {
+ return onlyLeaderIndexes;
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
index ffc5262..a389005 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
@@ -112,8 +112,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
}
@Override
- protected int getRealtimeReplicas() {
- return onlyLeaderIndexes? 1 : -1;
+ protected boolean useAppendReplicas() {
+ return onlyLeaderIndexes;
}
@Test
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
index ed9ed41..e75a854 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
@@ -286,7 +286,7 @@ public class CollectionsAPIDistributedZkTest extends SolrCloudTestCase {
// first we make a core with the core name the collections api
// will try and use - this will cause our mock fail
Create createCmd = new Create();
- createCmd.setCoreName("halfcollection_shard1_replica1");
+ createCmd.setCoreName(Assign.buildCoreName("halfcollection", "shard1", Replica.Type.REALTIME, 1));
createCmd.setCollection("halfcollectionblocker");
String dataDir = createTempDir().toFile().getAbsolutePath();
createCmd.setDataDir(dataDir);
@@ -298,7 +298,7 @@ public class CollectionsAPIDistributedZkTest extends SolrCloudTestCase {
}
createCmd = new Create();
- createCmd.setCoreName("halfcollection_shard1_replica1");
+ createCmd.setCoreName(Assign.buildCoreName("halfcollection", "shard1", Replica.Type.REALTIME, 1));
createCmd.setCollection("halfcollectionblocker2");
dataDir = createTempDir().toFile().getAbsolutePath();
createCmd.setDataDir(dataDir);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index 3e0d840..e2a80b6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -62,7 +62,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
Map<String, NamedList<Integer>> coresStatus = response.getCollectionCoresStatus();
assertEquals(4, coresStatus.size());
for (int i=0; i<4; i++) {
- NamedList<Integer> status = coresStatus.get(collectionName + "_shard" + (i/2+1) + "_replica" + (i%2+1));
+ NamedList<Integer> status = coresStatus.get(Assign.buildCoreName(collectionName, "shard" + (i/2+1), Replica.Type.REALTIME, (i%2+1)));
assertEquals(0, (int)status.get("status"));
assertTrue(status.get("QTime") > 0);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index 8904ea8..db9ecb4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -58,8 +58,8 @@ public class ForceLeaderTest extends HttpPartitionTest {
private final boolean onlyLeaderIndexes = random().nextBoolean();
@Override
- protected int getRealtimeReplicas() {
- return onlyLeaderIndexes? 1 : -1;
+ protected boolean useAppendReplicas() {
+ return onlyLeaderIndexes;
}
@Test
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index 01002cf..d0b0c5e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -85,8 +85,8 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
}
@Override
- protected int getRealtimeReplicas() {
- return onlyLeaderIndexes? 1 : -1;
+ protected boolean useAppendReplicas() {
+ return onlyLeaderIndexes;
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
index 457b9d9..fd1b403 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
@@ -46,8 +46,8 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
}
@Override
- protected int getRealtimeReplicas() {
- return onlyLeaderIndexes? 1 : -1;
+ protected boolean useAppendReplicas() {
+ return onlyLeaderIndexes;
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java b/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java
index a4e8d6f..6297408 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Semaphore;
@@ -30,9 +31,11 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
@@ -44,6 +47,7 @@ import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.RefCounted;
+import org.apache.zookeeper.KeeperException;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -59,10 +63,13 @@ public class OnlyLeaderIndexesTest extends SolrCloudTestCase {
.addConfig("config", TEST_PATH().resolve("configsets")
.resolve("cloud-minimal-inplace-updates").resolve("conf"))
.configure();
+
+ CollectionAdminRequest.ClusterProp clusterPropRequest = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false");
+ CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient());
+ assertEquals(0, response.getStatus());
CollectionAdminRequest
- .createCollection(COLLECTION, "config", 1, 3)
- .setRealtimeReplicas(1)
+ .createCollection(COLLECTION, "config", 1, 0, 3, 0)
.setMaxShardsPerNode(1)
.process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
@@ -71,6 +78,7 @@ public class OnlyLeaderIndexesTest extends SolrCloudTestCase {
@Test
public void test() throws Exception {
+ assertNumberOfReplicas(0, 3, 0, false, true);
basicTest();
recoveryTest();
dbiTest();
@@ -252,6 +260,7 @@ public class OnlyLeaderIndexesTest extends SolrCloudTestCase {
.add(sdoc("id", "4"))
.process(cloudClient, COLLECTION);
ChaosMonkey.start(oldLeaderJetty);
+ waitForState("Replica not removed", "collection1", activeReplicaCount(0, 3, 0));
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, 60);
checkRTG(1,4, cluster.getJettySolrRunners());
@@ -347,7 +356,7 @@ public class OnlyLeaderIndexesTest extends SolrCloudTestCase {
}
private void checkShardConsistency(int expected, int numTry) throws Exception{
-
+ String replicaNotInSync = null;
for (int i = 0; i < numTry; i++) {
boolean inSync = true;
for (JettySolrRunner solrRunner: cluster.getJettySolrRunners()) {
@@ -357,15 +366,16 @@ public class OnlyLeaderIndexesTest extends SolrCloudTestCase {
long results = client.query(COLLECTION, query).getResults().getNumFound();
if (expected != results) {
inSync = false;
- Thread.sleep(500);
+ replicaNotInSync = solrRunner.getNodeName();
break;
}
}
}
if (inSync) return;
+ Thread.sleep(500);
}
- fail("Some replicas are not in sync with leader");
+ fail("Some replicas are not in sync with leader: " + replicaNotInSync);
}
private void waitForReplicasCatchUp(int numTry) throws IOException, InterruptedException {
@@ -431,5 +441,48 @@ public class OnlyLeaderIndexesTest extends SolrCloudTestCase {
}
return rs;
}
+
+ // TODO: This is copy/paste from TestPassiveReplica, refactor
+ private DocCollection assertNumberOfReplicas(int numWriter, int numActive, int numPassive, boolean updateCollection, boolean activeOnly) throws KeeperException, InterruptedException {
+ if (updateCollection) {
+ cluster.getSolrClient().getZkStateReader().forceUpdateCollection("collection1");
+ }
+ DocCollection docCollection = getCollectionState("collection1");
+ assertNotNull(docCollection);
+ assertEquals("Unexpected number of writer replicas: " + docCollection, numWriter,
+ docCollection.getReplicas(EnumSet.of(Replica.Type.REALTIME)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ assertEquals("Unexpected number of passive replicas: " + docCollection, numPassive,
+ docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ assertEquals("Unexpected number of active replicas: " + docCollection, numActive,
+ docCollection.getReplicas(EnumSet.of(Replica.Type.APPEND)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ return docCollection;
+ }
+
+ private CollectionStatePredicate activeReplicaCount(int numWriter, int numActive, int numPassive) {
+ return (liveNodes, collectionState) -> {
+ int writersFound = 0, activesFound = 0, passivesFound = 0;
+ if (collectionState == null)
+ return false;
+ for (Slice slice : collectionState) {
+ for (Replica replica : slice) {
+ if (replica.isActive(liveNodes))
+ switch (replica.getType()) {
+ case APPEND:
+ activesFound++;
+ break;
+ case PASSIVE:
+ passivesFound++;
+ break;
+ case REALTIME:
+ writersFound++;
+ break;
+ default:
+ throw new AssertionError("Unexpected replica type");
+ }
+ }
+ }
+ return numWriter == writersFound && numActive == activesFound && numPassive == passivesFound;
+ };
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index 48ac91f..340adbb 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -27,6 +27,7 @@ import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -346,7 +347,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
assertEquals(numberOfSlices * numberOfReplica, coreNames.size());
for (int i = 1; i <= numberOfSlices; i++) {
for (int j = 1; j <= numberOfReplica; j++) {
- String coreName = COLLECTION_NAME + "_shard" + i + "_replica" + j;
+ String coreName = Assign.buildCoreName(COLLECTION_NAME, "shard" + i, Replica.Type.REALTIME, j);
assertTrue("Shard " + coreName + " was not created",
coreNames.contains(coreName));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index b0721a2..f5c5db0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -1210,7 +1210,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
q.offer(Utils.toJSON(m));
-
+
m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
@@ -1221,7 +1221,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
q.offer(Utils.toJSON(m));
-
+
Stat stat = new Stat();
byte[] data = zkClient.getData("/clusterstate.json", null, stat, true);
// Simulate an external modification
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
index a8e14bf..1da9aca 100644
--- a/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
@@ -40,8 +40,8 @@ public class RecoveryAfterSoftCommitTest extends AbstractFullDistribZkTestBase {
}
@Override
- protected int getRealtimeReplicas() {
- return onlyLeaderIndexes? 1: -1;
+ protected boolean useAppendReplicas() {
+ return onlyLeaderIndexes;
}
@BeforeClass
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
index bf9b5e0..e00ea0d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
@@ -79,6 +79,11 @@ public class ShardSplitTest extends BasicDistributedZkTest {
public ShardSplitTest() {
schemaString = "schema15.xml"; // we need a string id
}
+
+ @Override
+ protected boolean useAppendReplicas() {
+ return false;
+ }
@Override
public void distribSetUp() throws Exception {
@@ -86,12 +91,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
useFactory(null);
}
- //TODO for now, onlyLeaderIndexes do not work with ShardSplitTest
- @Override
- protected int getRealtimeReplicas() {
- return -1;
- }
-
@Test
public void test() throws Exception {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/TestAppendReplica.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestAppendReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestAppendReplica.java
new file mode 100644
index 0000000..fe353d2
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestAppendReplica.java
@@ -0,0 +1,808 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.DirectUpdateHandler2;
+import org.apache.solr.update.SolrIndexWriter;
+import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.util.RefCounted;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+
+@Slow
+public class TestAppendReplica extends SolrCloudTestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private String collectionName = null;
+ private final static int REPLICATION_TIMEOUT_SECS = 10;
+
+ private String suggestedCollectionName() {
+ return (getTestClass().getSimpleName().replace("Test", "") + "_" + getTestName().split(" ")[0]).replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase(Locale.ROOT);
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ TestInjection.waitForReplicasInSync = null; // We'll be explicit about this in this test
+ configureCluster(2) // 2 + random().nextInt(3)
+ .addConfig("conf", configset("cloud-minimal-inplace-updates"))
+ .configure();
+ CollectionAdminRequest.ClusterProp clusterPropRequest = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false");
+ CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient());
+ assertEquals(0, response.getStatus());
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ collectionName = suggestedCollectionName();
+ expectThrows(SolrException.class, () -> getCollectionState(collectionName));
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
+ if (!jetty.isRunning()) {
+ LOG.warn("Jetty {} not running, probably some bad test. Starting it", jetty.getLocalPort());
+ ChaosMonkey.start(jetty);
+ }
+ }
+ if (cluster.getSolrClient().getZkStateReader().getClusterState().getCollectionOrNull(collectionName) != null) {
+ LOG.info("tearDown deleting collection");
+ CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
+ waitForDeletion(collectionName);
+ }
+ super.tearDown();
+ }
+
+ // Just to compare test time, nocommit
+ @Ignore
+ public void testCreateDelete2() throws Exception {
+ try {
+ CollectionAdminRequest.createCollection(collectionName, "conf", 1, 8, 0, 0).process(cluster.getSolrClient());
+ DocCollection docCollection = getCollectionState(collectionName);
+ assertNotNull(docCollection);
+// assertEquals("Expecting 4 relpicas per shard",
+// 8, docCollection.getReplicas().size());
+// assertEquals("Expecting 6 passive replicas, 3 per shard",
+// 6, docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)).size());
+// assertEquals("Expecting 2 writer replicas, one per shard",
+// 2, docCollection.getReplicas(EnumSet.of(Replica.Type.WRITER)).size());
+// for (Slice s:docCollection.getSlices()) {
+// // read-only replicas can never become leaders
+// assertFalse(s.getLeader().isReadOnly());
+// }
+ } finally {
+ zkClient().printLayoutToStdOut();
+ }
+ }
+
+ /**
+ * Asserts that Update logs exist for replicas of type {@link org.apache.solr.common.cloud.Replica.Type#REALTIME}, but not
+ * for replicas of type {@link org.apache.solr.common.cloud.Replica.Type#PASSIVE}
+ */
+ private void assertUlogPresence(DocCollection collection) {
+ for (Slice s:collection.getSlices()) {
+ for (Replica r:s.getReplicas()) {
+ SolrCore core = null;
+ try {
+ core = cluster.getReplicaJetty(r).getCoreContainer().getCore(r.getCoreName());
+ assertNotNull(core);
+ assertTrue("Update log should exist for replicas of type Append",
+ new java.io.File(core.getUlogDir()).exists());
+ } finally {
+ core.close();
+ }
+ }
+ }
+ }
+
+ @Repeat(iterations=2) // 2 times to make sure cleanup is complete and we can create the same collection
+ public void testCreateDelete() throws Exception {
+ try {
+ CollectionAdminRequest.createCollection(collectionName, "conf", 2, 0, 4, 0)
+ .setMaxShardsPerNode(100)
+ .process(cluster.getSolrClient());
+ DocCollection docCollection = getCollectionState(collectionName);
+ assertNotNull(docCollection);
+ assertEquals("Expecting 2 shards",
+ 2, docCollection.getSlices().size());
+ assertEquals("Expecting 4 relpicas per shard",
+ 8, docCollection.getReplicas().size());
+ assertEquals("Expecting 8 append replicas, 4 per shard",
+ 8, docCollection.getReplicas(EnumSet.of(Replica.Type.APPEND)).size());
+ assertEquals("Expecting no realtime replicas",
+ 0, docCollection.getReplicas(EnumSet.of(Replica.Type.REALTIME)).size());
+ assertEquals("Expecting no passive replicas",
+ 0, docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)).size());
+ for (Slice s:docCollection.getSlices()) {
+ assertTrue(s.getLeader().getType() == Replica.Type.APPEND);
+ List<String> shardElectionNodes = cluster.getZkClient().getChildren(ZkStateReader.getShardLeadersElectPath(collectionName, s.getName()), null, true);
+ assertEquals("Unexpected election nodes for Shard: " + s.getName() + ": " + Arrays.toString(shardElectionNodes.toArray()),
+ 4, shardElectionNodes.size());
+ }
+ assertUlogPresence(docCollection);
+ } finally {
+ zkClient().printLayoutToStdOut();
+ }
+ }
+
+ public void testAddDocs() throws Exception {
+ int numAppendReplicas = 1 + random().nextInt(3);
+ DocCollection docCollection = createAndWaitForCollection(1, 0, numAppendReplicas, 0);
+ assertEquals(1, docCollection.getSlices().size());
+
+ cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
+ cluster.getSolrClient().commit(collectionName);
+
+ Slice s = docCollection.getSlices().iterator().next();
+ try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
+ assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+ }
+
+ TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS);
+ for (Replica r:s.getReplicas(EnumSet.of(Replica.Type.APPEND))) {
+ //TODO: assert replication < REPLICATION_TIMEOUT_SECS
+ try (HttpSolrClient appendReplicaClient = getHttpSolrClient(r.getCoreUrl())) {
+ while (true) {
+ try {
+ assertEquals("Replica " + r.getName() + " not up to date after 10 seconds",
+ 1, appendReplicaClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+ // Append replicas process all updates
+ SolrQuery req = new SolrQuery(
+ "qt", "/admin/plugins",
+ "stats", "true");
+ QueryResponse statsResponse = appendReplicaClient.query(req);
+// TODO: uncomment when SOLR-10569 is fixed
+// assertEquals("Append replicas should recive all updates. Replica: " + r + ", response: " + statsResponse,
+// 1L, ((NamedList<Object>)statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats", "cumulative_adds"));
+ break;
+ } catch (AssertionError e) {
+ if (t.hasTimedOut()) {
+ throw e;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ }
+ }
+ }
+ assertUlogPresence(docCollection);
+ }
+
+ public void testAddRemoveAppendReplica() throws Exception {
+ DocCollection docCollection = createAndWaitForCollection(2, 0, 1, 0);
+ assertEquals(2, docCollection.getSlices().size());
+
+ CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.APPEND).process(cluster.getSolrClient());
+ docCollection = assertNumberOfReplicas(0, 3, 0, true, false);
+ CollectionAdminRequest.addReplicaToShard(collectionName, "shard2", Replica.Type.APPEND).process(cluster.getSolrClient());
+ docCollection = assertNumberOfReplicas(0, 4, 0, true, false);
+
+ waitForState("Expecting collection to have 2 shards and 2 replica each", collectionName, clusterShape(2, 2));
+
+ //Delete passive replica from shard1
+ CollectionAdminRequest.deleteReplica(
+ collectionName,
+ "shard1",
+ docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.APPEND)).get(0).getName())
+ .process(cluster.getSolrClient());
+ assertNumberOfReplicas(0, 3, 0, true, true);
+ }
+
+ public void testRemoveLeader() throws Exception {
+ doReplaceLeader(true);
+ }
+
+ public void testKillLeader() throws Exception {
+ doReplaceLeader(false);
+ }
+
+ public void testPassiveReplicaStates() {
+ // Validate that passive replicas go through the correct states when starting, stopping, reconnecting
+ }
+
+ public void testPassiveReplicaCantConnectToZooKeeper() {
+
+ }
+
+ public void testRealTimeGet() {
+ // should be redirected to writers or error
+ }
+
+ /*
+ * validate leader election and that replication still happens on a new leader
+ */
+ private void doReplaceLeader(boolean removeReplica) throws Exception {
+ DocCollection docCollection = createAndWaitForCollection(1, 0, 2, 0);
+
+ // Add a document and commit
+ cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
+ cluster.getSolrClient().commit(collectionName);
+ Slice s = docCollection.getSlices().iterator().next();
+ try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
+ assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+ }
+
+ waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.APPEND)), REPLICATION_TIMEOUT_SECS);
+
+ // Delete leader replica from shard1
+ JettySolrRunner leaderJetty = null;
+ if (removeReplica) {
+ CollectionAdminRequest.deleteReplica(
+ collectionName,
+ "shard1",
+ s.getLeader().getName())
+ .process(cluster.getSolrClient());
+ } else {
+ leaderJetty = cluster.getReplicaJetty(s.getLeader());
+ ChaosMonkey.kill(leaderJetty);
+ waitForState("Leader replica not removed", collectionName, clusterShape(1, 1));
+ // Wait for cluster state to be updated
+ waitForState("Replica state not updated in cluster state",
+ collectionName, clusterStateReflectsActiveAndDownReplicas());
+ }
+ docCollection = assertNumberOfReplicas(0, 1, 0, true, true);
+
+ // Wait until a new leader is elected
+ TimeOut t = new TimeOut(30, TimeUnit.SECONDS);
+ while (!t.hasTimedOut()) {
+ docCollection = getCollectionState(collectionName);
+ Replica leader = docCollection.getSlice("shard1").getLeader();
+ if (leader != null && leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes())) {
+ break;
+ }
+ Thread.sleep(500);
+ }
+ assertFalse("Timeout waiting for a new leader to be elected", t.hasTimedOut());
+
+ // There is a new leader, I should be able to add and commit
+ cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"));
+ cluster.getSolrClient().commit(collectionName);
+
+ // Queries should still work
+ waitForNumDocsInAllReplicas(2, docCollection.getReplicas(EnumSet.of(Replica.Type.APPEND)), REPLICATION_TIMEOUT_SECS);
+ // Start back the node
+ if (removeReplica) {
+ CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.APPEND).process(cluster.getSolrClient());
+ } else {
+ ChaosMonkey.start(leaderJetty);
+ }
+ waitForState("Expected collection to be 1x2", collectionName, clusterShape(1, 2));
+ // added replica should replicate from the leader
+ waitForNumDocsInAllReplicas(2, docCollection.getReplicas(EnumSet.of(Replica.Type.APPEND)), REPLICATION_TIMEOUT_SECS);
+ }
+
+ public void testKillAppendReplica() throws Exception {
+ DocCollection docCollection = createAndWaitForCollection(1, 0, 2, 0);
+
+ waitForNumDocsInAllActiveReplicas(0);
+ cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
+ cluster.getSolrClient().commit(collectionName);
+ waitForNumDocsInAllActiveReplicas(1);
+
+ JettySolrRunner passiveReplicaJetty = cluster.getReplicaJetty(docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.APPEND)).get(0));
+ ChaosMonkey.kill(passiveReplicaJetty);
+ waitForState("Replica not removed", collectionName, activeReplicaCount(0, 1, 0));
+ // Also wait for the replica to be placed in state="down"
+ waitForState("Didn't update state", collectionName, clusterStateReflectsActiveAndDownReplicas());
+
+ cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "bar"));
+ cluster.getSolrClient().commit(collectionName);
+ waitForNumDocsInAllActiveReplicas(2);
+
+ ChaosMonkey.start(passiveReplicaJetty);
+ waitForState("Replica not added", collectionName, activeReplicaCount(0, 2, 0));
+ waitForNumDocsInAllActiveReplicas(2);
+ }
+
+ public void testSearchWhileReplicationHappens() {
+
+ }
+
+ public void testReplication() {
+ // Validate incremental replication
+ }
+
+ public void testOnlyLeaderIndexes() throws Exception {
+ createAndWaitForCollection(1, 0, 2, 0);
+
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+ new UpdateRequest()
+ .add(sdoc("id", "1"))
+ .add(sdoc("id", "2"))
+ .add(sdoc("id", "3"))
+ .add(sdoc("id", "4"))
+ .process(cloudClient, collectionName);
+
+ {
+ UpdateHandler updateHandler = getSolrCore(true).get(0).getUpdateHandler();
+ RefCounted<IndexWriter> iwRef = updateHandler.getSolrCoreState().getIndexWriter(null);
+ assertTrue("IndexWriter at leader must see updates ", iwRef.get().hasUncommittedChanges());
+ iwRef.decref();
+ }
+
+ for (SolrCore solrCore : getSolrCore(false)) {
+ RefCounted<IndexWriter> iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
+ assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges());
+ iwRef.decref();
+ }
+
+ checkRTG(1, 4, cluster.getJettySolrRunners());
+
+ new UpdateRequest()
+ .deleteById("1")
+ .deleteByQuery("id:2")
+ .process(cloudClient, collectionName);
+
+ // The DBQ is not processed at replicas, so we still can get doc2 and other docs by RTG
+ checkRTG(2,4, getSolrRunner(false));
+
+ new UpdateRequest()
+ .commit(cloudClient, collectionName);
+
+ waitForNumDocsInAllActiveReplicas(2);
+
+ // Update log roll over
+ for (SolrCore solrCore : getSolrCore(false)) {
+ UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
+ assertFalse(updateLog.hasUncommittedChanges());
+ }
+
+ // UpdateLog copy over old updates
+ for (int i = 15; i <= 150; i++) {
+ cloudClient.add(collectionName, sdoc("id",String.valueOf(i)));
+ if (random().nextInt(100) < 15 & i != 150) {
+ cloudClient.commit(collectionName);
+ }
+ }
+ checkRTG(120,150, cluster.getJettySolrRunners());
+ waitForReplicasCatchUp(20);
+ }
+
+ public void testRecovery() throws Exception {
+ boolean useKill = random().nextBoolean();
+ createAndWaitForCollection(1, 0, 2, 0);
+
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+ new UpdateRequest()
+ .add(sdoc("id", "3"))
+ .add(sdoc("id", "4"))
+ .commit(cloudClient, collectionName);
+ // Replica recovery
+ new UpdateRequest()
+ .add(sdoc("id", "5"))
+ .process(cloudClient, collectionName);
+ JettySolrRunner solrRunner = getSolrRunner(false).get(0);
+ if (useKill) {
+ ChaosMonkey.kill(solrRunner);
+ } else {
+ ChaosMonkey.stop(solrRunner);
+ }
+ new UpdateRequest()
+ .add(sdoc("id", "6"))
+ .process(cloudClient, collectionName);
+ ChaosMonkey.start(solrRunner);
+ waitForState("Replica didn't recover", collectionName, activeReplicaCount(0,2,0));
+ // We skip peerSync, so replica will always trigger commit on leader
+ waitForNumDocsInAllActiveReplicas(4);
+
+ // If I add the doc immediately, the leader fails to communicate with the follower with broken pipe. Related to SOLR-9555 I believe
+ //nocommit
+ Thread.sleep(10000);
+
+ // More Replica recovery testing
+ new UpdateRequest()
+ .add(sdoc("id", "7"))
+ .process(cloudClient, collectionName);
+ checkRTG(3,7, cluster.getJettySolrRunners());
+ DirectUpdateHandler2.commitOnClose = false;
+ ChaosMonkey.stop(solrRunner);
+ DirectUpdateHandler2.commitOnClose = true;
+ ChaosMonkey.start(solrRunner);
+ waitForState("Replica didn't recover", collectionName, activeReplicaCount(0,2,0));
+ checkRTG(3,7, cluster.getJettySolrRunners());
+ waitForNumDocsInAllActiveReplicas(5, 0);
+
+ // Test replica recovery apply buffer updates
+ Semaphore waitingForBufferUpdates = new Semaphore(0);
+ Semaphore waitingForReplay = new Semaphore(0);
+ RecoveryStrategy.testing_beforeReplayBufferingUpdates = () -> {
+ try {
+ waitingForReplay.release();
+ waitingForBufferUpdates.acquire();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail("Test interrupted: " + e.getMessage());
+ }
+ };
+ if (useKill) {
+ ChaosMonkey.kill(solrRunner);
+ } else {
+ ChaosMonkey.stop(solrRunner);
+ }
+ ChaosMonkey.start(solrRunner);
+ waitingForReplay.acquire();
+ new UpdateRequest()
+ .add(sdoc("id", "8"))
+ .add(sdoc("id", "9"))
+ .process(cloudClient, collectionName);
+ waitingForBufferUpdates.release();
+ RecoveryStrategy.testing_beforeReplayBufferingUpdates = null;
+ waitForState("Replica didn't recover", collectionName, activeReplicaCount(0,2,0));
+ checkRTG(3,9, cluster.getJettySolrRunners());
+ waitForNumDocsInAllActiveReplicas(5, 0);
+ for (SolrCore solrCore : getSolrCore(false)) {
+ RefCounted<IndexWriter> iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
+ assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges());
+ iwRef.decref();
+ }
+ }
+
+ public void testDeleteById() throws Exception{
+ createAndWaitForCollection(1,0,2,0);
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+ new UpdateRequest()
+ .deleteByQuery("*:*")
+ .commit(cluster.getSolrClient(), collectionName);
+ new UpdateRequest()
+ .add(sdoc("id", "1"))
+ .commit(cloudClient, collectionName);
+ waitForNumDocsInAllActiveReplicas(1);
+ new UpdateRequest()
+ .deleteById("1")
+ .process(cloudClient, collectionName);
+ boolean successs = false;
+ try {
+ checkRTG(1, 1, cluster.getJettySolrRunners());
+ successs = true;
+ } catch (AssertionError e) {
+ //expected
+ }
+ assertFalse("Doc1 is deleted but it's still exist", successs);
+ }
+
+ public void testBasicLeaderElection() throws Exception {
+ createAndWaitForCollection(1,0,2,0);
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+ new UpdateRequest()
+ .deleteByQuery("*:*")
+ .commit(cluster.getSolrClient(), collectionName);
+ new UpdateRequest()
+ .add(sdoc("id", "1"))
+ .add(sdoc("id", "2"))
+ .process(cloudClient, collectionName);
+ JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0);
+ ChaosMonkey.kill(oldLeaderJetty);
+ waitForState("Replica not removed", collectionName, activeReplicaCount(0, 1, 0));
+ new UpdateRequest()
+ .add(sdoc("id", "3"))
+ .add(sdoc("id", "4"))
+ .process(cloudClient, collectionName);
+ ChaosMonkey.start(oldLeaderJetty);
+ waitForState("Replica not added", collectionName, activeReplicaCount(0, 2, 0));
+ checkRTG(1,4, cluster.getJettySolrRunners());
+ new UpdateRequest()
+ .commit(cloudClient, collectionName);
+ waitForNumDocsInAllActiveReplicas(4, 0);
+ }
+
+ public void testOutOfOrderDBQWithInPlaceUpdates() throws Exception {
+ createAndWaitForCollection(1,0,2,0);
+ assertFalse(getSolrCore(true).get(0).getLatestSchema().getField("inplace_updatable_int").indexed());
+ assertFalse(getSolrCore(true).get(0).getLatestSchema().getField("inplace_updatable_int").stored());
+ assertTrue(getSolrCore(true).get(0).getLatestSchema().getField("inplace_updatable_int").hasDocValues());
+ List<UpdateRequest> updates = new ArrayList<>();
+ updates.add(simulatedUpdateRequest(null, "id", 1, "title_s", "title0_new", "inplace_updatable_int", 5, "_version_", 1L)); // full update
+ updates.add(simulatedDBQ("inplace_updatable_int:5", 3L));
+ updates.add(simulatedUpdateRequest(1L, "id", 1, "inplace_updatable_int", 6, "_version_", 2L));
+ for (JettySolrRunner solrRunner: getSolrRunner(false)) {
+ try (SolrClient client = solrRunner.newClient()) {
+ for (UpdateRequest up : updates) {
+ up.process(client, collectionName);
+ }
+ }
+ }
+ JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0);
+ ChaosMonkey.kill(oldLeaderJetty);
+ waitForState("Replica not removed", collectionName, activeReplicaCount(0, 1, 0));
+ ChaosMonkey.start(oldLeaderJetty);
+ waitForState("Replica not added", collectionName, activeReplicaCount(0, 2, 0));
+ checkRTG(1,1, cluster.getJettySolrRunners());
+ SolrDocument doc = cluster.getSolrClient().getById(collectionName,"1");
+ assertNotNull(doc.get("title_s"));
+ }
+
+ private UpdateRequest simulatedUpdateRequest(Long prevVersion, Object... fields) throws SolrServerException, IOException {
+ SolrInputDocument doc = sdoc(fields);
+
+ // get baseUrl of the leader
+ String baseUrl = getBaseUrl();
+
+ UpdateRequest ur = new UpdateRequest();
+ ur.add(doc);
+ ur.setParam("update.distrib", "FROMLEADER");
+ if (prevVersion != null) {
+ ur.setParam("distrib.inplace.prevversion", String.valueOf(prevVersion));
+ ur.setParam("distrib.inplace.update", "true");
+ }
+ ur.setParam("distrib.from", baseUrl);
+ return ur;
+ }
+
+ private UpdateRequest simulatedDBQ(String query, long version) throws SolrServerException, IOException {
+ String baseUrl = getBaseUrl();
+
+ UpdateRequest ur = new UpdateRequest();
+ ur.deleteByQuery(query);
+ ur.setParam("_version_", ""+version);
+ ur.setParam("update.distrib", "FROMLEADER");
+ ur.setParam("distrib.from", baseUrl);
+ return ur;
+ }
+
+ private String getBaseUrl() {
+ DocCollection collection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collectionName);
+ Slice slice = collection.getSlice("shard1");
+ return slice.getLeader().getCoreUrl();
+ }
+
+ private DocCollection createAndWaitForCollection(int numShards, int numRealtimeReplicas, int numAppendReplicas, int numPassiveReplicas) throws SolrServerException, IOException, KeeperException, InterruptedException {
+ CollectionAdminRequest.createCollection(collectionName, "conf", numShards, numRealtimeReplicas, numAppendReplicas, numPassiveReplicas)
+ .setMaxShardsPerNode(100)
+ .process(cluster.getSolrClient());
+ int numReplicasPerShard = numRealtimeReplicas + numAppendReplicas + numPassiveReplicas;
+ cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Is this needed?
+ waitForState("Expected collection to be created with " + numShards + " shards and " + numReplicasPerShard + " replicas",
+ collectionName, clusterShape(numShards, numReplicasPerShard));
+ return assertNumberOfReplicas(numRealtimeReplicas*numShards, numAppendReplicas*numShards, numPassiveReplicas*numShards, false, true);
+ }
+
+ private void waitForNumDocsInAllActiveReplicas(int numDocs) throws IOException, SolrServerException, InterruptedException {
+ waitForNumDocsInAllActiveReplicas(numDocs, REPLICATION_TIMEOUT_SECS);
+ }
+
+ private void waitForNumDocsInAllActiveReplicas(int numDocs, int timeout) throws IOException, SolrServerException, InterruptedException {
+ DocCollection docCollection = getCollectionState(collectionName);
+ waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> r.getState() == Replica.State.ACTIVE).collect(Collectors.toList()), timeout);
+ }
+
+ private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas, int timeout) throws IOException, SolrServerException, InterruptedException {
+ waitForNumDocsInAllReplicas(numDocs, replicas, "*:*", timeout);
+ }
+
+ private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas, String query, int timeout) throws IOException, SolrServerException, InterruptedException {
+ TimeOut t = new TimeOut(timeout, TimeUnit.SECONDS);
+ for (Replica r:replicas) {
+ if (!r.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes())) {
+ continue;
+ }
+ try (HttpSolrClient replicaClient = getHttpSolrClient(r.getCoreUrl())) {
+ while (true) {
+ try {
+ assertEquals("Replica " + r.getName() + " not up to date after " + REPLICATION_TIMEOUT_SECS + " seconds",
+ numDocs, replicaClient.query(new SolrQuery(query)).getResults().getNumFound());
+ break;
+ } catch (AssertionError e) {
+ if (t.hasTimedOut()) {
+ throw e;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void waitForDeletion(String collection) throws InterruptedException, KeeperException {
+ TimeOut t = new TimeOut(10, TimeUnit.SECONDS);
+ while (cluster.getSolrClient().getZkStateReader().getClusterState().hasCollection(collection)) {
+ try {
+ Thread.sleep(100);
+ if (t.hasTimedOut()) {
+ fail("Timed out waiting for collection " + collection + " to be deleted.");
+ }
+ cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collection);
+ } catch(SolrException e) {
+ return;
+ }
+
+ }
+ }
+
+ private DocCollection assertNumberOfReplicas(int numWriter, int numActive, int numPassive, boolean updateCollection, boolean activeOnly) throws KeeperException, InterruptedException {
+ if (updateCollection) {
+ cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
+ }
+ DocCollection docCollection = getCollectionState(collectionName);
+ assertNotNull(docCollection);
+ assertEquals("Unexpected number of writer replicas: " + docCollection, numWriter,
+ docCollection.getReplicas(EnumSet.of(Replica.Type.REALTIME)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ assertEquals("Unexpected number of passive replicas: " + docCollection, numPassive,
+ docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ assertEquals("Unexpected number of active replicas: " + docCollection, numActive,
+ docCollection.getReplicas(EnumSet.of(Replica.Type.APPEND)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+ return docCollection;
+ }
+
+ /*
+ * passes only if all replicas are active or down, and the "liveNodes" reflect the same status
+ */
+ private CollectionStatePredicate clusterStateReflectsActiveAndDownReplicas() {
+ return (liveNodes, collectionState) -> {
+ for (Replica r:collectionState.getReplicas()) {
+ if (r.getState() != Replica.State.DOWN && r.getState() != Replica.State.ACTIVE) {
+ return false;
+ }
+ if (r.getState() == Replica.State.DOWN && liveNodes.contains(r.getNodeName())) {
+ return false;
+ }
+ if (r.getState() == Replica.State.ACTIVE && !liveNodes.contains(r.getNodeName())) {
+ return false;
+ }
+ }
+ return true;
+ };
+ }
+
+
+ private CollectionStatePredicate activeReplicaCount(int numWriter, int numActive, int numPassive) {
+ return (liveNodes, collectionState) -> {
+ int writersFound = 0, activesFound = 0, passivesFound = 0;
+ if (collectionState == null)
+ return false;
+ for (Slice slice : collectionState) {
+ for (Replica replica : slice) {
+ if (replica.isActive(liveNodes))
+ switch (replica.getType()) {
+ case APPEND:
+ activesFound++;
+ break;
+ case PASSIVE:
+ passivesFound++;
+ break;
+ case REALTIME:
+ writersFound++;
+ break;
+ default:
+ throw new AssertionError("Unexpected replica type");
+ }
+ }
+ }
+ return numWriter == writersFound && numActive == activesFound && numPassive == passivesFound;
+ };
+ }
+
+ private List<SolrCore> getSolrCore(boolean isLeader) {
+ List<SolrCore> rs = new ArrayList<>();
+
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+ DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
+
+ for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
+ if (solrRunner.getCoreContainer() == null) continue;
+ for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) {
+ CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor();
+ Slice slice = docCollection.getSlice(cloudDescriptor.getShardId());
+ Replica replica = docCollection.getReplica(cloudDescriptor.getCoreNodeName());
+ if (slice.getLeader().equals(replica) && isLeader) {
+ rs.add(solrCore);
+ } else if (!slice.getLeader().equals(replica) && !isLeader) {
+ rs.add(solrCore);
+ }
+ }
+ }
+ return rs;
+ }
+
+ private void checkRTG(int from, int to, List<JettySolrRunner> solrRunners) throws Exception{
+ for (JettySolrRunner solrRunner: solrRunners) {
+ try (SolrClient client = solrRunner.newClient()) {
+ for (int i = from; i <= to; i++) {
+ SolrQuery query = new SolrQuery();
+ query.set("distrib", false);
+ query.setRequestHandler("/get");
+ query.set("id",i);
+ QueryResponse res = client.query(collectionName, query);
+ assertNotNull("Can not find doc "+ i + " in " + solrRunner.getBaseUrl(),res.getResponse().get("doc"));
+ }
+ }
+ }
+ }
+
+ private List<JettySolrRunner> getSolrRunner(boolean isLeader) {
+ List<JettySolrRunner> rs = new ArrayList<>();
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+ DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
+ for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
+ if (solrRunner.getCoreContainer() == null) continue;
+ for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) {
+ CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor();
+ Slice slice = docCollection.getSlice(cloudDescriptor.getShardId());
+ Replica replica = docCollection.getReplica(cloudDescriptor.getCoreNodeName());
+ if (slice.getLeader() == replica && isLeader) {
+ rs.add(solrRunner);
+ } else if (slice.getLeader() != replica && !isLeader) {
+ rs.add(solrRunner);
+ }
+ }
+ }
+ return rs;
+ }
+
+ private void waitForReplicasCatchUp(int numTry) throws IOException, InterruptedException {
+ String leaderTimeCommit = getSolrCore(true).get(0).getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
+ if (leaderTimeCommit == null) return;
+ for (int i = 0; i < numTry; i++) {
+ boolean inSync = true;
+ for (SolrCore solrCore : getSolrCore(false)) {
+ String replicateTimeCommit = solrCore.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
+ if (!leaderTimeCommit.equals(replicateTimeCommit)) {
+ inSync = false;
+ Thread.sleep(500);
+ break;
+ }
+ }
+ if (inSync) return;
+ }
+
+ fail("Some replicas are not in sync with leader");
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
index b592861..965c169 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
@@ -66,8 +66,7 @@ public class TestCloudRecovery extends SolrCloudTestCase {
onlyLeaderIndexes = random().nextBoolean();
CollectionAdminRequest
- .createCollection(COLLECTION, "config", 2, 2)
- .setRealtimeReplicas(onlyLeaderIndexes? 1: -1)
+ .createCollection(COLLECTION, "config", 2, onlyLeaderIndexes?0:2,onlyLeaderIndexes?2:0,0)
.setMaxShardsPerNode(2)
.process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d149074/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
index 8fbfee3..dd55f23 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
@@ -61,7 +61,6 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
public void test() throws Exception {
try (CloudSolrClient client = createCloudClient(null)) {
CollectionAdminRequest.Create req = CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1",2,2);
- req.setRealtimeReplicas(1);
req.setMaxShardsPerNode(2);
client.request(req);
createCollection(null, COLLECTION_NAME1, 1, 1, 1, client, null, "conf1");
@@ -173,7 +172,7 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
Map<String, Object> collection = (Map<String, Object>) collections.get(COLLECTION_NAME);
assertNotNull(collection);
assertEquals("conf1", collection.get("configName"));
- assertEquals("1", collection.get("realtimeReplicas"));
+// assertEquals("1", collection.get("realtimeReplicas"));
}
}