You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/07/02 18:02:57 UTC
[couchdb] 02/02: Implement _all_dbs/_all_docs API parameters
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch prototype/fdb-all-dbs-all-docs-qs-params
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit d7fef90b110c2c62de1ef3d413296adcc1ba7736
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Jun 19 11:58:47 2019 -0500
Implement _all_dbs/_all_docs API parameters
This adds the mapping of CouchDB start/end keys and so on to the similar
yet slightly different concepts in FoundationDB. The handlers for
`_all_dbs` and `_all_docs` have been udpated to use this new logic.
---
src/chttpd/src/chttpd_misc.erl | 70 +++++++++------
src/fabric/src/fabric2_db.erl | 123 ++++++++++++++++++++++++++-
src/fabric/src/fabric2_fdb.erl | 188 +++++++++++++++--------------------------
3 files changed, 232 insertions(+), 149 deletions(-)
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index b244e84..796ce71 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -108,39 +108,57 @@ maybe_add_csp_headers(Headers, _) ->
Headers.
handle_all_dbs_req(#httpd{method='GET'}=Req) ->
- % TODO: Support args and options properly, transform
- % this back into a fold call similar to the old
- % version.
- %% Args = couch_mrview_http:parse_params(Req, undefined),
+ #mrargs{
+ start_key = StartKey,
+ end_key = EndKey,
+ direction = Dir,
+ limit = Limit,
+ skip = Skip
+ } = couch_mrview_http:parse_params(Req, undefined),
+
+ Options = [
+ {start_key, StartKey},
+ {end_key, EndKey},
+ {dir, Dir},
+ {limit, Limit},
+ {skip, Skip}
+ ],
+
% Eventually the Etag for this request will be derived
% from the \xFFmetadataVersion key in fdb
Etag = <<"foo">>,
- %% Options = [{user_ctx, Req#httpd.user_ctx}],
+
{ok, Resp} = chttpd:etag_respond(Req, Etag, fun() ->
- AllDbs = fabric2_db:list_dbs(),
- chttpd:send_json(Req, AllDbs)
- end);
+ {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [{"ETag",Etag}]),
+ Callback = fun all_dbs_callback/2,
+ Acc = #vacc{req=Req,resp=Resp},
+ fabric2_db:list_dbs(Callback, Acc, Options)
+ end),
+ case is_record(Resp, vacc) of
+ true -> {ok, Resp#vacc.resp};
+ _ -> {ok, Resp}
+ end;
handle_all_dbs_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").
-%% all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) ->
-%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["),
-%% {ok, Acc#vacc{resp=Resp1}};
-%% all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) ->
-%% Prepend = couch_mrview_http:prepend_val(Acc),
-%% case couch_util:get_value(id, Row) of <<"_design", _/binary>> ->
-%% {ok, Acc};
-%% DbName ->
-%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]),
-%% {ok, Acc#vacc{prepend=",", resp=Resp1}}
-%% end;
-%% all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) ->
-%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"),
-%% {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
-%% {ok, Acc#vacc{resp=Resp2}};
-%% all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) ->
-%% {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason),
-%% {ok, Acc#vacc{resp=Resp1}}.
+all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) ->
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["),
+ {ok, Acc#vacc{resp=Resp1}};
+all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) ->
+ Prepend = couch_mrview_http:prepend_val(Acc),
+ case couch_util:get_value(id, Row) of <<"_design", _/binary>> ->
+ {ok, Acc};
+ DbName ->
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]),
+ {ok, Acc#vacc{prepend=",", resp=Resp1}}
+ end;
+all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) ->
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"),
+ {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+ {ok, Acc#vacc{resp=Resp2}};
+all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) ->
+ {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason),
+ {ok, Acc#vacc{resp=Resp1}}.
handle_dbs_info_req(#httpd{method='POST'}=Req) ->
chttpd:validate_ctype(Req, "application/json"),
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 80028a6..7ec1173 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -20,6 +20,7 @@
list_dbs/0,
list_dbs/1,
+ list_dbs/3,
is_admin/1,
check_is_admin/1,
@@ -194,8 +195,40 @@ list_dbs() ->
list_dbs(Options) ->
+ Skip = case fabric2_util:get_value(skip, Options, 0) of
+ N when is_integer(N), N >= 0 -> N
+ end,
+ Callback = fun
+ (DbName, {0, Acc}) -> {0, [DbName | Acc]};
+ (_DbName, {S, Acc}) -> {S - 1, Acc}
+ end,
fabric2_fdb:transactional(fun(Tx) ->
- fabric2_fdb:list_dbs(Tx, Options)
+ {_, DbNames} = fabric2_fdb:list_dbs(Tx, Callback, {Skip, []}, Options),
+ lists:reverse(DbNames)
+ end).
+
+
+list_dbs(UserFun, UserAcc0, Options) ->
+ Skip = case fabric2_util:get_value(skip, Options, 0) of
+ N when is_integer(N), N >= 0 -> N
+ end,
+ FoldFun = fun
+ (DbName, {0, Acc}) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc));
+ (_DbName, {S, Acc}) -> {S - 1, Acc}
+ end,
+ fabric2_fdb:transactional(fun(Tx) ->
+ try
+ UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
+ UserAcc2 = fabric2_fdb:list_dbs(
+ Tx,
+ FoldFun,
+ {Skip, UserAcc1},
+ Options
+ ),
+ {ok, maybe_stop(UserFun(complete, UserAcc2))}
+ catch throw:{stop, FinalUserAcc} ->
+ {ok, FinalUserAcc}
+ end
end).
@@ -615,9 +648,35 @@ fold_docs(Db, UserFun, UserAcc) ->
fold_docs(Db, UserFun, UserAcc, []).
-fold_docs(Db, UserFun, UserAcc, Options) ->
+fold_docs(Db, UserFun, UserAcc0, Options) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
- fabric2_fdb:fold_docs(TxDb, UserFun, UserAcc, Options)
+ try
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ Prefix = erlfdb_tuple:pack({?DB_ALL_DOCS}, DbPrefix),
+ DocCount = get_doc_count(TxDb),
+
+ UserAcc1 = maybe_stop(UserFun({meta, [
+ {total, DocCount},
+ {offset, null}
+ ]}, UserAcc0)),
+
+ UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
+ {DocId} = erlfdb_tuple:unpack(K, Prefix),
+ RevId = erlfdb_tuple:unpack(V),
+ maybe_stop(UserFun({row, [
+ {id, DocId},
+ {key, DocId},
+ {value, couch_doc:rev_to_str(RevId)}
+ ]}, Acc))
+ end, UserAcc1, Options),
+
+ {ok, maybe_stop(UserFun(complete, UserAcc2))}
+ catch throw:{stop, FinalUserAcc} ->
+ {ok, FinalUserAcc}
+ end
end).
@@ -627,7 +686,39 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc) ->
fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
- fabric2_fdb:fold_changes(TxDb, SinceSeq, UserFun, UserAcc, Options)
+ try
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ Prefix = erlfdb_tuple:pack({?DB_CHANGES}, DbPrefix),
+
+ StartKey = get_since_seq(SinceSeq),
+ EndKey = case fabric2_util:get_value(dir, Options, fwd) of
+ rev -> fabric2_util:seq_zero_vs();
+ _ -> fabric2_util:seq_max_vs()
+ end,
+ FoldOpts = [
+ {start_key, StartKey},
+ {end_key, EndKey}
+ ] ++ Options,
+
+ {ok, fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
+ {SeqVS} = erlfdb_tuple:unpack(K, Prefix),
+ {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
+
+ Change = #{
+ id => DocId,
+ sequence => fabric2_fdb:vs_to_seq(SeqVS),
+ rev_id => RevId,
+ deleted => Deleted
+ },
+
+ maybe_stop(UserFun(Change, Acc))
+ end, UserAcc, FoldOpts)}
+ catch throw:{stop, FinalUserAcc} ->
+ {ok, FinalUserAcc}
+ end
end).
@@ -1289,6 +1380,25 @@ check_duplicate_attachments(#doc{atts = Atts}) ->
end, ordsets:new(), Atts).
+get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
+ fabric2_util:seq_zero_vs();
+
+get_since_seq(Seq) when Seq == now; Seq == <<"now">> ->
+ fabric2_util:seq_max_vs();
+
+get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 ->
+ Seq1 = fabric2_util:from_hex(Seq),
+ Seq2 = <<51:8, Seq1/binary>>,
+ {SeqVS} = erlfdb_tuple:unpack(Seq2),
+ fabric2_fdb:next_vs(SeqVS);
+
+get_since_seq(List) when is_list(List) ->
+ get_since_seq(list_to_binary(List));
+
+get_since_seq(Seq) ->
+ erlang:error({invalid_since_seq, Seq}).
+
+
get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) ->
LeafPath;
get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) ->
@@ -1353,3 +1463,8 @@ rev(Rev) when is_list(Rev); is_binary(Rev) ->
rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
Rev.
+
+maybe_stop({ok, Acc}) ->
+ Acc;
+maybe_stop({stop, Acc}) ->
+ throw({stop, Acc}).
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 4b01826..5a4d9f9 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -24,7 +24,7 @@
delete/1,
exists/1,
- list_dbs/2,
+ list_dbs/4,
get_info/1,
get_config/1,
@@ -50,11 +50,12 @@
read_attachment/3,
write_attachment/3,
- fold_docs/4,
- fold_changes/5,
get_last_change/1,
+ fold_range/5,
+
vs_to_seq/1,
+ next_vs/1,
debug_cluster/0,
debug_cluster/2
@@ -254,16 +255,15 @@ exists(#{name := DbName} = Db) when is_binary(DbName) ->
end.
-list_dbs(Tx, _Options) ->
+list_dbs(Tx, Callback, AccIn, Options) ->
Root = erlfdb_directory:root(),
CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
LayerPrefix = erlfdb_directory:get_name(CouchDB),
- {Start, End} = erlfdb_tuple:range({?ALL_DBS}, LayerPrefix),
- Future = erlfdb:get_range(Tx, Start, End),
- lists:map(fun({K, _V}) ->
- {?ALL_DBS, DbName} = erlfdb_tuple:unpack(K, LayerPrefix),
- DbName
- end, erlfdb:wait(Future)).
+ Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix),
+ fold_range({tx, Tx}, Prefix, fun({K, _V}, Acc) ->
+ {DbName} = erlfdb_tuple:unpack(K, Prefix),
+ Callback(DbName, Acc)
+ end, AccIn, Options).
get_info(#{} = Db) ->
@@ -640,84 +640,6 @@ write_attachment(#{} = Db, DocId, Data) when is_binary(Data) ->
{ok, AttId}.
-fold_docs(#{} = Db, UserFun, UserAcc0, Options) ->
- #{
- tx := Tx,
- db_prefix := DbPrefix
- } = ensure_current(Db),
-
- {Reverse, Start, End} = get_dir_and_bounds(DbPrefix, Options),
-
- DocCountKey = erlfdb_tuple:pack({?DB_STATS, <<"doc_count">>}, DbPrefix),
- DocCountBin = erlfdb:wait(erlfdb:get(Tx, DocCountKey)),
-
- try
- UserAcc1 = maybe_stop(UserFun({meta, [
- {total, ?bin2uint(DocCountBin)},
- {offset, null}
- ]}, UserAcc0)),
-
- UserAcc2 = erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
- {?DB_ALL_DOCS, DocId} = erlfdb_tuple:unpack(K, DbPrefix),
- RevId = erlfdb_tuple:unpack(V),
- maybe_stop(UserFun({row, [
- {id, DocId},
- {key, DocId},
- {value, couch_doc:rev_to_str(RevId)}
- ]}, UserAccIn))
- end, UserAcc1, [{reverse, Reverse}] ++ Options),
-
- {ok, maybe_stop(UserFun(complete, UserAcc2))}
- catch throw:{stop, FinalUserAcc} ->
- {ok, FinalUserAcc}
- end.
-
-
-fold_changes(#{} = Db, SinceSeq0, UserFun, UserAcc0, Options) ->
- #{
- tx := Tx,
- db_prefix := DbPrefix
- } = ensure_current(Db),
-
- SinceSeq1 = get_since_seq(SinceSeq0),
-
- Reverse = case fabric2_util:get_value(dir, Options, fwd) of
- fwd -> false;
- rev -> true
- end,
-
- {Start0, End0} = case Reverse of
- false -> {SinceSeq1, fabric2_util:seq_max_vs()};
- true -> {fabric2_util:seq_zero_vs(), SinceSeq1}
- end,
-
- Start1 = erlfdb_tuple:pack({?DB_CHANGES, Start0}, DbPrefix),
- End1 = erlfdb_tuple:pack({?DB_CHANGES, End0}, DbPrefix),
-
- {Start, End} = case Reverse of
- false -> {erlfdb_key:first_greater_than(Start1), End1};
- true -> {Start1, erlfdb_key:first_greater_than(End1)}
- end,
-
- try
- {ok, erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
- {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
- {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
-
- Change = #{
- id => DocId,
- sequence => vs_to_seq(SeqVS),
- rev_id => RevId,
- deleted => Deleted
- },
-
- maybe_stop(UserFun(Change, UserAccIn))
- end, UserAcc0, [{reverse, Reverse}] ++ Options)}
- catch throw:{stop, FinalUserAcc} ->
- {ok, FinalUserAcc}
- end.
-
-
get_last_change(#{} = Db) ->
#{
tx := Tx,
@@ -735,10 +657,15 @@ get_last_change(#{} = Db) ->
end.
-maybe_stop({ok, Acc}) ->
- Acc;
-maybe_stop({stop, Acc}) ->
- throw({stop, Acc}).
+fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
+ #{
+ tx := Tx
+ } = ensure_current(Db),
+ fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
+
+fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options) ->
+ {Start, End, FoldOpts} = get_fold_opts(RangePrefix, Options),
+ erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts).
vs_to_seq(VS) ->
@@ -746,6 +673,21 @@ vs_to_seq(VS) ->
fabric2_util:to_hex(SeqBin).
+next_vs({versionstamp, VS, Batch, TxId}) ->
+ {V, B, T} = case TxId =< 65535 of
+ true ->
+ {VS, Batch, TxId + 1};
+ false ->
+ case Batch =< 65535 of
+ true ->
+ {VS, Batch + 1, 0};
+ false ->
+ {VS + 1, 0, 0}
+ end
+ end,
+ {versionstamp, V, B, T}.
+
+
debug_cluster() ->
debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
@@ -753,7 +695,7 @@ debug_cluster() ->
debug_cluster(Start, End) ->
transactional(fun(Tx) ->
lists:foreach(fun({Key, Val}) ->
- io:format("~s => ~s~n", [
+ io:format(standard_error, "~s => ~s~n", [
string:pad(erlfdb_util:repr(Key), 60),
erlfdb_util:repr(Val)
])
@@ -790,7 +732,7 @@ load_validate_doc_funs(#{} = Db) ->
{end_key, <<"_design0">>}
],
- {ok, Infos1} = fold_docs(Db, FoldFun, [], Options),
+ {ok, Infos1} = fabric2_db:fold_docs(Db, FoldFun, [], Options),
Infos2 = lists:map(fun(Info) ->
#{
@@ -999,11 +941,12 @@ chunkify_attachment(Data) ->
end.
-get_dir_and_bounds(DbPrefix, Options) ->
- Reverse = case fabric2_util:get_value(dir, Options, fwd) of
- fwd -> false;
- rev -> true
+get_fold_opts(RangePrefix, Options) ->
+ Reverse = case fabric2_util:get_value(dir, Options) of
+ rev -> true;
+ _ -> false
end,
+
StartKey0 = fabric2_util:get_value(start_key, Options),
EndKeyGt = fabric2_util:get_value(end_key_gt, Options),
EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt),
@@ -1019,17 +962,17 @@ get_dir_and_bounds(DbPrefix, Options) ->
% Set the maximum bounds for the start and endkey
StartKey2 = case StartKey1 of
- undefined -> {?DB_ALL_DOCS};
- SK2 when is_binary(SK2) -> {?DB_ALL_DOCS, SK2}
+ undefined -> <<>>;
+ SK2 -> SK2
end,
EndKey2 = case EndKey1 of
- undefined -> {?DB_ALL_DOCS, <<16#FF>>};
- EK2 when is_binary(EK2) -> {?DB_ALL_DOCS, EK2}
+ undefined -> <<255>>;
+ EK2 -> EK2
end,
- StartKey3 = erlfdb_tuple:pack(StartKey2, DbPrefix),
- EndKey3 = erlfdb_tuple:pack(EndKey2, DbPrefix),
+ StartKey3 = erlfdb_tuple:pack({StartKey2}, RangePrefix),
+ EndKey3 = erlfdb_tuple:pack({EndKey2}, RangePrefix),
% FoundationDB ranges are applied as SK <= key < EK
% By default, CouchDB is SK <= key <= EK with the
@@ -1056,26 +999,33 @@ get_dir_and_bounds(DbPrefix, Options) ->
EndKey3
end,
- {Reverse, StartKey4, EndKey4}.
-
+ Limit = case fabric2_util:get_value(limit, Options) of
+ L when is_integer(L), L >= 0 -> [{limit, L}];
+ undefined -> []
+ end,
-get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
- fabric2_util:seq_zero_vs();
+ TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
+ T when is_integer(T), T >= 0 -> [{target_bytes, T}];
+ undefined -> []
+ end,
-get_since_seq(Seq) when Seq == now; Seq == <<"now">> ->
- fabric2_util:seq_max_vs();
+ StreamingMode = case fabric2_util:get_value(streaming_mode, Options) of
+ undefined -> [];
+ Name when is_atom(Name) -> [{streaming_mode, Name}]
+ end,
-get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 ->
- Seq1 = fabric2_util:from_hex(Seq),
- Seq2 = <<51:8, Seq1/binary>>,
- {SeqVS} = erlfdb_tuple:unpack(Seq2),
- SeqVS;
+ Snapshot = case fabric2_util:get_value(snapshot, Options) of
+ undefined -> [];
+ B when is_boolean(B) -> [{snapshot, B}]
+ end,
-get_since_seq(List) when is_list(List) ->
- get_since_seq(list_to_binary(List));
+ OutOpts = [{reverse, Reverse}]
+ ++ Limit
+ ++ TargetBytes
+ ++ StreamingMode
+ ++ Snapshot,
-get_since_seq(Seq) ->
- erlang:error({invalid_since_seq, Seq}).
+ {StartKey4, EndKey4, OutOpts}.
get_db_handle() ->