You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:12:09 UTC
[21/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f
Handle duplicate docs in bulk update
Previous to this commit, fabric assumed that all docs in a bulk update
request were unique. In the case that they were not unique, fabric would
error and return a stack trace to the user. This commit makes fabric
identify docs in a bulk update by a ref rather than their id.
This commit is based on work done by Bob Dionne.
BugzID: 12540
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f38bc21a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f38bc21a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f38bc21a
Branch: refs/heads/windsor-merge-121
Commit: f38bc21ac2ef05fb12d807f5cd67218851c0c04a
Parents: 2f1db98
Author: Benjamin Bastian <be...@gmail.com>
Authored: Fri Aug 23 11:41:48 2013 -0700
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:14:14 2014 +0100
----------------------------------------------------------------------
src/fabric_doc_update.erl | 23 ++++++++++++++++++-----
1 file changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f38bc21a/src/fabric_doc_update.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl
index ddae65d..9e2ce50 100644
--- a/src/fabric_doc_update.erl
+++ b/src/fabric_doc_update.erl
@@ -20,18 +20,20 @@
go(_, [], _) ->
{ok, []};
-go(DbName, AllDocs, Opts) ->
+go(DbName, AllDocs0, Opts) ->
+ AllDocs = tag_docs(AllDocs0),
validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)),
Options = lists:delete(all_or_nothing, Opts),
GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
- Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+ Docs1 = untag_docs(Docs),
+ Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name,Docs1,Options]}),
{Shard#shard{ref=Ref}, Docs}
end, group_docs_by_shard(DbName, AllDocs)),
{Workers, _} = lists:unzip(GroupedDocs),
RexiMon = fabric_util:create_monitors(Workers),
W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs,
- dict:from_list([{Doc,[]} || Doc <- AllDocs])},
+ dict:new()},
Timeout = fabric_util:request_timeout(),
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of
{ok, {Health, Results}} when Health =:= ok; Health =:= accepted ->
@@ -80,7 +82,9 @@ handle_message({ok, Replies}, Worker, Acc0) ->
{ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}};
{stop, W, FinalReplies} ->
{stop, {ok, FinalReplies}}
- end
+ end;
+ _ ->
+ {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}}
end;
handle_message({missing_stub, Stub}, _, _) ->
throw({missing_stub, Stub});
@@ -91,6 +95,16 @@ handle_message({not_found, no_db_file} = X, Worker, Acc0) ->
handle_message({bad_request, Msg}, _, _) ->
throw({bad_request, Msg}).
+tag_docs([]) ->
+ [];
+tag_docs([#doc{meta=Meta}=Doc | Rest]) ->
+ [Doc#doc{meta=[{ref, make_ref()} | Meta]} | tag_docs(Rest)].
+
+untag_docs([]) ->
+ [];
+untag_docs([#doc{meta=Meta}=Doc | Rest]) ->
+ [Doc#doc{meta=lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)].
+
force_reply(Doc, [], {_, W, Acc}) ->
{error, W, [{Doc, {error, internal_server_error}} | Acc]};
force_reply(Doc, [FirstReply|_] = Replies, {Health, W, Acc}) ->
@@ -157,7 +171,6 @@ append_update_replies([Doc|Rest], [], Dict0) ->
% icky, if replicated_changes only errors show up in result
append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
- % TODO what if the same document shows up twice in one update_docs call?
append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
skip_message({0, _, W, _, DocReplyDict}) ->