You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2013/07/04 07:05:46 UTC

svn commit: r1499656 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/handler/admin/ solr/core/src/test/org/apache/solr/cloud/ solr/solrj/ solr/solrj/src/java/org/apac...

Author: shalin
Date: Thu Jul  4 05:05:46 2013
New Revision: 1499656

URL: http://svn.apache.org/r1499656
Log:
SOLR-4693: A 'deleteshard' collections API that unloads all replicas of a given shard and then removes it from the cluster state. It will remove only those shards which are INACTIVE or have no range (created for custom sharding).

Added:
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
      - copied unchanged from r1499655, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/branches/branch_4x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java

Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1499656&r1=1499655&r2=1499656&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Thu Jul  4 05:05:46 2013
@@ -122,6 +122,11 @@ New Features
 * SOLR-4977: Add option to send IndexWriter's infostream to the logging system.
   (Ryan Ernst via Robert Muir)
 
+* SOLR-4693: A "deleteshard" collections API that unloads all replicas of a given
+  shard and then removes it from the cluster state. It will remove only those shards
+  which are INACTIVE or have no range (created for custom sharding).
+  (Anshum Gupta, shalin)
+
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1499656&r1=1499655&r2=1499656&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Thu Jul  4 05:05:46 2013
@@ -52,6 +52,7 @@ public class Overseer {
   public static final String QUEUE_OPERATION = "operation";
   public static final String DELETECORE = "deletecore";
   public static final String REMOVECOLLECTION = "removecollection";
+  public static final String REMOVESHARD = "removeshard";
   
   private static final int STATE_UPDATE_DELAY = 1500;  // delay between cloud state updates
 
@@ -181,6 +182,8 @@ public class Overseer {
         clusterState = removeCore(clusterState, message);
       } else if (REMOVECOLLECTION.equals(operation)) {
         clusterState = removeCollection(clusterState, message);
+      } else if (REMOVESHARD.equals(operation)) {
+        clusterState = removeShard(clusterState, message);
       } else if (ZkStateReader.LEADER_PROP.equals(operation)) {
 
         StringBuilder sb = new StringBuilder();
@@ -548,8 +551,28 @@ public class Overseer {
         ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newCollections);
         return newState;
       }
-      
-      /*
+
+    /*
+     * Remove collection slice from cloudstate
+     */
+    private ClusterState removeShard(final ClusterState clusterState, ZkNodeProps message) {
+
+      final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+      final String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
+
+      final Map<String, DocCollection> newCollections = new LinkedHashMap<String,DocCollection>(clusterState.getCollectionStates()); // shallow copy
+      DocCollection coll = newCollections.get(collection);
+
+      Map<String, Slice> newSlices = new LinkedHashMap<String, Slice>(coll.getSlicesMap());
+      newSlices.remove(sliceId);
+
+      DocCollection newCollection = new DocCollection(coll.getName(), newSlices, coll.getProperties(), coll.getRouter());
+      newCollections.put(newCollection.getName(), newCollection);
+
+      return new ClusterState(clusterState.getLiveNodes(), newCollections);
+    }
+
+    /*
        * Remove core from cloudstate
        */
       private ClusterState removeCore(final ClusterState clusterState, ZkNodeProps message) {

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1499656&r1=1499655&r2=1499656&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Thu Jul  4 05:05:46 2013
@@ -75,6 +75,8 @@ public class OverseerCollectionProcessor
   
   public static final String SPLITSHARD = "splitshard";
 
+  public static final String DELETESHARD = "deleteshard";
+
   // TODO: use from Overseer?
   private static final String QUEUE_OPERATION = "operation";
   
@@ -164,31 +166,7 @@ public class OverseerCollectionProcessor
       if (CREATECOLLECTION.equals(operation)) {
         createCollection(zkStateReader.getClusterState(), message, results);
       } else if (DELETECOLLECTION.equals(operation)) {
-        ModifiableSolrParams params = new ModifiableSolrParams();
-        params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
-        params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
-        params.set(CoreAdminParams.DELETE_DATA_DIR, true);
-        collectionCmd(zkStateReader.getClusterState(), message, params, results, null);
-        
-        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-            Overseer.REMOVECOLLECTION, "name", message.getStr("name"));
-          Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
-          
-        // wait for a while until we don't see the collection
-        long now = System.currentTimeMillis();
-        long timeout = now + 30000;
-        boolean removed = false;
-        while (System.currentTimeMillis() < timeout) {
-          Thread.sleep(100);
-          removed = !zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
-          if (removed) {
-            Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
-            break;
-          }
-        }
-        if (!removed) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully remove collection: " + message.getStr("name"));
-        }
+        deleteCollection(message, results);
       } else if (RELOADCOLLECTION.equals(operation)) {
         ModifiableSolrParams params = new ModifiableSolrParams();
         params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
@@ -199,6 +177,8 @@ public class OverseerCollectionProcessor
         deleteAlias(zkStateReader.getAliases(), message);
       } else if (SPLITSHARD.equals(operation))  {
         splitShard(zkStateReader.getClusterState(), message, results);
+      } else if (DELETESHARD.equals(operation)) {
+        deleteShard(zkStateReader.getClusterState(), message, results);
       } else {
         throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
             + operation);
