You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by tf...@apache.org on 2018/02/16 00:23:27 UTC
[1/2] lucene-solr:branch_7x: SOLR-11739: Don't accept duplicate async
IDs in collection API operations
Repository: lucene-solr
Updated Branches:
refs/heads/branch_7x b897841bf -> dfb0803bb
SOLR-11739: Don't accept duplicate async IDs in collection API operations
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/f6b6f507
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/f6b6f507
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/f6b6f507
Branch: refs/heads/branch_7x
Commit: f6b6f5070270c93fa7d8604ed456c9df041e7454
Parents: b897841
Author: Tomas Fernandez Lobbe <tf...@apache.org>
Authored: Thu Feb 15 15:41:48 2018 -0800
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Thu Feb 15 16:18:26 2018 -0800
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../org/apache/solr/cloud/DistributedMap.java | 30 +++-
.../java/org/apache/solr/cloud/Overseer.java | 11 +-
.../apache/solr/cloud/OverseerTaskQueue.java | 2 +-
.../solr/cloud/SizeLimitedDistributedMap.java | 21 ++-
.../org/apache/solr/cloud/ZkController.java | 42 +++++
.../OverseerCollectionMessageHandler.java | 2 +-
.../solr/handler/admin/CollectionsHandler.java | 54 +++++-
.../solr/handler/admin/RebalanceLeaders.java | 2 +
.../apache/solr/cloud/TestDistributedMap.java | 180 +++++++++++++++++++
.../cloud/TestSizeLimitedDistributedMap.java | 71 ++++----
.../CollectionsAPIAsyncDistributedZkTest.java | 86 ++++++++-
.../org/apache/solr/cloud/ZkTestServer.java | 1 -
13 files changed, 455 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 0a4b52e..11ac84b 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -188,6 +188,8 @@ Bug Fixes
* SOLR-11950: Allow CLUSTERSTATUS "shard" parameter to accept comma (,) delimited list (Chris Ulicny via Jason Gerlowski)
+* SOLR-11739: Fix race condition that made Solr accept duplicate async IDs in collection API operations (Tomás Fernánadez Löbbe)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
index 7518208..c9f12e9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
@@ -16,16 +16,18 @@
*/
package org.apache.solr.cloud;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
-import java.util.List;
-
/**
* A distributed map.
* This supports basic map functions e.g. get, put, contains for interaction with zk which
@@ -58,6 +60,19 @@ public class DistributedMap {
public void put(String trackingId, byte[] data) throws KeeperException, InterruptedException {
zookeeper.makePath(dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null, false, true);
}
+
+ /**
+ * Puts an element in the map only if there isn't one with the same trackingId already
+ * @return True if the the element was added. False if it wasn't (because the key already exists)
+ */
+ public boolean putIfAbsent(String trackingId, byte[] data) throws KeeperException, InterruptedException {
+ try {
+ zookeeper.makePath(dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null, true, true);
+ return true;
+ } catch (NodeExistsException e) {
+ return false;
+ }
+ }
public byte[] get(String trackingId) throws KeeperException, InterruptedException {
return zookeeper.getData(dir + "/" + PREFIX + trackingId, null, null, true);
@@ -97,5 +112,16 @@ public class DistributedMap {
}
}
+
+ /**
+ * Returns the keys of all the elements in the map
+ */
+ public Collection<String> keys() throws KeeperException, InterruptedException {
+ List<String> childs = zookeeper.getChildren(dir, null, true);
+ final List<String> ids = new ArrayList<>(childs.size());
+ childs.stream().forEach((child) -> ids.add(child.substring(PREFIX.length())));
+ return ids;
+
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index edf3838..dbadcde 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -679,13 +679,19 @@ public class Overseer implements SolrCloseable {
/* Size-limited map for successfully completed tasks*/
static DistributedMap getCompletedMap(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
- return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed", NUM_RESPONSES_TO_STORE);
+ return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed", NUM_RESPONSES_TO_STORE, (child) -> getAsyncIdsMap(zkClient).remove(child));
}
/* Map for failed tasks, not to be used outside of the Overseer */
static DistributedMap getFailureMap(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
- return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", NUM_RESPONSES_TO_STORE);
+ return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", NUM_RESPONSES_TO_STORE, (child) -> getAsyncIdsMap(zkClient).remove(child));
+ }
+
+ /* Map of async IDs currently in use*/
+ static DistributedMap getAsyncIdsMap(final SolrZkClient zkClient) {
+ createOverseerNode(zkClient);
+ return new DistributedMap(zkClient, "/overseer/async_ids");
}
/**
@@ -770,6 +776,7 @@ public class Overseer implements SolrCloseable {
createOverseerNode(zkClient);
return getCollectionQueue(zkClient, zkStats);
}
+
private static void createOverseerNode(final SolrZkClient zkClient) {
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 2767258..3df6501 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -70,7 +70,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
if (data != null) {
ZkNodeProps message = ZkNodeProps.load(data);
if (message.containsKey(requestIdKey)) {
- LOG.debug(">>>> {}", message.get(requestIdKey));
+ LOG.debug("Looking for {}, found {}", message.get(requestIdKey), requestId);
if(message.get(requestIdKey).equals(requestId)) return true;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
index 6501b8c..7f7e75f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
@@ -17,7 +17,6 @@
package org.apache.solr.cloud;
import java.util.List;
-
import org.apache.lucene.util.PriorityQueue;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.zookeeper.KeeperException;
@@ -34,9 +33,19 @@ public class SizeLimitedDistributedMap extends DistributedMap {
private final int maxSize;
+ /**
+ * This observer will be called when this map overflows, and deletes the excess of elements
+ */
+ private final OnOverflowObserver onOverflowObserver;
+
public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, int maxSize) {
+ this(zookeeper, dir, maxSize, null);
+ }
+
+ public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, int maxSize, OnOverflowObserver onOverflowObserver) {
super(zookeeper, dir);
this.maxSize = maxSize;
+ this.onOverflowObserver = onOverflowObserver;
}
@Override
@@ -47,7 +56,7 @@ public class SizeLimitedDistributedMap extends DistributedMap {
int cleanupSize = maxSize / 10;
- final PriorityQueue priorityQueue = new PriorityQueue<Long>(cleanupSize) {
+ final PriorityQueue<Long> priorityQueue = new PriorityQueue<Long>(cleanupSize) {
@Override
protected boolean lessThan(Long a, Long b) {
return (a > b);
@@ -63,11 +72,17 @@ public class SizeLimitedDistributedMap extends DistributedMap {
for (String child : children) {
Stat stat = zookeeper.exists(dir + "/" + child, null, true);
- if (stat.getMzxid() <= topElementMzxId)
+ if (stat.getMzxid() <= topElementMzxId) {
zookeeper.delete(dir + "/" + child, -1, true);
+ if (onOverflowObserver != null) onOverflowObserver.onChildDelete(child.substring(PREFIX.length()));
+ }
}
}
super.put(trackingId, data);
}
+
+ interface OnOverflowObserver {
+ void onChildDelete(String child) throws KeeperException, InterruptedException;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 7898e96..cb1fcea 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -149,6 +149,7 @@ public class ZkController {
private final DistributedMap overseerRunningMap;
private final DistributedMap overseerCompletedMap;
private final DistributedMap overseerFailureMap;
+ private final DistributedMap asyncIdsMap;
public final static String COLLECTION_PARAM_PREFIX = "collection.";
public final static String CONFIGNAME_PROP = "configName";
@@ -436,6 +437,8 @@ public class ZkController {
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
+ this.asyncIdsMap = Overseer.getAsyncIdsMap(zkClient);
+
zkStateReader = new ZkStateReader(zkClient, () -> {
if (cc != null) cc.securityNodeChanged();
});
@@ -1930,6 +1933,45 @@ public class ZkController {
return overseerFailureMap;
}
+ /**
+ * When an operation needs to be performed in an asynchronous mode, the asyncId needs
+ * to be claimed by calling this method to make sure it's not duplicate (hasn't been
+ * claimed by other request). If this method returns true, the asyncId in the parameter
+ * has been reserved for the operation, meaning that no other thread/operation can claim
+ * it. If for whatever reason, the operation is not scheduled, the asuncId needs to be
+ * cleared using {@link #clearAsyncId(String)}.
+ * If this method returns false, no reservation has been made, and this asyncId can't
+ * be used, since it's being used by another operation (currently or in the past)
+ * @param asyncId A string representing the asyncId of an operation. Can't be null.
+ * @return True if the reservation succeeds.
+ * False if this ID is already in use.
+ */
+ public boolean claimAsyncId(String asyncId) throws KeeperException {
+ try {
+ return asyncIdsMap.putIfAbsent(asyncId, new byte[0]);
+ } catch (InterruptedException e) {
+ log.error("Could not claim asyncId=" + asyncId, e);
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Clears an asyncId previously claimed by calling {@link #claimAsyncId(String)}
+ * @param asyncId A string representing the asyncId of an operation. Can't be null.
+ * @return True if the asyncId existed and was cleared.
+ * False if the asyncId didn't exist before.
+ */
+ public boolean clearAsyncId(String asyncId) throws KeeperException {
+ try {
+ return asyncIdsMap.remove(asyncId);
+ } catch (InterruptedException e) {
+ log.error("Could not release asyncId=" + asyncId, e);
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
public int getClientTimeout() {
return clientTimeout;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 10d08fe..8b4d0b8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -810,7 +810,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
*/
List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
- log.info("Executing Collection Cmd : " + params);
+ log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
String collectionName = message.getStr(NAME);
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 63b9f16..4933559 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -279,6 +279,14 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
static final Set<String> KNOWN_ROLES = ImmutableSet.of("overseer");
+ /*
+ * In SOLR-11739 we change the way the async IDs are checked to decide if one has
+ * already been used or not. For backward compatibility, we continue to check in the
+ * old way (meaning, in all the queues) for now. This extra check should be removed
+ * in Solr 9
+ */
+ private static final boolean CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS = true;
+
public static long DEFAULT_COLLECTION_OP_TIMEOUT = 180*1000;
public SolrResponse sendToOCPQueue(ZkNodeProps m) throws KeeperException, InterruptedException {
@@ -294,21 +302,40 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
String asyncId = m.getStr(ASYNC);
- if(asyncId.equals("-1")) {
+ if (asyncId.equals("-1")) {
throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
}
NamedList<String> r = new NamedList<>();
-
- if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
+
+ if (CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS && (
+ coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
- overseerCollectionQueueContains(asyncId)) {
+ overseerCollectionQueueContains(asyncId))) {
+ // for back compatibility, check in the old places. This can be removed in Solr 9
r.add("error", "Task with the same requestid already exists.");
-
} else {
- coreContainer.getZkController().getOverseerCollectionQueue()
+ if (coreContainer.getZkController().claimAsyncId(asyncId)) {
+ boolean success = false;
+ try {
+ coreContainer.getZkController().getOverseerCollectionQueue()
.offer(Utils.toJSON(m));
+ success = true;
+ } finally {
+ if (!success) {
+ try {
+ coreContainer.getZkController().clearAsyncId(asyncId);
+ } catch (Exception e) {
+ // let the original exception bubble up
+ log.error("Unable to release async ID={}", asyncId, e);
+ SolrZkClient.checkInterrupted(e);
+ }
+ }
+ }
+ } else {
+ r.add("error", "Task with the same requestid already exists.");
+ }
}
r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
@@ -708,18 +735,29 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
if (flush) {
- zkController.getOverseerCompletedMap().clear();
- zkController.getOverseerFailureMap().clear();
+ Collection<String> completed = zkController.getOverseerCompletedMap().keys();
+ Collection<String> failed = zkController.getOverseerFailureMap().keys();
+ for (String asyncId:completed) {
+ zkController.getOverseerCompletedMap().remove(asyncId);
+ zkController.clearAsyncId(asyncId);
+ }
+ for (String asyncId:failed) {
+ zkController.getOverseerFailureMap().remove(asyncId);
+ zkController.clearAsyncId(asyncId);
+ }
rsp.getValues().add("status", "successfully cleared stored collection api responses");
return null;
} else {
// Request to cleanup
if (zkController.getOverseerCompletedMap().remove(requestId)) {
+ zkController.clearAsyncId(requestId);
rsp.getValues().add("status", "successfully removed stored response for [" + requestId + "]");
} else if (zkController.getOverseerFailureMap().remove(requestId)) {
+ zkController.clearAsyncId(requestId);
rsp.getValues().add("status", "successfully removed stored response for [" + requestId + "]");
} else {
rsp.getValues().add("status", "[" + requestId + "] not found in stored responses");
+ // Don't call zkController.clearAsyncId for this, since it could be a running/pending task
}
}
return null;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
index 53e9fde..f0819bd 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
@@ -287,6 +287,7 @@ class RebalanceLeaders {
String asyncId = pair.getKey();
if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
+ coreContainer.getZkController().clearAsyncId(asyncId);
NamedList<Object> fails = (NamedList<Object>) results.get("failures");
if (fails == null) {
fails = new NamedList<>();
@@ -300,6 +301,7 @@ class RebalanceLeaders {
foundChange = true;
} else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) {
coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
+ coreContainer.getZkController().clearAsyncId(asyncId);
NamedList<Object> successes = (NamedList<Object>) results.get("successes");
if (successes == null) {
successes = new NamedList<>();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/core/src/test/org/apache/solr/cloud/TestDistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestDistributedMap.java b/solr/core/src/test/org/apache/solr/cloud/TestDistributedMap.java
new file mode 100644
index 0000000..ae05dd5
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestDistributedMap.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Path;
+import java.util.Locale;
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestDistributedMap extends SolrTestCaseJ4 {
+
+ private static Path zkDir;
+
+ protected static ZkTestServer zkServer;
+
+ @BeforeClass
+ public static void setUpClass() throws InterruptedException {
+ zkDir = createTempDir("TestDistributedMap");
+ zkServer = new ZkTestServer(zkDir.toFile().getAbsolutePath());
+ zkServer.run();
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException, InterruptedException {
+
+ if (zkServer != null) {
+ zkServer.shutdown();
+ zkServer = null;
+ }
+ FileUtils.deleteDirectory(zkDir.toFile());
+ zkDir = null;
+ }
+
+ public void testPut() throws KeeperException, InterruptedException {
+ try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+ String path = getAndMakeInitialPath(zkClient);
+ DistributedMap map = createMap(zkClient, path);
+ assertFalse(zkClient.exists(path + "/" + DistributedMap.PREFIX + "foo", true));
+ map.put("foo", new byte[0]);
+ assertTrue(zkClient.exists(path + "/" + DistributedMap.PREFIX + "foo", true));
+ }
+ }
+
+ public void testGet() throws KeeperException, InterruptedException {
+ try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+ String path = getAndMakeInitialPath(zkClient);
+ byte[] data = "data".getBytes(Charset.defaultCharset());
+ zkClient.makePath(path + "/" + DistributedMap.PREFIX + "foo", data, CreateMode.PERSISTENT, null, false, true);
+ DistributedMap map = createMap(zkClient, path);
+ assertArrayEquals(data, map.get("foo"));
+ }
+ }
+
+ public void testContains() throws KeeperException, InterruptedException {
+ try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+ String path = getAndMakeInitialPath(zkClient);
+ DistributedMap map = createMap(zkClient, path);
+ assertFalse(map.contains("foo"));
+ zkClient.makePath(path + "/" + DistributedMap.PREFIX + "foo", new byte[0], CreateMode.PERSISTENT, null, false, true);
+ assertTrue(map.contains("foo"));
+ }
+ }
+
+ public void testRemove() throws KeeperException, InterruptedException {
+ try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+ String path = getAndMakeInitialPath(zkClient);
+ DistributedMap map = createMap(zkClient, path);
+ assertFalse(map.remove("foo"));
+ zkClient.makePath(path + "/" + DistributedMap.PREFIX + "foo", new byte[0], CreateMode.PERSISTENT, null, false, true);
+ assertTrue(map.remove("foo"));
+ assertFalse(map.contains("foo"));
+ assertFalse(zkClient.exists(path + "/" + DistributedMap.PREFIX + "foo", true));
+ }
+ }
+
+ public void testSize() throws KeeperException, InterruptedException {
+ try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+ String path = getAndMakeInitialPath(zkClient);
+ DistributedMap map = createMap(zkClient, path);
+ assertEquals(0, map.size());
+ map.remove("bar");
+ assertEquals(0, map.size());
+ map.put("foo", new byte[0]);
+ assertEquals(1, map.size());
+ map.put("foo2", new byte[0]);
+ assertEquals(2, map.size());
+ map.remove("foo");
+ assertEquals(1, map.size());
+ }
+ }
+
+ public void testPutIfAbsent() throws KeeperException, InterruptedException {
+ try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+ String path = getAndMakeInitialPath(zkClient);
+ DistributedMap map = createMap(zkClient, path);
+ assertEquals(0, map.size());
+ assertFalse(map.contains("foo"));
+ assertTrue(map.putIfAbsent("foo", new byte[0]));
+ assertEquals(1, map.size());
+ assertTrue(map.contains("foo"));
+ assertFalse(map.putIfAbsent("foo", new byte[0]));
+ assertTrue(map.contains("foo"));
+ assertEquals(1, map.size());
+ map.remove("foo");
+ assertFalse(map.contains("foo"));
+ assertEquals(0, map.size());
+ assertTrue(map.putIfAbsent("foo", new byte[0]));
+ assertEquals(1, map.size());
+ assertTrue(map.contains("foo"));
+ }
+
+ }
+
+ public void testKeys() throws KeeperException, InterruptedException {
+ try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+ String path = getAndMakeInitialPath(zkClient);
+ DistributedMap map = createMap(zkClient, path);
+ assertEquals(0, map.keys().size());
+ map.put("foo", new byte[0]);
+ assertTrue(map.keys().contains("foo"));
+ assertEquals(1, map.keys().size());
+
+ map.put("bar", new byte[0]);
+ assertTrue(map.keys().contains("bar"));
+ assertTrue(map.keys().contains("foo"));
+ assertEquals(2, map.keys().size());
+
+ map.remove("foo");
+ assertTrue(map.keys().contains("bar"));
+ assertEquals(1, map.keys().size());
+ }
+ }
+
+ public void testClear() throws KeeperException, InterruptedException {
+ try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+ String path = getAndMakeInitialPath(zkClient);
+ DistributedMap map = createMap(zkClient, path);
+ map.clear();
+ assertEquals(0, map.size());
+ map.put("foo", new byte[0]);
+ map.put("bar", new byte[0]);
+ assertEquals(2, map.size());
+ map.clear();
+ assertEquals(0, map.size());
+ }
+ }
+
+ protected DistributedMap createMap(SolrZkClient zkClient, String path) {
+ return new DistributedMap(zkClient, path);
+ }
+
+ protected String getAndMakeInitialPath(SolrZkClient zkClient) throws KeeperException, InterruptedException {
+ String path = String.format(Locale.ROOT, "/%s/%s", getClass().getName(), getTestName());
+ zkClient.makePath(path, false, true);
+ return path;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java b/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java
index 801403a..879a4e6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestSizeLimitedDistributedMap.java
@@ -17,42 +17,53 @@
package org.apache.solr.cloud;
-import org.apache.solr.SolrTestCaseJ4;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
import org.apache.solr.common.cloud.SolrZkClient;
-public class TestSizeLimitedDistributedMap extends SolrTestCaseJ4 {
+public class TestSizeLimitedDistributedMap extends TestDistributedMap {
public void testCleanup() throws Exception {
- String zkDir = createTempDir("TestSizeLimitedDistributedMap").toFile().getAbsolutePath();
-
- ZkTestServer server = new ZkTestServer(zkDir);
- try {
- server.run();
-
- AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
- AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
-
- try (SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), 10000)) {
- DistributedMap map = Overseer.getCompletedMap(zkClient);
- assertTrue(map instanceof SizeLimitedDistributedMap);
- for (int i = 0; i < Overseer.NUM_RESPONSES_TO_STORE; i++) {
- map.put("xyz_" + i, new byte[0]);
- }
+ final List<String> deletedItems = new LinkedList<>();
+ final Set<String> expectedKeys = new HashSet<>();
+ int numResponsesToStore=TEST_NIGHTLY?Overseer.NUM_RESPONSES_TO_STORE:100;
+
+ try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), 10000)) {
+ String path = getAndMakeInitialPath(zkClient);
+ DistributedMap map = new SizeLimitedDistributedMap(zkClient, path, numResponsesToStore, (element)->deletedItems.add(element));
+ for (int i = 0; i < numResponsesToStore; i++) {
+ map.put("xyz_" + i, new byte[0]);
+ expectedKeys.add("xyz_" + i);
+ }
- assertEquals("Number of items do not match", Overseer.NUM_RESPONSES_TO_STORE, map.size());
- // add another to trigger cleanup
- map.put("xyz_10000", new byte[0]);
- assertEquals("Distributed queue was not cleaned up",
- Overseer.NUM_RESPONSES_TO_STORE - (Overseer.NUM_RESPONSES_TO_STORE / 10) + 1, map.size());
- for (int i = Overseer.NUM_RESPONSES_TO_STORE; i >= Overseer.NUM_RESPONSES_TO_STORE / 10; i--) {
- assertTrue(map.contains("xyz_" + i));
- }
- for (int i = Overseer.NUM_RESPONSES_TO_STORE / 10 - 1; i >= 0; i--) {
- assertFalse(map.contains("xyz_" + i));
- }
+ assertEquals("Number of items do not match", numResponsesToStore, map.size());
+ assertTrue("Expected keys do not match", expectedKeys.containsAll(map.keys()));
+ assertTrue("Expected keys do not match", map.keys().containsAll(expectedKeys));
+ // add another to trigger cleanup
+ map.put("xyz_" + numResponsesToStore, new byte[0]);
+ expectedKeys.add("xyz_" + numResponsesToStore);
+ assertEquals("Distributed queue was not cleaned up",
+ numResponsesToStore - (numResponsesToStore / 10) + 1, map.size());
+ for (int i = numResponsesToStore; i >= numResponsesToStore / 10; i--) {
+ assertTrue(map.contains("xyz_" + i));
+ }
+ for (int i = numResponsesToStore / 10 - 1; i >= 0; i--) {
+ assertFalse(map.contains("xyz_" + i));
+ assertTrue(deletedItems.contains("xyz_" + i));
+ expectedKeys.remove("xyz_" + i);
}
- } finally {
- server.shutdown();
+ assertTrue("Expected keys do not match", expectedKeys.containsAll(map.keys()));
+ assertTrue("Expected keys do not match", map.keys().containsAll(expectedKeys));
+ map.remove("xyz_" + numResponsesToStore);
+ assertFalse("map.remove shouldn't trigger the observer",
+ deletedItems.contains("xyz_" + numResponsesToStore));
}
}
+
+ protected DistributedMap createMap(SolrZkClient zkClient, String path) {
+ return new SizeLimitedDistributedMap(zkClient, path, Overseer.NUM_RESPONSES_TO_STORE, null);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
index eff0d8e..8c8b11a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
@@ -16,13 +16,22 @@
*/
package org.apache.solr.cloud.api.collections;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
-
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.cloud.SolrCloudTestCase;
@@ -30,8 +39,12 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tests the Cloud Collections API.
@@ -40,6 +53,8 @@ import org.junit.Test;
public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
private static final int MAX_TIMEOUT_SECONDS = 60;
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void setupCluster() throws Exception {
@@ -174,5 +189,74 @@ public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
.processAndWait(client, MAX_TIMEOUT_SECONDS);
assertSame("DeleteCollection did not complete", RequestStatusState.COMPLETED, state);
}
+
+ public void testAsyncIdRaceCondition() throws Exception {
+ SolrClient[] clients = new SolrClient[cluster.getJettySolrRunners().size()];
+ int j = 0;
+ for (JettySolrRunner r:cluster.getJettySolrRunners()) {
+ clients[j++] = new HttpSolrClient.Builder(r.getBaseUrl().toString()).build();
+ }
+ RequestStatusState state = CollectionAdminRequest.createCollection("testAsyncIdRaceCondition","conf1",1,1)
+ .setRouterName("implicit")
+ .setShards("shard1")
+ .processAndWait(cluster.getSolrClient(), MAX_TIMEOUT_SECONDS);
+ assertSame("CreateCollection task did not complete!", RequestStatusState.COMPLETED, state);
+
+ int numThreads = 10;
+ final AtomicInteger numSuccess = new AtomicInteger(0);
+ final AtomicInteger numFailure = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(numThreads);
+
+ ExecutorService es = ExecutorUtil.newMDCAwareFixedThreadPool(numThreads, new DefaultSolrThreadFactory("testAsyncIdRaceCondition"));
+ try {
+ for (int i = 0; i < numThreads; i++) {
+ es.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ CollectionAdminRequest.Reload reloadCollectionRequest = CollectionAdminRequest.reloadCollection("testAsyncIdRaceCondition");
+ latch.countDown();
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException();
+ }
+
+ try {
+ log.info("{} - Reloading Collection.", Thread.currentThread().getName());
+ reloadCollectionRequest.processAsync("repeatedId", clients[random().nextInt(clients.length)]);
+ numSuccess.incrementAndGet();
+ } catch (SolrServerException e) {
+ log.info(e.getMessage());
+ assertEquals("Task with the same requestid already exists.", e.getMessage());
+ numFailure.incrementAndGet();
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+ });
+ }
+ es.shutdown();
+ assertTrue(es.awaitTermination(10, TimeUnit.SECONDS));
+ assertEquals(1, numSuccess.get());
+ assertEquals(numThreads - 1, numFailure.get());
+ } finally {
+ for (int i = 0; i < clients.length; i++) {
+ clients[i].close();
+ }
+ }
+ }
+
+ public void testAsyncIdBackCompat() throws Exception {
+ //remove with Solr 9
+ cluster.getZkClient().makePath("/overseer/collection-map-completed/mn-testAsyncIdBackCompat", true, true);
+ try {
+ CollectionAdminRequest.createCollection("testAsyncIdBackCompat","conf1",1,1)
+ .processAsync("testAsyncIdBackCompat", cluster.getSolrClient());
+ fail("Expecting exception");
+ } catch (SolrServerException e) {
+ assertTrue(e.getMessage().contains("Task with the same requestid already exists"));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f6b6f507/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index 75418c6..e432bb0 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -520,7 +520,6 @@ public class ZkTestServer {
log.info("start zk server on port:" + port);
}
- @SuppressWarnings("deprecation")
public void shutdown() throws IOException, InterruptedException {
// TODO: this can log an exception while trying to unregister a JMX MBean
zkServer.shutdown();
[2/2] lucene-solr:branch_7x: SOLR-11739: Remove cast no longer needed
Posted by tf...@apache.org.
SOLR-11739: Remove cast no longer needed
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/dfb0803b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/dfb0803b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/dfb0803b
Branch: refs/heads/branch_7x
Commit: dfb0803bbb972d730627fbdcd4df66558d06f13a
Parents: f6b6f50
Author: Tomas Fernandez Lobbe <tf...@apache.org>
Authored: Thu Feb 15 16:22:28 2018 -0800
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Thu Feb 15 16:23:09 2018 -0800
----------------------------------------------------------------------
.../src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/dfb0803b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
index 7f7e75f..0cb6cbe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
@@ -68,7 +68,7 @@ public class SizeLimitedDistributedMap extends DistributedMap {
priorityQueue.insertWithOverflow(stat.getMzxid());
}
- long topElementMzxId = (Long) priorityQueue.top();
+ long topElementMzxId = priorityQueue.top();
for (String child : children) {
Stat stat = zookeeper.exists(dir + "/" + child, null, true);