You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/02/14 03:45:42 UTC

svn commit: r1243768 - /lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java

Author: yonik
Date: Tue Feb 14 02:45:41 2012
New Revision: 1243768

URL: http://svn.apache.org/viewvc?rev=1243768&view=rev
Log:
solrcloud: send deleteByQuery to all shard leaders to version and forward to replicas

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1243768&r1=1243767&r2=1243768&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Tue Feb 14 02:45:41 2012
@@ -60,14 +60,15 @@ import org.apache.solr.update.UpdateHand
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.VersionBucket;
 import org.apache.solr.update.VersionInfo;
+import org.apache.zookeeper.KeeperException;
 
 // NOT mt-safe... create a new processor for each add thread
 // TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for
 public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   public static final String SEEN_LEADER = "leader";
   public static final String COMMIT_END_POINT = "commit_end_point";
-  public static final String DELQUERY_END_POINT = "delquery_end_point";
-  
+  public static final String DELETE_BY_QUERY_LEVEL = "dbq_level";
+
   private final SolrQueryRequest req;
   private final SolrQueryResponse rsp;
   private final UpdateRequestProcessor next;
@@ -91,6 +92,7 @@ public class DistributedUpdateProcessor 
 
   private boolean zkEnabled = false;
 
+  private CloudDescriptor cloudDesc;
   private String collection;
   private ZkController zkController;
   
@@ -128,7 +130,7 @@ public class DistributedUpdateProcessor 
     
     zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
     
-    CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
+    cloudDesc = coreDesc.getCloudDescriptor();
     
     if (cloudDesc != null) {
       collection = cloudDesc.getCollectionName();
@@ -192,7 +194,8 @@ public class DistributedUpdateProcessor 
 
     return nodes;
   }
-  
+
+
   private String getShard(int hash, String collection, CloudState cloudState) {
     // ranges should be part of the cloud state and eventually gotten from zk
 
@@ -200,6 +203,43 @@ public class DistributedUpdateProcessor 
     return cloudState.getShard(hash, collection);
   }
 
+  // used for deleteByQyery to get the list of nodes this leader should forward to
+  private List<Node> setupRequest() {
+    List<Node> nodes = null;
+    String shardId = cloudDesc.getShardId();
+
+    try {
+
+      ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps(
+          collection, shardId));
+
+      String leaderNodeName = leaderProps.getCoreNodeName();
+      String coreName = req.getCore().getName();
+      String coreNodeName = zkController.getNodeName() + "_" + coreName;
+      isLeader = coreNodeName.equals(leaderNodeName);
+
+      // TODO: what if we are no longer the leader?
+
+      forwardToLeader = false;
+      List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
+          .getReplicaProps(collection, shardId, zkController.getNodeName(),
+              coreName);
+      if (replicaProps != null) {
+        nodes = new ArrayList<Node>(replicaProps.size());
+        for (ZkCoreNodeProps props : replicaProps) {
+          nodes.add(new StdNode(props));
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+          e);
+    }
+
+    return nodes;
+  }
+
+
   @Override
   public void processAdd(AddUpdateCommand cmd) throws IOException {
     // TODO: check for id field?
@@ -419,17 +459,7 @@ public class DistributedUpdateProcessor 
   @Override
   public void processDelete(DeleteUpdateCommand cmd) throws IOException {
     if (!cmd.isDeleteById()) {
-      // delete by query...
-      // TODO: handle versioned and distributed deleteByQuery
-
-      // even in non zk mode, tests simulate updates from a leader
-      if(!zkEnabled) {
-        isLeader = !req.getParams().getBool(SEEN_LEADER, false);
-      } else {
-        zkCheck();
-      }
-      
-      processDeleteByQuery(cmd);
+      doDeleteByQuery(cmd);
       return;
     }
 
@@ -475,6 +505,161 @@ public class DistributedUpdateProcessor 
     }
   }
 
