You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by bi...@apache.org on 2011/11/28 13:36:33 UTC
[1/3] git commit: Add references to docs to prevent dups from being
collapsed
Updated Branches:
refs/heads/1.2.x 481b961df -> 6b530aa01
Add references to docs to prevent dups from being collapsed
Bulk docs requests may have duplicates or multiple docs with the same id.
These were being collapsed in a dictionary as messages are passed from
merge_rev_trees in couch_db_updater to collect_results in couch_db.
Attaching a reference allows each to be processed correctly.
Jira-911
Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/6b530aa0
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/6b530aa0
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/6b530aa0
Branch: refs/heads/1.2.x
Commit: 6b530aa018fbeecef56167b80dc129c9e2860426
Parents: 850132e
Author: Bob Dionne <bi...@apache.org>
Authored: Tue Oct 11 16:04:09 2011 -0400
Committer: Bob Dionne <bi...@apache.org>
Committed: Mon Nov 28 07:19:18 2011 -0500
----------------------------------------------------------------------
src/couchdb/couch_db.erl | 118 +++++++++++++++++----------------
src/couchdb/couch_db_updater.erl | 31 ++++-----
2 files changed, 77 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb/blob/6b530aa0/src/couchdb/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 668bc7b..b249ab4 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -436,22 +436,22 @@ update_docs(Db, Docs) ->
% group_alike_docs groups the sorted documents into sublist buckets, by id.
% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
group_alike_docs(Docs) ->
- Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs),
+ Sorted = lists:sort(fun({#doc{id=A},_},{#doc{id=B},_})-> A < B end, Docs),
group_alike_docs(Sorted, []).
group_alike_docs([], Buckets) ->
- lists:reverse(Buckets);
+ lists:reverse(lists:map(fun lists:reverse/1, Buckets));
group_alike_docs([Doc|Rest], []) ->
group_alike_docs(Rest, [[Doc]]);
-group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
- [#doc{id=BucketId}|_] = Bucket,
+group_alike_docs([{Doc,Ref}|Rest], [Bucket|RestBuckets]) ->
+ [{#doc{id=BucketId},_Ref}|_] = Bucket,
case Doc#doc.id == BucketId of
true ->
% add to existing bucket
- group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]);
+ group_alike_docs(Rest, [[{Doc,Ref}|Bucket]|RestBuckets]);
false ->
% add to new bucket
- group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
+ group_alike_docs(Rest, [[{Doc,Ref}]|[Bucket|RestBuckets]])
end.
validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
@@ -514,10 +514,8 @@ prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
{AccPrepped, AccFatalErrors};
prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
- [#doc{id=Id}|_]=DocBucket,
- % no existing revs are known,
{PreppedBucket, AccErrors3} = lists:foldl(
- fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
+ fun({#doc{revs=Revs}=Doc,Ref}, {AccBucket, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
@@ -527,19 +525,19 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
{0, []} ->
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
- {[Doc | AccBucket], AccErrors2};
+ {[{Doc, Ref} | AccBucket], AccErrors2};
Error ->
- {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]}
+ {AccBucket, [{Ref, Error} | AccErrors2]}
end;
_ ->
% old revs specified but none exist, a conflict
- {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]}
+ {AccBucket, [{Ref, conflict} | AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
- [PreppedBucket | AccPrepped], AccErrors3);
+ [lists:reverse(PreppedBucket) | AccPrepped], AccErrors3);
prep_and_validate_updates(Db, [DocBucket|RestBuckets],
[{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
@@ -553,14 +551,14 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets],
{LeafVal, {Start, [RevId | _]} = Revs} <- Leafs
]),
{PreppedBucket, AccErrors3} = lists:foldl(
- fun(Doc, {Docs2Acc, AccErrors2}) ->
+ fun({Doc, Ref}, {Docs2Acc, AccErrors2}) ->
case prep_and_validate_update(Db, Doc, OldFullDocInfo,
LeafRevsDict, AllowConflict) of
{ok, Doc2} ->
- {[Doc2 | Docs2Acc], AccErrors2};
- {Error, #doc{id=Id,revs=Revs}} ->
+ {[{Doc2, Ref} | Docs2Acc], AccErrors2};
+ {Error, #doc{}} ->
% Record the error
- {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]}
+ {Docs2Acc, [{Ref, Error} |AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
@@ -580,7 +578,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
case OldInfo of
not_found ->
{ValidatedBucket, AccErrors3} = lists:foldl(
- fun(Doc, {AccPrepped2, AccErrors2}) ->
+ fun({Doc, Ref}, {AccPrepped2, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
@@ -588,7 +586,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
end,
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
- {[Doc | AccPrepped2], AccErrors2};
+ {[{Doc, Ref} | AccPrepped2], AccErrors2};
Error ->
{AccPrepped2, [{Doc, Error} | AccErrors2]}
end
@@ -597,7 +595,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
{ok, #full_doc_info{rev_tree=OldTree}} ->
NewRevTree = lists:foldl(
- fun(NewDoc, AccTree) ->
+ fun({NewDoc, _Ref}, AccTree) ->
{NewTree, _} = couch_key_tree:merge(AccTree,
couch_doc:to_path(NewDoc), Db#db.revs_limit),
NewTree
@@ -607,7 +605,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
{ValidatedBucket, AccErrors3} =
lists:foldl(
- fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) ->
+ fun({#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, Ref}, {AccValidated, AccErrors2}) ->
case dict:find({Pos, RevId}, LeafRevsFullDict) of
{ok, {Start, Path}} ->
% our unflushed doc is a leaf node. Go back on the path
@@ -629,7 +627,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
case validate_doc_update(Db, Doc2, GetDiskDocFun) of
ok ->
- {[Doc2 | AccValidated], AccErrors2};
+ {[{Doc2, Ref} | AccValidated], AccErrors2};
Error ->
{AccValidated, [{Doc, Error} | AccErrors2]}
end;
@@ -661,10 +659,10 @@ new_revs([], OutBuckets, IdRevsAcc) ->
{lists:reverse(OutBuckets), IdRevsAcc};
new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
{NewBucket, IdRevsAcc3} = lists:mapfoldl(
- fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)->
+ fun({#doc{revs={Start, RevIds}}=Doc, Ref}, IdRevsAcc2)->
NewRevId = new_revid(Doc),
- {Doc#doc{revs={Start+1, [NewRevId | RevIds]}},
- [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
+ {{Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, Ref},
+ [{Ref, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
end, IdRevsAcc, Bucket),
new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
@@ -683,16 +681,17 @@ check_dup_atts2(_) ->
update_docs(Db, Docs, Options, replicated_changes) ->
increment_stat(Db, {couchdb, database_writes}),
- DocBuckets = group_alike_docs(Docs),
-
+ % associate reference with each doc in order to track duplicates
+ Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end, Docs),
+ DocBuckets = group_alike_docs(Docs2),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
- fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
- (#doc{atts=Atts}) ->
+ fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}, _Ref}) -> true;
+ ({#doc{atts=Atts}, _Ref}) ->
Atts /= []
- end, Docs) of
+ end, Docs2) of
true ->
- Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+ Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
ExistingDocs = get_full_doc_infos(Db, Ids),
{DocBuckets2, DocErrors} =
@@ -702,8 +701,8 @@ update_docs(Db, Docs, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.updater_fd)
- || Doc <- Bucket] || Bucket <- DocBuckets3],
+ DocBuckets4 = [[{doc_flush_atts(check_dup_atts(Doc), Db#db.updater_fd), Ref}
+ || {Doc, Ref} <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
@@ -712,28 +711,31 @@ update_docs(Db, Docs, Options, interactive_edit) ->
AllOrNothing = lists:member(all_or_nothing, Options),
% go ahead and generate the new revision ids for the documents.
% separate out the NonRep documents from the rest of the documents
- {Docs2, NonRepDocs} = lists:foldl(
- fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->
+
+ % associate reference with each doc in order to track duplicates
+ Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end,Docs),
+ {Docs3, NonRepDocs} = lists:foldl(
+ fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) ->
case Id of
<<?LOCAL_DOC_PREFIX, _/binary>> ->
{DocsAcc, [Doc | NonRepDocsAcc]};
Id->
{[Doc | DocsAcc], NonRepDocsAcc}
end
- end, {[], []}, Docs),
+ end, {[], []}, Docs2),
- DocBuckets = group_alike_docs(Docs2),
+ DocBuckets = group_alike_docs(Docs3),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
- fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
+ fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}, _Ref}) ->
true;
- (#doc{atts=Atts}) ->
+ ({#doc{atts=Atts}, _Ref}) ->
Atts /= []
- end, Docs2) of
+ end, Docs3) of
true ->
% lookup the doc by id and get the most recent
- Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+ Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
ExistingDocInfos = get_full_doc_infos(Db, Ids),
{DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
@@ -747,29 +749,33 @@ update_docs(Db, Docs, Options, interactive_edit) ->
end,
if (AllOrNothing) and (PreCommitFailures /= []) ->
- {aborted, lists:map(
- fun({{Id,{Pos, [RevId|_]}}, Error}) ->
- {{Id, {Pos, RevId}}, Error};
- ({{Id,{0, []}}, Error}) ->
- {{Id, {0, <<>>}}, Error}
- end, PreCommitFailures)};
+ {aborted,
+ lists:foldl(fun({#doc{id=Id,revs={Pos, RevIds}}, Ref},Acc) ->
+ case lists:keyfind(Ref,1,PreCommitFailures) of
+ {Ref, Error} ->
+ [{{Id,{Pos,RevIds}}, Error} | Acc];
+ false ->
+ Acc
+ end
+ end,[],Docs3)};
+
true ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
DocBuckets3 = [[
- doc_flush_atts(set_new_att_revpos(
- check_dup_atts(Doc)), Db#db.updater_fd)
- || Doc <- B] || B <- DocBuckets2],
+ {doc_flush_atts(set_new_att_revpos(
+ check_dup_atts(Doc)), Db#db.updater_fd), Ref}
+ || {Doc, Ref} <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
{ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
{ok, lists:map(
- fun(#doc{id=Id,revs={Pos, RevIds}}) ->
- {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict),
+ fun({#doc{}, Ref}) ->
+ {ok, Result} = dict:find(Ref, ResultsDict),
Result
- end, Docs)}
+ end, Docs2)}
end.
% Returns the first available document on disk. Input list is a full rev path
@@ -832,7 +838,7 @@ write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1,
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
DocBuckets2 = [
- [doc_flush_atts(Doc, Db2#db.updater_fd) || Doc <- Bucket] ||
+ [{doc_flush_atts(Doc, Db2#db.updater_fd), Ref} || {Doc, Ref} <- Bucket] ||
Bucket <- DocBuckets1
],
% We only retry once
@@ -851,7 +857,7 @@ write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1,
prepare_doc_summaries(Db, BucketList) ->
[lists:map(
- fun(#doc{body = Body, atts = Atts} = Doc) ->
+ fun({#doc{body = Body, atts = Atts} = Doc, Ref}) ->
DiskAtts = [{N, T, P, AL, DL, R, M, E} ||
#att{name = N, type = T, data = {_, P}, md5 = M, revpos = R,
att_len = AL, disk_len = DL, encoding = E} <- Atts],
@@ -862,7 +868,7 @@ prepare_doc_summaries(Db, BucketList) ->
nil
end,
SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}),
- Doc#doc{body = {summary, SummaryChunk, AttsFd}}
+ {Doc#doc{body = {summary, SummaryChunk, AttsFd}}, Ref}
end,
Bucket) || Bucket <- BucketList].
http://git-wip-us.apache.org/repos/asf/couchdb/blob/6b530aa0/src/couchdb/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index e50f482..933bad1 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -274,8 +274,8 @@ merge_updates([], RestB, AccOutGroups) ->
lists:reverse(AccOutGroups, RestB);
merge_updates(RestA, [], AccOutGroups) ->
lists:reverse(AccOutGroups, RestA);
-merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA],
- [[{_, #doc{id=IdB}}|_]=GroupB | RestB], AccOutGroups) ->
+merge_updates([[{_, {#doc{id=IdA}, _}}|_]=GroupA | RestA],
+ [[{_, {#doc{id=IdB}, _}}|_]=GroupB | RestB], AccOutGroups) ->
if IdA == IdB ->
merge_updates(RestA, RestB, [GroupA ++ GroupB | AccOutGroups]);
IdA < IdB ->
@@ -563,9 +563,9 @@ flush_trees(#db{updater_fd = Fd} = Db,
flush_trees(Db, RestUnflushed, [InfoFlushed | AccFlushed]).
-send_result(Client, Id, OriginalRevs, NewResult) ->
+send_result(Client, Ref, NewResult) ->
% used to send a result to the client
- catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}).
+ catch(Client ! {result, self(), {Ref, NewResult}}).
merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
{ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
@@ -574,12 +574,12 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
#full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
= OldDocInfo,
NewRevTree = lists:foldl(
- fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) ->
+ fun({Client, {#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, Ref}}, AccTree) ->
if not MergeConflicts ->
case couch_key_tree:merge(AccTree, couch_doc:to_path(NewDoc),
Limit) of
{_NewTree, conflicts} when (not OldDeleted) ->
- send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ send_result(Client, Ref, conflict),
AccTree;
{NewTree, conflicts} when PrevRevs /= [] ->
% Check to be sure if prev revision was specified, it's
@@ -591,7 +591,7 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
if IsPrevLeaf ->
NewTree;
true ->
- send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ send_result(Client, Ref, conflict),
AccTree
end;
{NewTree, no_conflicts} when AccTree == NewTree ->
@@ -610,11 +610,10 @@ merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
{NewTree2, _} = couch_key_tree:merge(AccTree,
couch_doc:to_path(NewDoc2), Limit),
% we changed the rev id, this tells the caller we did
- send_result(Client, Id, {Pos-1,PrevRevs},
- {ok, {OldPos + 1, NewRevId}}),
+ send_result(Client, Ref, {ok, {OldPos + 1, NewRevId}}),
NewTree2;
true ->
- send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ send_result(Client, Ref, conflict),
AccTree
end;
{NewTree, _} ->
@@ -672,7 +671,7 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
update_seq = LastSeq,
revs_limit = RevsLimit
} = Db,
- Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
+ Ids = [Id || [{_Client, {#doc{id=Id}, _Ref}}|_] <- DocsList],
% lookup up the old documents, if they exist.
OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
OldDocInfos = lists:zipwith(
@@ -720,10 +719,10 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
update_local_docs(Db, []) ->
{ok, Db};
update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
- Ids = [Id || {_Client, #doc{id=Id}} <- Docs],
+ Ids = [Id || {_Client, {#doc{id=Id}, _Ref}} <- Docs],
OldDocLookups = couch_btree:lookup(Btree, Ids),
BtreeEntries = lists:zipwith(
- fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, OldDocLookup) ->
+ fun({Client, {#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, Ref}}, OldDocLookup) ->
case PrevRevs of
[RevStr|_] ->
PrevRev = list_to_integer(?b2l(RevStr));
@@ -739,16 +738,16 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
true ->
case Delete of
false ->
- send_result(Client, Id, {0, PrevRevs}, {ok,
+ send_result(Client, Ref, {ok,
{0, ?l2b(integer_to_list(PrevRev + 1))}}),
{update, {Id, {PrevRev + 1, Body}}};
true ->
- send_result(Client, Id, {0, PrevRevs},
+ send_result(Client, Ref,
{ok, {0, <<"0">>}}),
{remove, Id}
end;
false ->
- send_result(Client, Id, {0, PrevRevs}, conflict),
+ send_result(Client, Ref, conflict),
ignore
end
end, Docs, OldDocLookups),