You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/09/11 20:56:49 UTC
[couchdb] 01/06: Reorder compaction functions
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch compactor-optimize-emsort
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit a7a66c3b74a932adc5f71e4bc5d046928be61389
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Aug 16 11:23:07 2017 -0500
Reorder compaction functions
Strictly copy/paste moving of code to make a more logical ordering of
start to finish reading top down.
---
src/couch/src/couch_db_updater.erl | 373 +++++++++++++++++++------------------
1 file changed, 188 insertions(+), 185 deletions(-)
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 78e0b8c..f786048 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -1014,40 +1014,6 @@ sync_header(Db, NewHeader) ->
waiting_delayed_commit=nil
}.
-copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
- {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
- BinInfos = case BinInfos0 of
- _ when is_binary(BinInfos0) ->
- couch_compress:decompress(BinInfos0);
- _ when is_list(BinInfos0) ->
- % pre 1.2 file format
- BinInfos0
- end,
- % copy the bin values
- NewBinInfos = lists:map(
- fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
- % 010 UPGRADE CODE
- {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- check_md5(ExpectedMd5, ActualMd5),
- {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
- ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
- {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- check_md5(ExpectedMd5, ActualMd5),
- Enc = case Enc1 of
- true ->
- % 0110 UPGRADE CODE
- gzip;
- false ->
- % 0110 UPGRADE CODE
- identity;
- _ ->
- Enc1
- end,
- {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
- end, BinInfos),
- {BodyData, NewBinInfos}.
merge_lookups(Infos, []) ->
Infos;
@@ -1065,157 +1031,6 @@ merge_lookups([FDI | RestInfos], Lookups) ->
check_md5(Md5, Md5) -> ok;
check_md5(_, _) -> throw(md5_mismatch).
-copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
- DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
- LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
- % COUCHDB-968, make sure we prune duplicates during compaction
- NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
- A =< B
- end, merge_lookups(MixedInfos, LookupResults)),
-
- NewInfos1 = lists:map(fun(Info) ->
- {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun
- (_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) ->
- {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd),
- % In the future, we should figure out how to do this for
- % upgrade purposes.
- EJsonBody = case is_binary(Body) of
- true ->
- couch_compress:decompress(Body);
- false ->
- Body
- end,
- SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}),
- ExternalSize = ?term_size(EJsonBody),
- {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
- DestFd, SummaryChunk),
- AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
- NewLeaf = Leaf#leaf{
- ptr = Pos,
- sizes = #size_info{
- active = SummarySize,
- external = ExternalSize
- },
- atts = AttSizes
- },
- {NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)};
- (_Rev, _Leaf, branch, SizesAcc) ->
- {?REV_MISSING, SizesAcc}
- end, {0, 0, []}, Info#full_doc_info.rev_tree),
- {FinalAS, FinalES, FinalAtts} = FinalAcc,
- TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
- NewActiveSize = FinalAS + TotalAttSize,
- NewExternalSize = FinalES + TotalAttSize,
- Info#full_doc_info{
- rev_tree = NewRevTree,
- sizes = #size_info{
- active = NewActiveSize,
- external = NewExternalSize
- }
- }
- end, NewInfos0),
-
- NewInfos = stem_full_doc_infos(Db, NewInfos1),
- RemoveSeqs =
- case Retry of
- nil ->
- [];
- OldDocIdTree ->
- % Compaction is being rerun to catch up to writes during the
- % first pass. This means we may have docs that already exist
- % in the seq_tree in the .data file. Here we lookup any old
- % update_seqs so that they can be removed.
- Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
- Existing = couch_btree:lookup(OldDocIdTree, Ids),
- [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
- end,
-
- {ok, SeqTree} = couch_btree:add_remove(
- NewDb#db.seq_tree, NewInfos, RemoveSeqs),
-
- FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
- {{Id, Seq}, FDI}
- end, NewInfos),
- {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
- update_compact_task(length(NewInfos)),
- NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
-
-
-copy_compact(Db, NewDb0, Retry) ->
- Compression = couch_compress:get_compression_method(),
- NewDb = NewDb0#db{compression=Compression},
- TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
- BufferSize = list_to_integer(
- config:get("database_compaction", "doc_buffer_size", "524288")),
- CheckpointAfter = couch_util:to_integer(
- config:get("database_compaction", "checkpoint_after",
- BufferSize * 10)),
-
- EnumBySeqFun =
- fun(DocInfo, _Offset,
- {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
-
- Seq = case DocInfo of
- #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
- #doc_info{} -> DocInfo#doc_info.high_seq
- end,
-
- AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
- if AccUncopiedSize2 >= BufferSize ->
- NewDb2 = copy_docs(
- Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
- AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
- if AccCopiedSize2 >= CheckpointAfter ->
- CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
- {ok, {CommNewDb2, [], 0, 0}};
- true ->
- {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
- end;
- true ->
- {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
- AccCopiedSize}}
- end
- end,
-
- TaskProps0 = [
- {type, database_compaction},
- {database, Db#db.name},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ],
- case (Retry =/= nil) and couch_task_status:is_task_added() of
- true ->
- couch_task_status:update([
- {retry, true},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ]);
- false ->
- couch_task_status:add_task(TaskProps0),
- couch_task_status:set_update_frequency(500)
- end,
-
- {ok, _, {NewDb2, Uncopied, _, _}} =
- couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
- {NewDb, [], 0, 0},
- [{start_key, NewDb#db.update_seq + 1}]),
-
- NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
-
- % copy misc header values
- if NewDb3#db.security /= Db#db.security ->
- {ok, Ptr, _} = couch_file:append_term(
- NewDb3#db.fd, Db#db.security,
- [{compression, NewDb3#db.compression}]),
- NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
- true ->
- NewDb4 = NewDb3
- end,
-
- commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
-
start_copy_compact(#db{}=Db) ->
erlang:put(io_priority, {db_compact, Db#db.name}),
@@ -1307,6 +1122,194 @@ copy_purge_info(OldDb, NewDb) ->
end.
+copy_compact(Db, NewDb0, Retry) ->
+ Compression = couch_compress:get_compression_method(),
+ NewDb = NewDb0#db{compression=Compression},
+ TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
+ BufferSize = list_to_integer(
+ config:get("database_compaction", "doc_buffer_size", "524288")),
+ CheckpointAfter = couch_util:to_integer(
+ config:get("database_compaction", "checkpoint_after",
+ BufferSize * 10)),
+
+ EnumBySeqFun =
+ fun(DocInfo, _Offset,
+ {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
+
+ Seq = case DocInfo of
+ #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
+ #doc_info{} -> DocInfo#doc_info.high_seq
+ end,
+
+ AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
+ if AccUncopiedSize2 >= BufferSize ->
+ NewDb2 = copy_docs(
+ Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
+ AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
+ if AccCopiedSize2 >= CheckpointAfter ->
+ CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
+ {ok, {CommNewDb2, [], 0, 0}};
+ true ->
+ {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
+ end;
+ true ->
+ {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
+ AccCopiedSize}}
+ end
+ end,
+
+ TaskProps0 = [
+ {type, database_compaction},
+ {database, Db#db.name},
+ {progress, 0},
+ {changes_done, 0},
+ {total_changes, TotalChanges}
+ ],
+ case (Retry =/= nil) and couch_task_status:is_task_added() of
+ true ->
+ couch_task_status:update([
+ {retry, true},
+ {progress, 0},
+ {changes_done, 0},
+ {total_changes, TotalChanges}
+ ]);
+ false ->
+ couch_task_status:add_task(TaskProps0),
+ couch_task_status:set_update_frequency(500)
+ end,
+
+ {ok, _, {NewDb2, Uncopied, _, _}} =
+ couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
+ {NewDb, [], 0, 0},
+ [{start_key, NewDb#db.update_seq + 1}]),
+
+ NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
+
+ % copy misc header values
+ if NewDb3#db.security /= Db#db.security ->
+ {ok, Ptr, _} = couch_file:append_term(
+ NewDb3#db.fd, Db#db.security,
+ [{compression, NewDb3#db.compression}]),
+ NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
+ true ->
+ NewDb4 = NewDb3
+ end,
+
+ commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
+
+
+copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
+ DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
+ LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
+ % COUCHDB-968, make sure we prune duplicates during compaction
+ NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
+ A =< B
+ end, merge_lookups(MixedInfos, LookupResults)),
+
+ NewInfos1 = lists:map(fun(Info) ->
+ {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun
+ (_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) ->
+ {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd),
+ % In the future, we should figure out how to do this for
+ % upgrade purposes.
+ EJsonBody = case is_binary(Body) of
+ true ->
+ couch_compress:decompress(Body);
+ false ->
+ Body
+ end,
+ SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}),
+ ExternalSize = ?term_size(EJsonBody),
+ {ok, Pos, SummarySize} = couch_file:append_raw_chunk(
+ DestFd, SummaryChunk),
+ AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
+ NewLeaf = Leaf#leaf{
+ ptr = Pos,
+ sizes = #size_info{
+ active = SummarySize,
+ external = ExternalSize
+ },
+ atts = AttSizes
+ },
+ {NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)};
+ (_Rev, _Leaf, branch, SizesAcc) ->
+ {?REV_MISSING, SizesAcc}
+ end, {0, 0, []}, Info#full_doc_info.rev_tree),
+ {FinalAS, FinalES, FinalAtts} = FinalAcc,
+ TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
+ NewActiveSize = FinalAS + TotalAttSize,
+ NewExternalSize = FinalES + TotalAttSize,
+ Info#full_doc_info{
+ rev_tree = NewRevTree,
+ sizes = #size_info{
+ active = NewActiveSize,
+ external = NewExternalSize
+ }
+ }
+ end, NewInfos0),
+
+ NewInfos = stem_full_doc_infos(Db, NewInfos1),
+ RemoveSeqs =
+ case Retry of
+ nil ->
+ [];
+ OldDocIdTree ->
+ % Compaction is being rerun to catch up to writes during the
+ % first pass. This means we may have docs that already exist
+ % in the seq_tree in the .data file. Here we lookup any old
+ % update_seqs so that they can be removed.
+ Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
+ Existing = couch_btree:lookup(OldDocIdTree, Ids),
+ [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
+ end,
+
+ {ok, SeqTree} = couch_btree:add_remove(
+ NewDb#db.seq_tree, NewInfos, RemoveSeqs),
+
+ FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
+ {{Id, Seq}, FDI}
+ end, NewInfos),
+ {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
+ update_compact_task(length(NewInfos)),
+ NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
+
+
+copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
+ {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
+ BinInfos = case BinInfos0 of
+ _ when is_binary(BinInfos0) ->
+ couch_compress:decompress(BinInfos0);
+ _ when is_list(BinInfos0) ->
+ % pre 1.2 file format
+ BinInfos0
+ end,
+ % copy the bin values
+ NewBinInfos = lists:map(
+ fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
+ % 010 UPGRADE CODE
+ {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} =
+ couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ check_md5(ExpectedMd5, ActualMd5),
+ {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
+ ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
+ {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} =
+ couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ check_md5(ExpectedMd5, ActualMd5),
+ Enc = case Enc1 of
+ true ->
+ % 0110 UPGRADE CODE
+ gzip;
+ false ->
+ % 0110 UPGRADE CODE
+ identity;
+ _ ->
+ Enc1
+ end,
+ {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
+ end, BinInfos),
+ {BodyData, NewBinInfos}.
+
+
commit_compaction_data(#db{}=Db) ->
% Compaction needs to write headers to both the data file
% and the meta file so if we need to restart we can pick
--
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.