You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2017/07/28 11:00:14 UTC
[2/2] lucene-solr:feature/autoscaling: SOLR-10397: Remove old
implementation of autoAddReplicas features
SOLR-10397: Remove old implementation of autoAddReplicas features
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0f7e3be5
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0f7e3be5
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0f7e3be5
Branch: refs/heads/feature/autoscaling
Commit: 0f7e3be589d7c9c663beed57cb4c6230a2a5b50e
Parents: b537361
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Jul 28 18:00:03 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Fri Jul 28 18:00:03 2017 +0700
----------------------------------------------------------------------
.../org/apache/solr/cloud/AddReplicaCmd.java | 3 +
.../java/org/apache/solr/cloud/CloudUtil.java | 7 +-
.../apache/solr/cloud/CreateCollectionCmd.java | 5 -
.../java/org/apache/solr/cloud/Overseer.java | 66 ++-
.../OverseerAutoReplicaFailoverThread.java | 531 -------------------
.../cloud/OverseerCollectionMessageHandler.java | 24 -
.../org/apache/solr/cloud/ZkController.java | 1 +
.../autoscaling/AutoAddReplicasPlanAction.java | 30 +-
.../solr/cloud/autoscaling/AutoScaling.java | 2 +-
.../solr/cloud/overseer/ReplicaMutator.java | 1 +
.../java/org/apache/solr/core/CloudConfig.java | 37 +-
.../org/apache/solr/core/CoreContainer.java | 18 +-
.../org/apache/solr/core/SolrXmlConfig.java | 8 +-
.../apache/solr/cloud/ClusterStateMockUtil.java | 48 +-
.../solr/cloud/MoveReplicaHDFSFailoverTest.java | 207 ++++++++
.../solr/cloud/MoveReplicaHDFSUlogDirTest.java | 142 -----
.../org/apache/solr/cloud/NodeMutatorTest.java | 15 +-
.../cloud/SharedFSAutoReplicaFailoverTest.java | 196 ++++---
.../SharedFSAutoReplicaFailoverUtilsTest.java | 191 -------
.../AutoAddReplicasIntegrationTest.java | 180 +++++++
.../AutoAddReplicasPlanActionTest.java | 21 +
.../HdfsAutoAddReplicasIntegrationTest.java | 58 ++
.../solrj/cloud/autoscaling/NoneSuggester.java | 32 ++
.../solr/common/cloud/ClusterStateUtil.java | 13 +-
.../solrj/cloud/autoscaling/TestPolicy.java | 129 +++++
25 files changed, 832 insertions(+), 1133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
index c325f07..c7889c9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -191,6 +191,9 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
if (instanceDir != null) {
params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
}
+ if (coreNodeName != null) {
+ params.set(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
+ }
ocmh.addPropertyParams(message, params);
// For tracking async calls.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
index c05072d..9167b81 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
@@ -31,7 +31,6 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrResourceLoader;
@@ -49,7 +48,8 @@ public class CloudUtil {
* + throw exception if it has been.
*/
public static void checkSharedFSFailoverReplaced(CoreContainer cc, CoreDescriptor desc) {
-
+ if (!cc.isSharedFs(desc)) return;
+
ZkController zkController = cc.getZkController();
String thisCnn = zkController.getCoreNodeName(desc);
String thisBaseUrl = zkController.getBaseUrl();
@@ -65,11 +65,10 @@ public class CloudUtil {
String cnn = replica.getName();
String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
- boolean isSharedFs = replica.getStr(CoreAdminParams.DATA_DIR) != null;
log.debug("compare against coreNodeName={} baseUrl={}", cnn, baseUrl);
if (thisCnn != null && thisCnn.equals(cnn)
- && !thisBaseUrl.equals(baseUrl) && isSharedFs) {
+ && !thisBaseUrl.equals(baseUrl)) {
if (cc.getLoadedCoreNames().contains(desc.getName())) {
cc.unload(desc.getName());
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index 8b15144..e35369b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
-import org.apache.solr.cloud.autoscaling.AutoScaling;
import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.common.cloud.ReplicaPosition;
@@ -110,7 +109,6 @@ public class CreateCollectionCmd implements Cmd {
int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1));
int numPullReplicas = message.getInt(PULL_REPLICAS, 0);
- boolean autoAddReplicas = message.getBool(AUTO_ADD_REPLICAS, false);
Map autoScalingJson = Utils.getJson(ocmh.zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
String policy = message.getStr(Policy.POLICY);
boolean usePolicyFramework = autoScalingJson.get(Policy.CLUSTER_POLICY) != null || policy != null;
@@ -318,9 +316,6 @@ public class CreateCollectionCmd implements Cmd {
ocmh.cleanupCollection(collectionName, new NamedList());
log.info("Cleaned up artifacts for failed create collection for [{}]", collectionName);
} else {
- if (autoAddReplicas) {
- ocmh.forwardToAutoScaling(AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_DSL);
- }
log.debug("Finished create command on all shards for collection: {}", collectionName);
// Emit a warning about production use of data driven functionality
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index d4f914d..ca7a935 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.codahale.metrics.Timer;
import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.cloud.autoscaling.AutoScaling;
+import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.CollectionMutator;
@@ -39,17 +41,22 @@ import org.apache.solr.cloud.overseer.ReplicaMutator;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -470,8 +477,6 @@ public class Overseer implements Closeable {
private OverseerThread ccThread;
private OverseerThread updaterThread;
-
- private OverseerThread arfoThread;
private OverseerThread triggerThread;
@@ -524,12 +529,11 @@ public class Overseer implements Closeable {
overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
ccThread.setDaemon(true);
-
- ThreadGroup ohcfTg = new ThreadGroup("Overseer Hdfs SolrCore Failover Thread.");
- OverseerAutoReplicaFailoverThread autoReplicaFailoverThread = new OverseerAutoReplicaFailoverThread(config, reader, updateShardHandler);
- arfoThread = new OverseerThread(ohcfTg, autoReplicaFailoverThread, "OverseerHdfsCoreFailoverThread-" + id);
- arfoThread.setDaemon(true);
+ //TODO nocommit, autoscaling framework should start autoAddReplicas trigger automatically (implicitly)
+ Thread autoscalingTriggerCreator = new Thread(createAutoscalingTriggerIfNotExist(), "AutoscalingTriggerCreator");
+ autoscalingTriggerCreator.setDaemon(true);
+ autoscalingTriggerCreator.start();
ThreadGroup triggerThreadGroup = new ThreadGroup("Overseer autoscaling triggers");
OverseerTriggerThread trigger = new OverseerTriggerThread(zkController);
@@ -537,7 +541,6 @@ public class Overseer implements Closeable {
updaterThread.start();
ccThread.start();
- arfoThread.start();
triggerThread.start();
assert ObjectReleaseTracker.track(this);
}
@@ -569,6 +572,43 @@ public class Overseer implements Closeable {
assert ObjectReleaseTracker.release(this);
}
+ private Runnable createAutoscalingTriggerIfNotExist() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ boolean triggerExist = getZkStateReader().getAutoScalingConfig()
+ .getTriggerConfigs().get(".auto_add_replicas") != null;
+ if (triggerExist) return;
+ } catch (InterruptedException | KeeperException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Failed when creating .auto_add_replicas trigger");
+ }
+ while (getZkController().getCoreContainer()
+ .getRequestHandler(AutoScalingHandler.HANDLER_PATH) == null) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // expected
+ }
+ }
+
+ String dsl = AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_DSL.replace("{{waitFor}}",
+ String.valueOf(config.getAutoReplicaFailoverWaitAfterExpiration()/1000));
+ LocalSolrQueryRequest request = new LocalSolrQueryRequest(null, new ModifiableSolrParams());
+ request.getContext().put("httpMethod", "POST");
+ request.setContentStreams(Collections.singleton(new ContentStreamBase.StringStream(dsl)));
+ SolrQueryResponse response = new SolrQueryResponse();
+ getZkController().getCoreContainer()
+ .getRequestHandler(AutoScalingHandler.HANDLER_PATH).handleRequest(request, response);
+ if (!"success".equals(response.getValues().get("result"))) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Failed when creating .auto_add_replicas trigger, return " + response);
+ }
+ }
+ };
+ }
+
private void doClose() {
if (updaterThread != null) {
@@ -579,10 +619,6 @@ public class Overseer implements Closeable {
IOUtils.closeQuietly(ccThread);
ccThread.interrupt();
}
- if (arfoThread != null) {
- IOUtils.closeQuietly(arfoThread);
- arfoThread.interrupt();
- }
if (triggerThread != null) {
IOUtils.closeQuietly(triggerThread);
triggerThread.interrupt();
@@ -598,11 +634,6 @@ public class Overseer implements Closeable {
ccThread.join();
} catch (InterruptedException e) {}
}
- if (arfoThread != null) {
- try {
- arfoThread.join();
- } catch (InterruptedException e) {}
- }
if (triggerThread != null) {
try {
triggerThread.join();
@@ -611,7 +642,6 @@ public class Overseer implements Closeable {
updaterThread = null;
ccThread = null;
- arfoThread = null;
triggerThread = null;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java b/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java
deleted file mode 100644
index 2eccef8..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java
+++ /dev/null
@@ -1,531 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.io.Closeable;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ClusterStateUtil;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.core.CloudConfig;
-import org.apache.solr.update.UpdateShardHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-
-// TODO: how to tmp exclude nodes?
-
-// TODO: more fine grained failover rules?
-
-// TODO: test with lots of collections
-
-// TODO: add config for only failover if replicas is < N
-
-// TODO: general support for non shared filesystems
-// this is specialized for a shared file system, but it should
-// not be much work to generalize
-
-// NOTE: using replication can slow down failover if a whole
-// shard is lost.
-
-/**
- *
- * In this simple initial implementation we are limited in how quickly we detect
- * a failure by a worst case of roughly zk session timeout + WAIT_AFTER_EXPIRATION_SECONDS + WORK_LOOP_DELAY_MS
- * and best case of roughly zk session timeout + WAIT_AFTER_EXPIRATION_SECONDS. Also, consider the time to
- * create the SolrCore, do any recovery necessary, and warm up the readers.
- *
- * NOTE: this will only work with collections created via the collections api because they will have defined
- * replicationFactor and maxShardsPerNode.
- *
- * @lucene.experimental
- */
-public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private Integer lastClusterStateVersion;
-
- private final ExecutorService updateExecutor;
- private volatile boolean isClosed;
- private ZkStateReader zkStateReader;
- private final Cache<String,Long> baseUrlForBadNodes;
- private Set<String> liveNodes = Collections.EMPTY_SET;
-
- private final int workLoopDelay;
- private final int waitAfterExpiration;
-
- private volatile Thread thread;
-
- public OverseerAutoReplicaFailoverThread(CloudConfig config, ZkStateReader zkStateReader,
- UpdateShardHandler updateShardHandler) {
- this.zkStateReader = zkStateReader;
-
- this.workLoopDelay = config.getAutoReplicaFailoverWorkLoopDelay();
- this.waitAfterExpiration = config.getAutoReplicaFailoverWaitAfterExpiration();
- int badNodeExpiration = config.getAutoReplicaFailoverBadNodeExpiration();
-
- log.debug(
- "Starting "
- + this.getClass().getSimpleName()
- + " autoReplicaFailoverWorkLoopDelay={} autoReplicaFailoverWaitAfterExpiration={} autoReplicaFailoverBadNodeExpiration={}",
- workLoopDelay, waitAfterExpiration, badNodeExpiration);
-
- baseUrlForBadNodes = CacheBuilder.newBuilder()
- .concurrencyLevel(1).expireAfterWrite(badNodeExpiration, TimeUnit.MILLISECONDS).build();
-
- // TODO: Speed up our work loop when live_nodes changes??
-
- updateExecutor = updateShardHandler.getUpdateExecutor();
-
-
- // TODO: perhaps do a health ping periodically to each node (scaryish)
- // And/OR work on JIRA issue around self health checks (SOLR-5805)
- }
-
- @Override
- public void run() {
- this.thread = Thread.currentThread();
- while (!this.isClosed) {
- // work loop
- log.debug("do " + this.getClass().getSimpleName() + " work loop");
-
- // every n, look at state and make add / remove calls
-
- try {
- doWork();
- } catch (Exception e) {
- SolrException.log(log, this.getClass().getSimpleName()
- + " had an error in its thread work loop.", e);
- }
-
- if (!this.isClosed) {
- try {
- Thread.sleep(workLoopDelay);
- } catch (InterruptedException e) {
- return;
- }
- }
- }
- }
-
- private void doWork() {
-
- // TODO: extract to configurable strategy class ??
- ClusterState clusterState = zkStateReader.getClusterState();
- //check if we have disabled autoAddReplicas cluster wide
- String autoAddReplicas = zkStateReader.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
- if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
- return;
- }
- if (clusterState != null) {
- if (clusterState.getZkClusterStateVersion() != null &&
- clusterState.getZkClusterStateVersion().equals(lastClusterStateVersion) && baseUrlForBadNodes.size() == 0 &&
- liveNodes.equals(clusterState.getLiveNodes())) {
- // nothing has changed, no work to do
- return;
- }
-
- liveNodes = clusterState.getLiveNodes();
- lastClusterStateVersion = clusterState.getZkClusterStateVersion();
- Map<String, DocCollection> collections = clusterState.getCollectionsMap();
- for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
- log.debug("look at collection={}", entry.getKey());
- DocCollection docCollection = entry.getValue();
- if (!docCollection.getAutoAddReplicas()) {
- log.debug("Collection {} is not setup to use autoAddReplicas, skipping..", docCollection.getName());
- continue;
- }
- if (docCollection.getReplicationFactor() == null) {
- log.debug("Skipping collection because it has no defined replicationFactor, name={}", docCollection.getName());
- continue;
- }
- log.debug("Found collection, name={} replicationFactor={}", entry.getKey(), docCollection.getReplicationFactor());
-
- Collection<Slice> slices = docCollection.getSlices();
- for (Slice slice : slices) {
- if (slice.getState() == Slice.State.ACTIVE) {
-
- final Collection<DownReplica> downReplicas = new ArrayList<DownReplica>();
-
- int goodReplicas = findDownReplicasInSlice(clusterState, docCollection, slice, downReplicas);
-
- log.debug("collection={} replicationFactor={} goodReplicaCount={}", docCollection.getName(), docCollection.getReplicationFactor(), goodReplicas);
-
- if (downReplicas.size() > 0 && goodReplicas < docCollection.getReplicationFactor()) {
- // badReplicaMap.put(collection, badReplicas);
- processBadReplicas(entry.getKey(), downReplicas);
- } else if (goodReplicas > docCollection.getReplicationFactor()) {
- log.debug("There are too many replicas");
- }
- }
- }
- }
-
- }
- }
-
- private void processBadReplicas(final String collection, final Collection<DownReplica> badReplicas) {
- for (DownReplica badReplica : badReplicas) {
- log.debug("process down replica={} from collection={}", badReplica.replica.getName(), collection);
- String baseUrl = badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP);
- Long wentBadAtNS = baseUrlForBadNodes.getIfPresent(baseUrl);
- if (wentBadAtNS == null) {
- log.warn("Replica {} may need to failover.",
- badReplica.replica.getName());
- baseUrlForBadNodes.put(baseUrl, System.nanoTime());
-
- } else {
-
- long elasped = System.nanoTime() - wentBadAtNS;
- if (elasped < TimeUnit.NANOSECONDS.convert(waitAfterExpiration, TimeUnit.MILLISECONDS)) {
- // protect against ZK 'flapping', startup and shutdown
- log.debug("Looks troublesome...continue. Elapsed={}", elasped + "ns");
- } else {
- log.debug("We need to add a replica. Elapsed={}", elasped + "ns");
-
- if (addReplica(collection, badReplica)) {
- baseUrlForBadNodes.invalidate(baseUrl);
- }
- }
- }
- }
- }
-
- private boolean addReplica(final String collection, DownReplica badReplica) {
- // first find best home - first strategy, sort by number of cores
- // hosted where maxCoresPerNode is not violated
- final Integer maxCoreCount = zkStateReader.getClusterProperty(ZkStateReader.MAX_CORES_PER_NODE, (Integer) null);
- final String createUrl = getBestCreateUrl(zkStateReader, badReplica, maxCoreCount);
- if (createUrl == null) {
- log.warn("Could not find a node to create new replica on.");
- return false;
- }
-
- // NOTE: we send the absolute path, which will slightly change
- // behavior of these cores as they won't respond to changes
- // in the solr.hdfs.home sys prop as they would have.
- final String dataDir = badReplica.replica.getStr("dataDir");
- final String ulogDir = badReplica.replica.getStr("ulogDir");
- final String coreNodeName = badReplica.replica.getName();
- final String shardId = badReplica.slice.getName();
- if (dataDir != null) {
- // need an async request - full shard goes down leader election
- final String coreName = badReplica.replica.getStr(ZkStateReader.CORE_NAME_PROP);
- log.debug("submit call to {}", createUrl);
- MDC.put("OverseerAutoReplicaFailoverThread.createUrl", createUrl);
- try {
- updateExecutor.submit(() -> createSolrCore(collection, createUrl, dataDir, ulogDir, coreNodeName, coreName, shardId));
- } finally {
- MDC.remove("OverseerAutoReplicaFailoverThread.createUrl");
- }
-
- // wait to see state for core we just created
- boolean success = ClusterStateUtil.waitToSeeLiveReplica(zkStateReader, collection, coreNodeName, createUrl, 30000);
- if (!success) {
- log.error("Creating new replica appears to have failed, timed out waiting to see created SolrCore register in the clusterstate.");
- return false;
- }
- return true;
- }
-
- log.warn("Could not find dataDir or ulogDir in cluster state.");
-
- return false;
- }
-
- private static int findDownReplicasInSlice(ClusterState clusterState, DocCollection collection, Slice slice, final Collection<DownReplica> badReplicas) {
- int goodReplicas = 0;
- Collection<Replica> replicas = slice.getReplicas();
- if (replicas != null) {
- for (Replica replica : replicas) {
- // on a live node?
- boolean live = clusterState.liveNodesContain(replica.getNodeName());
- final Replica.State state = replica.getState();
-
- final boolean okayState = state == Replica.State.DOWN
- || state == Replica.State.RECOVERING
- || state == Replica.State.ACTIVE;
-
- log.debug("Process replica name={} live={} state={}", replica.getName(), live, state.toString());
-
- if (live && okayState) {
- goodReplicas++;
- } else {
- DownReplica badReplica = new DownReplica();
- badReplica.replica = replica;
- badReplica.slice = slice;
- badReplica.collection = collection;
- badReplicas.add(badReplica);
- }
- }
- }
- log.debug("bad replicas for slice {}", badReplicas);
- return goodReplicas;
- }
-
- /**
- *
- * @return the best node to replace the badReplica on or null if there is no
- * such node
- */
- static String getBestCreateUrl(ZkStateReader zkStateReader, DownReplica badReplica, Integer maxCoreCount) {
- assert badReplica != null;
- assert badReplica.collection != null;
- assert badReplica.slice != null;
- log.debug("getBestCreateUrl for " + badReplica.replica);
- Map<String,Counts> counts = new HashMap<>();
- Set<String> unsuitableHosts = new HashSet<>();
-
- Set<String> liveNodes = new HashSet<>(zkStateReader.getClusterState().getLiveNodes());
- Map<String, Integer> coresPerNode = new HashMap<>();
-
- ClusterState clusterState = zkStateReader.getClusterState();
- if (clusterState != null) {
- Map<String, DocCollection> collections = clusterState.getCollectionsMap();
- for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
- String collection = entry.getKey();
- log.debug("look at collection {} as possible create candidate", collection);
- DocCollection docCollection = entry.getValue();
- // TODO - only operate on collections with sharedfs failover = true ??
- Collection<Slice> slices = docCollection.getSlices();
- for (Slice slice : slices) {
- // only look at active shards
- if (slice.getState() == Slice.State.ACTIVE) {
- log.debug("look at slice {} for collection {} as possible create candidate", slice.getName(), collection);
- Collection<Replica> replicas = slice.getReplicas();
-
- for (Replica replica : replicas) {
- liveNodes.remove(replica.getNodeName());
- String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
- if (coresPerNode.containsKey(baseUrl)) {
- Integer nodeCount = coresPerNode.get(baseUrl);
- coresPerNode.put(baseUrl, nodeCount++);
- } else {
- coresPerNode.put(baseUrl, 1);
- }
- if (baseUrl.equals(badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP))) {
- continue;
- }
- // on a live node?
- log.debug("collection={} nodename={} livenodes={}", collection, replica.getNodeName(), clusterState.getLiveNodes());
- boolean live = clusterState.liveNodesContain(replica.getNodeName());
- log.debug("collection={} look at replica {} as possible create candidate, live={}", collection, replica.getName(), live);
- if (live) {
- Counts cnt = counts.get(baseUrl);
- if (cnt == null) {
- cnt = new Counts();
- }
- if (badReplica.collection.getName().equals(collection)) {
- cnt.negRankingWeight += 3;
- cnt.collectionShardsOnNode += 1;
- } else {
- cnt.negRankingWeight += 1;
- }
- if (badReplica.collection.getName().equals(collection) && badReplica.slice.getName().equals(slice.getName())) {
- cnt.ourReplicas++;
- }
-
- Integer maxShardsPerNode = badReplica.collection.getMaxShardsPerNode();
- if (maxShardsPerNode == null) {
- log.warn("maxShardsPerNode is not defined for collection, name=" + badReplica.collection.getName());
- maxShardsPerNode = Integer.MAX_VALUE;
- }
- log.debug("collection={} node={} maxShardsPerNode={} maxCoresPerNode={} potential hosts={}",
- collection, baseUrl, maxShardsPerNode, maxCoreCount, cnt);
-
- Collection<Replica> badSliceReplicas = null;
- DocCollection c = clusterState.getCollection(badReplica.collection.getName());
- if (c != null) {
- Slice s = c.getSlice(badReplica.slice.getName());
- if (s != null) {
- badSliceReplicas = s.getReplicas();
- }
- }
- boolean alreadyExistsOnNode = replicaAlreadyExistsOnNode(zkStateReader.getClusterState(), badSliceReplicas, badReplica, baseUrl);
- if (unsuitableHosts.contains(baseUrl) || alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode
- || (maxCoreCount != null && coresPerNode.get(baseUrl) >= maxCoreCount) ) {
- counts.remove(baseUrl);
- unsuitableHosts.add(baseUrl);
- log.debug("not a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
- } else {
- counts.put(baseUrl, cnt);
- log.debug("is a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
- }
- }
- }
- }
- }
- }
- }
-
- for (String node : liveNodes) {
- counts.put(zkStateReader.getBaseUrlForNodeName(node), new Counts(0, 0));
- }
-
- if (counts.size() == 0) {
- log.debug("no suitable hosts found for getBestCreateUrl for collection={}", badReplica.collection.getName());
- return null;
- }
-
- ValueComparator vc = new ValueComparator(counts);
- Map<String,Counts> sortedCounts = new TreeMap<String, Counts>(vc);
- sortedCounts.putAll(counts);
-
- log.debug("empty nodes={} for collection={}", liveNodes, badReplica.collection.getName());
- log.debug("sorted hosts={} for collection={}", sortedCounts, badReplica.collection.getName());
- log.debug("unsuitable hosts={} for collection={}", unsuitableHosts, badReplica.collection.getName());
-
- return sortedCounts.keySet().iterator().next();
- }
-
- private static boolean replicaAlreadyExistsOnNode(ClusterState clusterState, Collection<Replica> replicas, DownReplica badReplica, String baseUrl) {
- if (replicas != null) {
- log.debug("collection={} check if replica already exists on node using replicas {}", badReplica.collection.getName(), getNames(replicas));
- for (Replica replica : replicas) {
- final Replica.State state = replica.getState();
- if (!replica.getName().equals(badReplica.replica.getName()) && replica.getStr(ZkStateReader.BASE_URL_PROP).equals(baseUrl)
- && clusterState.liveNodesContain(replica.getNodeName())
- && (state == Replica.State.ACTIVE || state == Replica.State.DOWN || state == Replica.State.RECOVERING)) {
- log.debug("collection={} replica already exists on node, bad replica={}, existing replica={}, node name={}", badReplica.collection.getName(), badReplica.replica.getName(), replica.getName(), replica.getNodeName());
- return true;
- }
- }
- }
- log.debug("collection={} replica does not yet exist on node: {}", badReplica.collection.getName(), baseUrl);
- return false;
- }
-
- private static Object getNames(Collection<Replica> replicas) {
- Set<String> names = new HashSet<>(replicas.size());
- for (Replica replica : replicas) {
- names.add(replica.getName());
- }
- return names;
- }
-
- private boolean createSolrCore(final String collection,
- final String createUrl, final String dataDir, final String ulogDir,
- final String coreNodeName, final String coreName, final String shardId) {
-
- try (HttpSolrClient client = new HttpSolrClient.Builder(createUrl)
- .withConnectionTimeout(30000)
- .withSocketTimeout(60000)
- .build()) {
- log.debug("create url={}", createUrl);
- Create createCmd = new Create();
- createCmd.setCollection(collection);
- createCmd.setCoreNodeName(coreNodeName);
- // TODO: how do we ensure unique coreName
- // for now, the collections API will use unique names
- createCmd.setShardId(shardId);
- createCmd.setCoreName(coreName);
- createCmd.setDataDir(dataDir);
- createCmd.setUlogDir(ulogDir.substring(0, ulogDir.length() - "/tlog".length()));
- client.request(createCmd);
- } catch (Exception e) {
- SolrException.log(log, "Exception trying to create new replica on " + createUrl, e);
- return false;
- }
- return true;
- }
-
- private static class ValueComparator implements Comparator<String> {
- Map<String,Counts> map;
-
- public ValueComparator(Map<String,Counts> map) {
- this.map = map;
- }
-
- public int compare(String a, String b) {
- if (map.get(a).negRankingWeight >= map.get(b).negRankingWeight) {
- return 1;
- } else {
- return -1;
- }
- }
- }
-
- @Override
- public void close() {
- isClosed = true;
- Thread lThread = thread;
- if (lThread != null) {
- lThread.interrupt();
- }
- }
-
- public boolean isClosed() {
- return isClosed;
- }
-
-
- private static class Counts {
- int collectionShardsOnNode = 0;
- int negRankingWeight = 0;
- int ourReplicas = 0;
-
- private Counts() {
-
- }
-
- private Counts(int totalReplicas, int ourReplicas) {
- this.negRankingWeight = totalReplicas;
- this.ourReplicas = ourReplicas;
- }
-
- @Override
- public String toString() {
- return "Counts [negRankingWeight=" + negRankingWeight + ", sameSliceCount="
- + ourReplicas + ", collectionShardsOnNode=" + collectionShardsOnNode + "]";
- }
- }
-
- static class DownReplica {
- Replica replica;
- Slice slice;
- DocCollection collection;
-
- @Override
- public String toString() {
- return "DownReplica [replica=" + replica.getName() + ", slice="
- + slice.getName() + ", collection=" + collection.getName() + "]";
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index b0791e2..5c29fa4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -41,9 +41,6 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.autoscaling.AutoScaling;
-import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
-import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -62,8 +59,6 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
@@ -74,10 +69,6 @@ import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.request.SolrRequestInfo;
-import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.TimeOut;
@@ -678,10 +669,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
Thread.sleep(100);
}
- if (message.getBool(ZkStateReader.AUTO_ADD_REPLICAS, false)) {
- forwardToAutoScaling(AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_DSL);
- }
-
if (!areChangesVisible)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not modify collection " + message);
}
@@ -881,17 +868,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
}
- void forwardToAutoScaling(String command) {
- LocalSolrQueryRequest request = new LocalSolrQueryRequest(null, new ModifiableSolrParams());
- request.getContext().put("httpMethod", "POST");
- request.setContentStreams(Collections.singleton(new ContentStreamBase.StringStream(command)));
- SolrQueryResponse response = new SolrQueryResponse();
- overseer.getZkController().getCoreContainer().getRequestHandler(AutoScalingHandler.HANDLER_PATH).handleRequest(request, response);
- if (!"success".equals(response.getValues().get("result"))) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Failed when execute command on autoScalingHandler, return " + response);
- }
- }
-
private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
ModifiableSolrParams params = new ModifiableSolrParams();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 512acbe..57cda45 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1482,6 +1482,7 @@ public class ZkController {
if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
descriptor.getCloudDescriptor()
.setCoreNodeName(replica.getName());
+ getCoreContainer().getCoresLocator().persist(getCoreContainer(), descriptor);
return;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java
index fac69f4..ff09469 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java
@@ -17,35 +17,35 @@
package org.apache.solr.cloud.autoscaling;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
public class AutoAddReplicasPlanAction extends ComputePlanAction {
- Set<String> autoAddReplicasCollections;
@Override
protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, ZkStateReader zkStateReader) {
- Policy.Suggester suggester = super.getSuggester(session, event, zkStateReader);
- if (autoAddReplicasCollections == null) {
- autoAddReplicasCollections = new HashSet<>();
-
- ClusterState clusterState = zkStateReader.getClusterState();
- for (DocCollection collection: clusterState.getCollectionsMap().values()) {
- if (collection.getAutoAddReplicas()) {
- autoAddReplicasCollections.add(collection.getName());
- }
- }
+ // for backward compatibility
+ String autoAddReplicas = zkStateReader.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
+ if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
+ return new NoneSuggester();
}
- for (String collection : autoAddReplicasCollections) {
- suggester.hint(Policy.Suggester.Hint.COLL, collection);
+ Policy.Suggester suggester = super.getSuggester(session, event, zkStateReader);
+ ClusterState clusterState = zkStateReader.getClusterState();
+
+ boolean anyCollections = false;
+ for (DocCollection collection: clusterState.getCollectionsMap().values()) {
+ if (collection.getAutoAddReplicas()) {
+ anyCollections = true;
+ suggester.hint(Policy.Suggester.Hint.COLL, collection.getName());
+ }
}
+ if (!anyCollections) return new NoneSuggester();
return suggester;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index 0d192d3..ed24bf7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -148,7 +148,7 @@ public class AutoScaling {
" 'set-trigger' : {" +
" 'name' : '.auto_add_replicas'," +
" 'event' : 'nodeLost'," +
- " 'waitFor' : '5s'," +
+ " 'waitFor' : '{{waitFor}}s'," +
" 'enabled' : true," +
" 'actions' : [" +
" {" +
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index 82d106d..fad5d96 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -37,6 +37,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/core/CloudConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CloudConfig.java b/solr/core/src/java/org/apache/solr/core/CloudConfig.java
index 447dd22..b971a0b 100644
--- a/solr/core/src/java/org/apache/solr/core/CloudConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/CloudConfig.java
@@ -38,10 +38,6 @@ public class CloudConfig {
private final int autoReplicaFailoverWaitAfterExpiration;
- private final int autoReplicaFailoverWorkLoopDelay;
-
- private final int autoReplicaFailoverBadNodeExpiration;
-
private final String zkCredentialsProviderClass;
private final String zkACLProviderClass;
@@ -51,9 +47,9 @@ public class CloudConfig {
private final boolean createCollectionCheckLeaderActive;
CloudConfig(String zkHost, int zkClientTimeout, int hostPort, String hostName, String hostContext, boolean useGenericCoreNames,
- int leaderVoteWait, int leaderConflictResolveWait, int autoReplicaFailoverWaitAfterExpiration,
- int autoReplicaFailoverWorkLoopDelay, int autoReplicaFailoverBadNodeExpiration, String zkCredentialsProviderClass,
- String zkACLProviderClass, int createCollectionWaitTimeTillActive, boolean createCollectionCheckLeaderActive) {
+ int leaderVoteWait, int leaderConflictResolveWait, int autoReplicaFailoverWaitAfterExpiration,
+ String zkCredentialsProviderClass, String zkACLProviderClass, int createCollectionWaitTimeTillActive,
+ boolean createCollectionCheckLeaderActive) {
this.zkHost = zkHost;
this.zkClientTimeout = zkClientTimeout;
this.hostPort = hostPort;
@@ -63,8 +59,6 @@ public class CloudConfig {
this.leaderVoteWait = leaderVoteWait;
this.leaderConflictResolveWait = leaderConflictResolveWait;
this.autoReplicaFailoverWaitAfterExpiration = autoReplicaFailoverWaitAfterExpiration;
- this.autoReplicaFailoverWorkLoopDelay = autoReplicaFailoverWorkLoopDelay;
- this.autoReplicaFailoverBadNodeExpiration = autoReplicaFailoverBadNodeExpiration;
this.zkCredentialsProviderClass = zkCredentialsProviderClass;
this.zkACLProviderClass = zkACLProviderClass;
this.createCollectionWaitTimeTillActive = createCollectionWaitTimeTillActive;
@@ -116,14 +110,6 @@ public class CloudConfig {
return autoReplicaFailoverWaitAfterExpiration;
}
- public int getAutoReplicaFailoverWorkLoopDelay() {
- return autoReplicaFailoverWorkLoopDelay;
- }
-
- public int getAutoReplicaFailoverBadNodeExpiration() {
- return autoReplicaFailoverBadNodeExpiration;
- }
-
public boolean getGenericCoreNodeNames() {
return useGenericCoreNames;
}
@@ -146,8 +132,6 @@ public class CloudConfig {
// TODO: tune defaults
private static final int DEFAULT_AUTO_REPLICA_FAILOVER_WAIT_AFTER_EXPIRATION = 30000;
- private static final int DEFAULT_AUTO_REPLICA_FAILOVER_WORKLOOP_DELAY = 10000;
- private static final int DEFAULT_AUTO_REPLICA_FAILOVER_BAD_NODE_EXPIRATION = 60000;
private String zkHost = System.getProperty("zkHost");
private int zkClientTimeout = Integer.getInteger("zkClientTimeout", DEFAULT_ZK_CLIENT_TIMEOUT);
@@ -158,8 +142,6 @@ public class CloudConfig {
private int leaderVoteWait = DEFAULT_LEADER_VOTE_WAIT;
private int leaderConflictResolveWait = DEFAULT_LEADER_CONFLICT_RESOLVE_WAIT;
private int autoReplicaFailoverWaitAfterExpiration = DEFAULT_AUTO_REPLICA_FAILOVER_WAIT_AFTER_EXPIRATION;
- private int autoReplicaFailoverWorkLoopDelay = DEFAULT_AUTO_REPLICA_FAILOVER_WORKLOOP_DELAY;
- private int autoReplicaFailoverBadNodeExpiration = DEFAULT_AUTO_REPLICA_FAILOVER_BAD_NODE_EXPIRATION;
private String zkCredentialsProviderClass;
private String zkACLProviderClass;
private int createCollectionWaitTimeTillActive = DEFAULT_CREATE_COLLECTION_ACTIVE_WAIT;
@@ -205,16 +187,6 @@ public class CloudConfig {
return this;
}
- public CloudConfigBuilder setAutoReplicaFailoverWorkLoopDelay(int autoReplicaFailoverWorkLoopDelay) {
- this.autoReplicaFailoverWorkLoopDelay = autoReplicaFailoverWorkLoopDelay;
- return this;
- }
-
- public CloudConfigBuilder setAutoReplicaFailoverBadNodeExpiration(int autoReplicaFailoverBadNodeExpiration) {
- this.autoReplicaFailoverBadNodeExpiration = autoReplicaFailoverBadNodeExpiration;
- return this;
- }
-
public CloudConfigBuilder setZkCredentialsProviderClass(String zkCredentialsProviderClass) {
this.zkCredentialsProviderClass = zkCredentialsProviderClass;
return this;
@@ -237,8 +209,7 @@ public class CloudConfig {
public CloudConfig build() {
return new CloudConfig(zkHost, zkClientTimeout, hostPort, hostName, hostContext, useGenericCoreNames, leaderVoteWait,
- leaderConflictResolveWait, autoReplicaFailoverWaitAfterExpiration, autoReplicaFailoverWorkLoopDelay,
- autoReplicaFailoverBadNodeExpiration, zkCredentialsProviderClass, zkACLProviderClass, createCollectionWaitTimeTillActive,
+ leaderConflictResolveWait, autoReplicaFailoverWaitAfterExpiration, zkCredentialsProviderClass, zkACLProviderClass, createCollectionWaitTimeTillActive,
createCollectionCheckLeaderActive);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index a43af96..4efd369 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -101,6 +101,7 @@ import org.apache.solr.security.HttpClientBuilderPlugin;
import org.apache.solr.security.PKIAuthenticationPlugin;
import org.apache.solr.security.SecurityPluginHolder;
import org.apache.solr.update.SolrCoreState;
+import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.stats.MetricUtils;
@@ -965,7 +966,7 @@ public class CoreContainer {
zkSys.getZkController().preRegister(dcore);
}
- ConfigSet coreConfig = coreConfigService.getConfig(dcore);
+ ConfigSet coreConfig = getConfigSet(dcore);
dcore.setConfigSetTrusted(coreConfig.isTrusted());
log.info("Creating SolrCore '{}' using configuration from {}, trusted={}", dcore.getName(), coreConfig.getName(), dcore.isConfigSetTrusted());
try {
@@ -1000,6 +1001,21 @@ public class CoreContainer {
MDCLoggingContext.clear();
}
}
+
+ public boolean isSharedFs(CoreDescriptor cd) {
+ try (SolrCore core = this.getCore(cd.getName())) {
+ if (core != null) {
+ return core.getDirectoryFactory().isSharedStorage();
+ } else {
+ ConfigSet configSet = getConfigSet(cd);
+ return DirectoryFactory.loadDirectoryFactory(configSet.getSolrConfig(), this, null).isSharedStorage();
+ }
+ }
+ }
+
+ private ConfigSet getConfigSet(CoreDescriptor cd) {
+ return coreConfigService.getConfig(cd);
+ }
/**
* Take action when we failed to create a SolrCore. If error is due to corrupt index, try to recover. Various recovery
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
index 8cdf947..c15127b 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
@@ -376,15 +376,13 @@ public class SolrXmlConfig {
case "zkClientTimeout":
builder.setZkClientTimeout(parseInt(name, value));
break;
- case "autoReplicaFailoverBadNodeExpiration":
- builder.setAutoReplicaFailoverBadNodeExpiration(parseInt(name, value));
+ case "autoReplicaFailoverBadNodeExpiration": case "autoReplicaFailoverWorkLoopDelay":
+ //TODO remove this in Solr 8.0
+ log.info("Configuration parameter " + name + " is ignored");
break;
case "autoReplicaFailoverWaitAfterExpiration":
builder.setAutoReplicaFailoverWaitAfterExpiration(parseInt(name, value));
break;
- case "autoReplicaFailoverWorkLoopDelay":
- builder.setAutoReplicaFailoverWorkLoopDelay(parseInt(name, value));
- break;
case "zkHost":
builder.setZkHost(value);
break;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
index e0cf3f7..3482596 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
@@ -18,13 +18,10 @@
package org.apache.solr.cloud;
-import java.io.Closeable;
-import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -40,22 +37,12 @@ public class ClusterStateMockUtil {
private final static Pattern BLUEPRINT = Pattern.compile("([a-z])(\\d+)?(?:(['A','R','D','F']))?(\\*)?");
- protected static class Result implements Closeable {
- OverseerAutoReplicaFailoverThread.DownReplica badReplica;
- ZkStateReader reader;
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
- }
-
- protected static ClusterStateMockUtil.Result buildClusterState(List<Result> results, String string, String ... liveNodes) {
- return buildClusterState(results, string, 1, liveNodes);
+ protected static ZkStateReader buildClusterState(String string, String ... liveNodes) {
+ return buildClusterState(string, 1, liveNodes);
}
- protected static ClusterStateMockUtil.Result buildClusterState(List<Result> results, String string, int replicationFactor, String ... liveNodes) {
- return buildClusterState(results, string, replicationFactor, 10, liveNodes);
+ protected static ZkStateReader buildClusterState(String string, int replicationFactor, String ... liveNodes) {
+ return buildClusterState(string, replicationFactor, 10, liveNodes);
}
/**
@@ -118,9 +105,7 @@ public class ClusterStateMockUtil {
*
*/
@SuppressWarnings("resource")
- protected static ClusterStateMockUtil.Result buildClusterState(List<Result> results, String clusterDescription, int replicationFactor, int maxShardsPerNode, String ... liveNodes) {
- ClusterStateMockUtil.Result result = new ClusterStateMockUtil.Result();
-
+ protected static ZkStateReader buildClusterState(String clusterDescription, int replicationFactor, int maxShardsPerNode, String ... liveNodes) {
Map<String,Slice> slices = null;
Map<String,Replica> replicas = null;
Map<String,Object> collectionProps = new HashMap<>();
@@ -181,23 +166,12 @@ public class ClusterStateMockUtil {
String nodeName = "baseUrl" + node + "_";
String replicaName = "replica" + replicaCount++;
- if ("*".equals(m.group(4))) {
- replicaName += " (bad)";
- }
-
replicaPropMap.put(ZkStateReader.NODE_NAME_PROP, nodeName);
replicaPropMap.put(ZkStateReader.BASE_URL_PROP, "http://baseUrl" + node);
replicaPropMap.put(ZkStateReader.STATE_PROP, state.toString());
replica = new Replica(replicaName, replicaPropMap);
- if ("*".equals(m.group(4))) {
- result.badReplica = new OverseerAutoReplicaFailoverThread.DownReplica();
- result.badReplica.replica = replica;
- result.badReplica.slice = slice;
- result.badReplica.collection = docCollection;
- }
-
replicas.put(replica.getName(), replica);
break;
default:
@@ -216,17 +190,7 @@ public class ClusterStateMockUtil {
}
System.err.println(json);
- // todo remove the limitation of always having a bad replica
- assert result.badReplica != null : "Is there no bad replica?";
- assert result.badReplica.slice != null : "Is there no bad replica?";
-
- result.reader = reader;
-
- if (results != null) {
- results.add(result);
- }
-
- return result;
+ return reader;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSFailoverTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSFailoverTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSFailoverTest.java
new file mode 100644
index 0000000..5edae7c
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSFailoverTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.cloud.hdfs.HdfsTestUtil;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterStateUtil;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.util.BadHdfsThreadsFilter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+@ThreadLeakFilters(defaultFilters = true, filters = {
+ BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s)
+ MoveReplicaHDFSTest.ForkJoinThreadsFilter.class
+})
+public class MoveReplicaHDFSFailoverTest extends SolrCloudTestCase {
+ private static MiniDFSCluster dfsCluster;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ configureCluster(2)
+ .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+ .configure();
+
+ System.setProperty("solr.hdfs.blockcache.enabled", "false");
+ dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
+
+ ZkConfigManager configManager = new ZkConfigManager(zkClient());
+ configManager.uploadConfigDir(configset("cloud-hdfs"), "conf1");
+
+ System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
+ }
+
+ @AfterClass
+ public static void teardownClass() throws Exception {
+ cluster.shutdown(); // need to close before the MiniDFSCluster
+ HdfsTestUtil.teardownClass(dfsCluster);
+ dfsCluster = null;
+ }
+
+ @Test
+ public void testDataDirAndUlogAreMaintained() throws Exception {
+ String coll = "movereplicatest_coll2";
+ CollectionAdminRequest.createCollection(coll, "conf1", 1, 1)
+ .setCreateNodeSet("")
+ .process(cluster.getSolrClient());
+ String hdfsUri = HdfsTestUtil.getURI(dfsCluster);
+ String dataDir = hdfsUri + "/dummyFolder/dataDir";
+ String ulogDir = hdfsUri + "/dummyFolder2/ulogDir";
+ CollectionAdminResponse res = CollectionAdminRequest
+ .addReplicaToShard(coll, "shard1")
+ .setDataDir(dataDir)
+ .setUlogDir(ulogDir)
+ .setNode(cluster.getJettySolrRunner(0).getNodeName())
+ .process(cluster.getSolrClient());
+
+ ulogDir += "/tlog";
+ ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+ assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
+
+ DocCollection docCollection = zkStateReader.getClusterState().getCollection(coll);
+ Replica replica = docCollection.getReplicas().iterator().next();
+ assertTrue(replica.getStr("ulogDir"), replica.getStr("ulogDir").equals(ulogDir) || replica.getStr("ulogDir").equals(ulogDir+'/'));
+ assertTrue(replica.getStr("dataDir"),replica.getStr("dataDir").equals(dataDir) || replica.getStr("dataDir").equals(dataDir+'/'));
+
+ new CollectionAdminRequest.MoveReplica(coll, replica.getName(), cluster.getJettySolrRunner(1).getNodeName())
+ .process(cluster.getSolrClient());
+ assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
+ docCollection = zkStateReader.getClusterState().getCollection(coll);
+ assertEquals(1, docCollection.getSlice("shard1").getReplicas().size());
+ Replica newReplica = docCollection.getReplicas().iterator().next();
+ assertEquals(newReplica.getNodeName(), cluster.getJettySolrRunner(1).getNodeName());
+ assertTrue(newReplica.getStr("ulogDir"), newReplica.getStr("ulogDir").equals(ulogDir) || newReplica.getStr("ulogDir").equals(ulogDir+'/'));
+ assertTrue(newReplica.getStr("dataDir"),newReplica.getStr("dataDir").equals(dataDir) || newReplica.getStr("dataDir").equals(dataDir+'/'));
+
+ assertEquals(replica.getName(), newReplica.getName());
+ assertEquals(replica.getCoreName(), newReplica.getCoreName());
+ assertFalse(replica.getNodeName().equals(newReplica.getNodeName()));
+ final int numDocs = 100;
+ addDocs(coll, numDocs); // indexed but not committed
+
+ cluster.getJettySolrRunner(1).stop();
+ Thread.sleep(5000);
+ new CollectionAdminRequest.MoveReplica(coll, newReplica.getName(), cluster.getJettySolrRunner(0).getNodeName())
+ .process(cluster.getSolrClient());
+ assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
+
+ // assert that the old core will be removed on startup
+ cluster.getJettySolrRunner(1).start();
+ assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
+ docCollection = zkStateReader.getClusterState().getCollection(coll);
+ assertEquals(1, docCollection.getReplicas().size());
+ newReplica = docCollection.getReplicas().iterator().next();
+ assertEquals(newReplica.getNodeName(), cluster.getJettySolrRunner(0).getNodeName());
+ assertTrue(newReplica.getStr("ulogDir"), newReplica.getStr("ulogDir").equals(ulogDir) || newReplica.getStr("ulogDir").equals(ulogDir+'/'));
+ assertTrue(newReplica.getStr("dataDir"),newReplica.getStr("dataDir").equals(dataDir) || newReplica.getStr("dataDir").equals(dataDir+'/'));
+
+ assertEquals(0, cluster.getJettySolrRunner(1).getCoreContainer().getCores().size());
+
+ cluster.getSolrClient().commit(coll);
+ assertEquals(numDocs, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
+ CollectionAdminRequest.deleteCollection(coll).process(cluster.getSolrClient());
+ }
+
+ @Test
+ public void testOldReplicaIsDeleted() throws Exception {
+ String coll = "movereplicatest_coll3";
+ CollectionAdminRequest.createCollection(coll, "conf1", 1, 1)
+ .setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName())
+ .process(cluster.getSolrClient());
+ addDocs(coll, 2);
+ Replica replica = getCollectionState(coll).getReplicas().iterator().next();
+
+ cluster.getJettySolrRunners().get(0).stop();
+ assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
+
+ // move replica from node0 -> node1
+ new CollectionAdminRequest.MoveReplica(coll, replica.getName(), cluster.getJettySolrRunner(1).getNodeName())
+ .process(cluster.getSolrClient());
+ assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 20000));
+
+ cluster.getJettySolrRunners().get(1).stop();
+ assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
+
+ // node0 will delete it replica because of CloudUtil.checkSharedFSFailoverReplaced()
+ cluster.getJettySolrRunners().get(0).start();
+ Thread.sleep(5000);
+ assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
+
+ cluster.getJettySolrRunners().get(1).start();
+ assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 20000));
+
+ assertEquals(1, getCollectionState(coll).getReplicas().size());
+ assertEquals(2, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
+ CollectionAdminRequest.deleteCollection(coll).process(cluster.getSolrClient());
+ }
+
+ @Test
+ public void testOldReplicaIsDeletedInRaceCondition() throws Exception {
+ String coll = "movereplicatest_coll4";
+ CollectionAdminRequest.createCollection(coll, "conf1", 1, 1)
+ .setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName())
+ .process(cluster.getSolrClient());
+ addDocs(coll, 100);
+ Replica replica = getCollectionState(coll).getReplicas().iterator().next();
+
+ cluster.getJettySolrRunners().get(0).stop();
+ assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
+
+ // move replica from node0 -> node1
+ new CollectionAdminRequest.MoveReplica(coll, replica.getName(), cluster.getJettySolrRunner(1).getNodeName())
+ .process(cluster.getSolrClient());
+ assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 20000));
+
+ cluster.getJettySolrRunners().get(1).stop();
+ assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
+
+ cluster.getJettySolrRunners().get(1).start();
+ // node0 will delete it replica because of CloudUtil.checkSharedFSFailoverReplaced()
+ cluster.getJettySolrRunners().get(0).start();
+ Thread.sleep(5000);
+ assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 20000));
+
+ assertEquals(1, getCollectionState(coll).getReplicas().size());
+ assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
+ CollectionAdminRequest.deleteCollection(coll).process(cluster.getSolrClient());
+ }
+
+ private void addDocs(String collection, int numDocs) throws SolrServerException, IOException {
+ SolrClient solrClient = cluster.getSolrClient();
+ for (int docId = 1; docId <= numDocs; docId++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", docId);
+ solrClient.add(collection, doc);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSUlogDirTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSUlogDirTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSUlogDirTest.java
deleted file mode 100644
index a27a39d..0000000
--- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSUlogDirTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.io.IOException;
-
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.cloud.hdfs.HdfsTestUtil;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.ClusterStateUtil;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ZkConfigManager;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.util.BadHdfsThreadsFilter;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-@ThreadLeakFilters(defaultFilters = true, filters = {
- BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s)
- MoveReplicaHDFSTest.ForkJoinThreadsFilter.class
-})
-public class MoveReplicaHDFSUlogDirTest extends SolrCloudTestCase {
- private static MiniDFSCluster dfsCluster;
-
- @BeforeClass
- public static void setupClass() throws Exception {
- configureCluster(2)
- .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
- .configure();
-
- System.setProperty("solr.hdfs.blockcache.enabled", "false");
- dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
-
- ZkConfigManager configManager = new ZkConfigManager(zkClient());
- configManager.uploadConfigDir(configset("cloud-hdfs"), "conf1");
-
- System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
- }
-
- @AfterClass
- public static void teardownClass() throws Exception {
- cluster.shutdown(); // need to close before the MiniDFSCluster
- HdfsTestUtil.teardownClass(dfsCluster);
- dfsCluster = null;
- }
-
- @Test
- public void testDataDirAndUlogAreMaintained() throws Exception {
- String coll = "movereplicatest_coll2";
- CollectionAdminRequest.createCollection(coll, "conf1", 1, 1)
- .setCreateNodeSet("")
- .process(cluster.getSolrClient());
- String hdfsUri = HdfsTestUtil.getURI(dfsCluster);
- String dataDir = hdfsUri + "/dummyFolder/dataDir";
- String ulogDir = hdfsUri + "/dummyFolder2/ulogDir";
- CollectionAdminResponse res = CollectionAdminRequest
- .addReplicaToShard(coll, "shard1")
- .setDataDir(dataDir)
- .setUlogDir(ulogDir)
- .setNode(cluster.getJettySolrRunner(0).getNodeName())
- .process(cluster.getSolrClient());
-
- ulogDir += "/tlog";
- ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
- assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
-
- DocCollection docCollection = zkStateReader.getClusterState().getCollection(coll);
- Replica replica = docCollection.getReplicas().iterator().next();
- assertTrue(replica.getStr("ulogDir"), replica.getStr("ulogDir").equals(ulogDir) || replica.getStr("ulogDir").equals(ulogDir+'/'));
- assertTrue(replica.getStr("dataDir"),replica.getStr("dataDir").equals(dataDir) || replica.getStr("dataDir").equals(dataDir+'/'));
-
- new CollectionAdminRequest.MoveReplica(coll, replica.getName(), cluster.getJettySolrRunner(1).getNodeName())
- .process(cluster.getSolrClient());
- assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
- docCollection = zkStateReader.getClusterState().getCollection(coll);
- assertEquals(1, docCollection.getSlice("shard1").getReplicas().size());
- Replica newReplica = docCollection.getReplicas().iterator().next();
- assertEquals(newReplica.getNodeName(), cluster.getJettySolrRunner(1).getNodeName());
- assertTrue(newReplica.getStr("ulogDir"), newReplica.getStr("ulogDir").equals(ulogDir) || newReplica.getStr("ulogDir").equals(ulogDir+'/'));
- assertTrue(newReplica.getStr("dataDir"),newReplica.getStr("dataDir").equals(dataDir) || newReplica.getStr("dataDir").equals(dataDir+'/'));
-
- assertEquals(replica.getName(), newReplica.getName());
- assertEquals(replica.getCoreName(), newReplica.getCoreName());
- assertFalse(replica.getNodeName().equals(newReplica.getNodeName()));
- final int numDocs = 100;
- addDocs(coll, numDocs); // indexed but not committed
-
- cluster.getJettySolrRunner(1).stop();
- Thread.sleep(5000);
- new CollectionAdminRequest.MoveReplica(coll, newReplica.getName(), cluster.getJettySolrRunner(0).getNodeName())
- .process(cluster.getSolrClient());
- assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
-
- // assert that the old core will be removed on startup
- cluster.getJettySolrRunner(1).start();
- assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
- docCollection = zkStateReader.getClusterState().getCollection(coll);
- assertEquals(1, docCollection.getReplicas().size());
- newReplica = docCollection.getReplicas().iterator().next();
- assertEquals(newReplica.getNodeName(), cluster.getJettySolrRunner(0).getNodeName());
- assertTrue(newReplica.getStr("ulogDir"), newReplica.getStr("ulogDir").equals(ulogDir) || newReplica.getStr("ulogDir").equals(ulogDir+'/'));
- assertTrue(newReplica.getStr("dataDir"),newReplica.getStr("dataDir").equals(dataDir) || newReplica.getStr("dataDir").equals(dataDir+'/'));
-
- assertEquals(0, cluster.getJettySolrRunner(1).getCoreContainer().getCores().size());
-
- cluster.getSolrClient().commit(coll);
- assertEquals(numDocs, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
- }
-
- private void addDocs(String collection, int numDocs) throws SolrServerException, IOException {
- SolrClient solrClient = cluster.getSolrClient();
- for (int docId = 1; docId <= numDocs; docId++) {
- SolrInputDocument doc = new SolrInputDocument();
- doc.addField("id", docId);
- solrClient.add(collection, doc);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
index ffa6ba2..a446f29 100644
--- a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
@@ -42,31 +42,30 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
@Test
public void downNodeReportsAllImpactedCollectionsAndNothingElse() throws IOException {
NodeMutator nm = new NodeMutator();
- ZkNodeProps props = new ZkNodeProps(ZkStateReader.NODE_NAME_PROP, NODE1);
//We use 2 nodes with maxShardsPerNode as 1
//Collection1: 2 shards X 1 replica = replica1 on node1 and replica2 on node2
//Collection2: 1 shard X 1 replica = replica1 on node2
- ClusterStateMockUtil.Result result = ClusterStateMockUtil.buildClusterState(null, "csrr2rD*csr2", 1, 1, NODE1, NODE2);
- ClusterState clusterState = result.reader.getClusterState();
+ ZkStateReader reader = ClusterStateMockUtil.buildClusterState("csrr2rDcsr2", 1, 1, NODE1, NODE2);
+ ClusterState clusterState = reader.getClusterState();
assertEquals(clusterState.getCollection("collection1").getReplica("replica1").getBaseUrl(), NODE1_URL);
assertEquals(clusterState.getCollection("collection1").getReplica("replica2").getBaseUrl(), NODE2_URL);
assertEquals(clusterState.getCollection("collection2").getReplica("replica4").getBaseUrl(), NODE2_URL);
- props = new ZkNodeProps(ZkStateReader.NODE_NAME_PROP, NODE1);
+ ZkNodeProps props = new ZkNodeProps(ZkStateReader.NODE_NAME_PROP, NODE1);
List<ZkWriteCommand> writes = nm.downNode(clusterState, props);
assertEquals(writes.size(), 1);
assertEquals(writes.get(0).name, "collection1");
assertEquals(writes.get(0).collection.getReplica("replica1").getState(), Replica.State.DOWN);
assertEquals(writes.get(0).collection.getReplica("replica2").getState(), Replica.State.ACTIVE);
- result.close();
+ reader.close();
//We use 3 nodes with maxShardsPerNode as 1
//Collection1: 2 shards X 1 replica = replica1 on node1 and replica2 on node2
//Collection2: 1 shard X 1 replica = replica1 on node2
//Collection3: 1 shard X 3 replica = replica1 on node1 , replica2 on node2, replica3 on node3
- result = ClusterStateMockUtil.buildClusterState(null, "csrr2rD*csr2csr1r2r3", 1, 1, NODE1, NODE2, NODE3);
- clusterState = result.reader.getClusterState();
+ reader = ClusterStateMockUtil.buildClusterState("csrr2rDcsr2csr1r2r3", 1, 1, NODE1, NODE2, NODE3);
+ clusterState = reader.getClusterState();
assertEquals(clusterState.getCollection("collection1").getReplica("replica1").getBaseUrl(), NODE1_URL);
assertEquals(clusterState.getCollection("collection1").getReplica("replica2").getBaseUrl(), NODE2_URL);
@@ -90,6 +89,6 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
fail("No other collection needs to be changed");
}
}
- result.close();
+ reader.close();
}
}