You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/30 04:30:45 UTC
lucene-solr:master: SOLR-11661: New HDFS collection reuses unremoved
data from a deleted HDFS collection with same name causes inconsistent view
of documents
Repository: lucene-solr
Updated Branches:
refs/heads/master e6928d857 -> c56d774eb
SOLR-11661: New HDFS collection reuses unremoved data from a deleted HDFS collection with same name causes inconsistent view of documents
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c56d774e
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c56d774e
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c56d774e
Branch: refs/heads/master
Commit: c56d774eb6555baa099fec22f290a9b5640a366d
Parents: e6928d8
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue Jan 30 11:30:24 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue Jan 30 11:30:24 2018 +0700
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../solr/cloud/api/collections/Assign.java | 7 +-
.../api/collections/DeleteCollectionCmd.java | 25 ++++--
.../OverseerCollectionMessageHandler.java | 47 ++++++----
.../solr/cloud/overseer/ZkStateWriter.java | 6 +-
.../solr/cloud/hdfs/HDFSCollectionsAPITest.java | 95 ++++++++++++++++++++
.../apache/solr/common/cloud/SolrZkClient.java | 5 ++
.../solr/common/cloud/ZkMaintenanceUtils.java | 30 +++++++
8 files changed, 191 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ddd176e..27ccc4c 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -180,6 +180,9 @@ Bug Fixes
* SOLR-11873: Use time based expiration cache in all necessary places in HdfsDirectoryFactory. (Mihaly Toth via Mark Miller)
+* SOLR-11661: New HDFS collection reuses unremoved data from a deleted HDFS collection with same name causes
+ inconsistent view of documents (Cao Manh Dat, shalin)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index e7ce583..f732d31 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -47,6 +47,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.NumberUtils;
@@ -63,8 +64,12 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
public class Assign {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static String getCounterNodePath(String collection) {
+ return ZkStateReader.COLLECTIONS_ZKNODE + "/"+collection+"/counter";
+ }
+
public static int incAndGetId(DistribStateManager stateManager, String collection, int defaultValue) {
- String path = "/collections/"+collection;
+ String path = ZkStateReader.COLLECTIONS_ZKNODE + "/"+collection;
try {
if (!stateManager.hasData(path)) {
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index bdae8b9..1560363 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -31,6 +31,7 @@ import org.apache.solr.common.NonExistentCoreException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -71,6 +72,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
}
+ boolean removeCounterNode = true;
try {
// Remove the snapshots meta-data for this collection in ZK. Deleting actual index files
// should be taken care of as part of collection delete operation.
@@ -99,7 +101,16 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
Set<String> okayExceptions = new HashSet<>(1);
okayExceptions.add(NonExistentCoreException.class.getName());
- ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
+ List<Replica> failedReplicas = ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
+ for (Replica failedRepilca : failedReplicas) {
+ boolean isSharedFS = failedRepilca.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && failedRepilca.get("dataDir") != null;
+ if (isSharedFS) {
+ // if the replica use a shared FS and it did not receive the unload message, then counter node should not be removed
+ // because when a new collection with same name is created, new replicas may reuse the old dataDir
+ removeCounterNode = false;
+ break;
+ }
+ }
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
@@ -124,10 +135,14 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
} finally {
try {
- if (zkStateReader.getZkClient().exists(
- ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
- zkStateReader.getZkClient().clean(
- ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
+ String collectionPath = ZkStateReader.getCollectionPathRoot(collection);
+ if (zkStateReader.getZkClient().exists(collectionPath, true)) {
+ if (removeCounterNode) {
+ zkStateReader.getZkClient().clean(collectionPath);
+ } else {
+ final String counterNodePath = Assign.getCounterNodePath(collection);
+ zkStateReader.getZkClient().clean(collectionPath, s -> !s.equals(counterNodePath));
+ }
}
} catch (InterruptedException e) {
SolrException.log(log, "Cleaning up collection in zk was interrupted:"
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index ba27fe4..056f108 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud.api.collections;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -798,13 +799,16 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
}
- private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
+ private List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap) {
- collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet());
+ return collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet());
}
-
- void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
+ /**
+ * Send request to all replicas of a collection
+ * @return List of replicas which is not live for receiving the request
+ */
+ List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
log.info("Executing Collection Cmd : " + params);
String collectionName = message.getStr(NAME);
@@ -812,30 +816,37 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollection(collectionName);
-
+ List<Replica> notLivesReplicas = new ArrayList<>();
for (Slice slice : coll.getSlices()) {
- sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap);
+ notLivesReplicas.addAll(sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap));
}
processResponses(results, shardHandler, false, null, asyncId, requestMap, okayExceptions);
-
+ return notLivesReplicas;
}
- void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
+ /**
+ * Send request to all replicas of a slice
+ * @return List of replicas which is not live for receiving the request
+ */
+ List<Replica> sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
Slice slice, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
-
+ List<Replica> notLiveReplicas = new ArrayList<>();
for (Replica replica : slice.getReplicas()) {
- if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))
- && (stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
-
- // For thread safety, only simple clone the ModifiableSolrParams
- ModifiableSolrParams cloneParams = new ModifiableSolrParams();
- cloneParams.add(params);
- cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
-
- sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
+ if ((stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
+ if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))) {
+ // For thread safety, only simple clone the ModifiableSolrParams
+ ModifiableSolrParams cloneParams = new ModifiableSolrParams();
+ cloneParams.add(params);
+ cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
+
+ sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
+ } else {
+ notLiveReplicas.add(replica);
+ }
}
}
+ return notLiveReplicas;
}
private void processResponse(NamedList results, ShardResponse srsp, Set<String> okayExceptions) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index d988441..0a5b2c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -189,9 +189,9 @@ public class ZkStateWriter {
DocCollection c = entry.getValue();
if (c == null) {
- // let's clean up the collections path for this collection
- log.debug("going to delete_collection {}", path);
- reader.getZkClient().clean("/collections/" + name);
+ // let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
+ log.debug("going to delete state.json {}", path);
+ reader.getZkClient().clean(path);
} else if (c.getStateFormat() > 1) {
byte[] data = Utils.toJSON(singletonMap(c.getName(), c));
if (reader.getZkClient().exists(path, true)) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java
new file mode 100644
index 0000000..48417a2
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.hdfs;
+
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MoveReplicaHDFSTest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.util.BadHdfsThreadsFilter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+@ThreadLeakFilters(defaultFilters = true, filters = {
+ BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s)
+ MoveReplicaHDFSTest.ForkJoinThreadsFilter.class
+})
+public class HDFSCollectionsAPITest extends SolrCloudTestCase {
+
+ private static MiniDFSCluster dfsCluster;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ configureCluster(2)
+ .configure();
+
+ System.setProperty("solr.hdfs.blockcache.enabled", "false");
+ dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
+
+ ZkConfigManager configManager = new ZkConfigManager(zkClient());
+ configManager.uploadConfigDir(configset("cloud-hdfs"), "conf1");
+
+ System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
+ }
+
+
+ @AfterClass
+ public static void teardownClass() throws Exception {
+ cluster.shutdown(); // need to close before the MiniDFSCluster
+ HdfsTestUtil.teardownClass(dfsCluster);
+ dfsCluster = null;
+ }
+
+ public void testDataDirIsNotReused() throws Exception {
+ JettySolrRunner jettySolrRunner = cluster.getJettySolrRunner(0);
+ String collection = "test";
+ cluster.getSolrClient().setDefaultCollection(collection);
+ CollectionAdminRequest.createCollection(collection, "conf1", 1, 1)
+ .setCreateNodeSet(jettySolrRunner.getNodeName()).process(cluster.getSolrClient());
+ waitForState("", collection, clusterShape(1, 1));
+ cluster.getSolrClient().setDefaultCollection(collection);
+ cluster.getSolrClient().add(new SolrInputDocument("id", "1"));
+ cluster.getSolrClient().add(new SolrInputDocument("id", "2"));
+ cluster.getSolrClient().commit();
+ cluster.getSolrClient().add(new SolrInputDocument("id", "3"));
+
+ jettySolrRunner.stop();
+ waitForState("", collection, (liveNodes, collectionState) -> {
+ Replica replica = collectionState.getSlice("shard1").getReplicas().iterator().next();
+ return replica.getState() == Replica.State.DOWN;
+ });
+ CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
+
+ jettySolrRunner.start();
+
+ CollectionAdminRequest.createCollection(collection, "conf1", 1, 1)
+ .setCreateNodeSet(cluster.getJettySolrRunner(1).getNodeName()).process(cluster.getSolrClient());
+ waitForState("", collection, clusterShape(1, 1));
+ QueryResponse response = cluster.getSolrClient().query(collection, new SolrQuery("*:*"));
+ assertEquals(0L, response.getResults().getNumFound());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index a5f303e..bc12c44 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -34,6 +34,7 @@ import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
@@ -761,6 +762,10 @@ public class SolrZkClient implements Closeable {
ZkMaintenanceUtils.clean(this, path);
}
+ public void clean(String path, Predicate<String> nodeFilter) throws InterruptedException, KeeperException {
+ ZkMaintenanceUtils.clean(this, path, nodeFilter);
+ }
+
public void upConfig(Path confPath, String confName) throws IOException {
ZkMaintenanceUtils.upConfig(this, confPath, confName);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java
index 6843480..b7e5072 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java
@@ -26,8 +26,11 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Comparator;
import java.util.List;
import java.util.Locale;
+import java.util.TreeSet;
+import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.solr.client.solrj.SolrServerException;
@@ -244,6 +247,33 @@ public class ZkMaintenanceUtils {
}
});
}
+
+ /**
+ * Delete a path and all of its sub nodes
+ * @param filter for node to be deleted
+ */
+ public static void clean(SolrZkClient zkClient, String path, Predicate<String> filter) throws InterruptedException, KeeperException {
+ if (filter == null) {
+ clean(zkClient, path);
+ return;
+ }
+
+ TreeSet<String> paths = new TreeSet<>(Comparator.comparingInt(String::length).reversed());
+
+ traverseZkTree(zkClient, path, VISIT_ORDER.VISIT_POST, znode -> {
+ if (!znode.equals("/") && filter.test(znode)) paths.add(znode);
+ });
+
+ for (String subpath : paths) {
+ if (!subpath.equals("/")) {
+ try {
+ zkClient.delete(subpath, -1, true);
+ } catch (KeeperException.NotEmptyException | KeeperException.NoNodeException e) {
+ // expected
+ }
+ }
+ }
+ }
public static void uploadToZK(SolrZkClient zkClient, final Path fromPath, final String zkPath,
final Pattern filenameExclusions) throws IOException {