You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by tf...@apache.org on 2017/05/23 03:00:55 UTC
[4/5] lucene-solr:master: SOLR-10233: Add support for replica types
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 96e505a..7d15701 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -178,11 +178,13 @@ public class IndexFetcher {
public static final IndexFetchResult INDEX_FETCH_FAILURE = new IndexFetchResult("Fetching lastest index is failed", false, null);
public static final IndexFetchResult INDEX_FETCH_SUCCESS = new IndexFetchResult("Fetching latest index is successful", true, null);
public static final IndexFetchResult LOCK_OBTAIN_FAILED = new IndexFetchResult("Obtaining SnapPuller lock failed", false, null);
+ public static final IndexFetchResult CONTAINER_IS_SHUTTING_DOWN = new IndexFetchResult("I was asked to replicate but CoreContainer is shutting down", false, null);
public static final IndexFetchResult MASTER_VERSION_ZERO = new IndexFetchResult("Index in peer is empty and never committed yet", true, null);
public static final IndexFetchResult NO_INDEX_COMMIT_EXIST = new IndexFetchResult("No IndexCommit in local index", false, null);
public static final IndexFetchResult PEER_INDEX_COMMIT_DELETED = new IndexFetchResult("No files to download because IndexCommit in peer was deleted", false, null);
public static final IndexFetchResult LOCAL_ACTIVITY_DURING_REPLICATION = new IndexFetchResult("Local index modification during replication", false, null);
public static final IndexFetchResult EXPECTING_NON_LEADER = new IndexFetchResult("Replicating from leader but I'm the shard leader", false, null);
+ public static final IndexFetchResult LEADER_IS_NOT_ACTIVE = new IndexFetchResult("Replicating from leader but leader is not active", false, null);
IndexFetchResult(String message, boolean successful, Throwable exception) {
this.message = message;
@@ -352,17 +354,32 @@ public class IndexFetcher {
// when we are a bit more confident we may want to try a partial replication
// if the error is connection related or something, but we have to be careful
forceReplication = true;
+ LOG.info("Last replication failed, so I'll force replication");
}
try {
if (fetchFromLeader) {
+ assert !solrCore.isClosed(): "Replication should be stopped before closing the core";
Replica replica = getLeaderReplica();
CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
if (cd.getCoreNodeName().equals(replica.getName())) {
return IndexFetchResult.EXPECTING_NON_LEADER;
}
- masterUrl = replica.getCoreUrl();
- LOG.info("Updated masterUrl to " + masterUrl);
+ if (replica.getState() != Replica.State.ACTIVE) {
+ LOG.info("Replica {} is leader but it's state is {}, skipping replication", replica.getName(), replica.getState());
+ return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
+ }
+ if (!solrCore.getCoreContainer().getZkController().getClusterState().liveNodesContain(replica.getNodeName())) {
+ LOG.info("Replica {} is leader but it's not hosted on a live node, skipping replication", replica.getName());
+ return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
+ }
+ if (!replica.getCoreUrl().equals(masterUrl)) {
+ masterUrl = replica.getCoreUrl();
+ LOG.info("Updated masterUrl to {}", masterUrl);
+ // TODO: Do we need to set forceReplication = true?
+ } else {
+ LOG.debug("masterUrl didn't change");
+ }
}
//get the current 'replicateable' index version in the master
NamedList response;
@@ -410,6 +427,7 @@ public class IndexFetcher {
if (forceReplication && commit.getGeneration() != 0) {
// since we won't get the files for an empty index,
// we just clear ours and commit
+ LOG.info("New index in Master. Deleting mine...");
RefCounted<IndexWriter> iw = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(solrCore);
try {
iw.get().deleteAll();
@@ -422,6 +440,7 @@ public class IndexFetcher {
//there is nothing to be replicated
successfulInstall = true;
+ LOG.debug("Nothing to replicate, master's version is 0");
return IndexFetchResult.MASTER_VERSION_ZERO;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java b/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java
index bce374f..9f2b693 100644
--- a/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java
@@ -16,14 +16,18 @@
*/
package org.apache.solr.handler;
-import org.apache.solr.api.Api;
-import org.apache.solr.api.ApiBag;
-import org.apache.solr.handler.component.*;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.solr.api.Api;
+import org.apache.solr.api.ApiBag;
+import org.apache.solr.handler.component.HttpShardHandler;
+import org.apache.solr.handler.component.RealTimeGetComponent;
+import org.apache.solr.handler.component.SearchHandler;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+
public class RealTimeGetHandler extends SearchHandler {
@Override
@@ -33,6 +37,14 @@ public class RealTimeGetHandler extends SearchHandler {
names.add(RealTimeGetComponent.COMPONENT_NAME);
return names;
}
+
+
+ @Override
+ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
+ // Tell HttpShardHandlerthat this request should only be distributed to NRT replicas
+ req.getContext().put(HttpShardHandler.ONLY_NRT_REPLICAS, Boolean.TRUE);
+ super.handleRequestBody(req, rsp);
+ }
//////////////////////// SolrInfoMBeans methods //////////////////////
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index 94ff189..f3dcdeb 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -403,6 +403,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
if (!indexFetchLock.tryLock())
return IndexFetchResult.LOCK_OBTAIN_FAILED;
+ if (core.getCoreContainer().isShutDown()) {
+ LOG.warn("I was asked to replicate but CoreContainer is shutting down");
+ return IndexFetchResult.CONTAINER_IS_SHUTTING_DOWN;
+ }
try {
if (masterUrl != null) {
if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 02d8e8f..b05dd84 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -114,15 +114,7 @@ import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
import static org.apache.solr.common.cloud.DocCollection.RULE;
import static org.apache.solr.common.cloud.DocCollection.SNITCH;
import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
-import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.REALTIME_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.*;
import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
@@ -408,7 +400,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
AUTO_ADD_REPLICAS,
RULE,
SNITCH,
- REALTIME_REPLICAS);
+ PULL_REPLICAS,
+ TLOG_REPLICAS,
+ NRT_REPLICAS);
if (props.get(STATE_FORMAT) == null) {
props.put(STATE_FORMAT, "2");
@@ -635,7 +629,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
_ROUTE_,
CoreAdminParams.NAME,
INSTANCE_DIR,
- DATA_DIR);
+ DATA_DIR,
+ REPLICA_TYPE);
return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX);
}),
OVERSEERSTATUS_OP(OVERSEERSTATUS, (req, rsp, h) -> (Map) new LinkedHashMap<>()),
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
index 6746332..1710da9 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
@@ -227,6 +227,7 @@ public class CoreAdminHandler extends RequestHandlerBase implements PermissionNa
.put(CoreAdminParams.ROLES, CoreDescriptor.CORE_ROLES)
.put(CoreAdminParams.CORE_NODE_NAME, CoreDescriptor.CORE_NODE_NAME)
.put(ZkStateReader.NUM_SHARDS_PROP, CloudDescriptor.NUM_SHARDS)
+ .put(CoreAdminParams.REPLICA_TYPE, CloudDescriptor.REPLICA_TYPE)
.build();
protected static Map<String, String> buildCoreParams(SolrParams params) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 0c2c903..748982d 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -18,6 +18,7 @@
package org.apache.solr.handler.admin;
import java.lang.invoke.MethodHandles;
+import java.util.Objects;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.cloud.CloudDescriptor;
@@ -63,18 +64,20 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
Boolean onlyIfLeader = params.getBool("onlyIfLeader");
Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive");
- log.info("Going to wait for coreNodeName: " + coreNodeName + ", state: " + waitForState
- + ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader
- + ", onlyIfLeaderActive: " + onlyIfLeaderActive);
- int maxTries = 0;
+ CoreContainer coreContainer = it.handler.coreContainer;
+ // wait long enough for the leader conflict to work itself out plus a little extra
+ int conflictWaitMs = coreContainer.getZkController().getLeaderConflictResolveWait();
+ int maxTries = (int) Math.round(conflictWaitMs / 1000) + 3;
+ log.info("Going to wait for coreNodeName: {}, state: {}, checkLive: {}, onlyIfLeader: {}, onlyIfLeaderActive: {}, maxTime: {} s",
+ coreNodeName, waitForState, checkLive, onlyIfLeader, onlyIfLeaderActive, maxTries);
+
Replica.State state = null;
boolean live = false;
int retry = 0;
while (true) {
- CoreContainer coreContainer = it.handler.coreContainer;
try (SolrCore core = coreContainer.getCore(cname)) {
- if (core == null && retry == 30) {
+ if (core == null && retry == Math.min(30, maxTries)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "core not found:"
+ cname);
}
@@ -102,15 +105,6 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName);
}
- if (maxTries == 0) {
- // wait long enough for the leader conflict to work itself out plus a little extra
- int conflictWaitMs = coreContainer.getZkController().getLeaderConflictResolveWait();
- maxTries = (int) Math.round(conflictWaitMs / 1000) + 3;
- log.info("Will wait a max of " + maxTries + " seconds to see " + cname + " (" +
- cloudDescriptor.getShardId() + " of " +
- cloudDescriptor.getCollectionName() + ") have state: " + waitForState);
- }
-
ClusterState clusterState = coreContainer.getZkController().getClusterState();
DocCollection collection = clusterState.getCollection(collectionName);
Slice slice = collection.getSlice(cloudDescriptor.getShardId());
@@ -160,6 +154,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
String collection = null;
String leaderInfo = null;
String shardId = null;
+
try {
CloudDescriptor cloudDescriptor =
core.getCoreDescriptor().getCloudDescriptor();
@@ -175,8 +170,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
"I was asked to wait on state " + waitForState + " for "
+ shardId + " in " + collection + " on " + nodeName
+ " but I still do not see the requested state. I see state: "
- + state.toString() + " live:" + live + " leader from ZK: " + leaderInfo
- );
+ + Objects.toString(state) + " live:" + live + " leader from ZK: " + leaderInfo);
}
if (coreContainer.isShutDown()) {
@@ -185,7 +179,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
}
// solrcloud_debug
- if (log.isDebugEnabled()) {
+ if (log.isDebugEnabled() && core != null) {
try {
LocalSolrQueryRequest r = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index 8c0a9cb..4ec3b79 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -60,6 +60,14 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
public class HttpShardHandler extends ShardHandler {
+
+ /**
+ * If the request context map has an entry with this key and Boolean.TRUE as value,
+ * {@link #prepDistributed(ResponseBuilder)} will only include {@link org.apache.solr.common.cloud.Replica.Type#NRT} replicas as possible
+ * destination of the distributed request (or a leader replica of type {@link org.apache.solr.common.cloud.Replica.Type#TLOG}). This is used
+ * by the RealtimeGet handler, since other types of replicas shouldn't respond to RTG requests
+ */
+ public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
private HttpShardHandlerFactory httpShardHandlerFactory;
private CompletionService<ShardResponse> completionService;
@@ -349,9 +357,12 @@ public class HttpShardHandler extends ShardHandler {
// and make it a non-distributed request.
String ourSlice = cloudDescriptor.getShardId();
String ourCollection = cloudDescriptor.getCollectionName();
+ // Some requests may only be fulfilled by replicas of type Replica.Type.NRT
+ boolean onlyNrtReplicas = Boolean.TRUE == req.getContext().get(ONLY_NRT_REPLICAS);
if (rb.slices.length == 1 && rb.slices[0] != null
&& ( rb.slices[0].equals(ourSlice) || rb.slices[0].equals(ourCollection + "_" + ourSlice) ) // handle the <collection>_<slice> format
- && cloudDescriptor.getLastPublished() == Replica.State.ACTIVE) {
+ && cloudDescriptor.getLastPublished() == Replica.State.ACTIVE
+ && (!onlyNrtReplicas || cloudDescriptor.getReplicaType() == Replica.Type.NRT)) {
boolean shortCircuit = params.getBool("shortCircuit", true); // currently just a debugging parameter to check distrib search on a single node
String targetHandler = params.get(ShardParams.SHARDS_QT);
@@ -387,14 +398,36 @@ public class HttpShardHandler extends ShardHandler {
continue;
// throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "no such shard: " + sliceName);
}
+ Replica shardLeader = null;
final Collection<Replica> allSliceReplicas = slice.getReplicasMap().values();
final List<Replica> eligibleSliceReplicas = new ArrayList<>(allSliceReplicas.size());
for (Replica replica : allSliceReplicas) {
if (!clusterState.liveNodesContain(replica.getNodeName())
- || replica.getState() != Replica.State.ACTIVE) {
+ || replica.getState() != Replica.State.ACTIVE
+ || (onlyNrtReplicas && replica.getType() == Replica.Type.PULL)) {
continue;
}
+
+ if (onlyNrtReplicas && replica.getType() == Replica.Type.TLOG) {
+ if (shardLeader == null) {
+ try {
+ shardLeader = zkController.getZkStateReader().getLeaderRetry(cloudDescriptor.getCollectionName(), slice.getName());
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + slice.getName() + " in collection "
+ + cloudDescriptor.getCollectionName(), e);
+ } catch (SolrException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Exception finding leader for shard {} in collection {}. Collection State: {}",
+ slice.getName(), cloudDescriptor.getCollectionName(), zkController.getZkStateReader().getClusterState().getCollectionOrNull(cloudDescriptor.getCollectionName()));
+ }
+ throw e;
+ }
+ }
+ if (!replica.getName().equals(shardLeader.getName())) {
+ continue;
+ }
+ }
eligibleSliceReplicas.add(replica);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index c0ceddb..6d70435 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -16,6 +16,10 @@
*/
package org.apache.solr.handler.component;
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.common.params.CommonParams.ID;
+import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
+
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
@@ -24,6 +28,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -68,9 +73,9 @@ import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.DocList;
-import org.apache.solr.search.SolrDocumentFetcher;
import org.apache.solr.search.QParser;
import org.apache.solr.search.ReturnFields;
+import org.apache.solr.search.SolrDocumentFetcher;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.SolrReturnFields;
import org.apache.solr.search.SyntaxError;
@@ -82,10 +87,6 @@ import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.params.CommonParams.DISTRIB;
-import static org.apache.solr.common.params.CommonParams.ID;
-import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
-
public class RealTimeGetComponent extends SearchComponent
{
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -105,7 +106,21 @@ public class RealTimeGetComponent extends SearchComponent
SolrQueryRequest req = rb.req;
SolrQueryResponse rsp = rb.rsp;
SolrParams params = req.getParams();
-
+ CloudDescriptor cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor();
+
+ if (cloudDesc != null) {
+ Replica.Type replicaType = cloudDesc.getReplicaType();
+ if (replicaType != null) {
+ if (replicaType == Replica.Type.PULL) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ String.format(Locale.ROOT, "%s can't handle realtime get requests. Replicas of type %s do not support these type of requests",
+ cloudDesc.getCoreNodeName(),
+ Replica.Type.PULL));
+ }
+ // non-leader TLOG replicas should not respond to distrib /get requests, but internal requests are OK
+ }
+ }
+
if (!params.getBool(COMPONENT_NAME, true)) {
return;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index e7f6a7b..e481109 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -45,7 +45,7 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.BytesRefHash;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
import org.apache.solr.core.SolrCore;
@@ -122,12 +122,9 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
indexWriterCloseWaitsForMerges = updateHandlerInfo.indexWriterCloseWaitsForMerges;
ZkController zkController = core.getCoreContainer().getZkController();
- if (zkController != null) {
- DocCollection dc = zkController.getClusterState().getCollection(core.getCoreDescriptor().getCollectionName());
- if (dc.getRealtimeReplicas() == 1) {
- commitWithinSoftCommit = false;
- commitTracker.setOpenSearcher(true);
- }
+ if (zkController != null && core.getCoreDescriptor().getCloudDescriptor().getReplicaType() == Replica.Type.TLOG) {
+ commitWithinSoftCommit = false;
+ commitTracker.setOpenSearcher(true);
}
}
@@ -249,7 +246,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
cmd.overwrite = false;
}
try {
- if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+ if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
if (ulog != null) ulog.add(cmd);
return 1;
}
@@ -425,7 +422,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
deleteByIdCommands.increment();
deleteByIdCommandsCumulative.mark();
- if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0 ) {
+ if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0 ) {
if (ulog != null) ulog.delete(cmd);
return;
}
@@ -489,7 +486,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
deleteByQueryCommandsCumulative.mark();
boolean madeIt=false;
try {
- if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+ if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
if (ulog != null) ulog.deleteByQuery(cmd);
madeIt = true;
return;
@@ -548,7 +545,6 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
}
}
-
@Override
public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
mergeIndexesCommands.mark();
@@ -921,7 +917,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
* Calls either {@link IndexWriter#updateDocValues} or {@link IndexWriter#updateDocument} as
* needed based on {@link AddUpdateCommand#isInPlaceUpdate}.
* <p>
- * If the this is an UPDATE_INPLACE cmd, then all fields inclued in
+ * If the this is an UPDATE_INPLACE cmd, then all fields included in
* {@link AddUpdateCommand#getLuceneDocument} must either be the uniqueKey field, or be DocValue
* only fields.
* </p>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
index 49d2664..f0eb8bc 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
@@ -115,7 +115,7 @@ public abstract class UpdateHandler implements SolrInfoBean {
public UpdateHandler(SolrCore core) {
this(core, null);
}
-
+
public UpdateHandler(SolrCore core, UpdateLog updateLog) {
this.core=core;
idField = core.getLatestSchema().getUniqueKeyField();
@@ -124,7 +124,9 @@ public abstract class UpdateHandler implements SolrInfoBean {
PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName());
- if (updateLog == null && ulogPluginInfo != null && ulogPluginInfo.isEnabled()) {
+ // If this is a replica of type PULL, don't create the update log
+ boolean skipUpdateLog = core.getCoreDescriptor().getCloudDescriptor() != null && !core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog();
+ if (updateLog == null && ulogPluginInfo != null && ulogPluginInfo.isEnabled() && !skipUpdateLog) {
String dataDir = (String)ulogPluginInfo.initArgs.get("dir");
String ulogDir = core.getCoreDescriptor().getUlogDir();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index c50add4..87b93f4 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1092,7 +1092,7 @@ public static final int VERSION_IDX = 1;
/**
* Replay current tlog, so all updates will be written to index.
- * This is must do task for a append replica become a new leader.
+ * This is must do task for a tlog replica become a new leader.
* @return future of this task
*/
public Future<RecoveryInfo> recoverFromCurrentLog() {
@@ -1706,7 +1706,7 @@ public static final int VERSION_IDX = 1;
public void doReplay(TransactionLog translog) {
try {
- loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart);
+ loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart + " inSortedOrder=" + inSortedOrder);
long lastStatusTime = System.nanoTime();
if (inSortedOrder) {
tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart);
@@ -1786,7 +1786,7 @@ public static final int VERSION_IDX = 1;
recoveryInfo.adds++;
AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
- log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
+ if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
proc.processAdd(cmd);
break;
}
@@ -1854,6 +1854,7 @@ public static final int VERSION_IDX = 1;
// something wrong with the request?
}
assert TestInjection.injectUpdateLogReplayRandomPause();
+
}
CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index cb1b2fb..5269ecb 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -279,7 +280,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// this is set to true in the constructor if the next processors in the chain
// are custom and may modify the SolrInputDocument racing with its serialization for replication
private final boolean cloneRequiredOnLeader;
- private final boolean onlyLeaderIndexes;
+ private final Replica.Type replicaType;
public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
@@ -324,12 +325,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (cloudDesc != null) {
collection = cloudDesc.getCollectionName();
- ClusterState cstate = zkController.getClusterState();
- DocCollection coll = cstate.getCollection(collection);
- onlyLeaderIndexes = coll.getRealtimeReplicas() == 1;
+ replicaType = cloudDesc.getReplicaType();
} else {
collection = null;
- onlyLeaderIndexes = false;
+ replicaType = Replica.Type.NRT;
}
boolean shouldClone = false;
@@ -666,7 +665,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// used for deleteByQuery to get the list of nodes this leader should forward to
- private List<Node> setupRequest() {
+ private List<Node> setupRequestForDBQ() {
List<Node> nodes = null;
String shardId = cloudDesc.getShardId();
@@ -680,7 +679,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
forwardToLeader = false;
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
- .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN);
+ .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
if (replicaProps != null) {
nodes = new ArrayList<>(replicaProps.size());
for (ZkCoreNodeProps props : replicaProps) {
@@ -1190,7 +1189,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
checkDeleteByQueries = true;
}
}
- if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ if (replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
}
@@ -1576,7 +1575,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (zkEnabled && DistribPhase.TOLEADER == phase) {
// This core should be a leader
isLeader = true;
- replicas = setupRequest();
+ replicas = setupRequestForDBQ();
} else if (DistribPhase.FROMLEADER == phase) {
isLeader = false;
}
@@ -1610,8 +1609,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, myShardId);
+ // DBQ forwarded to NRT and TLOG replicas
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
- .getReplicaProps(collection, myShardId, leaderReplica.getName(), null, Replica.State.DOWN);
+ .getReplicaProps(collection, myShardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
if (replicaProps != null) {
final List<Node> myReplicas = new ArrayList<>(replicaProps.size());
for (ZkCoreNodeProps replicaProp : replicaProps) {
@@ -1699,10 +1699,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return;
}
- if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ if (replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ // TLOG replica not leader, don't write the DBQ to IW
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
-
doLocalDelete(cmd);
}
}
@@ -1857,7 +1857,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
- if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ if (replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
}
@@ -1884,14 +1884,19 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
zkCheck();
nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
- .getCloudDescriptor().getCollectionName());
- if (isLeader && nodes.size() == 1) {
+ .getCloudDescriptor().getCollectionName(), EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT));
+ if (nodes == null) {
+ // This could happen if there are only pull replicas
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unable to distribute commit operation. No replicas available of types " + Replica.Type.TLOG + " or " + Replica.Type.NRT);
+ }
+ if (isLeader && nodes.size() == 1 && replicaType != Replica.Type.PULL) {
singleLeader = true;
}
}
if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) {
- if (onlyLeaderIndexes) {
+ if (replicaType == Replica.Type.TLOG) {
try {
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, cloudDesc.getShardId());
@@ -1904,12 +1909,19 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
doLocalCommit(cmd);
} else {
assert TestInjection.waitForInSyncWithLeader(req.getCore(),
- zkController, collection, cloudDesc.getShardId());
+ zkController, collection, cloudDesc.getShardId()): "Core " + req.getCore() + " not in sync with leader";
}
} catch (InterruptedException e) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
}
+ } else if (replicaType == Replica.Type.PULL) {
+ log.warn("Commit not supported on replicas of type " + Replica.Type.PULL);
} else {
+ // NRT replicas will always commit
+ if (vinfo != null) {
+ long commitVersion = vinfo.getNewClock();
+ cmd.setVersion(commitVersion);
+ }
doLocalCommit(cmd);
}
} else {
@@ -1958,7 +1970,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
- private List<Node> getCollectionUrls(SolrQueryRequest req, String collection) {
+ private List<Node> getCollectionUrls(SolrQueryRequest req, String collection, EnumSet<Replica.Type> types) {
ClusterState clusterState = req.getCore()
.getCoreContainer().getZkController().getClusterState();
Map<String,Slice> slices = clusterState.getSlicesMap(collection);
@@ -1973,6 +1985,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
Map<String,Replica> shardMap = replicas.getReplicasMap();
for (Entry<String,Replica> entry : shardMap.entrySet()) {
+ if (!types.contains(entry.getValue().getType())) {
+ continue;
+ }
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
if (clusterState.liveNodesContain(nodeProps.getNodeName())) {
urls.add(new StdNode(nodeProps, collection, replicas.getName()));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/util/TestInjection.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index 6b7b1f8..5b0d047 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -75,7 +75,7 @@ public class TestInjection {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final Pattern ENABLED_PERCENT = Pattern.compile("(true|false)(?:\\:(\\d+))?$", Pattern.CASE_INSENSITIVE);
-
+
private static final String LUCENE_TEST_CASE_FQN = "org.apache.lucene.util.LuceneTestCase";
/**
@@ -151,6 +151,7 @@ public class TestInjection {
splitFailureBeforeReplicaCreation = null;
prepRecoveryOpPauseForever = null;
countPrepRecoveryOpPauseForever = new AtomicInteger(0);
+ waitForReplicasInSync = "true:60";
for (Timer timer : timers) {
timer.cancel();
@@ -387,9 +388,10 @@ public class TestInjection {
String localVersion = searcher.get().getIndexReader().getIndexCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
if (localVersion == null && leaderVersion == 0 && !core.getUpdateHandler().getUpdateLog().hasUncommittedChanges()) return true;
if (localVersion != null && Long.parseLong(localVersion) == leaderVersion && (leaderVersion >= t || i >= 6)) {
- log.info("Waiting time for replica in sync with leader: {}", System.currentTimeMillis()-currentTime);
+ log.info("Waiting time for tlog replica to be in sync with leader: {}", System.currentTimeMillis()-currentTime);
return true;
} else {
+ log.debug("Tlog replica not in sync with leader yet. Attempt: {}. Local Version={}, leader Version={}", i, localVersion, leaderVersion);
Thread.sleep(500);
}
} finally {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml
index 58f9551..a63f6cb 100644
--- a/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml
+++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml
@@ -63,6 +63,10 @@
<updateHandler class="solr.DirectUpdateHandler2">
+ <autoCommit>
+ <maxTime>${solr.autoCommit.maxTime:-1}</maxTime>
+ </autoCommit>
+
<!-- autocommit pending docs if certain criteria are met
<autoCommit>
<maxDocs>10000</maxDocs>
@@ -478,7 +482,7 @@
<str name="facet.query">foo_s:bar</str>
</lst>
</requestHandler>
-
+
<admin>
<defaultQuery>solr</defaultQuery>
<gettableFiles>solrconfig.xml schema.xml admin-extra.html</gettableFiles>
@@ -577,6 +581,8 @@
<str name="df">text</str>
</lst>
</initParams>
+
+
</config>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
index 059e58f..8da7d28 100644
--- a/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
+++ b/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
@@ -33,7 +33,7 @@
<commitWithin>
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
</commitWithin>
- <updateLog></updateLog>
+ <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
</updateHandler>
<requestHandler name="/select" class="solr.SearchHandler">
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java b/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java
index 3414759..a6d130e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java
@@ -32,6 +32,7 @@ import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.ClusterProp;
import org.apache.solr.client.solrj.response.RequestStatusState;
@@ -48,8 +49,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.params.ShardParams._ROUTE_;
-
/**
* This class implements the logic required to test Solr cloud backup/restore capability.
*/
@@ -84,11 +83,17 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
@Test
public void test() throws Exception {
boolean isImplicit = random().nextBoolean();
+ boolean doSplitShardOperation = !isImplicit && random().nextBoolean();
int replFactor = TestUtil.nextInt(random(), 1, 2);
+ int numTlogReplicas = TestUtil.nextInt(random(), 0, 1);
+ int numPullReplicas = TestUtil.nextInt(random(), 0, 1);
CollectionAdminRequest.Create create =
- CollectionAdminRequest.createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor);
- if (NUM_SHARDS * replFactor > cluster.getJettySolrRunners().size() || random().nextBoolean()) {
- create.setMaxShardsPerNode(NUM_SHARDS);//just to assert it survives the restoration
+ CollectionAdminRequest.createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor, numTlogReplicas, numPullReplicas);
+ if (NUM_SHARDS * (replFactor + numTlogReplicas + numPullReplicas) > cluster.getJettySolrRunners().size() || random().nextBoolean()) {
+ create.setMaxShardsPerNode((int)Math.ceil(NUM_SHARDS * (replFactor + numTlogReplicas + numPullReplicas) / cluster.getJettySolrRunners().size()));//just to assert it survives the restoration
+ if (doSplitShardOperation) {
+ create.setMaxShardsPerNode(create.getMaxShardsPerNode() * 2);
+ }
}
if (random().nextBoolean()) {
create.setAutoAddReplicas(true);//just to assert it survives the restoration
@@ -112,7 +117,7 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
indexDocs(getCollectionName());
- if (!isImplicit && random().nextBoolean()) {
+ if (doSplitShardOperation) {
// shard split the first shard
int prevActiveSliceCount = getActiveSliceCount(getCollectionName());
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(getCollectionName());
@@ -235,9 +240,9 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
CollectionAdminRequest.Restore restore = CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
.setLocation(backupLocation).setRepositoryName(getBackupRepoName());
- if (origShardToDocCount.size() > cluster.getJettySolrRunners().size()) {
+ if (backupCollection.getReplicas().size() > cluster.getJettySolrRunners().size()) {
// may need to increase maxShardsPerNode (e.g. if it was shard split, then now we need more)
- restore.setMaxShardsPerNode(origShardToDocCount.size());
+ restore.setMaxShardsPerNode((int)Math.ceil(backupCollection.getReplicas().size()/cluster.getJettySolrRunners().size()));
}
if (rarely()) { // Try with createNodeSet configuration
@@ -304,9 +309,11 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
Map<String,Integer> shardToDocCount = new TreeMap<>();
for (Slice slice : docCollection.getActiveSlices()) {
String shardName = slice.getName();
- long docsInShard = client.query(docCollection.getName(), new SolrQuery("*:*").setParam(_ROUTE_, shardName))
- .getResults().getNumFound();
- shardToDocCount.put(shardName, (int) docsInShard);
+ try (HttpSolrClient leaderClient = new HttpSolrClient.Builder(slice.getLeader().getCoreUrl()).withHttpClient(client.getHttpClient()).build()) {
+ long docsInShard = leaderClient.query(new SolrQuery("*:*").setParam("distrib", "false"))
+ .getResults().getNumFound();
+ shardToDocCount.put(shardName, (int) docsInShard);
+ }
}
return shardToDocCount;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/AssignTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/AssignTest.java b/solr/core/src/test/org/apache/solr/cloud/AssignTest.java
index 7593f3b..8e32510 100644
--- a/solr/core/src/test/org/apache/solr/cloud/AssignTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/AssignTest.java
@@ -87,4 +87,10 @@ public class AssignTest extends SolrTestCaseJ4 {
assertEquals("core_node2", nodeName);
}
+ @Test
+ public void testBuildCoreName() {
+ assertEquals("Core name pattern changed", "collection1_shard1_replica_n1", Assign.buildCoreName("collection1", "shard1", Replica.Type.NRT, 1));
+ assertEquals("Core name pattern changed", "collection1_shard2_replica_p2", Assign.buildCoreName("collection1", "shard2", Replica.Type.PULL,2));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
index 5eb4b3b..c8e92fc 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
@@ -62,8 +62,8 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
}
@Override
- protected int getRealtimeReplicas() {
- return onlyLeaderIndexes? 1 : -1;
+ protected boolean useTlogReplicas() {
+ return onlyLeaderIndexes;
}
@Test
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
index 1c23c9c..18caa58 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
@@ -119,8 +119,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
}
@Override
- protected int getRealtimeReplicas() {
- return onlyLeaderIndexes? 1 : -1;
+ protected boolean useTlogReplicas() {
+ return onlyLeaderIndexes;
}
@Override
@@ -1075,12 +1075,6 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
assertEquals(collection3Docs, collection2Docs - 1);
}
- protected SolrInputDocument getDoc(Object... fields) throws Exception {
- SolrInputDocument doc = new SolrInputDocument();
- addFields(doc, fields);
- return doc;
- }
-
protected void indexDoc(String collection, SolrInputDocument doc) throws IOException, SolrServerException {
List<SolrClient> clients = otherCollectionClients.get(collection);
int which = (doc.getField(id).toString().hashCode() & 0x7fffffff) % clients.size();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
index ffc5262..2e31520 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
@@ -16,33 +16,21 @@
*/
package org.apache.solr.cloud;
-import java.lang.invoke.MethodHandles;
-import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.http.client.HttpClient;
-import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
-import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
-import org.apache.solr.client.solrj.impl.HttpClientUtil;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.IOUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Slow
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
@@ -51,8 +39,6 @@ import org.slf4j.LoggerFactory;
public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase {
private static final int FAIL_TOLERANCE = 100;
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1"));
private final boolean onlyLeaderIndexes = random().nextBoolean();
@@ -112,8 +98,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
}
@Override
- protected int getRealtimeReplicas() {
- return onlyLeaderIndexes? 1 : -1;
+ protected boolean useTlogReplicas() {
+ return onlyLeaderIndexes;
}
@Test
@@ -158,8 +144,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
// it's currently hard to know what requests failed when using ConcurrentSolrUpdateServer
boolean runFullThrottle = random().nextBoolean();
if (runFullThrottle) {
- FullThrottleStoppableIndexingThread ftIndexThread = new FullThrottleStoppableIndexingThread(
- clients, "ft1", true);
+ FullThrottleStoppableIndexingThread ftIndexThread =
+ new FullThrottleStoppableIndexingThread(controlClient, cloudClient, clients, "ft1", true, this.clientSoTimeout);
threads.add(ftIndexThread);
ftIndexThread.start();
}
@@ -289,111 +275,6 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
return deleteFails;
}
- class FullThrottleStoppableIndexingThread extends StoppableIndexingThread {
- private CloseableHttpClient httpClient = HttpClientUtil.createClient(null);
- private volatile boolean stop = false;
- int clientIndex = 0;
- private ConcurrentUpdateSolrClient cusc;
- private List<SolrClient> clients;
- private AtomicInteger fails = new AtomicInteger();
-
- public FullThrottleStoppableIndexingThread(List<SolrClient> clients,
- String id, boolean doDeletes) {
- super(controlClient, cloudClient, id, doDeletes);
- setName("FullThrottleStopableIndexingThread");
- setDaemon(true);
- this.clients = clients;
-
- cusc = new ErrorLoggingConcurrentUpdateSolrClient(((HttpSolrClient) clients.get(0)).getBaseURL(), httpClient, 8, 2);
- cusc.setConnectionTimeout(10000);
- cusc.setSoTimeout(clientSoTimeout);
- }
-
- @Override
- public void run() {
- int i = 0;
- int numDeletes = 0;
- int numAdds = 0;
-
- while (true && !stop) {
- String id = this.id + "-" + i;
- ++i;
-
- if (doDeletes && random().nextBoolean() && deletes.size() > 0) {
- String delete = deletes.remove(0);
- try {
- numDeletes++;
- cusc.deleteById(delete);
- } catch (Exception e) {
- changeUrlOnError(e);
- fails.incrementAndGet();
- }
- }
-
- try {
- numAdds++;
- if (numAdds > (TEST_NIGHTLY ? 4002 : 197))
- continue;
- SolrInputDocument doc = getDoc(
- "id",
- id,
- i1,
- 50,
- t1,
- "Saxon heptarchies that used to rip around so in old times and raise Cain. My, you ought to seen old Henry the Eight when he was in bloom. He WAS a blossom. He used to marry a new wife every day, and chop off her head next morning. And he would do it just as indifferent as if ");
- cusc.add(doc);
- } catch (Exception e) {
- changeUrlOnError(e);
- fails.incrementAndGet();
- }
-
- if (doDeletes && random().nextBoolean()) {
- deletes.add(id);
- }
-
- }
-
- log.info("FT added docs:" + numAdds + " with " + fails + " fails" + " deletes:" + numDeletes);
- }
-
- private void changeUrlOnError(Exception e) {
- if (e instanceof ConnectException) {
- clientIndex++;
- if (clientIndex > clients.size() - 1) {
- clientIndex = 0;
- }
- cusc.shutdownNow();
- cusc = new ErrorLoggingConcurrentUpdateSolrClient(((HttpSolrClient) clients.get(clientIndex)).getBaseURL(),
- httpClient, 30, 3);
- }
- }
-
- @Override
- public void safeStop() {
- stop = true;
- cusc.blockUntilFinished();
- cusc.shutdownNow();
- IOUtils.closeQuietly(httpClient);
- }
-
- @Override
- public int getFailCount() {
- return fails.get();
- }
-
- @Override
- public Set<String> getAddFails() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Set<String> getDeleteFails() {
- throw new UnsupportedOperationException();
- }
-
- };
-
-
// skip the randoms - they can deadlock...
@Override
protected void indexr(Object... fields) throws Exception {
@@ -401,13 +282,4 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
indexDoc(doc);
}
- static class ErrorLoggingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient {
- public ErrorLoggingConcurrentUpdateSolrClient(String serverUrl, HttpClient httpClient, int queueSize, int threadCount) {
- super(serverUrl, httpClient, queueSize, threadCount, null, false);
- }
- @Override
- public void handleError(Throwable ex) {
- log.warn("cusc error", ex);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeWithPullReplicasTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeWithPullReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeWithPullReplicasTest.java
new file mode 100644
index 0000000..11c25d3
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeWithPullReplicasTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker;
+import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+
+@Slow
+@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
+@ThreadLeakLingering(linger = 60000)
+@SuppressObjectReleaseTracker(bugUrl="Testing purposes")
+public class ChaosMonkeyNothingIsSafeWithPullReplicasTest extends AbstractFullDistribZkTestBase {
+ private static final int FAIL_TOLERANCE = 100;
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1"));
+
+ private final boolean useTlogReplicas = random().nextBoolean();
+
+ private final int numPullReplicas;
+ private final int numRealtimeOrTlogReplicas;
+
+ protected int getPullReplicaCount() {
+ return numPullReplicas;
+ }
+
+ @BeforeClass
+ public static void beforeSuperClass() {
+ schemaString = "schema15.xml"; // we need a string id
+ if (usually()) {
+ System.setProperty("solr.autoCommit.maxTime", "15000");
+ }
+ TestInjection.waitForReplicasInSync = null;
+ setErrorHook();
+ }
+
+ @AfterClass
+ public static void afterSuperClass() {
+ System.clearProperty("solr.autoCommit.maxTime");
+ clearErrorHook();
+ TestInjection.reset();
+ }
+
+ protected static final String[] fieldNames = new String[]{"f_i", "f_f", "f_d", "f_l", "f_dt"};
+ protected static final RandVal[] randVals = new RandVal[]{rint, rfloat, rdouble, rlong, rdate};
+
+ private int clientSoTimeout;
+
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ public RandVal[] getRandValues() {
+ return randVals;
+ }
+
+ @Override
+ public void distribSetUp() throws Exception {
+ super.distribSetUp();
+ // can help to hide this when testing and looking at logs
+ //ignoreException("shard update error");
+ useFactory("solr.StandardDirectoryFactory");
+ }
+
+ public ChaosMonkeyNothingIsSafeWithPullReplicasTest() {
+ super();
+ numPullReplicas = random().nextInt(TEST_NIGHTLY ? 2 : 1) + 1;
+ numRealtimeOrTlogReplicas = random().nextInt(TEST_NIGHTLY ? 4 : 3) + 1;
+ sliceCount = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.slicecount", "-1"));
+ if (sliceCount == -1) {
+ sliceCount = random().nextInt(TEST_NIGHTLY ? 3 : 2) + 1;
+ }
+
+ int numNodes = sliceCount * (numRealtimeOrTlogReplicas + numPullReplicas);
+ fixShardCount(numNodes);
+ log.info("Starting ChaosMonkey test with {} shards and {} nodes", sliceCount, numNodes);
+
+ // None of the operations used here are particularly costly, so this should work.
+ // Using this low timeout will also help us catch index stalling.
+ clientSoTimeout = 5000;
+ }
+
+ @Override
+ protected boolean useTlogReplicas() {
+ return useTlogReplicas;
+ }
+
+ @Test
+ public void test() throws Exception {
+ cloudClient.setSoTimeout(clientSoTimeout);
+ DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION);
+ assertEquals(this.sliceCount, docCollection.getSlices().size());
+ Slice s = docCollection.getSlice("shard1");
+ assertNotNull(s);
+ assertEquals("Unexpected number of replicas. Collection: " + docCollection, numRealtimeOrTlogReplicas + numPullReplicas, s.getReplicas().size());
+ assertEquals("Unexpected number of pull replicas. Collection: " + docCollection, numPullReplicas, s.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
+ assertEquals(useTlogReplicas()?0:numRealtimeOrTlogReplicas, s.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
+ assertEquals(useTlogReplicas()?numRealtimeOrTlogReplicas:0, s.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
+
+ boolean testSuccessful = false;
+ try {
+ handle.clear();
+ handle.put("timestamp", SKIPVAL);
+ ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+ // make sure we have leaders for each shard
+ for (int j = 1; j < sliceCount; j++) {
+ zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard" + j, 10000);
+ } // make sure we again have leaders for each shard
+
+ waitForRecoveriesToFinish(false);
+
+ // we cannot do delete by query
+ // as it's not supported for recovery
+ del("*:*");
+
+ List<StoppableThread> threads = new ArrayList<>();
+ List<StoppableIndexingThread> indexTreads = new ArrayList<>();
+ int threadCount = TEST_NIGHTLY ? 3 : 1;
+ int i = 0;
+ for (i = 0; i < threadCount; i++) {
+ StoppableIndexingThread indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), true);
+ threads.add(indexThread);
+ indexTreads.add(indexThread);
+ indexThread.start();
+ }
+
+ threadCount = 1;
+ i = 0;
+ for (i = 0; i < threadCount; i++) {
+ StoppableSearchThread searchThread = new StoppableSearchThread(cloudClient);
+ threads.add(searchThread);
+ searchThread.start();
+ }
+
+ if (usually()) {
+ StoppableCommitThread commitThread = new StoppableCommitThread(cloudClient, 1000, false);
+ threads.add(commitThread);
+ commitThread.start();
+ }
+
+ // TODO: we only do this sometimes so that we can sometimes compare against control,
+ // it's currently hard to know what requests failed when using ConcurrentSolrUpdateServer
+ boolean runFullThrottle = random().nextBoolean();
+ if (runFullThrottle) {
+ FullThrottleStoppableIndexingThread ftIndexThread =
+ new FullThrottleStoppableIndexingThread(controlClient, cloudClient, clients, "ft1", true, this.clientSoTimeout);
+ threads.add(ftIndexThread);
+ ftIndexThread.start();
+ }
+
+ chaosMonkey.startTheMonkey(true, 10000);
+ try {
+ long runLength;
+ if (RUN_LENGTH != -1) {
+ runLength = RUN_LENGTH;
+ } else {
+ int[] runTimes;
+ if (TEST_NIGHTLY) {
+ runTimes = new int[] {5000, 6000, 10000, 15000, 25000, 30000,
+ 30000, 45000, 90000, 120000};
+ } else {
+ runTimes = new int[] {5000, 7000, 15000};
+ }
+ runLength = runTimes[random().nextInt(runTimes.length - 1)];
+ }
+ ChaosMonkey.wait(runLength, DEFAULT_COLLECTION, zkStateReader);
+ } finally {
+ chaosMonkey.stopTheMonkey();
+ }
+
+ // ideally this should go into chaosMonkey
+ restartZk(1000 * (5 + random().nextInt(4)));
+
+ for (StoppableThread indexThread : threads) {
+ indexThread.safeStop();
+ }
+
+ // start any downed jetties to be sure we still will end up with a leader per shard...
+
+ // wait for stop...
+ for (StoppableThread indexThread : threads) {
+ indexThread.join();
+ }
+
+ // try and wait for any replications and what not to finish...
+
+ ChaosMonkey.wait(2000, DEFAULT_COLLECTION, zkStateReader);
+
+ // wait until there are no recoveries...
+ waitForThingsToLevelOut(Integer.MAX_VALUE);//Math.round((runLength / 1000.0f / 3.0f)));
+
+ // make sure we again have leaders for each shard
+ for (int j = 1; j < sliceCount; j++) {
+ zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard" + j, 30000);
+ }
+
+ commit();
+
+ // TODO: assert we didnt kill everyone
+
+ zkStateReader.updateLiveNodes();
+ assertTrue(zkStateReader.getClusterState().getLiveNodes().size() > 0);
+
+
+ // we expect full throttle fails, but cloud client should not easily fail
+ for (StoppableThread indexThread : threads) {
+ if (indexThread instanceof StoppableIndexingThread && !(indexThread instanceof FullThrottleStoppableIndexingThread)) {
+ int failCount = ((StoppableIndexingThread) indexThread).getFailCount();
+ assertFalse("There were too many update fails (" + failCount + " > " + FAIL_TOLERANCE
+ + ") - we expect it can happen, but shouldn't easily", failCount > FAIL_TOLERANCE);
+ }
+ }
+
+ waitForReplicationFromReplicas(DEFAULT_COLLECTION, zkStateReader, new TimeOut(30, TimeUnit.SECONDS));
+// waitForAllWarmingSearchers();
+
+ Set<String> addFails = getAddFails(indexTreads);
+ Set<String> deleteFails = getDeleteFails(indexTreads);
+ // full throttle thread can
+ // have request fails
+ checkShardConsistency(!runFullThrottle, true, addFails, deleteFails);
+
+ long ctrlDocs = controlClient.query(new SolrQuery("*:*")).getResults()
+ .getNumFound();
+
+ // ensure we have added more than 0 docs
+ long cloudClientDocs = cloudClient.query(new SolrQuery("*:*"))
+ .getResults().getNumFound();
+
+ assertTrue("Found " + ctrlDocs + " control docs", cloudClientDocs > 0);
+
+ if (VERBOSE) System.out.println("control docs:"
+ + controlClient.query(new SolrQuery("*:*")).getResults()
+ .getNumFound() + "\n\n");
+
+ // try and make a collection to make sure the overseer has survived the expiration and session loss
+
+ // sometimes we restart zookeeper as well
+ if (random().nextBoolean()) {
+ restartZk(1000 * (5 + random().nextInt(4)));
+ }
+
+ try (CloudSolrClient client = createCloudClient("collection1")) {
+ // We don't really know how many live nodes we have at this point, so "maxShardsPerNode" needs to be > 1
+ createCollection(null, "testcollection",
+ 1, 1, 10, client, null, "conf1");
+ }
+ List<Integer> numShardsNumReplicas = new ArrayList<>(2);
+ numShardsNumReplicas.add(1);
+ numShardsNumReplicas.add(1 + getPullReplicaCount());
+ checkForCollection("testcollection", numShardsNumReplicas, null);
+
+ testSuccessful = true;
+ } finally {
+ if (!testSuccessful) {
+ logReplicaTypesReplicationInfo(DEFAULT_COLLECTION, cloudClient.getZkStateReader());
+ printLayout();
+ }
+ }
+ }
+
+ private Set<String> getAddFails(List<StoppableIndexingThread> threads) {
+ Set<String> addFails = new HashSet<String>();
+ for (StoppableIndexingThread thread : threads) {
+ addFails.addAll(thread.getAddFails());
+// addFails.addAll(thread.getAddFailsMinRf());
+ }
+ return addFails;
+ }
+
+ private Set<String> getDeleteFails(List<StoppableIndexingThread> threads) {
+ Set<String> deleteFails = new HashSet<String>();
+ for (StoppableIndexingThread thread : threads) {
+ deleteFails.addAll(thread.getDeleteFails());
+// deleteFails.addAll(thread.getDeleteFailsMinRf());
+ }
+ return deleteFails;
+ }
+
+ // skip the randoms - they can deadlock...
+ @Override
+ protected void indexr(Object... fields) throws Exception {
+ SolrInputDocument doc = getDoc(fields);
+ indexDoc(doc);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderWithPullReplicasTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderWithPullReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderWithPullReplicasTest.java
new file mode 100644
index 0000000..f2e8845
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderWithPullReplicasTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Slow
+@SuppressObjectReleaseTracker(bugUrl="Testing purposes")
+public class ChaosMonkeySafeLeaderWithPullReplicasTest extends AbstractFullDistribZkTestBase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1"));
+
+ private final boolean useTlogReplicas = random().nextBoolean();
+
+ private final int numPullReplicas;
+ private final int numRealtimeOrTlogReplicas;
+
+ @Override
+ protected int getPullReplicaCount() {
+ return numPullReplicas;
+ }
+
+ @Override
+ protected boolean useTlogReplicas() {
+ return useTlogReplicas;
+ }
+
+ @BeforeClass
+ public static void beforeSuperClass() {
+ schemaString = "schema15.xml"; // we need a string id
+ if (usually()) {
+ System.setProperty("solr.autoCommit.maxTime", "15000");
+ }
+ TestInjection.waitForReplicasInSync = null;
+ setErrorHook();
+ }
+
+ @AfterClass
+ public static void afterSuperClass() {
+ System.clearProperty("solr.autoCommit.maxTime");
+ clearErrorHook();
+ TestInjection.reset();
+ }
+
+ protected static final String[] fieldNames = new String[]{"f_i", "f_f", "f_d", "f_l", "f_dt"};
+ protected static final RandVal[] randVals = new RandVal[]{rint, rfloat, rdouble, rlong, rdate};
+
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ public RandVal[] getRandValues() {
+ return randVals;
+ }
+
+ @Override
+ public void distribSetUp() throws Exception {
+ useFactory("solr.StandardDirectoryFactory");
+ super.distribSetUp();
+ }
+
+ public ChaosMonkeySafeLeaderWithPullReplicasTest() {
+ super();
+ numPullReplicas = random().nextInt(TEST_NIGHTLY ? 3 : 2) + 1;;
+ numRealtimeOrTlogReplicas = random().nextInt(TEST_NIGHTLY ? 3 : 2) + 1;;
+ sliceCount = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.slicecount", "-1"));
+ if (sliceCount == -1) {
+ sliceCount = random().nextInt(TEST_NIGHTLY ? 3 : 2) + 1;
+ }
+
+ int numNodes = sliceCount * (numRealtimeOrTlogReplicas + numPullReplicas);
+ fixShardCount(numNodes);
+ log.info("Starting ChaosMonkey test with {} shards and {} nodes", sliceCount, numNodes);
+ }
+
+ @Test
+ public void test() throws Exception {
+ DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION);
+ assertEquals(this.sliceCount, docCollection.getSlices().size());
+ Slice s = docCollection.getSlice("shard1");
+ assertNotNull(s);
+ assertEquals("Unexpected number of replicas. Collection: " + docCollection, numRealtimeOrTlogReplicas + numPullReplicas, s.getReplicas().size());
+ assertEquals("Unexpected number of pull replicas. Collection: " + docCollection, numPullReplicas, s.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
+ assertEquals(useTlogReplicas()?0:numRealtimeOrTlogReplicas, s.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
+ assertEquals(useTlogReplicas()?numRealtimeOrTlogReplicas:0, s.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
+ handle.clear();
+ handle.put("timestamp", SKIPVAL);
+
+ // randomly turn on 1 seconds 'soft' commit
+ randomlyEnableAutoSoftCommit();
+
+ tryDelete();
+
+ List<StoppableThread> threads = new ArrayList<>();
+ int threadCount = 2;
+ int batchSize = 1;
+ if (random().nextBoolean()) {
+ batchSize = random().nextInt(98) + 2;
+ }
+
+ boolean pauseBetweenUpdates = TEST_NIGHTLY ? random().nextBoolean() : true;
+ int maxUpdates = -1;
+ if (!pauseBetweenUpdates) {
+ maxUpdates = 1000 + random().nextInt(1000);
+ } else {
+ maxUpdates = 15000;
+ }
+
+ for (int i = 0; i < threadCount; i++) {
+ StoppableIndexingThread indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), true, maxUpdates, batchSize, pauseBetweenUpdates); // random().nextInt(999) + 1
+ threads.add(indexThread);
+ indexThread.start();
+ }
+
+ StoppableCommitThread commitThread = new StoppableCommitThread(cloudClient, 1000, false);
+ threads.add(commitThread);
+ commitThread.start();
+
+ chaosMonkey.startTheMonkey(false, 500);
+ try {
+ long runLength;
+ if (RUN_LENGTH != -1) {
+ runLength = RUN_LENGTH;
+ } else {
+ int[] runTimes;
+ if (TEST_NIGHTLY) {
+ runTimes = new int[] {5000, 6000, 10000, 15000, 25000, 30000,
+ 30000, 45000, 90000, 120000};
+ } else {
+ runTimes = new int[] {5000, 7000, 15000};
+ }
+ runLength = runTimes[random().nextInt(runTimes.length - 1)];
+ }
+
+ ChaosMonkey.wait(runLength, DEFAULT_COLLECTION, cloudClient.getZkStateReader());
+ } finally {
+ chaosMonkey.stopTheMonkey();
+ }
+
+ for (StoppableThread thread : threads) {
+ thread.safeStop();
+ }
+
+ // wait for stop...
+ for (StoppableThread thread : threads) {
+ thread.join();
+ }
+
+ for (StoppableThread thread : threads) {
+ if (thread instanceof StoppableIndexingThread) {
+ assertEquals(0, ((StoppableIndexingThread)thread).getFailCount());
+ }
+ }
+
+ // try and wait for any replications and what not to finish...
+
+ Thread.sleep(2000);
+
+ waitForThingsToLevelOut(180000);
+
+ // even if things were leveled out, a jetty may have just been stopped or something
+ // we wait again and wait to level out again to make sure the system is not still in flux
+
+ Thread.sleep(3000);
+
+ waitForThingsToLevelOut(180000);
+
+ log.info("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");
+
+ waitForReplicationFromReplicas(DEFAULT_COLLECTION, cloudClient.getZkStateReader(), new TimeOut(30, TimeUnit.SECONDS));
+// waitForAllWarmingSearchers();
+
+ checkShardConsistency(batchSize == 1, true);
+
+ // try and make a collection to make sure the overseer has survived the expiration and session loss
+
+ // sometimes we restart zookeeper as well
+ if (random().nextBoolean()) {
+ zkServer.shutdown();
+ zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
+ zkServer.run();
+ }
+
+ try (CloudSolrClient client = createCloudClient("collection1")) {
+ createCollection(null, "testcollection", 1, 1, 100, client, null, "conf1");
+
+ }
+ List<Integer> numShardsNumReplicas = new ArrayList<>(2);
+ numShardsNumReplicas.add(1);
+ numShardsNumReplicas.add(1 + getPullReplicaCount());
+ checkForCollection("testcollection",numShardsNumReplicas, null);
+ }
+
+ private void tryDelete() throws Exception {
+ long start = System.nanoTime();
+ long timeout = start + TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
+ while (System.nanoTime() < timeout) {
+ try {
+ del("*:*");
+ break;
+ } catch (SolrServerException e) {
+ // cluster may not be up yet
+ e.printStackTrace();
+ }
+ Thread.sleep(100);
+ }
+ }
+
+ // skip the randoms - they can deadlock...
+ @Override
+ protected void indexr(Object... fields) throws Exception {
+ SolrInputDocument doc = new SolrInputDocument();
+ addFields(doc, fields);
+ addFields(doc, "rnd_b", true);
+ indexDoc(doc);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
index ed9ed41..ea8598b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
@@ -286,7 +286,7 @@ public class CollectionsAPIDistributedZkTest extends SolrCloudTestCase {
// first we make a core with the core name the collections api
// will try and use - this will cause our mock fail
Create createCmd = new Create();
- createCmd.setCoreName("halfcollection_shard1_replica1");
+ createCmd.setCoreName(Assign.buildCoreName("halfcollection", "shard1", Replica.Type.NRT, 1));
createCmd.setCollection("halfcollectionblocker");
String dataDir = createTempDir().toFile().getAbsolutePath();
createCmd.setDataDir(dataDir);
@@ -298,7 +298,7 @@ public class CollectionsAPIDistributedZkTest extends SolrCloudTestCase {
}
createCmd = new Create();
- createCmd.setCoreName("halfcollection_shard1_replica1");
+ createCmd.setCoreName(Assign.buildCoreName("halfcollection", "shard1", Replica.Type.NRT, 1));
createCmd.setCollection("halfcollectionblocker2");
dataDir = createTempDir().toFile().getAbsolutePath();
createCmd.setDataDir(dataDir);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index 0142c7a..643660b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -67,7 +67,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
Map<String, NamedList<Integer>> coresStatus = response.getCollectionCoresStatus();
assertEquals(4, coresStatus.size());
for (int i=0; i<4; i++) {
- NamedList<Integer> status = coresStatus.get(collectionName + "_shard" + (i/2+1) + "_replica" + (i%2+1));
+ NamedList<Integer> status = coresStatus.get(Assign.buildCoreName(collectionName, "shard" + (i/2+1), Replica.Type.NRT, (i%2+1)));
assertEquals(0, (int)status.get("status"));
assertTrue(status.get("QTime") > 0);
}
@@ -117,17 +117,17 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
@Test
public void testCreateAndDeleteShard() throws IOException, SolrServerException {
-
// Create an implicit collection
String collectionName = "solrj_implicit";
CollectionAdminResponse response
- = CollectionAdminRequest.createCollectionWithImplicitRouter(collectionName, "conf", "shardA,shardB", 1)
+ = CollectionAdminRequest.createCollectionWithImplicitRouter(collectionName, "conf", "shardA,shardB", 1, 1, 1)
+ .setMaxShardsPerNode(3)
.process(cluster.getSolrClient());
assertEquals(0, response.getStatus());
assertTrue(response.isSuccess());
Map<String, NamedList<Integer>> coresStatus = response.getCollectionCoresStatus();
- assertEquals(2, coresStatus.size());
+ assertEquals(6, coresStatus.size());
// Add a shard to the implicit collection
response = CollectionAdminRequest.createShard(collectionName, "shardC").process(cluster.getSolrClient());
@@ -135,8 +135,10 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
assertEquals(0, response.getStatus());
assertTrue(response.isSuccess());
coresStatus = response.getCollectionCoresStatus();
- assertEquals(1, coresStatus.size());
- assertEquals(0, (int) coresStatus.get(collectionName + "_shardC_replica1").get("status"));
+ assertEquals(3, coresStatus.size());
+ assertEquals(0, (int) coresStatus.get(Assign.buildCoreName(collectionName, "shardC", Replica.Type.NRT, 1)).get("status"));
+ assertEquals(0, (int) coresStatus.get(Assign.buildCoreName(collectionName, "shardC", Replica.Type.TLOG, 1)).get("status"));
+ assertEquals(0, (int) coresStatus.get(Assign.buildCoreName(collectionName, "shardC", Replica.Type.PULL, 1)).get("status"));
response = CollectionAdminRequest.deleteShard(collectionName, "shardC").process(cluster.getSolrClient());
@@ -174,8 +176,8 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
assertEquals(0, response.getStatus());
assertTrue(response.isSuccess());
Map<String, NamedList<Integer>> coresStatus = response.getCollectionCoresStatus();
- assertEquals(0, (int) coresStatus.get(collectionName + "_shard1_0_replica1").get("status"));
- assertEquals(0, (int) coresStatus.get(collectionName + "_shard1_1_replica1").get("status"));
+ assertEquals(0, (int) coresStatus.get(Assign.buildCoreName(collectionName, "shard1_0" , Replica.Type.NRT, 1)).get("status"));
+ assertEquals(0, (int) coresStatus.get(Assign.buildCoreName(collectionName, "shard1_1" , Replica.Type.NRT, 1)).get("status"));
waitForState("Expected all shards to be active and parent shard to be removed", collectionName, (n, c) -> {
if (c.getSlice("shard1").getState() == Slice.State.ACTIVE)