You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/02/14 03:45:42 UTC
svn commit: r1243768 -
/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
Author: yonik
Date: Tue Feb 14 02:45:41 2012
New Revision: 1243768
URL: http://svn.apache.org/viewvc?rev=1243768&view=rev
Log:
solrcloud: send deleteByQuery to all shard leaders to version and forward to replicas
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1243768&r1=1243767&r2=1243768&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Tue Feb 14 02:45:41 2012
@@ -60,14 +60,15 @@ import org.apache.solr.update.UpdateHand
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.VersionBucket;
import org.apache.solr.update.VersionInfo;
+import org.apache.zookeeper.KeeperException;
// NOT mt-safe... create a new processor for each add thread
// TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for
public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public static final String SEEN_LEADER = "leader";
public static final String COMMIT_END_POINT = "commit_end_point";
- public static final String DELQUERY_END_POINT = "delquery_end_point";
-
+ public static final String DELETE_BY_QUERY_LEVEL = "dbq_level";
+
private final SolrQueryRequest req;
private final SolrQueryResponse rsp;
private final UpdateRequestProcessor next;
@@ -91,6 +92,7 @@ public class DistributedUpdateProcessor
private boolean zkEnabled = false;
+ private CloudDescriptor cloudDesc;
private String collection;
private ZkController zkController;
@@ -128,7 +130,7 @@ public class DistributedUpdateProcessor
zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
- CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
+ cloudDesc = coreDesc.getCloudDescriptor();
if (cloudDesc != null) {
collection = cloudDesc.getCollectionName();
@@ -192,7 +194,8 @@ public class DistributedUpdateProcessor
return nodes;
}
-
+
+
private String getShard(int hash, String collection, CloudState cloudState) {
// ranges should be part of the cloud state and eventually gotten from zk
@@ -200,6 +203,43 @@ public class DistributedUpdateProcessor
return cloudState.getShard(hash, collection);
}
+ // used for deleteByQyery to get the list of nodes this leader should forward to
+ private List<Node> setupRequest() {
+ List<Node> nodes = null;
+ String shardId = cloudDesc.getShardId();
+
+ try {
+
+ ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps(
+ collection, shardId));
+
+ String leaderNodeName = leaderProps.getCoreNodeName();
+ String coreName = req.getCore().getName();
+ String coreNodeName = zkController.getNodeName() + "_" + coreName;
+ isLeader = coreNodeName.equals(leaderNodeName);
+
+ // TODO: what if we are no longer the leader?
+
+ forwardToLeader = false;
+ List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
+ .getReplicaProps(collection, shardId, zkController.getNodeName(),
+ coreName);
+ if (replicaProps != null) {
+ nodes = new ArrayList<Node>(replicaProps.size());
+ for (ZkCoreNodeProps props : replicaProps) {
+ nodes.add(new StdNode(props));
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+ e);
+ }
+
+ return nodes;
+ }
+
+
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
// TODO: check for id field?
@@ -419,17 +459,7 @@ public class DistributedUpdateProcessor
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
if (!cmd.isDeleteById()) {
- // delete by query...
- // TODO: handle versioned and distributed deleteByQuery
-
- // even in non zk mode, tests simulate updates from a leader
- if(!zkEnabled) {
- isLeader = !req.getParams().getBool(SEEN_LEADER, false);
- } else {
- zkCheck();
- }
-
- processDeleteByQuery(cmd);
+ doDeleteByQuery(cmd);
return;
}
@@ -475,6 +505,161 @@ public class DistributedUpdateProcessor
}
}
+ public void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
+ // even in non zk mode, tests simulate updates from a leader
+ if(!zkEnabled) {
+ isLeader = !req.getParams().getBool(SEEN_LEADER, false);
+ } else {
+ zkCheck();
+ }
+
+ // Lev1: we are the first to receive this deleteByQuery, it must be forwarded to the leader of every shard
+ // Lev2: we are a leader receiving a forwarded deleteByQuery... we must:
+ // - block all updates (use VersionInfo)
+ // - flush *all* updates going to our replicas
+ // - forward the DBQ to our replicas and wait for the response
+ // - log + execute the local DBQ
+ // Lev3: we are a replica receiving a DBQ from our leader
+ // - log + execute the local DBQ
+
+ int dbqlevel = req.getParams().getInt(DELETE_BY_QUERY_LEVEL, 1);
+
+ if (zkEnabled && dbqlevel == 1) {
+ boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard
+
+ Map<String,Slice> slices = zkController.getCloudState().getSlices(collection);
+
+ ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+ params.set("dbqlevel", 2);
+
+ List<Node> leaders = new ArrayList<Node>(slices.size());
+ for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) {
+ String sliceName = sliceEntry.getKey();
+ ZkNodeProps leaderProps;
+ try {
+ leaderProps = zkController.getZkStateReader().getLeaderProps(collection, sliceName);
+ } catch (InterruptedException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Exception finding leader for shard " + sliceName, e);
+ }
+
+ // TODO: What if leaders changed in the meantime?
+ // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?
+
+ // Am I the leader for this slice?
+ ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leaderProps);
+ String leaderNodeName = coreLeaderProps.getCoreNodeName();
+ String coreName = req.getCore().getName();
+ String coreNodeName = zkController.getNodeName() + "_" + coreName;
+ isLeader = coreNodeName.equals(leaderNodeName);
+
+ if (isLeader) {
+ // don't forward to ourself
+ leaderForAnyShard = true;
+ } else {
+ leaders.add(new StdNode(coreLeaderProps));
+ }
+ }
+
+ cmdDistrib.distribDelete(cmd, leaders, params);
+
+ if (!leaderForAnyShard) {
+ return;
+ }
+
+ // change the level to 2 so we look up and forward to our own replicas (if any)
+ dbqlevel = 2;
+ }
+
+ List<Node> replicas = null;
+
+ if (zkEnabled && dbqlevel == 2) {
+ // This core should be a leader
+ replicas = setupRequest();
+ }
+
+ if (vinfo == null) {
+ super.processDelete(cmd);
+ return;
+ }
+
+ // at this point, there is an update we need to try and apply.
+ // we may or may not be the leader.
+
+ // Find the version
+ long versionOnUpdate = cmd.getVersion();
+ if (versionOnUpdate == 0) {
+ String versionOnUpdateS = req.getParams().get(VERSION_FIELD);
+ versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
+ }
+ versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
+
+ boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
+ boolean leaderLogic = isLeader && !isReplay;
+
+ if (!leaderLogic && versionOnUpdate==0) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
+ }
+
+ vinfo.blockUpdates();
+ try {
+
+ if (versionsStored) {
+ if (leaderLogic) {
+ long version = vinfo.getNewClock();
+ cmd.setVersion(-version);
+ // TODO update versions in all buckets
+
+ // TODO: flush any adds to these replicas so they do not get reordered w.r.t. this DBQ
+
+ doLocalDelete(cmd);
+
+ // forward to all replicas
+ if (replicas != null) {
+ ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+ params.set("dbqlevel", 3);
+ params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));
+ params.set(SEEN_LEADER, "true");
+ cmdDistrib.distribDelete(cmd, replicas, params);
+
+ // wait for DBQ responses before releasing the update block to eliminate the possibility
+ // of an add being reordered.
+ // TODO: this isn't strictly necessary - we could do the same thing we do for PeerSync
+ // in DUH2 and add a clause that prevents deleting older docs.
+ cmdDistrib.finish();
+ }
+
+ } else {
+ cmd.setVersion(-versionOnUpdate);
+
+ if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ // we're not in an active state, and this update isn't from a replay, so buffer it.
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+ ulog.deleteByQuery(cmd);
+ return;
+ }
+
+ doLocalDelete(cmd);
+ }
+ }
+
+ // since we don't know which documents were deleted, the easiest thing to do is to invalidate
+ // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
+ // (so cache misses will see up-to-date data)
+
+ } finally {
+ vinfo.unblockUpdates();
+ }
+
+ if (returnVersions && rsp != null) {
+ if (deleteByQueryResponse == null) {
+ deleteByQueryResponse = new NamedList<String>();
+ rsp.add("deleteByQuery",deleteByQueryResponse);
+ }
+ deleteByQueryResponse.add(cmd.getQuery(), cmd.getVersion());
+ }
+ }
+
+
private void zkCheck() {
int retries = 10;
while (!zkController.isConnected()) {
@@ -571,89 +756,6 @@ public class DistributedUpdateProcessor
}
}
- private void processDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
- if (vinfo == null) {
- super.processDelete(cmd);
- return;
- }
-
- // at this point, there is an update we need to try and apply.
- // we may or may not be the leader.
-
- // Find the version
- long versionOnUpdate = cmd.getVersion();
- if (versionOnUpdate == 0) {
- String versionOnUpdateS = req.getParams().get(VERSION_FIELD);
- versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
- }
- versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
-
- boolean isReplay = (cmd.getFlags() & UpdateCommand.REPLAY) != 0;
- boolean leaderLogic = isLeader && !isReplay;
-
- if (!leaderLogic && versionOnUpdate==0) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
- }
-
- vinfo.blockUpdates();
- try {
-
- if (versionsStored) {
- if (leaderLogic) {
- long version = vinfo.getNewClock();
- cmd.setVersion(-version);
- // TODO update versions in all buckets
- } else {
- cmd.setVersion(-versionOnUpdate);
-
- if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
- // we're not in an active state, and this update isn't from a replay, so buffer it.
- cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
- ulog.deleteByQuery(cmd);
- return;
- }
- }
- }
-
- doLocalDelete(cmd);
-
- // since we don't know which documents were deleted, the easiest thing to do is to invalidate
- // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
- // (so cache misses will see up-to-date data)
-
- } finally {
- vinfo.unblockUpdates();
- }
-
- // TODO: we should consider this? Send delete query to everyone in the current collection
-
- if (zkEnabled) {
- ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
- if (!params.getBool(DELQUERY_END_POINT, false)) {
- params.set(DELQUERY_END_POINT, true);
-
- String nodeName = req.getCore().getCoreDescriptor().getCoreContainer()
- .getZkController().getNodeName();
- String shardZkNodeName = nodeName + "_" + req.getCore().getName();
- List<Node> nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
- .getCloudDescriptor().getCollectionName(), shardZkNodeName);
-
- if (nodes != null) {
- cmdDistrib.distribDelete(cmd, nodes, params);
- finish();
- }
- }
- }
-
- if (returnVersions && rsp != null) {
- if (deleteByQueryResponse == null) {
- deleteByQueryResponse = new NamedList<String>();
- rsp.add("deleteByQuery",deleteByQueryResponse);
- }
- deleteByQueryResponse.add(cmd.getQuery(), cmd.getVersion());
- }
-
- }
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
@@ -718,7 +820,7 @@ public class DistributedUpdateProcessor
}
for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) {
Slice replicas = slices.get(sliceEntry.getKey());
-
+
Map<String,ZkNodeProps> shardMap = replicas.getShards();
for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {