You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2017/04/06 08:58:57 UTC
[1/2] lucene-solr:branch_6x: SOLR-10239: MOVEREPLICA API
Repository: lucene-solr
Updated Branches:
refs/heads/branch_6x e606d901e -> 78b84e65b
SOLR-10239: MOVEREPLICA API
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c3bb6e20
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c3bb6e20
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c3bb6e20
Branch: refs/heads/branch_6x
Commit: c3bb6e20591d69e0f5039a56780a32d69c2543ec
Parents: e606d90
Author: Cao Manh Dat <da...@apache.org>
Authored: Thu Apr 6 15:48:38 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Thu Apr 6 15:58:25 2017 +0700
----------------------------------------------------------------------
.../org/apache/solr/cloud/MoveReplicaCmd.java | 193 +++++++++++++++++++
.../cloud/OverseerCollectionMessageHandler.java | 1 +
.../solr/handler/admin/CollectionsHandler.java | 12 +-
.../CollectionsAPIAsyncDistributedZkTest.java | 16 +-
.../org/apache/solr/cloud/MoveReplicaTest.java | 125 ++++++++++++
.../HdfsCollectionsAPIDistributedZkTest.java | 114 +++++++++++
.../solrj/request/CollectionAdminRequest.java | 38 ++++
.../solr/common/params/CollectionParams.java | 1 +
8 files changed, 497 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
new file mode 100644
index 0000000..09d3b79
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.*;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class MoveReplicaCmd implements Cmd{
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OverseerCollectionMessageHandler ocmh;
+
+ public MoveReplicaCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+ moveReplica(ocmh.zkStateReader.getClusterState(), message, results);
+ }
+
+ private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+ log.info("moveReplica() : {}", Utils.toJSONString(message));
+ ocmh.checkRequired(message, COLLECTION_PROP, "targetNode");
+ String collection = message.getStr(COLLECTION_PROP);
+ String targetNode = message.getStr("targetNode");
+
+ String async = message.getStr(ASYNC);
+
+ DocCollection coll = clusterState.getCollection(collection);
+ if (coll == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
+ }
+ Replica replica = null;
+ if (message.containsKey(REPLICA_PROP)) {
+ String replicaName = message.getStr(REPLICA_PROP);
+ replica = coll.getReplica(replicaName);
+ if (replica == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Collection: " + collection + " replica: " + replicaName + " does not exist");
+ }
+ } else {
+ ocmh.checkRequired(message, SHARD_ID_PROP, "fromNode");
+ String fromNode = message.getStr("fromNode");
+ String shardId = message.getStr(SHARD_ID_PROP);
+ Slice slice = clusterState.getCollection(collection).getSlice(shardId);
+ List<Replica> sliceReplicas = new ArrayList<>(slice.getReplicas());
+ Collections.shuffle(sliceReplicas, RANDOM);
+ for (Replica r : slice.getReplicas()) {
+ if (r.getNodeName().equals(fromNode)) {
+ replica = r;
+ }
+ }
+ if (replica == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Collection: " + collection + " node: " + fromNode + " do not have any replica belong to shard: " + shardId);
+ }
+ }
+
+ log.info("Replica will be moved {}", replica);
+ Slice slice = null;
+ for (Slice s : coll.getSlices()) {
+ if (s.getReplicas().contains(replica)) {
+ slice = s;
+ }
+ }
+ assert slice != null;
+ Object dataDir = replica.get("dataDir");
+ if (dataDir != null && dataDir.toString().startsWith("hdfs:/")) {
+ moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice);
+ } else {
+ moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice);
+ }
+ }
+
+ private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async,
+ DocCollection coll, Replica replica, Slice slice) throws Exception {
+ String newCoreName = Assign.buildCoreName(coll, slice.getName());
+
+ ZkNodeProps removeReplicasProps = new ZkNodeProps(
+ COLLECTION_PROP, coll.getName(),
+ SHARD_ID_PROP, slice.getName(),
+ REPLICA_PROP, replica.getName()
+ );
+ removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_DATA_DIR, false);
+ removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_INDEX, false);
+ if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
+ NamedList deleteResult = new NamedList();
+ ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, ()->{});
+ if (deleteResult.get("failure") != null) {
+ String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s",
+ coll.getName(), slice.getName(), replica.getName());
+ log.warn(errorString);
+ results.add("failure", errorString + ", because of : " + deleteResult.get("failure"));
+ return;
+ }
+
+ ZkNodeProps addReplicasProps = new ZkNodeProps(
+ COLLECTION_PROP, coll.getName(),
+ SHARD_ID_PROP, slice.getName(),
+ CoreAdminParams.NODE, targetNode,
+ CoreAdminParams.NAME, newCoreName,
+ CoreAdminParams.DATA_DIR, dataDir);
+ if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
+ NamedList addResult = new NamedList();
+ ocmh.addReplica(clusterState, addReplicasProps, addResult, ()->{});
+ if (addResult.get("failure") != null) {
+ String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
+ " on node=%s", coll.getName(), slice.getName(), targetNode);
+ log.warn(errorString);
+ results.add("failure", errorString);
+ return;
+ } else {
+ String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
+ "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName, targetNode);
+ results.add("success", successString);
+ }
+ }
+
+ private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
+ DocCollection coll, Replica replica, Slice slice) throws Exception {
+ String newCoreName = Assign.buildCoreName(coll, slice.getName());
+ ZkNodeProps addReplicasProps = new ZkNodeProps(
+ COLLECTION_PROP, coll.getName(),
+ SHARD_ID_PROP, slice.getName(),
+ CoreAdminParams.NODE, targetNode,
+ CoreAdminParams.NAME, newCoreName);
+ if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
+ NamedList addResult = new NamedList();
+ ocmh.addReplica(clusterState, addReplicasProps, addResult, ()->{});
+ if (addResult.get("failure") != null) {
+ String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
+ " on node=%s", coll.getName(), slice.getName(), targetNode);
+ log.warn(errorString);
+ results.add("failure", errorString);
+ return;
+ }
+
+ ZkNodeProps removeReplicasProps = new ZkNodeProps(
+ COLLECTION_PROP, coll.getName(),
+ SHARD_ID_PROP, slice.getName(),
+ REPLICA_PROP, replica.getName());
+ if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
+ NamedList deleteResult = new NamedList();
+ ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, ()->{});
+ if (deleteResult.get("failure") != null) {
+ String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s",
+ coll.getName(), slice.getName(), replica.getName());
+ log.warn(errorString);
+ results.add("failure", errorString + ", because of : " + deleteResult.get("failure"));
+ } else {
+ String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
+ "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName, targetNode);
+ results.add("success", successString);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 00eb12d..a13323d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -207,6 +207,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
.put(DELETESHARD, new DeleteShardCmd(this))
.put(DELETEREPLICA, new DeleteReplicaCmd(this))
.put(ADDREPLICA, new AddReplicaCmd(this))
+ .put(MOVEREPLICA, new MoveReplicaCmd(this))
.build()
;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index d7759ca..c50e2b4 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -857,6 +857,16 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
return null;
}),
REPLACENODE_OP(REPLACENODE, (req, rsp, h) -> req.getParams().required().getAll(req.getParams().getAll(null, "parallel"), "source", "target")),
+ MOVEREPLICA_OP(MOVEREPLICA, (req, rsp, h) -> {
+ Map<String, Object> map = req.getParams().required().getAll(null,
+ COLLECTION_PROP);
+
+ return req.getParams().getAll(map,
+ "fromNode",
+ "targetNode",
+ "replica",
+ "shard");
+ }),
DELETENODE_OP(DELETENODE, (req, rsp, h) -> req.getParams().required().getAll(null, "node"));
public final CollectionOp fun;
CollectionAction action;
@@ -879,7 +889,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
for (CollectionOperation op : values()) {
if (op.action == action) return op;
}
- throw new SolrException(ErrorCode.SERVER_ERROR, "No such action" + action);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "No such action " + action);
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
index dcb115a..30c3c9e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
@@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest.SplitShard;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -178,11 +179,22 @@ public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
//expected
}
- String replica = shard1.getReplicas().iterator().next().getName();
+ Replica replica = shard1.getReplicas().iterator().next();
+ for (String liveNode : client.getZkStateReader().getClusterState().getLiveNodes()) {
+ if (!replica.getNodeName().equals(liveNode)) {
+ state = new CollectionAdminRequest.MoveReplica(collection, replica.getName(), liveNode)
+ .processAndWait(client, MAX_TIMEOUT_SECONDS);
+ assertSame("MoveReplica did not complete", RequestStatusState.COMPLETED, state);
+ break;
+ }
+ }
+
+ shard1 = client.getZkStateReader().getClusterState().getSlice(collection, "shard1");
+ String replicaName = shard1.getReplicas().iterator().next().getName();
state = new CollectionAdminRequest.DeleteReplica()
.setCollectionName(collection)
.setShardName("shard1")
- .setReplica(replica)
+ .setReplica(replicaName)
.processAndWait(client, MAX_TIMEOUT_SECONDS);
assertSame("DeleteReplica did not complete", RequestStatusState.COMPLETED, state);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
new file mode 100644
index 0000000..4368fea
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MoveReplicaTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(4)
+ .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+ .configure();
+ }
+
+ protected String getSolrXml() {
+ return "solr.xml";
+ }
+
+ @Test
+ public void test() throws Exception {
+ cluster.waitForAllNodes(5000);
+ String coll = "movereplicatest_coll";
+ log.info("total_jettys: " + cluster.getJettySolrRunners().size());
+
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, 2);
+ create.setMaxShardsPerNode(2);
+ cloudClient.request(create);
+
+ Replica replica = getRandomReplica(coll, cloudClient);
+ Set<String> liveNodes = cloudClient.getZkStateReader().getClusterState().getLiveNodes();
+ ArrayList<String> l = new ArrayList<>(liveNodes);
+ Collections.shuffle(l, random());
+ String targetNode = null;
+ for (String node : liveNodes) {
+ if (!replica.getNodeName().equals(node)) {
+ targetNode = node;
+ break;
+ }
+ }
+ assertNotNull(targetNode);
+ String shardId = null;
+ for (Slice slice : cloudClient.getZkStateReader().getClusterState().getCollection(coll).getSlices()) {
+ if (slice.getReplicas().contains(replica)) {
+ shardId = slice.getName();
+ }
+ }
+
+ CollectionAdminRequest.MoveReplica moveReplica = new CollectionAdminRequest.MoveReplica(coll, replica.getName(), targetNode);
+ moveReplica.processAsync("000", cloudClient);
+ CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
+ // wait for async request success
+ boolean success = false;
+ for (int i = 0; i < 200; i++) {
+ CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
+ if (rsp.getRequestStatus() == RequestStatusState.COMPLETED) {
+ success = true;
+ break;
+ }
+ assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
+ Thread.sleep(50);
+ }
+ assertTrue(success);
+ checkNumOfCores(cloudClient, replica.getNodeName(), 0);
+ checkNumOfCores(cloudClient, targetNode, 2);
+
+ moveReplica = new CollectionAdminRequest.MoveReplica(coll, shardId, targetNode, replica.getNodeName());
+ moveReplica.process(cloudClient);
+ checkNumOfCores(cloudClient, replica.getNodeName(), 1);
+ checkNumOfCores(cloudClient, targetNode, 1);
+ }
+
+ private Replica getRandomReplica(String coll, CloudSolrClient cloudClient) {
+ List<Replica> replicas = cloudClient.getZkStateReader().getClusterState().getCollection(coll).getReplicas();
+ Collections.shuffle(replicas, random());
+ return replicas.get(0);
+ }
+
+ private void checkNumOfCores(CloudSolrClient cloudClient, String nodeName, int expectedCores) throws IOException, SolrServerException {
+ assertEquals(nodeName + " does not have expected number of cores",expectedCores, getNumOfCores(cloudClient, nodeName));
+ }
+
+ private int getNumOfCores(CloudSolrClient cloudClient, String nodeName) throws IOException, SolrServerException {
+ try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(nodeName))) {
+ CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
+ return status.getCoreStatus().size();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsCollectionsAPIDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsCollectionsAPIDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsCollectionsAPIDistributedZkTest.java
index 1b830ad..58d499b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsCollectionsAPIDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsCollectionsAPIDistributedZkTest.java
@@ -16,15 +16,37 @@
*/
package org.apache.solr.cloud.hdfs;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
import com.carrotsearch.randomizedtesting.annotations.Nightly;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Metric;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.request.CoreStatus;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.cloud.CollectionsAPIDistributedZkTest;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.util.BadHdfsThreadsFilter;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Test;
@Slow
@Nightly
@@ -59,4 +81,96 @@ public class HdfsCollectionsAPIDistributedZkTest extends CollectionsAPIDistribut
System.clearProperty("solr.hdfs.home");
}
+ @Test
+ public void moveReplicaTest() throws Exception {
+ cluster.waitForAllNodes(5000);
+ String coll = "movereplicatest_coll";
+
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf", 2, 2);
+ create.setMaxShardsPerNode(2);
+ cloudClient.request(create);
+
+ for (int i = 0; i < 10; i++) {
+ cloudClient.add(coll, sdoc("id",String.valueOf(i)));
+ cloudClient.commit(coll);
+ }
+
+ List<Slice> slices = new ArrayList<>(cloudClient.getZkStateReader().getClusterState().getCollection(coll).getSlices());
+ Collections.shuffle(slices, random());
+ Slice slice = null;
+ Replica replica = null;
+ for (Slice s : slices) {
+ slice = s;
+ for (Replica r : s.getReplicas()) {
+ if (s.getLeader() != r) {
+ replica = r;
+ }
+ }
+ }
+ String dataDir = getDataDir(replica);
+
+ Set<String> liveNodes = cloudClient.getZkStateReader().getClusterState().getLiveNodes();
+ ArrayList<String> l = new ArrayList<>(liveNodes);
+ Collections.shuffle(l, random());
+ String targetNode = null;
+ for (String node : liveNodes) {
+ if (!replica.getNodeName().equals(node)) {
+ targetNode = node;
+ break;
+ }
+ }
+ assertNotNull(targetNode);
+
+ CollectionAdminRequest.MoveReplica moveReplica = new CollectionAdminRequest.MoveReplica(coll, replica.getName(), targetNode);
+ moveReplica.process(cloudClient);
+
+ checkNumOfCores(cloudClient, replica.getNodeName(), 0);
+ checkNumOfCores(cloudClient, targetNode, 2);
+
+ waitForState("Wait for recovery finish failed",coll, clusterShape(2,2));
+ slice = cloudClient.getZkStateReader().getClusterState().getCollection(coll).getSlice(slice.getName());
+ boolean found = false;
+ for (Replica newReplica : slice.getReplicas()) {
+ if (getDataDir(newReplica).equals(dataDir)) {
+ found = true;
+ }
+ }
+ assertTrue(found);
+
+
+ // data dir is reused so replication will be skipped
+ for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+ SolrMetricManager manager = jetty.getCoreContainer().getMetricManager();
+ List<String> registryNames = manager.registryNames().stream()
+ .filter(s -> s.startsWith("solr.core.")).collect(Collectors.toList());
+ for (String registry : registryNames) {
+ Map<String, Metric> metrics = manager.registry(registry).getMetrics();
+ Counter counter = (Counter) metrics.get("REPLICATION./replication.requests");
+ if (counter != null) {
+ assertEquals(0, counter.getCount());
+ }
+ }
+ }
+ }
+
+
+ private void checkNumOfCores(CloudSolrClient cloudClient, String nodeName, int expectedCores) throws IOException, SolrServerException {
+ assertEquals(nodeName + " does not have expected number of cores",expectedCores, getNumOfCores(cloudClient, nodeName));
+ }
+
+ private int getNumOfCores(CloudSolrClient cloudClient, String nodeName) throws IOException, SolrServerException {
+ try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(nodeName))) {
+ CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
+ return status.getCoreStatus().size();
+ }
+ }
+
+ private String getDataDir(Replica replica) throws IOException, SolrServerException {
+ try (HttpSolrClient coreclient = getHttpSolrClient(replica.getBaseUrl())) {
+ CoreStatus status = CoreAdminRequest.getCoreStatus(replica.getCoreName(), coreclient);
+ return status.getDataDirectory();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 94750c0a..0b1609d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -607,6 +607,44 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
}
+ public static class MoveReplica extends AsyncCollectionAdminRequest {
+ String collection, replica, targetNode;
+ String shard, fromNode;
+ boolean randomlyMoveReplica;
+
+ public MoveReplica(String collection, String replica, String targetNode) {
+ super(CollectionAction.MOVEREPLICA);
+ this.collection = collection;
+ this.replica = replica;
+ this.targetNode = targetNode;
+ this.randomlyMoveReplica = false;
+ }
+
+ public MoveReplica(String collection, String shard, String fromNode, String targetNode) {
+ super(CollectionAction.MOVEREPLICA);
+ this.collection = collection;
+ this.shard = shard;
+ this.fromNode = fromNode;
+ this.targetNode = targetNode;
+ this.randomlyMoveReplica = true;
+ }
+
+ @Override
+ public SolrParams getParams() {
+ ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+ params.set("collection", collection);
+ params.set("targetNode", targetNode);
+ if (randomlyMoveReplica) {
+ params.set("shard", shard);
+ params.set("fromNode", fromNode);
+ } else {
+ params.set("replica", replica);
+ }
+ return params;
+ }
+ }
+
+
/*
* Returns a RebalanceLeaders object to rebalance leaders for a collection
*/
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c3bb6e20/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index f1e5a52..51db039 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -80,6 +80,7 @@ public interface CollectionParams {
REQUESTSTATUS(false, LockLevel.NONE),
DELETESTATUS(false, LockLevel.NONE),
ADDREPLICA(true, LockLevel.SHARD),
+ MOVEREPLICA(true, LockLevel.SHARD),
OVERSEERSTATUS(false, LockLevel.NONE),
LIST(false, LockLevel.NONE),
CLUSTERSTATUS(false, LockLevel.NONE),
[2/2] lucene-solr:branch_6x: SOLR-10239: change empty lambda to null
Posted by da...@apache.org.
SOLR-10239: change empty lambda to null
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/78b84e65
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/78b84e65
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/78b84e65
Branch: refs/heads/branch_6x
Commit: 78b84e65bd5262386917225cd94a90c078847ae6
Parents: c3bb6e2
Author: Cao Manh Dat <da...@apache.org>
Authored: Thu Apr 6 15:57:43 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Thu Apr 6 15:58:45 2017 +0700
----------------------------------------------------------------------
solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/78b84e65/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
index 09d3b79..545989e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
@@ -122,7 +122,7 @@ public class MoveReplicaCmd implements Cmd{
removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_INDEX, false);
if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
NamedList deleteResult = new NamedList();
- ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, ()->{});
+ ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
if (deleteResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s",
coll.getName(), slice.getName(), replica.getName());
@@ -139,7 +139,7 @@ public class MoveReplicaCmd implements Cmd{
CoreAdminParams.DATA_DIR, dataDir);
if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
NamedList addResult = new NamedList();
- ocmh.addReplica(clusterState, addReplicasProps, addResult, ()->{});
+ ocmh.addReplica(clusterState, addReplicasProps, addResult, null);
if (addResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s", coll.getName(), slice.getName(), targetNode);
@@ -163,7 +163,7 @@ public class MoveReplicaCmd implements Cmd{
CoreAdminParams.NAME, newCoreName);
if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
NamedList addResult = new NamedList();
- ocmh.addReplica(clusterState, addReplicasProps, addResult, ()->{});
+ ocmh.addReplica(clusterState, addReplicasProps, addResult, null);
if (addResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s", coll.getName(), slice.getName(), targetNode);
@@ -178,7 +178,7 @@ public class MoveReplicaCmd implements Cmd{
REPLICA_PROP, replica.getName());
if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
NamedList deleteResult = new NamedList();
- ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, ()->{});
+ ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
if (deleteResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s",
coll.getName(), slice.getName(), replica.getName());