You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/30 04:30:45 UTC

lucene-solr:master: SOLR-11661: New HDFS collection reuses unremoved data from a deleted HDFS collection with same name causes inconsistent view of documents

Repository: lucene-solr
Updated Branches:
  refs/heads/master e6928d857 -> c56d774eb


SOLR-11661: New HDFS collection reuses unremoved data from a deleted HDFS collection with same name causes inconsistent view of documents


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c56d774e
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c56d774e
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c56d774e

Branch: refs/heads/master
Commit: c56d774eb6555baa099fec22f290a9b5640a366d
Parents: e6928d8
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue Jan 30 11:30:24 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue Jan 30 11:30:24 2018 +0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  3 +
 .../solr/cloud/api/collections/Assign.java      |  7 +-
 .../api/collections/DeleteCollectionCmd.java    | 25 ++++--
 .../OverseerCollectionMessageHandler.java       | 47 ++++++----
 .../solr/cloud/overseer/ZkStateWriter.java      |  6 +-
 .../solr/cloud/hdfs/HDFSCollectionsAPITest.java | 95 ++++++++++++++++++++
 .../apache/solr/common/cloud/SolrZkClient.java  |  5 ++
 .../solr/common/cloud/ZkMaintenanceUtils.java   | 30 +++++++
 8 files changed, 191 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ddd176e..27ccc4c 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -180,6 +180,9 @@ Bug Fixes
 
 * SOLR-11873: Use time based expiration cache in all necessary places in HdfsDirectoryFactory. (Mihaly Toth via Mark Miller)
 
+* SOLR-11661: New HDFS collection reuses unremoved data from a deleted HDFS collection with same name causes 
+  inconsistent view of documents (Cao Manh Dat, shalin)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index e7ce583..f732d31 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -47,6 +47,7 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.NumberUtils;
@@ -63,8 +64,12 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 public class Assign {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  public static String getCounterNodePath(String collection) {
+    return ZkStateReader.COLLECTIONS_ZKNODE + "/"+collection+"/counter";
+  }
+
   public static int incAndGetId(DistribStateManager stateManager, String collection, int defaultValue) {
-    String path = "/collections/"+collection;
+    String path = ZkStateReader.COLLECTIONS_ZKNODE + "/"+collection;
     try {
       if (!stateManager.hasData(path)) {
         try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index bdae8b9..1560363 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -31,6 +31,7 @@ import org.apache.solr.common.NonExistentCoreException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -71,6 +72,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       }
     }
 
+    boolean removeCounterNode = true;
     try {
       // Remove the snapshots meta-data for this collection in ZK. Deleting actual index files
       // should be taken care of as part of collection delete operation.
@@ -99,7 +101,16 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       Set<String> okayExceptions = new HashSet<>(1);
       okayExceptions.add(NonExistentCoreException.class.getName());
 
-      ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
+      List<Replica> failedReplicas = ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
+      for (Replica failedRepilca : failedReplicas) {
+        boolean isSharedFS = failedRepilca.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && failedRepilca.get("dataDir") != null;
+        if (isSharedFS) {
+          // if the replica use a shared FS and it did not receive the unload message, then counter node should not be removed
+          // because when a new collection with same name is created, new replicas may reuse the old dataDir
+          removeCounterNode = false;
+          break;
+        }
+      }
 
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
       Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
@@ -124,10 +135,14 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     } finally {
 
       try {
-        if (zkStateReader.getZkClient().exists(
-            ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
-          zkStateReader.getZkClient().clean(
-              ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
+        String collectionPath =  ZkStateReader.getCollectionPathRoot(collection);
+        if (zkStateReader.getZkClient().exists(collectionPath, true)) {
+          if (removeCounterNode) {
+            zkStateReader.getZkClient().clean(collectionPath);
+          } else {
+            final String counterNodePath = Assign.getCounterNodePath(collection);
+            zkStateReader.getZkClient().clean(collectionPath, s -> !s.equals(counterNodePath));
+          }
         }
       } catch (InterruptedException e) {
         SolrException.log(log, "Cleaning up collection in zk was interrupted:"

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index ba27fe4..056f108 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud.api.collections;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -798,13 +799,16 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     }
   }
   
-  private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
+  private List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
                              NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap) {
-    collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet());
+    return collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet());
   }
 
-
-  void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
+  /**
+   * Send request to all replicas of a collection
+   * @return List of replicas which is not live for receiving the request
+   */
+  List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
                      NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
     log.info("Executing Collection Cmd : " + params);
     String collectionName = message.getStr(NAME);
@@ -812,30 +816,37 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
 
     ClusterState clusterState = zkStateReader.getClusterState();
     DocCollection coll = clusterState.getCollection(collectionName);
-    
+    List<Replica> notLivesReplicas = new ArrayList<>();
     for (Slice slice : coll.getSlices()) {
-      sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap);
+      notLivesReplicas.addAll(sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap));
     }
 
     processResponses(results, shardHandler, false, null, asyncId, requestMap, okayExceptions);
-
+    return notLivesReplicas;
   }
 
-  void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
+  /**
+   * Send request to all replicas of a slice
+   * @return List of replicas which is not live for receiving the request
+   */
+  List<Replica> sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
                 Slice slice, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
-
+    List<Replica> notLiveReplicas = new ArrayList<>();
     for (Replica replica : slice.getReplicas()) {
-      if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))
-          && (stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
-
-        // For thread safety, only simple clone the ModifiableSolrParams
-        ModifiableSolrParams cloneParams = new ModifiableSolrParams();
-        cloneParams.add(params);
-        cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
-
-        sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
+      if ((stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
+        if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))) {
+          // For thread safety, only simple clone the ModifiableSolrParams
+          ModifiableSolrParams cloneParams = new ModifiableSolrParams();
+          cloneParams.add(params);
+          cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
+
+          sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
+        } else {
+          notLiveReplicas.add(replica);
+        }
       }
     }