+  public void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
+    // even in non zk mode, tests simulate updates from a leader
+    if(!zkEnabled) {
+      isLeader = !req.getParams().getBool(SEEN_LEADER, false);
+    } else {
+      zkCheck();
+    }
+
+    // Lev1: we are the first to receive this deleteByQuery, it must be forwarded to the leader of every shard
+    // Lev2: we are a leader receiving a forwarded deleteByQuery... we must:
+    //       - block all updates (use VersionInfo)
+    //       - flush *all* updates going to our replicas
+    //       - forward the DBQ to our replicas and wait for the response
+    //       - log + execute the local DBQ
+    // Lev3: we are a replica receiving a DBQ from our leader
+    //       - log + execute the local DBQ
+
+    int dbqlevel = req.getParams().getInt(DELETE_BY_QUERY_LEVEL, 1);
+
+    if (zkEnabled && dbqlevel == 1) {
+      boolean leaderForAnyShard = false;  // start off by assuming we are not a leader for any shard
+
+      Map<String,Slice> slices = zkController.getCloudState().getSlices(collection);
+
+      ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+      params.set("dbqlevel", 2);
+
+      List<Node> leaders =  new ArrayList<Node>(slices.size());
+      for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) {
+        String sliceName = sliceEntry.getKey();
+        ZkNodeProps leaderProps;
+        try {
+          leaderProps = zkController.getZkStateReader().getLeaderProps(collection, sliceName);
+        } catch (InterruptedException e) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Exception finding leader for shard " + sliceName, e);
+        }
+
+        // TODO: What if leaders changed in the meantime?
+        // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?
+
+        // Am I the leader for this slice?
+        ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leaderProps);
+        String leaderNodeName = coreLeaderProps.getCoreNodeName();
+        String coreName = req.getCore().getName();
+        String coreNodeName = zkController.getNodeName() + "_" + coreName;
+        isLeader = coreNodeName.equals(leaderNodeName);
+
+        if (isLeader) {
+          // don't forward to ourself
+          leaderForAnyShard = true;
+        } else {
+          leaders.add(new StdNode(coreLeaderProps));
+        }
+      }
+
+      cmdDistrib.distribDelete(cmd, leaders, params);
+
+      if (!leaderForAnyShard) {
+        return;
+      }
+
+      // change the level to 2 so we look up and forward to our own replicas (if any)
+      dbqlevel = 2;
+    }
+
+    List<Node> replicas = null;
+
+    if (zkEnabled && dbqlevel == 2) {
+      // This core should be a leader
+      replicas = setupRequest();
+    }
+
+    if (vinfo == null) {
+      super.processDelete(cmd);
+      return;
+    }
+
+    // at this point, there is an update we need to try and apply.
+    // we may or may not be the leader.
+
+    // Find the version
+    long versionOnUpdate = cmd.getVersion();
+    if (versionOnUpdate == 0) {
+      String versionOnUpdateS = req.getParams().get(VERSION_FIELD);
+      versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
+    }
+    versionOnUpdate = Math.abs(versionOnUpdate);  // normalize to positive version
+
+    boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
+    boolean leaderLogic = isLeader && !isReplay;
+
+    if (!leaderLogic && versionOnUpdate==0) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
+    }
+
+    vinfo.blockUpdates();
+    try {
+
+      if (versionsStored) {
+        if (leaderLogic) {
+          long version = vinfo.getNewClock();
+          cmd.setVersion(-version);
+          // TODO update versions in all buckets
+
+          // TODO: flush any adds to these replicas so they do not get reordered w.r.t. this DBQ
+
+          doLocalDelete(cmd);
+
+          // forward to all replicas
+          if (replicas != null) {
+            ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+            params.set("dbqlevel", 3);
+            params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));
+            params.set(SEEN_LEADER, "true");
+            cmdDistrib.distribDelete(cmd, replicas, params);
+
+            // wait for DBQ responses before releasing the update block to eliminate the possibility
+            // of an add being reordered.
+            // TODO: this isn't strictly necessary - we could do the same thing we do for PeerSync
+            // in DUH2 and add a clause that prevents deleting older docs.
+            cmdDistrib.finish();
+          }
+
+        } else {
+          cmd.setVersion(-versionOnUpdate);
+
+          if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            // we're not in an active state, and this update isn't from a replay, so buffer it.
+            cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+            ulog.deleteByQuery(cmd);
+            return;
+          }
+
+          doLocalDelete(cmd);
+        }
+      }
+
+      // since we don't know which documents were deleted, the easiest thing to do is to invalidate
+      // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
+      // (so cache misses will see up-to-date data)
+
+    } finally {
+      vinfo.unblockUpdates();
+    }
+
+    if (returnVersions && rsp != null) {
+      if (deleteByQueryResponse == null) {
+        deleteByQueryResponse = new NamedList<String>();
+        rsp.add("deleteByQuery",deleteByQueryResponse);
+      }
+      deleteByQueryResponse.add(cmd.getQuery(), cmd.getVersion());
+    }
+  }
+
+
   private void zkCheck() {
     int retries = 10;
     while (!zkController.isConnected()) {
@@ -571,89 +756,6 @@ public class DistributedUpdateProcessor 
     }
   }
 
