You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/07 17:37:49 UTC

[27/50] couch commit: updated refs/heads/windsor-merge to 6e60cbe

Handle duplicate doc updates in single commit

Before this commit, there were bugs in couch_db and couch_db_updater
which caused couch to incorrectly report that all duplicate docs in a
group commit failed from conflicts. The changes in this commit cause
couch to correctly report that one of the duplicate updates in the
group commit succeeded. This commit also changes behavior slightly by
applying duplicate doc updates in the order they were supplied. This
change is for consistency with CouchDB.

This work is based on the patch by Bob Dionne.

BugzID: 12540


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/46607e03
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/46607e03
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/46607e03

Branch: refs/heads/windsor-merge
Commit: 46607e03d2a06a95ff38d6256e037450e886a837
Parents: fcce4b7
Author: Benjamin Bastian <be...@gmail.com>
Authored: Tue Aug 27 11:26:58 2013 -0700
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Aug 6 10:34:17 2014 +0100

----------------------------------------------------------------------
 src/couch_db.erl         | 95 ++++++++++++++++++++++++++-----------------
 src/couch_db_updater.erl | 52 +++++++++++++++++------
 src/couch_doc.erl        | 12 +++---
 3 files changed, 103 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/46607e03/src/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_db.erl b/src/couch_db.erl
index 0b9e490..b0c7894 100644
--- a/src/couch_db.erl
+++ b/src/couch_db.erl
@@ -500,11 +500,17 @@ update_docs(Db, Docs) ->
 % group_alike_docs groups the sorted documents into sublist buckets, by id.
 % ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
 group_alike_docs(Docs) ->
-    Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs),
-    group_alike_docs(Sorted, []).
+    % Here we're just asserting that our doc sort is stable so that
+    % if we have duplicate docids we don't have to worry about the
+    % behavior of lists:sort/2 which isn't documented anyhwere as
+    % being stable.
+    WithPos = lists:zip(Docs, lists:seq(1, length(Docs))),
+    SortFun = fun({D1, P1}, {D2, P2}) -> {D1#doc.id, P1} =< {D2#doc.id, P2} end,
+    SortedDocs = [D || {D, _} <- lists:sort(SortFun, WithPos)],
+    group_alike_docs(SortedDocs, []).
 
 group_alike_docs([], Buckets) ->
-    lists:reverse(Buckets);
+    lists:reverse(lists:map(fun lists:reverse/1, Buckets));
 group_alike_docs([Doc|Rest], []) ->
     group_alike_docs(Rest, [[Doc]]);
 group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
@@ -627,10 +633,10 @@ prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
 
 prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
         AccFatalErrors) ->
-   {AccPrepped, AccFatalErrors};
+    AccPrepped2 = lists:reverse(lists:map(fun lists:reverse/1, AccPrepped)),
+    {AccPrepped2, AccFatalErrors};
 prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
         AllowConflict, AccPrepped, AccErrors) ->
-    [#doc{id=Id}|_]=DocBucket,
     % no existing revs are known,
     {PreppedBucket, AccErrors3} = lists:foldl(
         fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
@@ -645,11 +651,11 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
                 ok ->
                     {[Doc | AccBucket], AccErrors2};
                 Error ->
-                    {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]}
+                    {AccBucket, [{doc_tag(Doc), Error} | AccErrors2]}
                 end;
             _ ->
                 % old revs specified but none exist, a conflict
-                {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]}
+                {AccBucket, [{doc_tag(Doc), conflict} | AccErrors2]}
             end
         end,
         {[], AccErrors}, DocBucket),
@@ -670,9 +676,9 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets],
                     LeafRevsDict, AllowConflict) of
             {ok, Doc2} ->
                 {[Doc2 | Docs2Acc], AccErrors2};
-            {Error, #doc{id=Id,revs=Revs}} ->
+            {Error, _} ->
                 % Record the error
-                {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]}
+                {Docs2Acc, [{doc_tag(Doc), Error} |AccErrors2]}
             end
         end,
         {[], AccErrors}, DocBucket),
