You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:12:09 UTC

[21/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Handle duplicate docs in bulk update

Previous to this commit, fabric assumed that all docs in a bulk update
request were unique. In the case that they were not unique, fabric would
error and return a stack trace to the user. This commit makes fabric
identify docs in a bulk update by a ref rather than their id.

This commit is based on work done by Bob Dionne.

BugzID: 12540


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f38bc21a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f38bc21a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f38bc21a

Branch: refs/heads/windsor-merge-121
Commit: f38bc21ac2ef05fb12d807f5cd67218851c0c04a
Parents: 2f1db98
Author: Benjamin Bastian <be...@gmail.com>
Authored: Fri Aug 23 11:41:48 2013 -0700
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:14:14 2014 +0100

----------------------------------------------------------------------
 src/fabric_doc_update.erl | 23 ++++++++++++++++++-----
 1 file changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f38bc21a/src/fabric_doc_update.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl
index ddae65d..9e2ce50 100644
--- a/src/fabric_doc_update.erl
+++ b/src/fabric_doc_update.erl
@@ -20,18 +20,20 @@
 
 go(_, [], _) ->
     {ok, []};
-go(DbName, AllDocs, Opts) ->
+go(DbName, AllDocs0, Opts) ->
+    AllDocs = tag_docs(AllDocs0),
     validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)),
     Options = lists:delete(all_or_nothing, Opts),
     GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
-        Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+        Docs1 = untag_docs(Docs),
+        Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name,Docs1,Options]}),
         {Shard#shard{ref=Ref}, Docs}
     end, group_docs_by_shard(DbName, AllDocs)),
     {Workers, _} = lists:unzip(GroupedDocs),
     RexiMon = fabric_util:create_monitors(Workers),
     W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
     Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs,
-        dict:from_list([{Doc,[]} || Doc <- AllDocs])},
+        dict:new()},
     Timeout = fabric_util:request_timeout(),
     try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of
     {ok, {Health, Results}} when Health =:= ok; Health =:= accepted ->
@@ -80,7 +82,9 @@ handle_message({ok, Replies}, Worker, Acc0) ->
             {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}};
         {stop, W, FinalReplies} ->
             {stop, {ok, FinalReplies}}
-        end
+        end;
+    _ ->
+        {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}}
     end;
 handle_message({missing_stub, Stub}, _, _) ->
     throw({missing_stub, Stub});
@@ -91,6 +95,16 @@ handle_message({not_found, no_db_file} = X, Worker, Acc0) ->
 handle_message({bad_request, Msg}, _, _) ->
     throw({bad_request, Msg}).
 
+tag_docs([]) ->
+    [];
+tag_docs([#doc{meta=Meta}=Doc | Rest]) ->
+    [Doc#doc{meta=[{ref, make_ref()} | Meta]} | tag_docs(Rest)].
+
+untag_docs([]) ->
+    [];
+untag_docs([#doc{meta=Meta}=Doc | Rest]) ->
+    [Doc#doc{meta=lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)].
+
 force_reply(Doc, [], {_, W, Acc}) ->
     {error, W, [{Doc, {error, internal_server_error}} | Acc]};
 force_reply(Doc, [FirstReply|_] = Replies, {Health, W, Acc}) ->
@@ -157,7 +171,6 @@ append_update_replies([Doc|Rest], [], Dict0) ->
     % icky, if replicated_changes only errors show up in result
     append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
 append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
-    % TODO what if the same document shows up twice in one update_docs call?
     append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
 
 skip_message({0, _, W, _, DocReplyDict}) ->