You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/09/15 20:14:03 UTC

[couchdb] 06/16: Cleanup couch_replicator_utils module

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b6e87f8a43eebb4d02dfa52227ba5b77cd4ebc68
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Aug 28 04:31:38 2020 -0400

    Cleanup couch_replicator_utils module
    
     * Remove unused functions and some function used only from one place like `sum_stats/2`.
    
     * Update time functions to use the more modern `erlang:system_time/1` API.
    
     * `parse_int_param/5` and `parse_replication_states/1` was moved from the old
        _httpd_util module as they were they only ones need from there.
    
     * `default_headers_map/0` Used to the default httpd record headers as a map
       since part of the replication data will be kept as map object.
    
     * `proplist_options/1` Some parts of the replicator, like _httpc and _api_wrap
       still use proplist options, so this function can be used to translate
       options as maps to a proplist version.
---
 .../src/couch_replicator_stats.erl                 |   2 +
 .../src/couch_replicator_utils.erl                 | 241 ++++++++++++---------
 .../src/couch_replicator_worker.erl                |   2 +-
 3 files changed, 137 insertions(+), 108 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl
index 37848b3..69e60a0 100644
--- a/src/couch_replicator/src/couch_replicator_stats.erl
+++ b/src/couch_replicator/src/couch_replicator_stats.erl
@@ -32,6 +32,8 @@
 new() ->
     orddict:new().
 