@@ -687,7 +693,8 @@ update_docs(Db, Docs, Options) ->
 prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
     Errors2 = [{{Id, {Pos, Rev}}, Error} ||
             {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors],
-    {lists:reverse(AccPrepped), lists:reverse(Errors2)};
+    AccPrepped2 = lists:reverse(lists:map(fun lists:reverse/1, AccPrepped)),
+    {AccPrepped2, lists:reverse(Errors2)};
 prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) ->
     case OldInfo of
     not_found ->
@@ -782,10 +789,10 @@ new_revs([], OutBuckets, IdRevsAcc) ->
     {lists:reverse(OutBuckets), IdRevsAcc};
 new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
     {NewBucket, IdRevsAcc3} = lists:mapfoldl(
-        fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)->
+        fun(#doc{revs={Start, RevIds}}=Doc, IdRevsAcc2)->
         NewRevId = new_revid(Doc),
         {Doc#doc{revs={Start+1, [NewRevId | RevIds]}},
-            [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
+            [{doc_tag(Doc), {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
     end, IdRevsAcc, Bucket),
     new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
 
@@ -802,9 +809,23 @@ check_dup_atts2(_) ->
     ok.
 
 
-update_docs(Db, Docs, Options, replicated_changes) ->
+tag_docs([]) ->
+    [];
+tag_docs([#doc{meta=Meta}=Doc | Rest]) ->
+    [Doc#doc{meta=[{ref, make_ref()} | Meta]} | tag_docs(Rest)].
+
+doc_tag(#doc{meta=Meta}) ->
+    case lists:keyfind(ref, 1, Meta) of
+        {ref, Ref} when is_reference(Ref) -> Ref;
+        false -> throw(doc_not_tagged);
+        Else -> throw({invalid_doc_tag, Else})
+    end.
+
+update_docs(Db, Docs0, Options, replicated_changes) ->
     increment_stat(Db, {couchdb, database_writes}),
+    Docs = tag_docs(Docs0),
     DocBuckets = before_docs_update(Db, group_alike_docs(Docs)),
+
     case (Db#db.validate_doc_funs /= []) orelse
         lists:any(
             fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
@@ -827,21 +848,17 @@ update_docs(Db, Docs, Options, replicated_changes) ->
     {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
     {ok, DocErrors};
 
-update_docs(Db, Docs, Options, interactive_edit) ->
+update_docs(Db, Docs0, Options, interactive_edit) ->
     increment_stat(Db, {couchdb, database_writes}),
     AllOrNothing = lists:member(all_or_nothing, Options),
-    % go ahead and generate the new revision ids for the documents.
-    % separate out the NonRep documents from the rest of the documents
-
-    {Docs2, NonRepDocs} = lists:foldl(
-         fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->
-            case Id of
-            <<?LOCAL_DOC_PREFIX, _/binary>> ->
-                {DocsAcc, [Doc | NonRepDocsAcc]};
-            Id->
-                {[Doc | DocsAcc], NonRepDocsAcc}
-            end
-        end, {[], []}, Docs),
+    Docs = tag_docs(Docs0),
+
+    % Separate _local docs from normal docs
+    IsLocal = fun
+        (#doc{id= <<?LOCAL_DOC_PREFIX, _/binary>>}) -> true;
+        (_) -> false
+    end,
+    {NonRepDocs, Docs2} = lists:partition(IsLocal, Docs),
 
     DocBuckets = before_docs_update(Db, group_alike_docs(Docs2)),
 
@@ -868,12 +885,14 @@ update_docs(Db, Docs, Options, interactive_edit) ->
     end,
 
     if (AllOrNothing) and (PreCommitFailures /= []) ->
-        {aborted, lists:map(
-            fun({{Id,{Pos, [RevId|_]}}, Error}) ->
-                {{Id, {Pos, RevId}}, Error};
-            ({{Id,{0, []}}, Error}) ->
-                {{Id, {0, <<>>}}, Error}
-            end, PreCommitFailures)};
+        RefErrorDict = dict:from_list([{doc_tag(Doc), Doc} || Doc <- Docs]),
+        {aborted, lists:map(fun({Ref, Error}) ->
+            #doc{id=Id,revs={Start,RevIds}} = dict:fetch(Ref, RefErrorDict),
+            case {Start, RevIds} of
+                {Pos, [RevId | _]} -> {{Id, {Pos, RevId}}, Error};
+                {0, []} -> {{Id, {0, <<>>}}, Error}
+            end
+        end, PreCommitFailures)};
     true ->
         Options2 = if AllOrNothing -> [merge_conflicts];
                 true -> [] end ++ Options,
@@ -885,12 +904,12 @@ update_docs(Db, Docs, Options, interactive_edit) ->
 
         {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
 
-        ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
-        {ok, lists:map(
-            fun(#doc{id=Id,revs={Pos, RevIds}}) ->
-                {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict),
-                Result
-            end, Docs)}
+        ResultsDict = lists:foldl(fun({Key, Resp}, ResultsAcc) ->
+            dict:store(Key, Resp, ResultsAcc)
+        end, dict:from_list(IdRevs), CommitResults ++ PreCommitFailures),
+        {ok, lists:map(fun(Doc) ->
+            dict:fetch(doc_tag(Doc), ResultsDict)
+        end, Docs)}
     end.
 
 % Returns the first available document on disk. Input list is a full rev path

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/46607e03/src/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_updater.erl b/src/couch_db_updater.erl
index 09028bd..7c2a43a 100644
--- a/src/couch_db_updater.erl
+++ b/src/couch_db_updater.erl
@@ -264,7 +264,7 @@ handle_cast(Msg, #db{name = Name} = Db) ->
 
 handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
         FullCommit}, Db) ->
-    GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs],
+    GroupedDocs2 = maybe_tag_grouped_docs(Client, GroupedDocs),
     if NonRepDocs == [] ->
         {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
                 [Client], MergeConflicts, FullCommit);
@@ -319,6 +319,20 @@ handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+maybe_tag_grouped_docs(Client, GroupedDocs) ->
+    lists:map(fun(DocGroup) ->
+        [{Client, maybe_tag_doc(D)} || D <- DocGroup]
+    end, GroupedDocs).
+
+maybe_tag_doc(#doc{id=Id, revs={Pos,[_Rev|PrevRevs]}, meta=Meta0}=Doc) ->
+    case lists:keymember(ref, 1, Meta0) of
+        true ->
+            Doc;
+        false ->
+            Key = {Id, {Pos-1, PrevRevs}},
+            Doc#doc{meta=[{ref, Key} | Meta0]}
+    end.
+
 merge_updates([[{_,#doc{id=X}}|_]=A|RestA], [[{_,#doc{id=X}}|_]=B|RestB]) ->
     [A++B | merge_updates(RestA, RestB)];
 merge_updates([[{_,#doc{id=X}}|_]|_]=A, [[{_,#doc{id=Y}}|_]|_]=B) when X < Y ->
@@ -337,8 +351,7 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
         % updaters than deal with their possible conflicts, and local docs
         % writes are relatively rare. Can be optmized later if really needed.
         {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} ->
-            GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup]
-                    || DocGroup <- GroupedDocs],
+            GroupedDocs2 = maybe_tag_grouped_docs(Client, GroupedDocs),
             GroupedDocsAcc2 =
                 merge_updates(GroupedDocsAcc, GroupedDocs2),
             collect_updates(GroupedDocsAcc2, [Client | ClientsAcc],
@@ -597,9 +610,16 @@ flush_trees(#db{fd = Fd} = Db,
     flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]).
 
 
-send_result(Client, Id, OriginalRevs, NewResult) ->
+send_result(Client, Doc, NewResult) ->
     % used to send a result to the client
-    catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}).
+    catch(Client ! {result, self(), {doc_tag(Doc), NewResult}}).
+
+doc_tag(#doc{meta=Meta}) ->
+    case lists:keyfind(ref, 1, Meta) of
+        {ref, Ref} -> Ref;
+        false -> throw(no_doc_tag);
+        Else -> throw({invalid_doc_tag, Else})
+    end.
 
 merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
     {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
@@ -613,7 +633,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
                 case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc),
                     Limit) of
                 {_NewTree, conflicts} when (not OldDeleted) ->
-                    send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+                    send_result(Client, NewDoc, conflict),
                     {AccTree, OldDeleted};
                 {NewTree, conflicts} when PrevRevs /= [] ->
                     % Check to be sure if prev revision was specified, it's
@@ -625,7 +645,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
                     if IsPrevLeaf ->
                         {NewTree, OldDeleted};
                     true ->
-                        send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+                        send_result(Client, NewDoc, conflict),
                         {AccTree, OldDeleted}
                     end;
                 {NewTree, no_conflicts} when  AccTree == NewTree ->
@@ -644,11 +664,11 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
                         {NewTree2, _} = couch_key_tree:merge(AccTree,
                                 couch_doc:to_path(NewDoc2), Limit),
                         % we changed the rev id, this tells the caller we did
-                        send_result(Client, Id, {Pos-1,PrevRevs},
-                            {ok, {OldPos + 1, NewRevId}}),
+                        send_result(Client, NewDoc,
+                                {ok, {OldPos + 1, NewRevId}}),
                         {NewTree2, OldDeleted};
                     true ->
-                        send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+                        send_result(Client, NewDoc, conflict),
                         {AccTree, OldDeleted}
                     end;
                 {NewTree, _} ->
@@ -754,7 +774,13 @@ update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
     Ids = [Id || {_Client, #doc{id=Id}} <- Docs],
     OldDocLookups = couch_btree:lookup(Btree, Ids),
     BtreeEntries = lists:zipwith(
-        fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, _OldDocLookup) ->
+        fun({Client, NewDoc}, _OldDocLookup) ->
+            #doc{
+                id = Id,
+                deleted = Delete,
+                revs = {0, PrevRevs},
+                body = Body
+            } = NewDoc,
             case PrevRevs of
             [RevStr|_] ->
                 PrevRev = list_to_integer(?b2l(RevStr));
@@ -771,11 +797,11 @@ update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
             % true ->
                 case Delete of
                     false ->
-                        send_result(Client, Id, {0, PrevRevs}, {ok,
+                        send_result(Client, NewDoc, {ok,
                                 {0, ?l2b(integer_to_list(PrevRev + 1))}}),
                         {update, {Id, {PrevRev + 1, Body}}};
                     true  ->
-                        send_result(Client, Id, {0, PrevRevs},
+                        send_result(Client, NewDoc,
                                 {ok, {0, <<"0">>}}),
                         {remove, Id}
                 end%;

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/46607e03/src/couch_doc.erl
----------------------------------------------------------------------
diff --git a/src/couch_doc.erl b/src/couch_doc.erl
index 05202f4..14f6a4f 100644
--- a/src/couch_doc.erl
+++ b/src/couch_doc.erl
@@ -70,7 +70,7 @@ revs_to_strs([{Pos, RevId}| Rest]) ->
     [rev_to_str({Pos, RevId}) | revs_to_strs(Rest)].
 
 to_json_meta(Meta) ->
-    lists:map(
+    lists:flatmap(
         fun({revs_info, Start, RevsInfo}) ->
             {JsonRevsInfo, _Pos}  = lists:mapfoldl(
                 fun({RevId, Status}, PosAcc) ->
@@ -78,13 +78,15 @@ to_json_meta(Meta) ->
                         {<<"status">>, ?l2b(atom_to_list(Status))}]},
                     {JsonObj, PosAcc - 1}
                 end, Start, RevsInfo),
-            {<<"_revs_info">>, JsonRevsInfo};
+            [{<<"_revs_info">>, JsonRevsInfo}];
         ({local_seq, Seq}) ->
-            {<<"_local_seq">>, Seq};
+            [{<<"_local_seq">>, Seq}];
         ({conflicts, Conflicts}) ->
-            {<<"_conflicts">>, revs_to_strs(Conflicts)};
+            [{<<"_conflicts">>, revs_to_strs(Conflicts)}];
         ({deleted_conflicts, DConflicts}) ->
-            {<<"_deleted_conflicts">>, revs_to_strs(DConflicts)}
+            [{<<"_deleted_conflicts">>, revs_to_strs(DConflicts)}];
+        (_) ->
+            []
         end, Meta).
 
 to_json_attachments(Attachments, Options) ->