@@ -217,6 +197,34 @@ public class OverseerCollectionProcessor
     return new OverseerSolrResponse(results);
   }
 
+  private void deleteCollection(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+    params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
+    params.set(CoreAdminParams.DELETE_DATA_DIR, true);
+    collectionCmd(zkStateReader.getClusterState(), message, params, results, null);
+
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+        Overseer.REMOVECOLLECTION, "name", message.getStr("name"));
+    Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
+
+    // wait for a while until we don't see the collection
+    long now = System.currentTimeMillis();
+    long timeout = now + 30000;
+    boolean removed = false;
+    while (System.currentTimeMillis() < timeout) {
+      Thread.sleep(100);
+      removed = !zkStateReader.getClusterState().getCollections().contains(message.getStr("name"));
+      if (removed) {
+        Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
+        break;
+      }
+    }
+    if (!removed) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully remove collection: " + message.getStr("name"));
+    }
+  }
+
   private void createAlias(Aliases aliases, ZkNodeProps message) {
     String aliasName = message.getStr("name");
     String collections = message.getStr("collections");
@@ -604,6 +612,75 @@ public class OverseerCollectionProcessor
     } while (srsp != null);
   }
 
+  
+  private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
+    log.info("Delete shard invoked");
+    String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+
+    String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
+    Slice slice = clusterState.getSlice(collection, sliceId);
+    
+    if (slice == null) {
+      if(clusterState.getCollections().contains(collection)) {
+        throw new SolrException(ErrorCode.BAD_REQUEST,
+            "No shard with the specified name exists: " + slice);
+      } else {
+        throw new SolrException(ErrorCode.BAD_REQUEST,
+            "No collection with the specified name exists: " + collection);
+      }      
+    }
+    // For now, only allow for deletions of Inactive slices or custom hashes (range==null).
+    // TODO: Add check for range gaps on Slice deletion
+    if (!(slice.getRange() == null || slice.getState().equals(Slice.INACTIVE))) {
+      throw new SolrException(ErrorCode.BAD_REQUEST,
+          "The slice: " + slice.getName() + " is currently "
+          + slice.getState() + ". Only INACTIVE (or custom-hashed) slices can be deleted.");
+    }
+    
+    try {
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+      params.set(CoreAdminParams.DELETE_INDEX, "true");
+      sliceCmd(clusterState, params, null, slice);
+
+      ShardResponse srsp;
+      do {
+        srsp = shardHandler.takeCompletedOrError();
+        if (srsp != null) {
+          processResponse(results, srsp);
+        }
+      } while (srsp != null);
+
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+          Overseer.REMOVESHARD, ZkStateReader.COLLECTION_PROP, collection);
+      Overseer.getInQueue(zkStateReader.getZkClient()).offer(ZkStateReader.toJSON(m));
+
+      // wait for a while until we don't see the shard
+      long now = System.currentTimeMillis();
+      long timeout = now + 30000;
+      boolean removed = false;
+      while (System.currentTimeMillis() < timeout) {
+        Thread.sleep(100);
+        removed = zkStateReader.getClusterState().getSlice(collection, message.getStr("name")) == null;
+        if (removed) {
+          Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
+          break;
+        }
+      }
+      if (!removed) {
+        throw new SolrException(ErrorCode.SERVER_ERROR,
+            "Could not fully remove collection: " + collection + " shard: " + message.getStr("name"));
+      }
+
+      log.info("Successfully deleted collection " + collection + ", shard: " + message.getStr("name"));
+
+    } catch (SolrException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Error executing delete operation for collection: " + collection + " shard: " + message.getStr("name"), e);
+    }
+  }
+
   private void sendShardRequest(String nodeName, ModifiableSolrParams params) {
     ShardRequest sreq = new ShardRequest();
     params.set("qt", adminPath);
@@ -756,33 +833,7 @@ public class OverseerCollectionProcessor
     
     for (Map.Entry<String,Slice> entry : coll.getSlicesMap().entrySet()) {
       Slice slice = entry.getValue();
-      Map<String,Replica> shards = slice.getReplicasMap();
-      Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
-      for (Map.Entry<String,Replica> shardEntry : shardEntries) {
-        final ZkNodeProps node = shardEntry.getValue();
-        if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP)) && (stateMatcher != null ? node.getStr(ZkStateReader.STATE_PROP).equals(stateMatcher) : true)) {
-          // For thread safety, only simple clone the ModifiableSolrParams
-          ModifiableSolrParams cloneParams = new ModifiableSolrParams();
-          cloneParams.add(params);
-          cloneParams.set(CoreAdminParams.CORE,
-              node.getStr(ZkStateReader.CORE_NAME_PROP));
-          
-          String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
-          ShardRequest sreq = new ShardRequest();
-          sreq.nodeName = node.getStr(ZkStateReader.NODE_NAME_PROP);
-          // yes, they must use same admin handler path everywhere...
-          cloneParams.set("qt", adminPath);
-          sreq.purpose = 1;
-          // TODO: this sucks
-          if (replica.startsWith("http://")) replica = replica.substring(7);
-          sreq.shards = new String[] {replica};
-          sreq.actualShards = sreq.shards;
-          sreq.params = cloneParams;
-          log.info("Collection Admin sending CoreAdmin cmd to " + replica
-              + " params:" + sreq.params);
-          shardHandler.submit(sreq, replica, sreq.params);
-        }
-      }
+      sliceCmd(clusterState, params, stateMatcher, slice);
     }
     
     ShardResponse srsp;