-  private void processDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
-    if (vinfo == null) {
-      super.processDelete(cmd);
-      return;
-    }
-
-    // at this point, there is an update we need to try and apply.
-    // we may or may not be the leader.
-
-    // Find the version
-    long versionOnUpdate = cmd.getVersion();
-    if (versionOnUpdate == 0) {
-      String versionOnUpdateS = req.getParams().get(VERSION_FIELD);
-      versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
-    }
-    versionOnUpdate = Math.abs(versionOnUpdate);  // normalize to positive version
-
-    boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
-    boolean leaderLogic = isLeader && !isReplay;
-
-    if (!leaderLogic && versionOnUpdate==0) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
-    }
-
-    vinfo.blockUpdates();
-    try {
-
-      if (versionsStored) {
-        if (leaderLogic) {
-          long version = vinfo.getNewClock();
-          cmd.setVersion(-version);
-          // TODO update versions in all buckets
-        } else {
-          cmd.setVersion(-versionOnUpdate);
-
-          if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
-            // we're not in an active state, and this update isn't from a replay, so buffer it.
-            cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
-            ulog.deleteByQuery(cmd);
-            return;
-          }
-        }
-      }
-
-      doLocalDelete(cmd);
-
-      // since we don't know which documents were deleted, the easiest thing to do is to invalidate
-      // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
-      // (so cache misses will see up-to-date data)
-
-    } finally {
-      vinfo.unblockUpdates();
-    }
-
-    // TODO: we should consider this? Send delete query to everyone in the current collection
-
-    if (zkEnabled) {
-      ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
-      if (!params.getBool(DELQUERY_END_POINT, false)) {
-        params.set(DELQUERY_END_POINT, true);
-
-        String nodeName = req.getCore().getCoreDescriptor().getCoreContainer()
-            .getZkController().getNodeName();
-        String shardZkNodeName = nodeName + "_" + req.getCore().getName();
-        List<Node> nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
-            .getCloudDescriptor().getCollectionName(), shardZkNodeName);
-
-        if (nodes != null) {
-          cmdDistrib.distribDelete(cmd, nodes, params);
-          finish();
-        }
-      }
-    }
-
-    if (returnVersions && rsp != null) {
-      if (deleteByQueryResponse == null) {
-        deleteByQueryResponse = new NamedList<String>();
-        rsp.add("deleteByQuery",deleteByQueryResponse);
-      }
-      deleteByQueryResponse.add(cmd.getQuery(), cmd.getVersion());
-    }
-
-  }
 
   @Override
   public void processCommit(CommitUpdateCommand cmd) throws IOException {
@@ -718,7 +820,7 @@ public class DistributedUpdateProcessor 
     }
     for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) {
       Slice replicas = slices.get(sliceEntry.getKey());
-      
+
       Map<String,ZkNodeProps> shardMap = replicas.getShards();
       
       for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {