You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2012/01/25 08:20:54 UTC
[3/4] git commit: Replace _local doc sequence with revision trees
Replace _local doc sequence with revision trees
There are a number of cases that _local docs might need to be merged
between nodes. One motivating case is for clusters that might wish to
move _local docs across nodes to maintain replication checkpoints with
external CouchDB instances. The previous _local docs strategy of a
single linear sequence breaks down in this situation.
This new behavior should be indistinguishable from the previous behavior
assuming clients did not try and introspect the _rev value for _local
documents. It should be impossible for normal HTTP clients to introduce
different behavior than previously existed because there's no support
for non-linear updates at the HTTP level. This update is merely and
internal refactoring for special cases like clusters.
Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/eb4138f1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/eb4138f1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/eb4138f1
Branch: refs/heads/new-security-object
Commit: eb4138f196f15364e293128cacc2c5011bb28b69
Parents: 0ab5ebd
Author: Paul Joseph Davis <da...@apache.org>
Authored: Thu Jan 19 18:10:05 2012 -0600
Committer: Paul Joseph Davis <da...@apache.org>
Committed: Wed Jan 25 01:14:07 2012 -0600
----------------------------------------------------------------------
src/couch_replicator/src/couch_replicator.erl | 9 +-
src/couchdb/couch_db.erl | 21 +++-
src/couchdb/couch_db_updater.erl | 116 ++++++++++----------
3 files changed, 81 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb/blob/eb4138f1/src/couch_replicator/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 1f7c08a..53acfd5 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -580,15 +580,18 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
fold_replication_logs(Dbs, Vsn - 1,
?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
{error, <<"not_found">>} ->
+ Doc0 = #doc{id = NewId},
+ Doc1 = Doc0#doc{revs = {1, [couch_db:new_revid(Doc0)]}},
fold_replication_logs(
- Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc1 | Acc]);
{ok, Doc} when LogId =:= NewId ->
fold_replication_logs(
Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
{ok, Doc} ->
- MigratedLog = #doc{id = NewId, body = Doc#doc.body},
+ Log0 = #doc{id = NewId, body = Doc#doc.body},
+ Log1 = Log0#doc{revs = {1, [couch_db:new_revid(Log0)]}},
fold_replication_logs(
- Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Log1 | Acc])
end.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/eb4138f1/src/couchdb/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index ae21bfa..a6903c4 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -717,16 +717,18 @@ update_docs(Db, Docs, Options, interactive_edit) ->
% associate reference with each doc in order to track duplicates
Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end,Docs),
- {Docs3, NonRepDocs} = lists:foldl(
+ {Docs3, NonRepDocs0} = lists:foldl(
fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) ->
case Id of
<<?LOCAL_DOC_PREFIX, _/binary>> ->
- {DocsAcc, [Doc | NonRepDocsAcc]};
+ {DocsAcc, [[Doc] | NonRepDocsAcc]};
Id->
{[Doc | DocsAcc], NonRepDocsAcc}
end
end, {[], []}, Docs2),
+ {NonRepDocs, _} = new_revs(NonRepDocs0, [], []),
+
DocBuckets = before_docs_update(Db, group_alike_docs(Docs3)),
case (Db#db.validate_doc_funs /= []) orelse
@@ -826,8 +828,9 @@ collect_results(UpdatePid, MRef, ResultsAcc) ->
end.
write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1,
- NonRepDocs, Options0) ->
+ NonRepDocs1, Options0) ->
DocBuckets = prepare_doc_summaries(Db, DocBuckets1),
+ NonRepDocs = prepare_doc_summaries(Db, NonRepDocs1),
Options = set_commit_option(Options0),
MergeConflicts = lists:member(merge_conflicts, Options),
FullCommit = lists:member(full_commit, Options),
@@ -1182,9 +1185,15 @@ open_doc_revs_int(Db, IdRevs, Options) ->
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
case couch_btree:lookup(local_btree(Db), [Id]) of
- [{ok, {_, {Rev, BodyData}}}] ->
- Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData},
- apply_open_options({ok, Doc}, Options);
+ [{ok, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo}] ->
+ #doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} =
+ DocInfo = couch_doc:to_doc_info(FullDocInfo),
+ {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
+ Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath),
+ apply_open_options(
+ {ok, Doc#doc{
+ meta=doc_meta_info(DocInfo, RevTree, Options)
+ }}, Options);
[not_found] ->
{not_found, missing}
end;
http://git-wip-us.apache.org/repos/asf/couchdb/blob/eb4138f1/src/couchdb/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index 0bfe951..862c48a 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -183,12 +183,8 @@ handle_call({compact_done, CompactFilepath}, _From, #db{filepath=Filepath}=Db) -
case Db#db.update_seq == NewSeq of
true ->
% suck up all the local docs into memory and write them to the new db
- {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
- fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs),
-
- NewDb2 = commit_data(NewDb#db{
- local_docs_btree = NewLocalBtree,
+ NewDb1 = copy_local_docs(Db, NewDb),
+ NewDb2 = commit_data(NewDb1#db{
main_pid = Db#db.main_pid,
filepath = Filepath,
instance_start_time = Db#db.instance_start_time,
@@ -449,7 +445,11 @@ init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
{reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end},
{compression, Compression}]),
{ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd,
- [{compression, Compression}]),
+ [
+ {split, fun(X) -> btree_by_id_split(X) end},
+ {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
+ {compression, Compression}
+ ]),
case Header#db_header.security_ptr of
nil ->
Security = [],
@@ -680,45 +680,34 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
update_local_docs(Db, []) ->
{ok, Db};
update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
- Ids = [Id || {_Client, {#doc{id=Id}, _Ref}} <- Docs],
- OldDocLookups = couch_btree:lookup(Btree, Ids),
- BtreeEntries = lists:zipwith(
- fun({Client, {#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, Ref}}, OldDocLookup) ->
- case PrevRevs of
- [RevStr|_] ->
- PrevRev = list_to_integer(?b2l(RevStr));
- [] ->
- PrevRev = 0
- end,
- OldRev =
- case OldDocLookup of
- {ok, {_, {OldRev0, _}}} -> OldRev0;
- not_found -> 0
- end,
- case OldRev == PrevRev of
- true ->
- case Delete of
- false ->
- send_result(Client, Ref, {ok,
- {0, ?l2b(integer_to_list(PrevRev + 1))}}),
- {update, {Id, {PrevRev + 1, Body}}};
- true ->
- send_result(Client, Ref,
- {ok, {0, <<"0">>}}),
- {remove, Id}
- end;
- false ->
+ Options = [{revs_limit, Db#db.revs_limit}],
+ ZipFun = fun
+ (_Id, {ok, FullDocInfo}) -> FullDocInfo;
+ (Id, not_found) -> #full_doc_info{id=Id}
+ end,
+ FoldFun = fun({OldInfo, {Client, [{NewDoc, Ref}]}}, Acc) ->
+ case couch_doc:merge(OldInfo, NewDoc, Options) of
+ {ok, NewInfo} ->
+ #doc_info{
+ revs=[#rev_info{rev={NewPos, NewRev}} | _]
+ } = couch_doc:to_doc_info(NewInfo),
+ send_result(Client, Ref, {ok, {NewPos, NewRev}}),
+ [NewInfo | Acc];
+ {ok, NewInfo, NewRev} ->
+ send_result(Client, Ref, {ok, NewRev}),
+ [NewInfo | Acc];
+ conflict ->
send_result(Client, Ref, conflict),
- ignore
- end
- end, Docs, OldDocLookups),
-
- BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
- BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries],
-
- {ok, Btree2} =
- couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
-
+ Acc
+ end
+ end,
+ Ids = [Id || {_Client, [{#doc{id=Id}, _Ref}]} <- Docs],
+ OldInfos0 = couch_btree:lookup(Btree, Ids),
+ OldInfos = lists:zipwith(ZipFun, Ids, OldInfos0),
+ Pairs = lists:zip(OldInfos, Docs),
+ NewInfos = lists:foldl(FoldFun, [], Pairs),
+ {ok, FlushedInfos} = flush_trees(Db, NewInfos, []),
+ {ok, Btree2} = couch_btree:add(Btree, FlushedInfos),
{ok, Db#db{local_docs_btree = Btree2}}.
@@ -852,6 +841,31 @@ copy_docs(Db, #db{updater_fd = DestFd} = NewDb, InfoBySeq0, Retry) ->
docinfo_by_seq_btree=DocInfoBTree}.
+copy_local_docs(Db, #db{revs_limit=Limit, updater_fd = DestFd}=NewDb) ->
+ FoldFun = fun(#full_doc_info{rev_tree=RevTree}=Info, Acc) ->
+ NewRevTree0 = couch_key_tree:map(fun
+ (_, _, branch) ->
+ ?REV_MISSING;
+ (_Rev, LeafVal, leaf) ->
+ IsDel = element(1, LeafVal),
+ Sp = element(2, LeafVal),
+ Seq = element(3, LeafVal),
+ {_Body, AttsInfo} = Summary = copy_doc_attachments(
+ Db, Sp, DestFd),
+ SummaryChunk = make_doc_summary(NewDb, Summary),
+ {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
+ DestFd, SummaryChunk),
+ TotalLeafSize = lists:foldl(
+ fun({_, _, _, AttLen, _, _, _, _}, S) -> S + AttLen end,
+ SummarySize, AttsInfo),
+ {IsDel, Pos, Seq, TotalLeafSize}
+ end, RevTree),
+ NewRevTree = couch_key_tree:stem(NewRevTree0, Limit),
+ {ok, [Info#full_doc_info{rev_tree=NewRevTree} | Acc]}
+ end,
+ {ok, _, NewInfos} = couch_btree:foldl(Db#db.local_docs_btree, FoldFun, []),
+ {ok, LocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, NewInfos),
+ NewDb#db{local_docs_btree=LocalBtree}.
copy_compact(Db, NewDb0, Retry) ->
FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
@@ -911,17 +925,7 @@ copy_compact(Db, NewDb0, Retry) ->
NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
TotalChanges = couch_task_status:get(changes_done),
- % copy misc header values
- if NewDb3#db.security /= Db#db.security ->
- {ok, Ptr, _} = couch_file:append_term(
- NewDb3#db.updater_fd, Db#db.security,
- [{compression, NewDb3#db.compression}]),
- NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
- true ->
- NewDb4 = NewDb3
- end,
-
- commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
+ commit_data(NewDb3#db{update_seq=Db#db.update_seq}).
start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) ->
CompactFile = Filepath ++ ".compact",