@@ -795,6 +846,36 @@ public class OverseerCollectionProcessor
 
   }
 
+  private void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, String stateMatcher, Slice slice) {
+    Map<String,Replica> shards = slice.getReplicasMap();
+    Set<Map.Entry<String,Replica>> shardEntries = shards.entrySet();
+    for (Map.Entry<String,Replica> shardEntry : shardEntries) {
+      final ZkNodeProps node = shardEntry.getValue();
+      if (clusterState.liveNodesContain(node.getStr(ZkStateReader.NODE_NAME_PROP)) && (stateMatcher != null ? node.getStr(ZkStateReader.STATE_PROP).equals(stateMatcher) : true)) {
+        // For thread safety, only simple clone the ModifiableSolrParams
+        ModifiableSolrParams cloneParams = new ModifiableSolrParams();
+        cloneParams.add(params);
+        cloneParams.set(CoreAdminParams.CORE,
+            node.getStr(ZkStateReader.CORE_NAME_PROP));
+
+        String replica = node.getStr(ZkStateReader.BASE_URL_PROP);
+        ShardRequest sreq = new ShardRequest();
+        sreq.nodeName = node.getStr(ZkStateReader.NODE_NAME_PROP);
+        // yes, they must use same admin handler path everywhere...
+        cloneParams.set("qt", adminPath);
+        sreq.purpose = 1;
+        // TODO: this sucks
+        if (replica.startsWith("http://")) replica = replica.substring(7);
+        sreq.shards = new String[] {replica};
+        sreq.actualShards = sreq.shards;
+        sreq.params = cloneParams;
+        log.info("Collection Admin sending CoreAdmin cmd to " + replica
+            + " params:" + sreq.params);
+        shardHandler.submit(sreq, replica, sreq.params);
+      }
+    }
+  }
+
   private void processResponse(NamedList results, ShardResponse srsp) {
     Throwable e = srsp.getException();
     if (e != null) {

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1499656&r1=1499655&r2=1499656&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Thu Jul  4 05:05:46 2013
@@ -132,10 +132,14 @@ public class CollectionsHandler extends 
         this.handleDeleteAliasAction(req, rsp);
         break;
       }
-        case SPLITSHARD:  {
-          this.handleSplitShardAction(req, rsp);
-          break;
-        }
+      case SPLITSHARD:  {
+        this.handleSplitShardAction(req, rsp);
+        break;
+      }
+      case DELETESHARD: {
+        this.handleDeleteShardAction(req, rsp);
+        break;
+      }
 
       default: {
           throw new RuntimeException("Unknown action: " + action);
@@ -146,13 +150,18 @@ public class CollectionsHandler extends 
   }
   
   public static long DEFAULT_ZK_TIMEOUT = 60*1000;
+
+  private void handleResponse(String operation, ZkNodeProps m,
+                              SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+    handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
+  }
   
   private void handleResponse(String operation, ZkNodeProps m,
-      SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+      SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
     long time = System.currentTimeMillis();
     QueueEvent event = coreContainer.getZkController()
         .getOverseerCollectionQueue()
-        .offer(ZkStateReader.toJSON(m), DEFAULT_ZK_TIMEOUT);
+        .offer(ZkStateReader.toJSON(m), timeout);
     if (event.getBytes() != null) {
       SolrResponse response = SolrResponse.deserialize(event.getBytes());
       rsp.getValues().addAll(response.getResponse());
@@ -162,9 +171,9 @@ public class CollectionsHandler extends 
         rsp.setException(new SolrException(code != null && code != -1 ? ErrorCode.getErrorCode(code) : ErrorCode.SERVER_ERROR, (String)exp.get("msg")));
       }
     } else {
-      if (System.currentTimeMillis() - time >= DEFAULT_ZK_TIMEOUT) {
+      if (System.currentTimeMillis() - time >= timeout) {
         throw new SolrException(ErrorCode.SERVER_ERROR, operation
-            + " the collection time out:" + DEFAULT_ZK_TIMEOUT / 1000 + "s");
+            + " the collection time out:" + timeout / 1000 + "s");
       } else if (event.getWatchedEvent() != null) {
         throw new SolrException(ErrorCode.SERVER_ERROR, operation
             + " the collection error [Watcher fired on path: "
@@ -280,6 +289,21 @@ public class CollectionsHandler extends 
 
     handleResponse(OverseerCollectionProcessor.CREATECOLLECTION, m, rsp);
   }
+  
+  private void handleDeleteShardAction(SolrQueryRequest req,
+      SolrQueryResponse rsp) throws InterruptedException, KeeperException {
+    log.info("Deleting Shard : " + req.getParamString());
+    String name = req.getParams().required().get("collection");
+    String shard = req.getParams().required().get("shard");
+    
+    Map<String,Object> props = new HashMap<String,Object>();
+    props.put("collection", name);
+    props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.DELETESHARD);
+    props.put(ZkStateReader.SHARD_ID_PROP, shard);
+
+    ZkNodeProps m = new ZkNodeProps(props);
+    handleResponse(OverseerCollectionProcessor.DELETESHARD, m, rsp);
+  }
 
   private void handleSplitShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
     log.info("Splitting shard : " + req.getParamString());
@@ -295,10 +319,7 @@ public class CollectionsHandler extends 
 
     ZkNodeProps m = new ZkNodeProps(props);
 
-    // todo remove this hack
-    DEFAULT_ZK_TIMEOUT *= 5;
-    handleResponse(OverseerCollectionProcessor.SPLITSHARD, m, rsp);
-    DEFAULT_ZK_TIMEOUT /= 5;
+    handleResponse(OverseerCollectionProcessor.SPLITSHARD, m, rsp, DEFAULT_ZK_TIMEOUT * 5);
   }
 
   public static ModifiableSolrParams params(String... params) {

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1499656&r1=1499655&r2=1499656&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Thu Jul  4 05:05:46 2013
@@ -28,7 +28,7 @@ public interface CollectionParams 
 
 
   public enum CollectionAction {
-    CREATE, DELETE, RELOAD, SYNCSHARD, CREATEALIAS, DELETEALIAS, SPLITSHARD;
+    CREATE, DELETE, RELOAD, SYNCSHARD, CREATEALIAS, DELETEALIAS, SPLITSHARD, DELETESHARD;
     
     public static CollectionAction get( String p )
     {