You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2017/05/25 17:39:41 UTC
[5/6] lucene-solr:jira/solr-8668: SOLR-10233: Add support for replica
types
SOLR-10233: Add support for replica types
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/2fc41d56
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/2fc41d56
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/2fc41d56
Branch: refs/heads/jira/solr-8668
Commit: 2fc41d565a4a0408a09856a37d3be7d87414ba3f
Parents: 1802d24
Author: Tomas Fernandez Lobbe <tf...@apache.org>
Authored: Mon May 22 19:44:01 2017 -0700
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Mon May 22 19:58:51 2017 -0700
----------------------------------------------------------------------
solr/CHANGES.txt | 14 +
.../org/apache/solr/cloud/AddReplicaCmd.java | 9 +-
.../src/java/org/apache/solr/cloud/Assign.java | 12 +-
.../org/apache/solr/cloud/CloudDescriptor.java | 25 +-
.../apache/solr/cloud/CreateCollectionCmd.java | 40 +-
.../org/apache/solr/cloud/CreateShardCmd.java | 48 +-
.../org/apache/solr/cloud/ElectionContext.java | 13 +-
.../java/org/apache/solr/cloud/MigrateCmd.java | 8 +-
.../org/apache/solr/cloud/MoveReplicaCmd.java | 4 +-
.../cloud/OverseerCollectionMessageHandler.java | 91 +-
.../org/apache/solr/cloud/RecoveryStrategy.java | 166 +++-
.../org/apache/solr/cloud/ReplaceNodeCmd.java | 4 +-
.../apache/solr/cloud/ReplicateFromLeader.java | 42 +-
.../java/org/apache/solr/cloud/RestoreCmd.java | 71 +-
.../org/apache/solr/cloud/SplitShardCmd.java | 4 +-
.../org/apache/solr/cloud/ZkController.java | 67 +-
.../solr/cloud/overseer/ReplicaMutator.java | 7 +-
.../solr/cloud/overseer/SliceMutator.java | 24 +-
.../solr/cloud/overseer/ZkStateWriter.java | 1 +
.../apache/solr/cloud/rule/ReplicaAssigner.java | 6 +-
.../org/apache/solr/core/CoreContainer.java | 38 +-
.../org/apache/solr/handler/IndexFetcher.java | 23 +-
.../apache/solr/handler/RealTimeGetHandler.java | 20 +-
.../apache/solr/handler/ReplicationHandler.java | 4 +
.../solr/handler/admin/CollectionsHandler.java | 17 +-
.../solr/handler/admin/CoreAdminHandler.java | 1 +
.../solr/handler/admin/PrepRecoveryOp.java | 30 +-
.../handler/component/HttpShardHandler.java | 37 +-
.../handler/component/RealTimeGetComponent.java | 27 +-
.../solr/update/DirectUpdateHandler2.java | 20 +-
.../org/apache/solr/update/UpdateHandler.java | 6 +-
.../java/org/apache/solr/update/UpdateLog.java | 7 +-
.../processor/DistributedUpdateProcessor.java | 51 +-
.../org/apache/solr/util/TestInjection.java | 6 +-
.../solr/collection1/conf/solrconfig.xml | 8 +-
.../cloud-minimal/conf/solrconfig.xml | 2 +-
.../AbstractCloudBackupRestoreTestCase.java | 29 +-
.../test/org/apache/solr/cloud/AssignTest.java | 6 +
.../solr/cloud/BasicDistributedZk2Test.java | 4 +-
.../solr/cloud/BasicDistributedZkTest.java | 10 +-
.../cloud/ChaosMonkeyNothingIsSafeTest.java | 136 +--
...MonkeyNothingIsSafeWithPullReplicasTest.java | 327 +++++++
...aosMonkeySafeLeaderWithPullReplicasTest.java | 254 ++++++
.../cloud/CollectionsAPIDistributedZkTest.java | 4 +-
.../solr/cloud/CollectionsAPISolrJTest.java | 18 +-
.../org/apache/solr/cloud/DeleteNodeTest.java | 11 +-
.../apache/solr/cloud/DeleteReplicaTest.java | 22 +-
.../org/apache/solr/cloud/ForceLeaderTest.java | 4 +-
.../FullThrottleStoppableIndexingThread.java | 156 ++++
.../apache/solr/cloud/HttpPartitionTest.java | 8 +-
.../LeaderInitiatedRecoveryOnCommitTest.java | 8 +-
.../solr/cloud/OnlyLeaderIndexesTest.java | 435 ----------
...verseerCollectionConfigSetProcessorTest.java | 3 +-
.../solr/cloud/RecoveryAfterSoftCommitTest.java | 8 +-
.../org/apache/solr/cloud/ReplaceNodeTest.java | 21 +-
.../solr/cloud/ReplicationFactorTest.java | 4 +-
.../org/apache/solr/cloud/ShardSplitTest.java | 6 -
.../apache/solr/cloud/TestCloudRecovery.java | 5 +-
.../apache/solr/cloud/TestCollectionAPI.java | 10 +-
.../org/apache/solr/cloud/TestPullReplica.java | 576 +++++++++++++
.../cloud/TestPullReplicaErrorHandling.java | 344 ++++++++
.../org/apache/solr/cloud/TestTlogReplica.java | 845 +++++++++++++++++++
.../cloud/hdfs/HdfsBasicDistributedZkTest.java | 4 +-
.../reporters/solr/SolrCloudReportersTest.java | 2 +-
.../solr/update/TestInPlaceUpdatesDistrib.java | 10 +-
.../solrj/impl/ConcurrentUpdateSolrClient.java | 23 +-
.../solrj/request/CollectionAdminRequest.java | 121 ++-
.../apache/solr/common/cloud/DocCollection.java | 56 +-
.../org/apache/solr/common/cloud/Replica.java | 32 +
.../org/apache/solr/common/cloud/Slice.java | 21 +-
.../apache/solr/common/cloud/ZkStateReader.java | 16 +-
.../solr/common/params/CoreAdminParams.java | 5 +
...ollectionAdminRequestRequiredParamsTest.java | 9 +
.../solr/BaseDistributedSearchTestCase.java | 3 +-
.../java/org/apache/solr/SolrTestCaseJ4.java | 31 +-
.../cloud/AbstractFullDistribZkTestBase.java | 264 +++++-
.../java/org/apache/solr/cloud/ChaosMonkey.java | 159 +++-
.../solr/cloud/StoppableCommitThread.java | 69 ++
.../solr/cloud/StoppableIndexingThread.java | 2 +-
.../solr/cloud/StoppableSearchThread.java | 2 +-
80 files changed, 4026 insertions(+), 1010 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7adac97..799df74 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -95,6 +95,20 @@ New Features
* SOLR-10431: Make it possible to invoke v2 api calls using SolrJ (Cao Manh Dat, Noble Paul, shalin)
+* SOLR-10233: Add support for different replica types, that can handle updates differently:
+ - NRT: Writes updates to transaction log and indexes locally. Replicas of type “NRT” support NRT
+ (soft commits) and RTG. Any NRT replica can become a leader. This was the only type supported
+ in SolrCloud until now and it’s the default type.
+ - TLOG: Writes to transaction log, but not to index, uses replication to copy segment files from the
+ shard leader. Any TLOG replica can become leader (by first applying all local transaction log
+ elements). If a replica is of type TLOG but is also the leader, it will behave as a NRT. This
+ is exactly what was added in SOLR-9835 (non-realtime replicas), just the API and naming changes.
+ - PULL: Doesn’t index or writes to transaction log, just replicates from the shard leader. PULL replicas
+ can’t become shard leaders (i.e., if there are only PULL replicas in the collection at some point,
+ updates will fail same as if there is no leaders, queries continue to work), so they don’t even
+ participate in elections.
+ (Tomás Fernández Löbbe)
+
Bug Fixes
----------------------
* SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
index 6bb3350..7338d9e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -72,6 +72,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
String node = message.getStr(CoreAdminParams.NODE);
String shard = message.getStr(SHARD_ID_PROP);
String coreName = message.getStr(CoreAdminParams.NAME);
+ Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()));
boolean parallel = message.getBool("parallel", false);
if (StringUtils.isBlank(coreName)) {
coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
@@ -93,7 +94,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
// Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
if (!skipCreateReplicaInClusterState) {
node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
- ocmh.overseer.getZkController().getCoreContainer()).get(0).nodeName;
+ ocmh.overseer.getZkController().getCoreContainer()).get(0).nodeName;// TODO: use replica type in this logic too
}
log.info("Node Identified {} for creating new replica", node);
@@ -101,7 +102,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
}
if (coreName == null) {
- coreName = Assign.buildCoreName(coll, shard);
+ coreName = Assign.buildCoreName(coll, shard, replicaType);
} else if (!skipCreateReplicaInClusterState) {
//Validate that the core name is unique in that collection
for (Slice slice : coll.getSlices()) {
@@ -126,7 +127,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node),
- ZkStateReader.NODE_NAME_PROP, node);
+ ZkStateReader.NODE_NAME_PROP, node,
+ ZkStateReader.REPLICA_TYPE, replicaType.name());
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
}
params.set(CoreAdminParams.CORE_NODE_NAME,
@@ -142,6 +144,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
params.set(CoreAdminParams.NAME, coreName);
params.set(COLL_CONF, configName);
params.set(CoreAdminParams.COLLECTION, collection);
+ params.set(CoreAdminParams.REPLICA_TYPE, replicaType.name());
if (shard != null) {
params.set(CoreAdminParams.SHARD, shard);
} else if (routeKey != null) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java
index ba03ccd..265e453 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java
@@ -107,12 +107,17 @@ public class Assign {
returnShardId = shardIdNames.get(0);
return returnShardId;
}
+
+ public static String buildCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) {
+ // TODO: Adding the suffix is great for debugging, but may be an issue if at some point we want to support a way to change replica type
+ return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum);
+ }
- static String buildCoreName(DocCollection collection, String shard) {
+ public static String buildCoreName(DocCollection collection, String shard, Replica.Type type) {
Slice slice = collection.getSlice(shard);
int replicaNum = slice.getReplicas().size();
for (; ; ) {
- String replicaName = collection.getName() + "_" + shard + "_replica" + replicaNum;
+ String replicaName = buildCoreName(collection.getName(), shard, type, replicaNum);
boolean exists = false;
for (Replica replica : slice.getReplicas()) {
if (replicaName.equals(replica.getStr(CORE_NAME_PROP))) {
@@ -121,9 +126,8 @@ public class Assign {
}
}
if (exists) replicaNum++;
- else break;
+ else return replicaName;
}
- return collection.getName() + "_" + shard + "_replica" + replicaNum;
}
static class ReplicaCount {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
index 719b1d1..32cb65b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
@@ -20,12 +20,13 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import com.google.common.base.Strings;
import org.apache.solr.common.StringUtils;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.util.PropertiesUtil;
+import com.google.common.base.Strings;
+
public class CloudDescriptor {
private final CoreDescriptor cd;
@@ -44,6 +45,13 @@ public class CloudDescriptor {
volatile Replica.State lastPublished = Replica.State.ACTIVE;
public static final String NUM_SHARDS = "numShards";
+
+ public static final String REPLICA_TYPE = "replicaType";
+
+ /**
+ * The type of replica this core hosts
+ */
+ private final Replica.Type replicaType;
public CloudDescriptor(String coreName, Properties props, CoreDescriptor cd) {
this.cd = cd;
@@ -57,7 +65,12 @@ public class CloudDescriptor {
if (Strings.isNullOrEmpty(nodeName))
this.nodeName = null;
this.numShards = PropertiesUtil.toInteger(props.getProperty(CloudDescriptor.NUM_SHARDS), null);
-
+ String replicaTypeStr = props.getProperty(CloudDescriptor.REPLICA_TYPE);
+ if (Strings.isNullOrEmpty(replicaTypeStr)) {
+ this.replicaType = Replica.Type.NRT;
+ } else {
+ this.replicaType = Replica.Type.valueOf(replicaTypeStr);
+ }
for (String propName : props.stringPropertyNames()) {
if (propName.startsWith(ZkController.COLLECTION_PARAM_PREFIX)) {
collectionParams.put(propName.substring(ZkController.COLLECTION_PARAM_PREFIX.length()), props.getProperty(propName));
@@ -65,6 +78,10 @@ public class CloudDescriptor {
}
}
+ public boolean requiresTransactionLog() {
+ return this.replicaType != Replica.Type.PULL;
+ }
+
public Replica.State getLastPublished() {
return lastPublished;
}
@@ -155,4 +172,8 @@ public class CloudDescriptor {
collectionParams.put(ent.getKey(), ent.getValue());
}
}
+
+ public Replica.Type getReplicaType() {
+ return replicaType;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index a1bb70e..3d1a54e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -60,8 +60,7 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.RANDOM;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.*;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
@@ -96,7 +95,9 @@ public class CreateCollectionCmd implements Cmd {
// look at the replication factor and see if it matches reality
// if it does not, find best nodes to create more cores
- int repFactor = message.getInt(REPLICATION_FACTOR, 1);
+ int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, 1));
+ int numPullReplicas = message.getInt(PULL_REPLICAS, 0);
+ int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
final String async = message.getStr(ASYNC);
@@ -116,8 +117,8 @@ public class CreateCollectionCmd implements Cmd {
int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
- if (repFactor <= 0) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, REPLICATION_FACTOR + " must be greater than 0");
+ if (numNrtReplicas + numTlogReplicas <= 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
}
if (numSlices <= 0) {
@@ -135,32 +136,33 @@ public class CreateCollectionCmd implements Cmd {
positionVsNodes = new HashMap<>();
} else {
- if (repFactor > nodeList.size()) {
- log.warn("Specified "
- + REPLICATION_FACTOR
- + " of "
- + repFactor
+ int totalNumReplicas = numNrtReplicas + numTlogReplicas + numPullReplicas;
+ if (totalNumReplicas > nodeList.size()) {
+ log.warn("Specified number of replicas of "
+ + totalNumReplicas
+ " on collection "
+ collectionName
- + " is higher than or equal to the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
+ + " is higher than the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
+ nodeList.size()
+ "). It's unusual to run two replica of the same slice on the same Solr-instance.");
}
int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size();
- int requestedShardsToCreate = numSlices * repFactor;
+ int requestedShardsToCreate = numSlices * totalNumReplicas;
if (maxShardsAllowedToCreate < requestedShardsToCreate) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
+ MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+ ", and the number of nodes currently live or live and part of your "+CREATE_NODE_SET+" is " + nodeList.size()
+ ". This allows a maximum of " + maxShardsAllowedToCreate
+ " to be created. Value of " + NUM_SLICES + " is " + numSlices
- + " and value of " + REPLICATION_FACTOR + " is " + repFactor
+ + ", value of " + NRT_REPLICAS + " is " + numNrtReplicas
+ + ", value of " + TLOG_REPLICAS + " is " + numTlogReplicas
+ + " and value of " + PULL_REPLICAS + " is " + numPullReplicas
+ ". This requires " + requestedShardsToCreate
+ " shards to be created (higher than the allowed number)");
}
- positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, message, shardNames, repFactor);
+ positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
}
ZkStateReader zkStateReader = ocmh.zkStateReader;
@@ -200,13 +202,13 @@ public class CreateCollectionCmd implements Cmd {
Map<String, String> requestMap = new HashMap<>();
- log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}",
- collectionName, shardNames, repFactor));
+ log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , nrtReplicas : {2}, tlogReplicas: {3}, pullReplicas: {4}",
+ collectionName, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas));
Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
for (Map.Entry<ReplicaAssigner.Position, String> e : positionVsNodes.entrySet()) {
ReplicaAssigner.Position position = e.getKey();
String nodeName = e.getValue();
- String coreName = collectionName + "_" + position.shard + "_replica" + (position.index + 1);
+ String coreName = Assign.buildCoreName(collectionName, position.shard, position.type, position.index + 1);
log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
, coreName, position.shard, collectionName, nodeName));
@@ -221,7 +223,8 @@ public class CreateCollectionCmd implements Cmd {
ZkStateReader.SHARD_ID_PROP, position.shard,
ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
- ZkStateReader.BASE_URL_PROP, baseUrl);
+ ZkStateReader.BASE_URL_PROP, baseUrl,
+ ZkStateReader.REPLICA_TYPE, position.type.name());
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
}
@@ -235,6 +238,7 @@ public class CreateCollectionCmd implements Cmd {
params.set(CoreAdminParams.SHARD, position.shard);
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
params.set(CoreAdminParams.NEW_COLLECTION, "true");
+ params.set(CoreAdminParams.REPLICA_TYPE, position.type.name());
if (async != null) {
String coreAdminAsyncId = async + Math.abs(System.nanoTime());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
index 52df32b..d3eb828 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
@@ -28,6 +28,7 @@ import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
@@ -41,7 +42,10 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
@@ -67,9 +71,18 @@ public class CreateShardCmd implements Cmd {
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
DocCollection collection = clusterState.getCollection(collectionName);
- int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
+// int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
+ int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
+ int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0));
+ int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0));
+ int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
+
+ if (numNrtReplicas + numTlogReplicas <= 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
+ }
+
Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
- List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, repFactor,
+ List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, totalReplicas,
createNodeSetStr, ocmh.overseer.getZkController().getCoreContainer());
ZkStateReader zkStateReader = ocmh.zkStateReader;
@@ -90,19 +103,38 @@ public class CreateShardCmd implements Cmd {
String async = message.getStr(ASYNC);
Map<String, String> requestMap = null;
if (async != null) {
- requestMap = new HashMap<>(repFactor, 1.0f);
+ requestMap = new HashMap<>(totalReplicas, 1.0f);
}
-
- for (int j = 1; j <= repFactor; j++) {
+
+ int createdNrtReplicas = 0, createdTlogReplicas = 0, createdPullReplicas = 0;
+
+ for (int j = 1; j <= totalReplicas; j++) {
+ int coreNameNumber;
+ Replica.Type typeToCreate;
+ if (createdNrtReplicas < numNrtReplicas) {
+ createdNrtReplicas++;
+ coreNameNumber = createdNrtReplicas;
+ typeToCreate = Replica.Type.NRT;
+ } else if (createdTlogReplicas < numTlogReplicas) {
+ createdTlogReplicas++;
+ coreNameNumber = createdTlogReplicas;
+ typeToCreate = Replica.Type.TLOG;
+ } else {
+ createdPullReplicas++;
+ coreNameNumber = createdPullReplicas;
+ typeToCreate = Replica.Type.PULL;
+ }
String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
- String shardName = collectionName + "_" + sliceName + "_replica" + j;
- log.info("Creating shard " + shardName + " as part of slice " + sliceName + " of collection " + collectionName
+ String coreName = Assign.buildCoreName(collectionName, sliceName, typeToCreate, coreNameNumber);
+// String coreName = collectionName + "_" + sliceName + "_replica" + j;
+ log.info("Creating replica " + coreName + " as part of slice " + sliceName + " of collection " + collectionName
+ " on " + nodeName);
// Need to create new params for each request
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
- params.set(CoreAdminParams.NAME, shardName);
+ params.set(CoreAdminParams.NAME, coreName);
+ params.set(CoreAdminParams.REPLICA_TYPE, typeToCreate.name());
params.set(COLL_CONF, configName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index bdbeca9..588262d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -326,6 +327,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
return;
}
+ Replica.Type replicaType;
+
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
@@ -338,6 +341,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
}
+ replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
+
// should I be leader?
if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
rejoinLeaderElection(core);
@@ -423,9 +428,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
try {
// we must check LIR before registering as leader
checkLIR(coreName, allReplicasInLine);
-
- boolean onlyLeaderIndexes = zkController.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
- if (onlyLeaderIndexes) {
+ if (replicaType == Replica.Type.TLOG) {
// stop replicate from old leader
zkController.stopReplicationFromLeader(coreName);
if (weAreReplacement) {
@@ -621,7 +624,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
// on startup and after connection timeout, wait for all known shards
- if (found >= slices.getReplicasMap().size()) {
+ if (found >= slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size()) {
log.info("Enough replicas found to continue.");
return true;
} else {
@@ -629,7 +632,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
log.info("Waiting until we see more replicas up for shard {}: total={}"
+ " found={}"
+ " timeoutin={}ms",
- shardId, slices.getReplicasMap().size(), found,
+ shardId, slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size(), found,
TimeUnit.MILLISECONDS.convert(timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS));
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
index 7b1ad2c..0ea5d6e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
@@ -51,7 +51,7 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_P
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
@@ -208,7 +208,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
Map<String, Object> props = makeMap(
Overseer.QUEUE_OPERATION, CREATE.toLower(),
NAME, tempSourceCollectionName,
- REPLICATION_FACTOR, 1,
+ NRT_REPLICAS, 1,
NUM_SLICES, 1,
COLL_CONF, configName,
CREATE_NODE_SET, sourceLeader.getNodeName());
@@ -224,7 +224,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000);
- String tempCollectionReplica1 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica1";
+ String tempCollectionReplica1 = Assign.buildCoreName(tempSourceCollectionName, tempSourceSlice.getName(), Replica.Type.NRT, 1);
String coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
sourceLeader.getNodeName(), tempCollectionReplica1);
// wait for the replicas to be seen as active on temp source leader
@@ -257,7 +257,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
tempSourceCollectionName, targetLeader.getNodeName());
- String tempCollectionReplica2 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica2";
+ String tempCollectionReplica2 = Assign.buildCoreName(tempSourceCollectionName, tempSourceSlice.getName(), Replica.Type.NRT, 2);
props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
props.put(COLLECTION_PROP, tempSourceCollectionName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
index 545989e..fed1398 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
@@ -111,7 +111,7 @@ public class MoveReplicaCmd implements Cmd{
private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async,
DocCollection coll, Replica replica, Slice slice) throws Exception {
- String newCoreName = Assign.buildCoreName(coll, slice.getName());
+ String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType());
ZkNodeProps removeReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
@@ -155,7 +155,7 @@ public class MoveReplicaCmd implements Cmd{
private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
DocCollection coll, Replica replica, Slice slice) throws Exception {
- String newCoreName = Assign.buildCoreName(coll, slice.getName());
+ String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType());
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
SHARD_ID_PROP, slice.getName(),
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 9b83140..1d51df7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -16,6 +16,51 @@
*/
package org.apache.solr.cloud;
+import static org.apache.solr.common.cloud.DocCollection.SNITCH;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BACKUP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESNAPSHOT;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETENODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESNAPSHOT;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REPLACENODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.RESTORE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.Utils.makeMap;
+
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -33,7 +78,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
@@ -80,21 +124,7 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.cloud.DocCollection.SNITCH;
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.common.util.Utils.makeMap;
+import com.google.common.collect.ImmutableMap;
/**
* A {@link OverseerMessageHandler} that handles Collections API related
@@ -130,9 +160,11 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
public static final Map<String, Object> COLL_PROPS = Collections.unmodifiableMap(makeMap(
ROUTER, DocRouter.DEFAULT_NAME,
ZkStateReader.REPLICATION_FACTOR, "1",
+ ZkStateReader.NRT_REPLICAS, "1",
+ ZkStateReader.TLOG_REPLICAS, "0",
+ ZkStateReader.PULL_REPLICAS, "0",
ZkStateReader.MAX_SHARDS_PER_NODE, "1",
ZkStateReader.AUTO_ADD_REPLICAS, "false",
- ZkStateReader.REALTIME_REPLICAS, "-1",
DocCollection.RULE, null,
SNITCH, null));
@@ -702,18 +734,33 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
List<String> nodeList,
ZkNodeProps message,
List<String> shardNames,
- int repFactor) throws IOException {
+ int numNrtReplicas,
+ int numTlogReplicas,
+ int numPullReplicas) throws IOException {
List<Map> rulesMap = (List) message.get("rule");
if (rulesMap == null) {
int i = 0;
Map<Position, String> result = new HashMap<>();
for (String aShard : shardNames) {
- for (int j = 0; j < repFactor; j++){
- result.put(new Position(aShard, j), nodeList.get(i % nodeList.size()));
+ for (int j = 0; j < numNrtReplicas; j++){
+ result.put(new Position(aShard, j, Replica.Type.NRT), nodeList.get(i % nodeList.size()));
+ i++;
+ }
+ for (int j = 0; j < numTlogReplicas; j++){
+ result.put(new Position(aShard, j, Replica.Type.TLOG), nodeList.get(i % nodeList.size()));
+ i++;
+ }
+ for (int j = 0; j < numPullReplicas; j++){
+ result.put(new Position(aShard, j, Replica.Type.PULL), nodeList.get(i % nodeList.size()));
i++;
}
}
return result;
+ } else {
+ if (numTlogReplicas + numPullReplicas != 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules");
+ }
}
List<Rule> rules = new ArrayList<>();
@@ -721,7 +768,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
Map<String, Integer> sharVsReplicaCount = new HashMap<>();
- for (String shard : shardNames) sharVsReplicaCount.put(shard, repFactor);
+ for (String shard : shardNames) sharVsReplicaCount.put(shard, numNrtReplicas);
ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
sharVsReplicaCount,
(List<Map>) message.get(SNITCH),
@@ -752,6 +799,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
if (result.size() == coreNames.size()) {
return result;
+ } else {
+ log.debug("Expecting {} cores but found {}", coreNames.size(), result.size());
}
if (timeout.hasTimedOut()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state.");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 7599b05..3449935 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -118,7 +118,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
private boolean recoveringAfterStartup;
private CoreContainer cc;
private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
- private boolean onlyLeaderIndexes;
+ private final Replica.Type replicaType;
protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
this.cc = cc;
@@ -128,8 +128,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
zkStateReader = zkController.getZkStateReader();
baseUrl = zkController.getBaseUrl();
coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
- String collection = cd.getCloudDescriptor().getCollectionName();
- onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
+ replicaType = cd.getCloudDescriptor().getReplicaType();
}
final public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() {
@@ -262,7 +261,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams());
ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
- ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);
+// ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);// Why do we need to open searcher if "onlyLeaderIndexes"?
+ ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
client);
}
@@ -296,9 +296,151 @@ public class RecoveryStrategy implements Runnable, Closeable {
MDCLoggingContext.clear();
}
}
+
+ final public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
+ if (core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog()) {
+ doSyncOrReplicateRecovery(core);
+ } else {
+ doReplicateOnlyRecovery(core);
+ }
+ }
+
+ final private void doReplicateOnlyRecovery(SolrCore core) throws InterruptedException {
+ boolean successfulRecovery = false;
+
+// if (core.getUpdateHandler().getUpdateLog() != null) {
+// SolrException.log(LOG, "'replicate-only' recovery strategy should only be used if no update logs are present, but this core has one: "
+// + core.getUpdateHandler().getUpdateLog());
+// return;
+// }
+ while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
+ try {
+ CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+ ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
+ cloudDesc.getCollectionName(), cloudDesc.getShardId());
+ final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
+ final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
+
+ String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
+
+ String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+
+ boolean isLeader = leaderUrl.equals(ourUrl); //TODO: We can probably delete most of this code if we say this strategy can only be used for pull replicas
+ if (isLeader && !cloudDesc.isLeader()) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
+ }
+ if (cloudDesc.isLeader()) {
+ assert cloudDesc.getReplicaType() != Replica.Type.PULL;
+ // we are now the leader - no one else must have been suitable
+ LOG.warn("We have not yet recovered - but we are now the leader!");
+ LOG.info("Finished recovery process.");
+ zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+ return;
+ }
+
+
+ LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
+ ourUrl);
+ zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
+
+ if (isClosed()) {
+ LOG.info("Recovery for core {} has been closed", core.getName());
+ break;
+ }
+ LOG.info("Starting Replication Recovery.");
+
+ try {
+ LOG.info("Stopping background replicate from leader process");
+ zkController.stopReplicationFromLeader(coreName);
+ replicate(zkController.getNodeName(), core, leaderprops);
+
+ if (isClosed()) {
+ LOG.info("Recovery for core {} has been closed", core.getName());
+ break;
+ }
+
+ LOG.info("Replication Recovery was successful.");
+ successfulRecovery = true;
+ } catch (Exception e) {
+ SolrException.log(LOG, "Error while trying to recover", e);
+ }
+
+ } catch (Exception e) {
+ SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e);
+ } finally {
+ if (successfulRecovery) {
+ LOG.info("Restaring background replicate from leader process");
+ zkController.startReplicationFromLeader(coreName, false);
+ LOG.info("Registering as Active after recovery.");
+ try {
+ zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+ } catch (Exception e) {
+ LOG.error("Could not publish as ACTIVE after succesful recovery", e);
+ successfulRecovery = false;
+ }
+
+ if (successfulRecovery) {
+ close = true;
+ recoveryListener.recovered();
+ }
+ }
+ }
+
+ if (!successfulRecovery) {
+ // lets pause for a moment and we need to try again...
+ // TODO: we don't want to retry for some problems?
+ // Or do a fall off retry...
+ try {
+
+ if (isClosed()) {
+ LOG.info("Recovery for core {} has been closed", core.getName());
+ break;
+ }
+
+ LOG.error("Recovery failed - trying again... (" + retries + ")");
+
+ retries++;
+ if (retries >= maxRetries) {
+ SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ").");
+ try {
+ recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
+ } catch (Exception e) {
+ SolrException.log(LOG, "Could not publish that recovery failed", e);
+ }
+ break;
+ }
+ } catch (Exception e) {
+ SolrException.log(LOG, "An error has occurred during recovery", e);
+ }
+
+ try {
+ // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
+ // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result
+ // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
+ // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
+ double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12;
+ LOG.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries);
+ for (int i = 0; i < loopCount; i++) {
+ if (isClosed()) {
+ LOG.info("Recovery for core {} has been closed", core.getName());
+ break; // check if someone closed us
+ }
+ Thread.sleep(startingRecoveryDelayMilliSeconds);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Recovery was interrupted.", e);
+ close = true;
+ }
+ }
+
+ }
+ // We skip core.seedVersionBuckets(); We don't have a transaction log
+ LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
+}
// TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
- final public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
+ final public void doSyncOrReplicateRecovery(SolrCore core) throws KeeperException, InterruptedException {
boolean replayed = false;
boolean successfulRecovery = false;
@@ -310,9 +452,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
core.getCoreDescriptor());
return;
}
-
- // we temporary ignore peersync for realtimeReplicas mode
- boolean firstTime = !onlyLeaderIndexes;
+
+ // we temporary ignore peersync for tlog replicas
+ boolean firstTime = replicaType != Replica.Type.TLOG;
List<Long> recentVersions;
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
@@ -364,7 +506,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
}
- if (onlyLeaderIndexes) {
+ if (replicaType == Replica.Type.TLOG) {
zkController.stopReplicationFromLeader(coreName);
}
@@ -521,8 +663,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (successfulRecovery) {
LOG.info("Registering as Active after recovery.");
try {
- if (onlyLeaderIndexes) {
- zkController.startReplicationFromLeader(coreName);
+ if (replicaType == Replica.Type.TLOG) {
+ zkController.startReplicationFromLeader(coreName, true);
}
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
} catch (Exception e) {
@@ -604,7 +746,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (testing_beforeReplayBufferingUpdates != null) {
testing_beforeReplayBufferingUpdates.run();
}
- if (onlyLeaderIndexes) {
+ if (replicaType == Replica.Type.TLOG) {
// roll over all updates during buffering to new tlog, make RTG available
SolrQueryRequest req = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
index 92c9afe..e4240be 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
@@ -154,9 +154,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
SHARD_ID_PROP, slice.getName(),
ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
ZkStateReader.REPLICA_PROP, replica.getName(),
+ ZkStateReader.REPLICA_TYPE, replica.getType().name(),
CoreAdminParams.NODE, source);
- sourceReplicas.add(props
- );
+ sourceReplicas.add(props);
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index 817b371..0800e0f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -49,7 +49,12 @@ public class ReplicateFromLeader {
this.coreName = coreName;
}
- public void startReplication() throws InterruptedException {
+ /**
+ * Start a replication handler thread that will periodically pull indices from the shard leader
+ * @param switchTransactionLog if true, ReplicationHandler will rotate the transaction log once
+ * the replication is done
+ */
+ public void startReplication(boolean switchTransactionLog) throws InterruptedException {
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
if (cc.isShutDown()) {
@@ -65,6 +70,7 @@ public class ReplicateFromLeader {
} else if (uinfo.autoSoftCommmitMaxTime != -1) {
pollIntervalStr = toPollIntervalStr(uinfo.autoSoftCommmitMaxTime/2);
}
+ LOG.info("Will start replication from leader with poll interval: {}", pollIntervalStr );
NamedList slaveConfig = new NamedList();
slaveConfig.add("fetchFromLeader", true);
@@ -78,20 +84,22 @@ public class ReplicateFromLeader {
}
replicationProcess = new ReplicationHandler();
- replicationProcess.setPollListener((solrCore, pollSuccess) -> {
- if (pollSuccess) {
- String commitVersion = getCommitVersion(core);
- if (commitVersion == null) return;
- if (Long.parseLong(commitVersion) == lastVersion) return;
- UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
- SolrQueryRequest req = new LocalSolrQueryRequest(core,
- new ModifiableSolrParams());
- CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
- cuc.setVersion(Long.parseLong(commitVersion));
- updateLog.copyOverOldUpdates(cuc);
- lastVersion = Long.parseLong(commitVersion);
- }
- });
+ if (switchTransactionLog) {
+ replicationProcess.setPollListener((solrCore, pollSuccess) -> {
+ if (pollSuccess) {
+ String commitVersion = getCommitVersion(core);
+ if (commitVersion == null) return;
+ if (Long.parseLong(commitVersion) == lastVersion) return;
+ UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
+ SolrQueryRequest req = new LocalSolrQueryRequest(core,
+ new ModifiableSolrParams());
+ CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+ cuc.setVersion(Long.parseLong(commitVersion));
+ updateLog.copyOverOldUpdates(cuc);
+ lastVersion = Long.parseLong(commitVersion);
+ }
+ });
+ }
replicationProcess.init(replicationConfig);
replicationProcess.inform(core);
}
@@ -119,6 +127,8 @@ public class ReplicateFromLeader {
}
public void stopReplication() {
- replicationProcess.close();
+ if (replicationProcess != null) {
+ replicationProcess.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
index 23b2fb5..76c12b8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
@@ -39,6 +39,7 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -63,11 +64,15 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.RANDOM;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_TYPE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
@@ -107,15 +112,23 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
zkStateReader.getClusterState().getLiveNodes(), message, RANDOM);
int numShards = backupCollectionState.getActiveSlices().size();
- int repFactor = message.getInt(REPLICATION_FACTOR, backupCollectionState.getReplicationFactor());
+
+ int numNrtReplicas = getInt(message, NRT_REPLICAS, backupCollectionState.getNumNrtReplicas(), 0);
+ if (numNrtReplicas == 0) {
+ numNrtReplicas = getInt(message, REPLICATION_FACTOR, backupCollectionState.getReplicationFactor(), 0);
+ }
+ int numTlogReplicas = getInt(message, TLOG_REPLICAS, backupCollectionState.getNumTlogReplicas(), 0);
+ int numPullReplicas = getInt(message, PULL_REPLICAS, backupCollectionState.getNumPullReplicas(), 0);
+ int totalReplicasPerShard = numNrtReplicas + numTlogReplicas + numPullReplicas;
+
int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, backupCollectionState.getMaxShardsPerNode());
int availableNodeCount = nodeList.size();
- if ((numShards * repFactor) > (availableNodeCount * maxShardsPerNode)) {
+ if ((numShards * totalReplicasPerShard) > (availableNodeCount * maxShardsPerNode)) {
throw new SolrException(ErrorCode.BAD_REQUEST,
String.format(Locale.ROOT, "Solr cloud with available number of nodes:%d is insufficient for"
- + " restoring a collection with %d shards, replication factor %d and maxShardsPerNode %d."
+ + " restoring a collection with %d shards, total replicas per shard %d and maxShardsPerNode %d."
+ " Consider increasing maxShardsPerNode value OR number of available nodes.",
- availableNodeCount, numShards, repFactor, maxShardsPerNode));
+ availableNodeCount, numShards, totalReplicasPerShard, maxShardsPerNode));
}
//Upload the configs
@@ -201,7 +214,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
Map<ReplicaAssigner.Position, String> positionVsNodes = ocmh.identifyNodes(clusterState, nodeList,
- message, sliceNames, repFactor);
+ message, sliceNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
//Create one replica per shard and copy backed up data to it
for (Slice slice : restoreCollection.getSlices()) {
@@ -210,6 +223,15 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
propMap.put(COLLECTION_PROP, restoreCollectionName);
propMap.put(SHARD_ID_PROP, slice.getName());
+
+ if (numNrtReplicas >= 1) {
+ propMap.put(REPLICA_TYPE, Replica.Type.NRT.name());
+ } else if (numTlogReplicas >= 1) {
+ propMap.put(REPLICA_TYPE, Replica.Type.TLOG.name());
+ } else {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected number of replicas, replicationFactor, " +
+ Replica.Type.NRT + " or " + Replica.Type.TLOG + " must be greater than 0");
+ }
// Get the first node matching the shard to restore in
String node;
@@ -261,17 +283,39 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
//refresh the location copy of collection state
restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
- //Add the remaining replicas for each shard
- Integer numReplicas = restoreCollection.getReplicationFactor();
- if (numReplicas != null && numReplicas > 1) {
+ if (totalReplicasPerShard > 1) {
log.info("Adding replicas to restored collection={}", restoreCollection);
-
for (Slice slice : restoreCollection.getSlices()) {
- for (int i = 1; i < numReplicas; i++) {
- log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
+
+ //Add the remaining replicas for each shard, considering it's type
+ int createdNrtReplicas = 0, createdTlogReplicas = 0, createdPullReplicas = 0;
+
+ // We already created either a NRT or an TLOG replica as leader
+ if (numNrtReplicas > 0) {
+ createdNrtReplicas++;
+ } else if (createdTlogReplicas > 0) {
+ createdTlogReplicas++;
+ }
+
+ for (int i = 1; i < totalReplicasPerShard; i++) {
+ Replica.Type typeToCreate;
+ if (createdNrtReplicas < numNrtReplicas) {
+ createdNrtReplicas++;
+ typeToCreate = Replica.Type.NRT;
+ } else if (createdTlogReplicas < numTlogReplicas) {
+ createdTlogReplicas++;
+ typeToCreate = Replica.Type.TLOG;
+ } else {
+ createdPullReplicas++;
+ typeToCreate = Replica.Type.PULL;
+ assert createdPullReplicas <= numPullReplicas: "Unexpected number of replicas";
+ }
+
+ log.debug("Adding replica for shard={} collection={} of type {} ", slice.getName(), restoreCollection, typeToCreate);
HashMap<String, Object> propMap = new HashMap<>();
propMap.put(COLLECTION_PROP, restoreCollectionName);
propMap.put(SHARD_ID_PROP, slice.getName());
+ propMap.put(REPLICA_TYPE, typeToCreate.name());
// Get the first node matching the shard to restore in
String node;
@@ -298,4 +342,9 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
}
+
+ private int getInt(ZkNodeProps message, String propertyName, Integer default1, int default2) {
+ Integer value = message.getInt(REPLICATION_FACTOR, default1);
+ return value!=null?value:default2;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
index 5a099e1..fe95458 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
@@ -205,7 +205,7 @@ public class SplitShardCmd implements Cmd {
for (int i = 0; i < subRanges.size(); i++) {
String subSlice = slice + "_" + i;
subSlices.add(subSlice);
- String subShardName = collectionName + "_" + subSlice + "_replica1";
+ String subShardName = Assign.buildCoreName(collectionName, subSlice, Replica.Type.NRT, 1);
subShardNames.add(subShardName);
}
@@ -385,7 +385,7 @@ public class SplitShardCmd implements Cmd {
Map<ReplicaAssigner.Position, String> nodeMap = ocmh.identifyNodes(clusterState,
new ArrayList<>(clusterState.getLiveNodes()),
new ZkNodeProps(collection.getProperties()),
- subSlices, repFactor - 1);
+ subSlices, repFactor - 1, 0, 0);
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 5e00c32..cb8175e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -62,6 +62,7 @@ import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.Type;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkACLProvider;
@@ -883,12 +884,20 @@ public class ZkController {
try {
// If we're a preferred leader, insert ourselves at the head of the queue
boolean joinAtHead = false;
- Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(),
- coreZkNodeName);
+ Replica replica = zkStateReader.getClusterState().getReplica(collection, coreZkNodeName);
if (replica != null) {
joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
}
- joinElection(desc, afterExpiration, joinAtHead);
+ //TODO WHy would replica be null?
+ if (replica == null || replica.getType() != Type.PULL) {
+ joinElection(desc, afterExpiration, joinAtHead);
+ } else if (replica.getType() == Type.PULL) {
+ if (joinAtHead) {
+ log.warn("Replica {} was designated as preferred leader but it's type is {}, It won't join election", coreZkNodeName, Type.PULL);
+ }
+ log.debug("Replica {} skipping election because it's type is {}", coreZkNodeName, Type.PULL);
+ startReplicationFromLeader(coreName, false);
+ }
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
@@ -905,6 +914,8 @@ public class ZkController {
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
log.debug("We are " + ourUrl + " and leader is " + leaderUrl);
boolean isLeader = leaderUrl.equals(ourUrl);
+ Replica.Type replicaType = zkStateReader.getClusterState().getCollection(collection).getReplica(coreZkNodeName).getType();
+ assert !(isLeader && replicaType == Type.PULL): "Pull replica became leader!";
try (SolrCore core = cc.getCore(desc.getName())) {
@@ -915,16 +926,15 @@ public class ZkController {
// leader election perhaps?
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- boolean onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
- boolean isReplicaInOnlyLeaderIndexes = onlyLeaderIndexes && !isLeader;
- if (isReplicaInOnlyLeaderIndexes) {
+ boolean isTlogReplicaAndNotLeader = replicaType == Replica.Type.TLOG && !isLeader;
+ if (isTlogReplicaAndNotLeader) {
String commitVersion = ReplicateFromLeader.getCommitVersion(core);
if (commitVersion != null) {
ulog.copyOverOldUpdates(Long.parseLong(commitVersion));
}
}
// we will call register again after zk expiration and on reload
- if (!afterExpiration && !core.isReloaded() && ulog != null && !isReplicaInOnlyLeaderIndexes) {
+ if (!afterExpiration && !core.isReloaded() && ulog != null && !isTlogReplicaAndNotLeader) {
// disable recovery in case shard is in construction state (for shard splits)
Slice slice = getClusterState().getSlice(collection, shardId);
if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) {
@@ -943,12 +953,13 @@ public class ZkController {
boolean didRecovery
= checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
if (!didRecovery) {
- if (isReplicaInOnlyLeaderIndexes) {
- startReplicationFromLeader(coreName);
+ if (isTlogReplicaAndNotLeader) {
+ startReplicationFromLeader(coreName, true);
}
publish(desc, Replica.State.ACTIVE);
}
+
core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
}
@@ -960,17 +971,25 @@ public class ZkController {
}
}
- public void startReplicationFromLeader(String coreName) throws InterruptedException {
+ public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) throws InterruptedException {
+ log.info("{} starting background replication from leader", coreName);
ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
- if (replicateFromLeaders.putIfAbsent(coreName, replicateFromLeader) == null) {
- replicateFromLeader.startReplication();
+ synchronized (replicateFromLeader) { // synchronize to prevent any stop before we finish the start
+ if (replicateFromLeaders.putIfAbsent(coreName, replicateFromLeader) == null) {
+ replicateFromLeader.startReplication(switchTransactionLog);
+ } else {
+ log.warn("A replicate from leader instance already exists for core {}", coreName);
+ }
}
}
public void stopReplicationFromLeader(String coreName) {
+ log.info("{} stopping background replication from leader", coreName);
ReplicateFromLeader replicateFromLeader = replicateFromLeaders.remove(coreName);
if (replicateFromLeader != null) {
- replicateFromLeader.stopReplication();
+ synchronized (replicateFromLeader) {
+ replicateFromLeader.stopReplication();
+ }
}
}
@@ -1191,6 +1210,7 @@ public class ZkController {
if (state != Replica.State.DOWN) {
final Replica.State lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
if (lirState != null) {
+ assert cd.getCloudDescriptor().getReplicaType() != Replica.Type.PULL: "LIR should not happen for pull replicas!";
if (state == Replica.State.ACTIVE) {
// trying to become active, so leader-initiated state must be recovering
if (lirState == Replica.State.RECOVERING) {
@@ -1217,6 +1237,7 @@ public class ZkController {
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
props.put(ZkStateReader.COLLECTION_PROP, collection);
+ props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
if (numShards != null) {
props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
}
@@ -1272,13 +1293,15 @@ public class ZkController {
assert false : "No collection was specified [" + collection + "]";
return;
}
+ Replica replica = zkStateReader.getClusterState().getReplica(collection, coreNodeName);
+
+ if (replica == null || replica.getType() != Type.PULL) {
+ ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));
- ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));
-
- if (context != null) {
- context.cancelElection();
+ if (context != null) {
+ context.cancelElection();
+ }
}
-
CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
zkStateReader.unregisterCore(cloudDescriptor.getCollectionName());
@@ -2408,11 +2431,9 @@ public class ZkController {
for (Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
- for (Replica replica : replicas) {
- if (replica.getName().equals(
- dcore.getCloudDescriptor().getCoreNodeName())) {
- return true;
- }
+ Replica r = slice.getReplica(dcore.getCloudDescriptor().getCoreNodeName());
+ if (r != null) {
+ return true;
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index f03eeeb..9758c8f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -271,11 +271,12 @@ public class ReplicaMutator {
replicaProps.putAll(message.getProperties());
if (slice != null) {
- Replica oldReplica = slice.getReplicasMap().get(coreNodeName);
+ Replica oldReplica = slice.getReplica(coreNodeName);
if (oldReplica != null) {
if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
}
+ replicaProps.put(ZkStateReader.REPLICA_TYPE, oldReplica.getType().toString());
// Move custom props over.
for (Map.Entry<String, Object> ent : oldReplica.getProperties().entrySet()) {
if (ent.getKey().startsWith(COLL_PROP_PREFIX)) {
@@ -311,6 +312,8 @@ public class ReplicaMutator {
Replica replica = new Replica(coreNodeName, replicaProps);
+
+ log.debug("Will update state for replica: {}", replica);
Map<String, Object> sliceProps = null;
Map<String, Replica> replicas;
@@ -328,11 +331,11 @@ public class ReplicaMutator {
sliceProps.put(ZkStateReader.STATE_PROP, shardState);
sliceProps.put(Slice.PARENT, shardParent);
}
-
replicas.put(replica.getName(), replica);
slice = new Slice(sliceName, replicas, sliceProps);
DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
+ log.debug("Collection is now: {}", newCollection);
return new ZkWriteCommand(collectionName, newCollection);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 4d767ed..5724f17 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -16,14 +16,16 @@
*/
package org.apache.solr.cloud.overseer;
-import java.lang.invoke.MethodHandles;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
+import static org.apache.solr.common.util.Utils.makeMap;
+import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
-import com.google.common.collect.ImmutableSet;
import org.apache.solr.cloud.Assign;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.cloud.ClusterState;
@@ -37,9 +39,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
-import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
-import static org.apache.solr.common.util.Utils.makeMap;
+import com.google.common.collect.ImmutableSet;
public class SliceMutator {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -66,14 +66,14 @@ public class SliceMutator {
log.error("Invalid Collection/Slice {}/{} ", coll, slice);
return ZkStateWriter.NO_OP;
}
-
String coreNodeName = Assign.assignNode(collection);
Replica replica = new Replica(coreNodeName,
makeMap(
ZkStateReader.CORE_NAME_PROP, message.getStr(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.BASE_URL_PROP, message.getStr(ZkStateReader.BASE_URL_PROP),
ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP),
- ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP)));
+ ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP),
+ ZkStateReader.REPLICA_TYPE, message.get(ZkStateReader.REPLICA_TYPE)));
return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
}
@@ -249,13 +249,15 @@ public class SliceMutator {
}
public static DocCollection updateReplica(DocCollection collection, final Slice slice, String coreNodeName, final Replica replica) {
- Map<String, Replica> copy = slice.getReplicasCopy();
+ Map<String, Replica> replicasCopy = slice.getReplicasCopy();
if (replica == null) {
- copy.remove(coreNodeName);
+ replicasCopy.remove(coreNodeName);
} else {
- copy.put(replica.getName(), replica);
+ replicasCopy.put(replica.getName(), replica);
}
- Slice newSlice = new Slice(slice.getName(), copy, slice.getProperties());
+ Slice newSlice = new Slice(slice.getName(), replicasCopy, slice.getProperties());
+ log.debug("Old Slice: {}", slice);
+ log.debug("New Slice: {}", newSlice);
return CollectionMutator.updateSlice(collection.getName(), collection, newSlice);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index da0f57c..911a9e3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -261,6 +261,7 @@ public class ZkStateWriter {
}
}
+ log.trace("New Cluster State is: {}", clusterState);
return clusterState;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
index 3eab8b4..506e158 100644
--- a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
+++ b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
@@ -62,10 +62,12 @@ public class ReplicaAssigner {
public static class Position implements Comparable<Position> {
public final String shard;
public final int index;
+ public final Replica.Type type;
- public Position(String shard, int replicaIdx) {
+ public Position(String shard, int replicaIdx, Replica.Type type) {
this.shard = shard;
this.index = replicaIdx;
+ this.type = type;
}
@Override
@@ -188,7 +190,7 @@ public class ReplicaAssigner {
List<Position> positions = new ArrayList<>();
for (int pos : p) {
for (int j = 0; j < shardVsReplicaCount.get(shardNames.get(pos)); j++) {
- positions.add(new Position(shardNames.get(pos), j));
+ positions.add(new Position(shardNames.get(pos), j, Replica.Type.NRT));
}
}
Collections.sort(positions);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 3ff5135..7471c08 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -16,6 +16,17 @@
*/
package org.apache.solr.core;
+import static java.util.Objects.requireNonNull;
+import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
+import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
+import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
+import static org.apache.solr.common.params.CommonParams.ZK_PATH;
+import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
+
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
@@ -56,6 +67,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.Utils;
@@ -95,17 +107,7 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.util.Objects.requireNonNull;
-import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
-import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
-import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
-import static org.apache.solr.common.params.CommonParams.ZK_PATH;
import static org.apache.solr.core.CorePropertiesLocator.PROPERTIES_FILENAME;
-import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
/**
*
@@ -1185,10 +1187,15 @@ public class CoreContainer {
SolrCore newCore = core.reload(coreConfig);
registerCore(cd, newCore, false, false);
if (getZkController() != null) {
- boolean onlyLeaderIndexes = getZkController().getClusterState().getCollection(cd.getCollectionName()).getRealtimeReplicas() == 1;
- if (onlyLeaderIndexes && !cd.getCloudDescriptor().isLeader()) {
+ DocCollection docCollection = getZkController().getClusterState().getCollection(cd.getCollectionName());
+ Replica replica = docCollection.getReplica(cd.getCloudDescriptor().getCoreNodeName());
+ assert replica != null;
+ if (replica.getType() == Replica.Type.TLOG) { //TODO: needed here?
getZkController().stopReplicationFromLeader(core.getName());
- getZkController().startReplicationFromLeader(newCore.getName());
+ if (!cd.getCloudDescriptor().isLeader()) {
+ getZkController().startReplicationFromLeader(newCore.getName(), true);
+ }
+
}
}
} catch (SolrCoreState.CoreIsClosedException e) {
@@ -1274,6 +1281,11 @@ public class CoreContainer {
if (zkSys.getZkController() != null) {
// cancel recovery in cloud mode
core.getSolrCoreState().cancelRecovery();
+ if (core.getCoreDescriptor().getCloudDescriptor().getReplicaType() == Replica.Type.PULL
+ || core.getCoreDescriptor().getCloudDescriptor().getReplicaType() == Replica.Type.TLOG) {
+ // Stop replication if this is part of a pull/tlog replica before closing the core
+ zkSys.getZkController().stopReplicationFromLeader(name);
+ }
}
core.unloadOnClose(cd, deleteIndexDir, deleteDataDir, deleteInstanceDir);