+    return notLiveReplicas;
   }
   
   private void processResponse(NamedList results, ShardResponse srsp, Set<String> okayExceptions) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index d988441..0a5b2c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -189,9 +189,9 @@ public class ZkStateWriter {
           DocCollection c = entry.getValue();
 
           if (c == null) {
-            // let's clean up the collections path for this collection
-            log.debug("going to delete_collection {}", path);
-            reader.getZkClient().clean("/collections/" + name);
+            // let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
+            log.debug("going to delete state.json {}", path);
+            reader.getZkClient().clean(path);
           } else if (c.getStateFormat() > 1) {
             byte[] data = Utils.toJSON(singletonMap(c.getName(), c));
             if (reader.getZkClient().exists(path, true)) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java
new file mode 100644
index 0000000..48417a2
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HDFSCollectionsAPITest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.hdfs;
+
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MoveReplicaHDFSTest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.util.BadHdfsThreadsFilter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+@ThreadLeakFilters(defaultFilters = true, filters = {
+    BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s)
+    MoveReplicaHDFSTest.ForkJoinThreadsFilter.class
+})
+public class HDFSCollectionsAPITest extends SolrCloudTestCase {
+
+  private static MiniDFSCluster dfsCluster;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    configureCluster(2)
+        .configure();
+
+    System.setProperty("solr.hdfs.blockcache.enabled", "false");
+    dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
+
+    ZkConfigManager configManager = new ZkConfigManager(zkClient());
+    configManager.uploadConfigDir(configset("cloud-hdfs"), "conf1");
+
+    System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
+  }
+
+
+  @AfterClass
+  public static void teardownClass() throws Exception {
+    cluster.shutdown(); // need to close before the MiniDFSCluster
+    HdfsTestUtil.teardownClass(dfsCluster);
+    dfsCluster = null;
+  }
+
+  public void testDataDirIsNotReused() throws Exception {
+    JettySolrRunner jettySolrRunner = cluster.getJettySolrRunner(0);
+    String collection = "test";
+    cluster.getSolrClient().setDefaultCollection(collection);
+    CollectionAdminRequest.createCollection(collection, "conf1", 1, 1)
+        .setCreateNodeSet(jettySolrRunner.getNodeName()).process(cluster.getSolrClient());
+    waitForState("", collection, clusterShape(1, 1));
+    cluster.getSolrClient().setDefaultCollection(collection);
+    cluster.getSolrClient().add(new SolrInputDocument("id", "1"));
+    cluster.getSolrClient().add(new SolrInputDocument("id", "2"));
+    cluster.getSolrClient().commit();
+    cluster.getSolrClient().add(new SolrInputDocument("id", "3"));
+
+    jettySolrRunner.stop();
+    waitForState("", collection, (liveNodes, collectionState) -> {
+      Replica replica = collectionState.getSlice("shard1").getReplicas().iterator().next();
+      return replica.getState() == Replica.State.DOWN;
+    });
+    CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
+
+    jettySolrRunner.start();
+
+    CollectionAdminRequest.createCollection(collection, "conf1", 1, 1)
+        .setCreateNodeSet(cluster.getJettySolrRunner(1).getNodeName()).process(cluster.getSolrClient());
+    waitForState("", collection, clusterShape(1, 1));
+    QueryResponse response = cluster.getSolrClient().query(collection, new SolrQuery("*:*"));
+    assertEquals(0L, response.getResults().getNumFound());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index a5f303e..bc12c44 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -34,6 +34,7 @@ import java.nio.file.Path;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
@@ -761,6 +762,10 @@ public class SolrZkClient implements Closeable {
     ZkMaintenanceUtils.clean(this, path);
   }
 
+  public void clean(String path, Predicate<String> nodeFilter) throws InterruptedException, KeeperException {
+    ZkMaintenanceUtils.clean(this, path, nodeFilter);
+  }
+
   public void upConfig(Path confPath, String confName) throws IOException {
     ZkMaintenanceUtils.upConfig(this, confPath, confName);
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c56d774e/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java
index 6843480..b7e5072 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkMaintenanceUtils.java
@@ -26,8 +26,11 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
+import java.util.TreeSet;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
 
 import org.apache.solr.client.solrj.SolrServerException;
@@ -244,6 +247,33 @@ public class ZkMaintenanceUtils {
       }
     });
   }
+
+  /**
+   * Delete a path and all of its sub nodes
+   * @param filter for node to be deleted
+   */
+  public static void clean(SolrZkClient zkClient, String path, Predicate<String> filter) throws InterruptedException, KeeperException {
+    if (filter == null) {
+      clean(zkClient, path);
+      return;
+    }
+
+    TreeSet<String> paths = new TreeSet<>(Comparator.comparingInt(String::length).reversed());
+
+    traverseZkTree(zkClient, path, VISIT_ORDER.VISIT_POST, znode -> {
+      if (!znode.equals("/") && filter.test(znode)) paths.add(znode);
+    });
+
+    for (String subpath : paths) {
+      if (!subpath.equals("/")) {
+        try {
+          zkClient.delete(subpath, -1, true);
+        } catch (KeeperException.NotEmptyException | KeeperException.NoNodeException e) {
+          // expected
+        }
+      }
+    }
+  }
   
   public static void uploadToZK(SolrZkClient zkClient, final Path fromPath, final String zkPath,
                                 final Pattern filenameExclusions) throws IOException {