+new(#{} = Map) ->
+    new(maps:to_list(Map));
 new(Initializers0) when is_list(Initializers0) ->
     Initializers1 = lists:filtermap(fun fmap/1, Initializers0),
     orddict:from_list(Initializers1).
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index 5f608de..cbed78e 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -13,19 +13,18 @@
 -module(couch_replicator_utils).
 
 -export([
-   parse_rep_doc/2,
-   replication_id/2,
-   sum_stats/2,
-   is_deleted/1,
    rep_error_to_binary/1,
-   get_json_value/2,
-   get_json_value/3,
-   pp_rep_id/1,
+   iso8601/0,
    iso8601/1,
-   filter_state/3,
+   rfc1123_local/0,
+   rfc1123_local/1,
    remove_basic_auth_from_headers/1,
    normalize_rep/1,
-   ejson_state_info/1
+   compare_reps/2,
+   default_headers_map/0,
+   parse_replication_states/1,
+   parse_int_param/5,
+   proplist_options/1
 ]).
 
 
@@ -33,11 +32,6 @@
 -include("couch_replicator.hrl").
 -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
 
--import(couch_util, [
-    get_value/2,
-    get_value/3
-]).
-
 
 rep_error_to_binary(Error) ->
     couch_util:to_binary(error_reason(Error)).
@@ -54,77 +48,27 @@ error_reason(Reason) ->
     Reason.
 
 
-get_json_value(Key, Props) ->
-    get_json_value(Key, Props, undefined).
-
-get_json_value(Key, Props, Default) when is_atom(Key) ->
-    Ref = make_ref(),
-    case get_value(Key, Props, Ref) of
-        Ref ->
-            get_value(?l2b(atom_to_list(Key)), Props, Default);
-        Else ->
-            Else
-    end;
-get_json_value(Key, Props, Default) when is_binary(Key) ->
-    Ref = make_ref(),
-    case get_value(Key, Props, Ref) of
-        Ref ->
-            get_value(list_to_atom(?b2l(Key)), Props, Default);
-        Else ->
-            Else
-    end.
-
-
-% pretty-print replication id
--spec pp_rep_id(#rep{} | rep_id()) -> string().
-pp_rep_id(#rep{id = RepId}) ->
-    pp_rep_id(RepId);
-pp_rep_id({Base, Extension}) ->
-    Base ++ Extension.
-
-
-% NV: TODO: this function is not used outside api wrap module
-% consider moving it there during final cleanup
-is_deleted(Change) ->
-    get_json_value(<<"deleted">>, Change, false).
-
-
-% NV: TODO: proxy some functions which used to be here, later remove
-% these and replace calls to their respective modules
-replication_id(Rep, Version) ->
-    couch_replicator_ids:replication_id(Rep, Version).
+-spec iso8601() -> binary().
+iso8601() ->
+    iso8601(erlang:system_time(second)).
 
 
-sum_stats(S1, S2) ->
-    couch_replicator_stats:sum_stats(S1, S2).
-
-
-parse_rep_doc(Props, UserCtx) ->
-    couch_replicator_docs:parse_rep_doc(Props, UserCtx).
-
-
--spec iso8601(erlang:timestamp()) -> binary().
-iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
-    {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
+-spec iso8601(integer()) -> binary().
+iso8601(Sec) when is_integer(Sec) ->
+    Time = unix_sec_to_timestamp(Sec),
+    {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Time),
     Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
     iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])).
 
 
-%% Filter replication info ejson by state provided. If it matches return
-%% the input value, if it doesn't return 'skip'. This is used from replicator
-%% fabric coordinator and worker.
--spec filter_state(atom(), [atom()], {[_ | _]}) -> {[_ | _]} | skip.
-filter_state(null = _State, _States, _Info) ->
-    skip;
-filter_state(_ = _State, [] = _States, Info) ->
-    Info;
-filter_state(State, States, Info) ->
-    case lists:member(State, States) of
-        true ->
-            Info;
-        false ->
-            skip
-    end.
+rfc1123_local() ->
+    list_to_binary(httpd_util:rfc1123_date()).
+
+
+rfc1123_local(Sec) ->
+    Time = unix_sec_to_timestamp(Sec),
+    Local = calendar:now_to_local_time(Time),
+    list_to_binary(httpd_util:rfc1123_date(Local)).
 
 
 remove_basic_auth_from_headers(Headers) ->
@@ -158,37 +102,101 @@ decode_basic_creds(Base64) ->
     end.
 
 
-% Normalize a #rep{} record such that it doesn't contain time dependent fields
+-spec compare_reps(#{} | null, #{} | null) -> boolean().
+compare_reps(Rep1, Rep2) ->
+    NormRep1 = normalize_rep(Rep1),
+    NormRep2 = normalize_rep(Rep2),
+    NormRep1 =:= NormRep2.
+
+
+% Normalize a rep map such that it doesn't contain time dependent fields
 % pids (like httpc pools), and options / props are sorted. This function would
 % used during comparisons.
--spec normalize_rep(#rep{} | nil) -> #rep{} | nil.
-normalize_rep(nil) ->
-    nil;
-
-normalize_rep(#rep{} = Rep)->
-    #rep{
-        source = couch_replicator_api_wrap:normalize_db(Rep#rep.source),
-        target = couch_replicator_api_wrap:normalize_db(Rep#rep.target),
-        options = Rep#rep.options,  % already sorted in make_options/1
-        type = Rep#rep.type,
-        view = Rep#rep.view,
-        doc_id = Rep#rep.doc_id,
-        db_name = Rep#rep.db_name
+-spec normalize_rep(#{} | null) -> #{} | null.
+normalize_rep(null) ->
+    null;
+
+normalize_rep(#{} = Rep)->
+    #{
+        ?SOURCE := Source,
+        ?TARGET := Target,
+        ?OPTIONS := Options
+    } = Rep,
+    #{
+        ?SOURCE => normalize_endpoint(Source),
+        ?TARGET => normalize_endpoint(Target),
+        ?OPTIONS => Options
     }.
 
 
--spec ejson_state_info(binary() | nil) -> binary() | null.
-ejson_state_info(nil) ->
-    null;
-ejson_state_info(Info) when is_binary(Info) ->
-    {[{<<"error">>, Info}]};
-ejson_state_info([]) ->
-    null;  % Status not set yet => null for compatibility reasons
-ejson_state_info([{_, _} | _] = Info) ->
-    {Info};
-ejson_state_info(Info) ->
-    ErrMsg = couch_replicator_utils:rep_error_to_binary(Info),
-    {[{<<"error">>, ErrMsg}]}.
+normalize_endpoint(<<DbName/binary>>) ->
+    DbName;
+
+normalize_endpoint(#{} = Endpoint) ->
+    Ks = [
+        <<"url">>,
+        <<"auth_props">>,
+        <<"headers">>,
+        <<"timeout">>,
+        <<"ibrowse_options">>,
+        <<"retries">>,
+        <<"http_connections">>,
+        <<"proxy_url">>
+    ],
+    maps:with(Ks, Endpoint).
+
+
+default_headers_map() ->
+    lists:foldl(fun({K, V}, Acc) ->
+        Acc#{list_to_binary(K) => list_to_binary(V)}
+    end, #{}, (#httpdb{})#httpdb.headers).
+
+
+parse_replication_states(undefined) ->
+    [];  % This is the default (wildcard) filter
+
+parse_replication_states(States) when is_list(States) ->
+    All = [?ST_RUNNING, ?ST_FAILED, ?ST_COMPLETED, ?ST_PENDING, ?ST_CRASHING],
+    AllSet = sets:from_list(All),
+    BinStates = [?l2b(string:to_lower(S)) || S <- string:tokens(States, ",")],
+    StatesSet = sets:from_list(BinStates),
+    Diff = sets:to_list(sets:subtract(StatesSet, AllSet)),
+    case Diff of
+        [] ->
+            BinStates;
+        _ ->
+            Args = [Diff, All],
+            Msg2 = io_lib:format("Unknown states ~p. Choose from: ~p", Args),
+            throw({query_parse_error, ?l2b(Msg2)})
+    end.
+
+
+parse_int_param(Req, Param, Default, Min, Max) ->
+    IntVal = try
+        list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default)))
+    catch error:badarg ->
+        Msg1 = io_lib:format("~s must be an integer", [Param]),
+        throw({query_parse_error, ?l2b(Msg1)})
+    end,
+    case IntVal >= Min andalso IntVal =< Max of
+    true ->
+        IntVal;
+    false ->
+        Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]),
+        throw({query_parse_error, ?l2b(Msg2)})
+    end.
+
+
+proplist_options(#{} = OptionsMap) ->
+    maps:fold(fun(K, V, Acc) ->
+        [{binary_to_atom(K, utf8), V} | Acc]
+    end, [], OptionsMap).
+
+
+unix_sec_to_timestamp(Sec) when is_integer(Sec) ->
+    MegaSecPart = Sec div 1000000,
+    SecPart = Sec - MegaSecPart * 1000000,
+    {MegaSecPart, SecPart, 0}.
 
 
 -ifdef(TEST).
@@ -256,7 +264,7 @@ normalize_rep_test_() ->
                 {<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]},
                 {<<"other_field">>, <<"some_value">>}
             ]},
-            Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1),
+            Rep1 = couch_replicator_parse:parse_rep_doc(EJson1),
             EJson2 = {[
                 {<<"other_field">>, <<"unrelated">>},
                 {<<"target">>, <<"http://target.local/db">>},
@@ -264,9 +272,28 @@ normalize_rep_test_() ->
                 {<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]},
                 {<<"other_field2">>, <<"unrelated2">>}
             ]},
