You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/02/27 16:50:06 UTC
lucene-solr:jira/solr-11670: Refactor and implement automatic refresh
of tasks when props are updated. Add unit test.
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-11670 760f7a395 -> 7265a5453
Refactor and implement automatic refresh of tasks when props are updated.
Add unit test.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/7265a545
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/7265a545
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/7265a545
Branch: refs/heads/jira/solr-11670
Commit: 7265a54534cf9c3b9b937b50f1340517ccc2a52d
Parents: 760f7a3
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue Feb 27 17:49:01 2018 +0100
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue Feb 27 17:49:01 2018 +0100
----------------------------------------------------------------------
.../org/apache/solr/cloud/MaintenanceTask.java | 16 ++
.../org/apache/solr/cloud/MaintenanceTasks.java | 161 +++++++++++++++++++
.../apache/solr/cloud/MaintenanceThread.java | 104 ------------
.../java/org/apache/solr/cloud/Overseer.java | 13 +-
.../cloud/api/collections/SplitShardCmd.java | 69 ++++++--
.../cloud/api/collections/ShardSplitTest.java | 43 +++++
.../solr/common/cloud/ClusterProperties.java | 2 +-
7 files changed, 288 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7265a545/solr/core/src/java/org/apache/solr/cloud/MaintenanceTask.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MaintenanceTask.java b/solr/core/src/java/org/apache/solr/cloud/MaintenanceTask.java
index 5525c42..9f0879a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MaintenanceTask.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MaintenanceTask.java
@@ -19,12 +19,28 @@ package org.apache.solr.cloud;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Map;
/**
*
*/
public interface MaintenanceTask extends Runnable, Closeable {
+ /**
+ * Initialize this task. This method may be called multiple times when configuration changes.
+ * @param properties configuration properties (may contain unrelated key / values)
+ */
+ void init(Map<String, Object> properties);
+
+ /**
+ * Schedule period in seconds. The task will be executed at most that frequently.
+ */
+ int getSchedulePeriod();
+
+ /**
+ * Name of this task (must be unique). Default value is the simple class name.
+ * @return task name
+ */
default String getName() {
return getClass().getSimpleName();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7265a545/solr/core/src/java/org/apache/solr/cloud/MaintenanceTasks.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MaintenanceTasks.java b/solr/core/src/java/org/apache/solr/cloud/MaintenanceTasks.java
new file mode 100644
index 0000000..f040d37
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/MaintenanceTasks.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.common.SolrCloseable;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for running periodically some maintenance tasks.
+ */
+public class MaintenanceTasks implements Runnable, SolrCloseable {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private class ScheduledTask {
+ MaintenanceTask maintenanceTask;
+ ScheduledFuture<?> scheduledFuture;
+ }
+
+ private final SolrCloudManager cloudManager;
+ private final ScheduledThreadPoolExecutor taskExecutor;
+ private final Map<String, ScheduledTask> tasks = new ConcurrentHashMap<>();
+ private Object updated = new Object();
+
+ private volatile Map<String, Object> clusterProperties = Collections.emptyMap();
+ private volatile boolean isClosed = false;
+
+ public MaintenanceTasks(SolrCloudManager cloudManager) {
+ this.cloudManager = cloudManager;
+ this.taskExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(3, new DefaultSolrThreadFactory("MaintenanceTask"));
+ }
+
+ @Override
+ public void run() {
+ refreshClusterProperties();
+ while (!isClosed && !Thread.interrupted()) {
+ try {
+ synchronized (updated) {
+ updated.wait();
+ }
+ // re-init and reschedule tasks
+ for (ScheduledTask st : tasks.values()) {
+ if (st.scheduledFuture != null) {
+ st.scheduledFuture.cancel(false);
+ }
+ st.maintenanceTask.init(clusterProperties);
+ st.scheduledFuture = taskExecutor.scheduleWithFixedDelay(st.maintenanceTask, 0,
+ cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, st.maintenanceTask.getSchedulePeriod(), TimeUnit.MILLISECONDS),
+ TimeUnit.MILLISECONDS);
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+
+ private void refreshClusterProperties() {
+ try {
+ VersionedData data = cloudManager.getDistribStateManager().getData(ZkStateReader.CLUSTER_PROPS, ev -> {
+ // session events are not change events, and do not remove the watcher
+ if (ev.getType().equals(Watcher.Event.EventType.None)) {
+ return;
+ } else {
+ refreshClusterProperties();
+ }
+ });
+ Map<String, Object> newProperties = (Map<String, Object>)Utils.fromJSON(data.getData());
+ clusterProperties = newProperties;
+ synchronized (updated) {
+ updated.notifyAll();
+ }
+ } catch (Exception e) {
+ log.warn("Exception retrieving cluster properties", e);
+ }
+ }
+
+ public void addTask(MaintenanceTask task, boolean replace) throws AlreadyExistsException {
+ if (tasks.containsKey(task.getName())) {
+ if (replace) {
+ ScheduledTask oldScheduledTask = tasks.get(task.getName());
+ IOUtils.closeQuietly(oldScheduledTask.maintenanceTask);
+ if (oldScheduledTask.scheduledFuture != null) {
+ oldScheduledTask.scheduledFuture.cancel(false);
+ }
+ } else {
+ throw new AlreadyExistsException(task.getName());
+ }
+ }
+ ScheduledTask st = new ScheduledTask();
+ st.maintenanceTask = task;
+ task.init(clusterProperties);
+ tasks.put(task.getName(), st);
+ st.scheduledFuture = taskExecutor.scheduleWithFixedDelay(task, 0,
+ cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, task.getSchedulePeriod(), TimeUnit.MILLISECONDS),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void removeTask(String name) {
+ ScheduledTask st = tasks.get(name);
+ if (st == null) {
+ // ignore
+ return;
+ }
+ if (st.scheduledFuture != null) {
+ st.scheduledFuture.cancel(false);
+ }
+ IOUtils.closeQuietly(st.maintenanceTask);
+ }
+
+ @Override
+ public void close() throws IOException {
+ isClosed = true;
+ for (ScheduledTask scheduledTask : tasks.values()) {
+ if (scheduledTask.scheduledFuture != null) {
+ scheduledTask.scheduledFuture.cancel(false);
+ }
+ IOUtils.closeQuietly(scheduledTask.maintenanceTask);
+ }
+ tasks.clear();
+ taskExecutor.shutdown();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return isClosed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7265a545/solr/core/src/java/org/apache/solr/cloud/MaintenanceThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MaintenanceThread.java b/solr/core/src/java/org/apache/solr/cloud/MaintenanceThread.java
deleted file mode 100644
index 216beca..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/MaintenanceThread.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
-import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
-import org.apache.solr.common.SolrCloseable;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-
-/**
- * Responsible for running periodically some maintenance tasks.
- */
-public class MaintenanceThread implements SolrCloseable {
-
- private class Task {
- MaintenanceTask maintenanceTask;
- ScheduledFuture<?> scheduledFuture;
- }
-
- private final SolrCloudManager cloudManager;
- private final ScheduledThreadPoolExecutor taskExecutor;
- private final Map<String, Task> tasks = new ConcurrentHashMap<>();
-
- private transient boolean isClosed = false;
-
- public MaintenanceThread(SolrCloudManager cloudManager) {
- this.cloudManager = cloudManager;
- this.taskExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(3, new DefaultSolrThreadFactory("MaintenanceTask"));
- }
-
- public void addTask(MaintenanceTask task, int period, TimeUnit timeUnit, boolean replace) throws AlreadyExistsException {
- if (tasks.containsKey(task.getName())) {
- if (replace) {
- Task oldTask = tasks.get(task.getName());
- IOUtils.closeQuietly(oldTask.maintenanceTask);
- if (oldTask.scheduledFuture != null) {
- oldTask.scheduledFuture.cancel(false);
- }
- } else {
- throw new AlreadyExistsException(task.getName());
- }
- }
- Task t = new Task();
- t.maintenanceTask = task;
- tasks.put(task.getName(), t);
- t.scheduledFuture = taskExecutor.scheduleWithFixedDelay(task, 0,
- cloudManager.getTimeSource().convertDelay(timeUnit, period, TimeUnit.MILLISECONDS),
- TimeUnit.MILLISECONDS);
- }
-
- public void removeTask(String name) {
- Task t = tasks.get(name);
- if (t == null) {
- // ignore
- return;
- }
- IOUtils.closeQuietly(t.maintenanceTask);
- if (t.scheduledFuture != null) {
- t.scheduledFuture.cancel(false);
- }
- }
-
- @Override
- public void close() throws IOException {
- isClosed = true;
- for (Task task : tasks.values()) {
- IOUtils.closeQuietly(task.maintenanceTask);
- if (task.scheduledFuture != null) {
- task.scheduledFuture.cancel(false);
- }
- }
- tasks.clear();
- taskExecutor.shutdown();
- }
-
- @Override
- public boolean isClosed() {
- return isClosed;
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7265a545/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index dd03a99..e96bfce 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -486,6 +486,8 @@ public class Overseer implements SolrCloseable {
private OverseerThread maintenanceThread;
+ private MaintenanceTasks maintenanceTasks;
+
private final ZkStateReader reader;
private final ShardHandler shardHandler;
@@ -531,8 +533,8 @@ public class Overseer implements SolrCloseable {
// create this early so that collection commands may register their tasks
ThreadGroup maintenanceThreadGroup = new ThreadGroup("Overseer maintenance process");
- MaintenanceThread maintenance = new MaintenanceThread(zkController.getSolrCloudManager());
- maintenanceThread = new OverseerThread(maintenanceThreadGroup, maintenance, "OverseerMaintenanceThread-" + id);
+ maintenanceTasks = new MaintenanceTasks(zkController.getSolrCloudManager());
+ maintenanceThread = new OverseerThread(maintenanceThreadGroup, maintenanceTasks, "OverseerMaintenanceTasks-" + id);
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
@@ -597,6 +599,13 @@ public class Overseer implements SolrCloseable {
return maintenanceThread;
}
+ /**
+ * Access the task runner to register / unregister maintenance tasks.
+ */
+ public synchronized MaintenanceTasks getMaintenanceTasks() {
+ return maintenanceTasks;
+ }
+
public synchronized void close() {
if (closed) return;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7265a545/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index 3c3ae4f..374ed65 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -34,9 +34,9 @@ import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.MaintenanceTask;
-import org.apache.solr.cloud.MaintenanceThread;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
@@ -74,20 +74,18 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
private final OverseerCollectionMessageHandler ocmh;
- public static final String CLEANUP_PERIOD_PROP = "split.shard.cleanup.period";
- public static final String CLEANUP_TTL_PROP = "split.shard.cleanup.ttl";
+ private static final String PROP_PREFIX = ZkStateReader.PROPERTY_PROP_PREFIX + InactiveSliceCleanupTask.class.getSimpleName() + ".";
+ public static final String CLEANUP_PERIOD_PROP = PROP_PREFIX + "period";
+ public static final String CLEANUP_TTL_PROP = PROP_PREFIX + "ttl";
public static final int DEFAULT_CLEANUP_PERIOD_SECONDS = 3600;
public static final int DEFAULT_CLEANUP_TTL_SECONDS = 3600 * 24 * 2;
public SplitShardCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
- int cleanupPeriod = ocmh.cloudManager.getClusterStateProvider().getClusterProperty(CLEANUP_PERIOD_PROP, DEFAULT_CLEANUP_PERIOD_SECONDS);
- int cleanupTTL = ocmh.cloudManager.getClusterStateProvider().getClusterProperty(CLEANUP_TTL_PROP, DEFAULT_CLEANUP_TTL_SECONDS);
- InactiveSliceCleanupTask task = new InactiveSliceCleanupTask(ocmh, cleanupTTL);
+ InactiveSliceCleanupTask task = new InactiveSliceCleanupTask(ocmh);
try {
- ((MaintenanceThread) ocmh.overseer.getMaintenanceThread().getThread())
- .addTask(task, cleanupPeriod, TimeUnit.SECONDS, false);
+ ocmh.overseer.getMaintenanceTasks().addTask(task, false);
} catch (AlreadyExistsException e) { // should not happen
log.error("Already registered cleanup task? ", e);
throw new RuntimeException(e);
@@ -565,24 +563,62 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
* their cleanupTTL period elapsed.
*/
public static class InactiveSliceCleanupTask implements MaintenanceTask {
+
+ // a unit test shunt
+ static List<NamedList> testResultsList = null;
+
private OverseerCollectionMessageHandler ocmh;
- private int cleanupTTL;
- private DeleteShardCmd cmd;
+ private int cleanupTTL = DEFAULT_CLEANUP_TTL_SECONDS;
+ private int period = DEFAULT_CLEANUP_PERIOD_SECONDS;
- public InactiveSliceCleanupTask(OverseerCollectionMessageHandler ocmh, int cleanupTTL) {
+ public InactiveSliceCleanupTask(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
- this.cleanupTTL = cleanupTTL;
- cmd = (DeleteShardCmd)ocmh.commandMap.get(DELETESHARD);
+ }
+
+ @Override
+ public void init(Map<String, Object> properties) {
+ String periodStr = String.valueOf(properties.getOrDefault(CLEANUP_PERIOD_PROP, DEFAULT_CLEANUP_PERIOD_SECONDS));
+ String ttlStr = String.valueOf(properties.getOrDefault(CLEANUP_TTL_PROP, DEFAULT_CLEANUP_TTL_SECONDS));
+ try {
+ period = Integer.parseInt(periodStr);
+ } catch (Exception e) {
+ period = -1;
+ }
+ if (period <= 0) {
+ log.warn("Invalid cleanup period " + periodStr + ", using default " + DEFAULT_CLEANUP_PERIOD_SECONDS);
+ period = DEFAULT_CLEANUP_PERIOD_SECONDS;
+ }
+ try {
+ cleanupTTL = Integer.parseInt(ttlStr);
+ } catch (Exception e) {
+ cleanupTTL = -1;
+ }
+ if (cleanupTTL < 0) {
+ log.warn("Invalid cleanup TTL " + ttlStr + ", using default " + DEFAULT_CLEANUP_TTL_SECONDS);
+ cleanupTTL = DEFAULT_CLEANUP_TTL_SECONDS;
+ }
+ }
+
+ @Override
+ public int getSchedulePeriod() {
+ return period;
}
@Override
public synchronized void run() {
+ DeleteShardCmd cmd = (DeleteShardCmd)ocmh.commandMap.get(DELETESHARD);
+ log.debug("-- running, cmd={}", cmd);
+ if (cmd == null) {
+ return; // not initialized yet
+ }
try {
ClusterState state = ocmh.cloudManager.getClusterStateProvider().getClusterState();
state.forEachCollection(coll -> {
coll.getSlices().forEach(s -> {
if (Slice.State.INACTIVE.equals(s.getState())) {
String tstampStr = s.getStr(ZkStateReader.STATE_TIMESTAMP_PROP);
+ log.debug("-- found inactive {} / {}, now={}, tstamp={}",
+ coll.getName(), s.getName(), ocmh.cloudManager.getTimeSource().getTime(), tstampStr);
if (tstampStr == null || tstampStr.isEmpty()) {
return;
}
@@ -592,13 +628,20 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
ZkNodeProps props = new ZkNodeProps(ZkStateReader.COLLECTION_PROP, coll.getName(),
ZkStateReader.SHARD_ID_PROP, s.getName());
NamedList results = new NamedList();
+ results.add("collection", coll.getName());
+ results.add("slice", s.getName());
try {
+ log.debug("calling DELETESHARD with {}", props);
cmd.call(state, props, results);
if (results.get("failure") != null) {
throw new Exception("Failed to delete inactive shard: " + results);
}
} catch (Exception e) {
log.warn("Exception deleting inactive shard " + coll.getName() + "/" + s.getName(), e);
+ results.add("error", e.toString());
+ }
+ if (testResultsList != null) {
+ testResultsList.add(results);
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7265a545/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index 6d3ce4e..1ca82ba 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -49,6 +49,7 @@ import org.apache.solr.cloud.BasicDistributedZkTest;
import org.apache.solr.cloud.ChaosMonkey;
import org.apache.solr.cloud.StoppableIndexingThread;
import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.cloud.ClusterProperties;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.CompositeIdRouter;
@@ -61,8 +62,10 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TestInjection;
+import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,6 +92,29 @@ public class ShardSplitTest extends BasicDistributedZkTest {
useFactory(null);
}
+ @After
+ public void resetClusterProperties() throws Exception {
+ SplitShardCmd.InactiveSliceCleanupTask.testResultsList = null;
+ setClusterProperties(null, null);
+ }
+
+ private void setClusterProperties(String cleanupPeriod, String cleanupTTL) throws Exception {
+ ClusterProperties clusterProperties = new ClusterProperties(cloudClient.getZkStateReader().getZkClient());
+ Map<String, Object> props = clusterProperties.getClusterProperties();
+ boolean changed = false;
+ if (props.containsKey(SplitShardCmd.CLEANUP_PERIOD_PROP) ||
+ props.containsKey(SplitShardCmd.CLEANUP_TTL_PROP)) {
+ changed = true;
+ } else if (cleanupPeriod != null || cleanupTTL != null) {
+ changed = true;
+ }
+ clusterProperties.setClusterProperty(SplitShardCmd.CLEANUP_PERIOD_PROP, cleanupPeriod);
+ clusterProperties.setClusterProperty(SplitShardCmd.CLEANUP_TTL_PROP, cleanupTTL);
+ if (changed) {
+ Thread.sleep(5000);
+ }
+ }
+
@Test
public void test() throws Exception {
@@ -508,6 +534,8 @@ public class ShardSplitTest extends BasicDistributedZkTest {
@Test
public void testSplitShardWithRule() throws Exception {
+ SplitShardCmd.InactiveSliceCleanupTask.testResultsList = new ArrayList<>();
+ setClusterProperties("2", "2");
waitForThingsToLevelOut(15);
if (usually()) {
@@ -527,6 +555,19 @@ public class ShardSplitTest extends BasicDistributedZkTest {
.setShardName("shard1");
response = splitShardRequest.process(cloudClient);
assertEquals(String.valueOf(response.getErrorMessages()), 0, response.getStatus());
+
+ waitForThingsToLevelOut(15);
+
+ Thread.sleep(10000);
+ assertTrue(SplitShardCmd.InactiveSliceCleanupTask.testResultsList.size() > 0);
+ NamedList nl = SplitShardCmd.InactiveSliceCleanupTask.testResultsList.get(0);
+ assertEquals(collectionName, nl.get("collection"));
+ assertEquals("shard1", nl.get("slice"));
+ assertNull(nl.get("failure"));
+ assertNull(nl.get("error"));
+ assertNotNull(nl.get("success"));
+ // at this point there should be no inactive shards
+
}
private void incompleteOrOverlappingCustomRangeTest() throws Exception {
@@ -574,6 +615,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
}
private void splitByUniqueKeyTest() throws Exception {
+
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
final DocRouter router = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getRouter();
Slice shard1 = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getSlice(SHARD1);
@@ -657,6 +699,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
waitForRecoveriesToFinish(true);
checkDocCountsAndShardStates(docCounts, numReplicas);
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7265a545/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterProperties.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterProperties.java
index e6df845..979e968 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterProperties.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterProperties.java
@@ -87,7 +87,7 @@ public class ClusterProperties {
@SuppressWarnings("unchecked")
public void setClusterProperty(String propertyName, String propertyValue) throws IOException {
- if (!ZkStateReader.KNOWN_CLUSTER_PROPS.contains(propertyName)) {
+ if (!ZkStateReader.KNOWN_CLUSTER_PROPS.contains(propertyName) && !propertyName.startsWith(ZkStateReader.PROPERTY_PROP_PREFIX)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Not a known cluster property " + propertyName);
}