-            Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2),
+            Rep2 = couch_replicator_parse:parse_rep_doc(EJson2),
             ?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2))
         end)
     }.
 
+
+normalize_endpoint() ->
+    HttpDb =  #httpdb{
+        url = "http://host/db",
+        auth_props = [{"key", "val"}],
+        headers = [{"k2","v2"}, {"k1","v1"}],
+        timeout = 30000,
+        ibrowse_options = [{k2, v2}, {k1, v1}],
+        retries = 10,
+        http_connections = 20
+    },
+    Expected = HttpDb#httpdb{
+        headers = [{"k1","v1"}, {"k2","v2"}],
+        ibrowse_options = [{k1, v1}, {k2, v2}]
+    },
+    ?assertEqual(Expected, normalize_endpoint(HttpDb)),
+    ?assertEqual(<<"local">>, normalize_endpoint(<<"local">>)).
+
+
 -endif.
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index eb8beaa..4cd984c 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -103,7 +103,7 @@ handle_call({batch_doc, Doc}, From, State) ->
 
 handle_call({add_stats, IncStats}, From, #state{stats = Stats} = State) ->
     gen_server:reply(From, ok),
-    NewStats = couch_replicator_utils:sum_stats(Stats, IncStats),
+    NewStats = couch_replicator_stats:sum_stats(Stats, IncStats),
     NewStats2 = maybe_report_stats(State#state.cp, NewStats),
     {noreply, State#state{stats = NewStats2}};