You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/09/15 20:14:06 UTC

[couchdb] 09/16: Update couch_replicator_ids

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 276d19731bc5df73838f40efc126f1f709e04fbe
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Aug 28 04:33:11 2020 -0400

    Update couch_replicator_ids
    
    This module is responsible for calculating replication IDs. It inspects all the
    replication options which may affect the replication results and hashes them
    into a single ID. CouchDB replicator tries to maintain compatibility with older
    versions of itself so it keep tracks of how to calculate replication IDs used
    by previous version of CouchDB. Replication ID calculation algorithms have
    their own version, the latest one is at version 4.
    
    One of the goals of this update is to not alter the replication ID algorithm
    and keep it at version 4, such that for all the same parameters the replication
    IDs should stay the same as they would be on CouchDB <= 3.x. That is why in
    some internal function, options maps and binares are turned back into proplist
    and tuples before hashing is performed. There is a unit tests which asserts
    that the replication ID calcuated with this update matches what was calcuated
    in CouchDB 3.x.
    
    Internal representation of the replication ID has changed slighly. Previously
    it was represented by a tuple of `{BaseId, ExtId}`, where `BaseId` was the ID
    without any options such as `continuous` or `create_target`, and `ExtId` was
    the concatenated list of those options. In most cases it was useful to operate
    on the full ID and in only a few place the `BaseId` was needed. So the
    calculation function was updated to return `{RepId, BaseId}` instead. `RepId`
    is a binary that is the full relication ID (base + extensions) and `BaseId` is
    just the base.
    
    The function which calculated the base ID was updated to actually be called
    `base_id/2` as opposed to `replication_id/2`.
    
    Another update to the module is a function which calculates replication job
    IDs. A `JobId` is used to identify replication jobs in the `couch_jobs` API. A
    `JobId`, unlike a `RepId` never requires making a network round-trip to
    calculate. For replications created from `_replicator` docs, `JobId` is defined
    as the concatenation of the database instance UUID and document ID. For a
    transient jobs it is calculated by hashing the source, target endpoint
    parameters, replication options. In fact, it is almost the same as a
    replication ID, with one important difference that the filter design doc name
    and function name are used instead of the contents of the filter from the
    source, so no network round-trip is necessary to calculate it.
---
 src/couch_replicator/src/couch_replicator_ids.erl | 202 +++++++++++++++-------
 1 file changed, 141 insertions(+), 61 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 04e71c3..d1cbe57 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -14,7 +14,9 @@
 
 -export([
     replication_id/1,
-    replication_id/2,
+    base_id/2,
+    job_id/3,
+    job_id/2,
     convert/1
 ]).
 
@@ -30,28 +32,31 @@
 %  {filter_fetch_error, Error} exception.
 %
 
-replication_id(#rep{options = Options} = Rep) ->
-    BaseId = replication_id(Rep, ?REP_ID_VERSION),
-    {BaseId, maybe_append_options([continuous, create_target], Options)}.
+replication_id(#{?OPTIONS := Options} = Rep) ->
+    BaseId = base_id(Rep, ?REP_ID_VERSION),
+    UseOpts = [<<"continuous">>, <<"create_target">>],
+    ExtId = maybe_append_options(UseOpts, Options),
+    RepId = iolist_to_binary([BaseId, ExtId]),
+    {RepId, BaseId}.
 
 
 % Versioned clauses for generating replication IDs.
 % If a change is made to how replications are identified,
 % please add a new clause and increase ?REP_ID_VERSION.
 
-replication_id(#rep{} = Rep, 4) ->
+base_id(#{?SOURCE := Src, ?TARGET := Tgt} = Rep, 4) ->
     UUID = couch_server:get_uuid(),
-    SrcInfo = get_v4_endpoint(Rep#rep.source),
-    TgtInfo = get_v4_endpoint(Rep#rep.target),
+    SrcInfo = get_v4_endpoint(Src),
+    TgtInfo = get_v4_endpoint(Tgt),
     maybe_append_filters([UUID, SrcInfo, TgtInfo], Rep);
 
-replication_id(#rep{} = Rep, 3) ->
+base_id(#{?SOURCE := Src0, ?TARGET := Tgt0} = Rep, 3) ->
     UUID = couch_server:get_uuid(),
-    Src = get_rep_endpoint(Rep#rep.source),
-    Tgt = get_rep_endpoint(Rep#rep.target),
+    Src = get_rep_endpoint(Src0),
+    Tgt = get_rep_endpoint(Tgt0),
     maybe_append_filters([UUID, Src, Tgt], Rep);
 
-replication_id(#rep{} = Rep, 2) ->
+base_id(#{?SOURCE := Src0, ?TARGET := Tgt0} = Rep, 2) ->
     {ok, HostName} = inet:gethostname(),
     Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
     P when is_number(P) ->
@@ -64,47 +69,76 @@ replication_id(#rep{} = Rep, 2) ->
         % ... mochiweb_socket_server:get(https, port)
         list_to_integer(config:get("httpd", "port", "5984"))
     end,
-    Src = get_rep_endpoint(Rep#rep.source),
-    Tgt = get_rep_endpoint(Rep#rep.target),
+    Src = get_rep_endpoint(Src0),
+    Tgt = get_rep_endpoint(Tgt0),
     maybe_append_filters([HostName, Port, Src, Tgt], Rep);
 
-replication_id(#rep{} = Rep, 1) ->
+base_id(#{?SOURCE := Src0, ?TARGET := Tgt0} = Rep, 1) ->
     {ok, HostName} = inet:gethostname(),
-    Src = get_rep_endpoint(Rep#rep.source),
-    Tgt = get_rep_endpoint(Rep#rep.target),
+    Src = get_rep_endpoint(Src0),
+    Tgt = get_rep_endpoint(Tgt0),
     maybe_append_filters([HostName, Src, Tgt], Rep).
 
 
--spec convert([_] | binary() | {string(), string()}) -> {string(), string()}.
-convert(Id) when is_list(Id) ->
-    convert(?l2b(Id));
+-spec job_id(#{}, binary() | null, binary() | null) -> binary().
+job_id(#{} = Rep, null = _DbUUID, null = _DocId) ->
+    #{
+        ?SOURCE := Src,
+        ?TARGET := Tgt,
+        ?REP_USER := UserName,
+        ?OPTIONS := Options
+    } = Rep,
+    UUID = couch_server:get_uuid(),
+    SrcInfo = get_v4_endpoint(Src),
+    TgtInfo = get_v4_endpoint(Tgt),
+    UseOpts = [<<"continuous">>, <<"create_target">>],
+    Opts = maybe_append_options(UseOpts, Options),
+    IdParts = [UUID, SrcInfo, TgtInfo, UserName, Opts],
+    maybe_append_filters(IdParts, Rep, false);
+
+job_id(#{} = _Rep, DbUUID, DocId) when is_binary(DbUUID), is_binary(DocId) ->
+    job_id(DbUUID, DocId).
+
+
+-spec job_id(binary(), binary()) -> binary().
+job_id(DbUUID, DocId) when is_binary(DbUUID), is_binary(DocId) ->
+    <<DbUUID/binary, "|", DocId/binary>>.
+
+
+-spec convert(binary()) -> binary().
 convert(Id0) when is_binary(Id0) ->
     % Spaces can result from mochiweb incorrectly unquoting + characters from
     % the URL path. So undo the incorrect parsing here to avoid forcing
     % users to url encode + characters.
-    Id = binary:replace(Id0, <<" ">>, <<"+">>, [global]),
-    lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
-convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
-    Id.
+    binary:replace(Id0, <<" ">>, <<"+">>, [global]).
 
 
 % Private functions
 
-maybe_append_filters(Base,
-        #rep{source = Source, options = Options}) ->
+maybe_append_filters(Base, #{} = Rep) ->
+    maybe_append_filters(Base, Rep, true).
+
+
+maybe_append_filters(Base, #{} = Rep, FetchFilter) ->
+    #{
+        ?SOURCE := Source,
+        ?OPTIONS := Options
+    } = Rep,
     Base2 = Base ++
         case couch_replicator_filters:parse(Options) of
         {ok, nil} ->
             [];
         {ok, {view, Filter, QueryParams}} ->
             [Filter, QueryParams];
-        {ok, {user, {Doc, Filter}, QueryParams}} ->
+        {ok, {user, {Doc, Filter}, QueryParams}} when FetchFilter =:= true ->
             case couch_replicator_filters:fetch(Doc, Filter, Source) of
                 {ok, Code} ->
                     [Code, QueryParams];
                 {error, Error} ->
                     throw({filter_fetch_error, Error})
             end;
+        {ok, {user, {Doc, Filter}, QueryParams}} when FetchFilter =:= false ->
+            [Doc, Filter, QueryParams];
         {ok, {docids, DocIds}} ->
             [DocIds];
         {ok, {mango, Selector}} ->
@@ -112,27 +146,33 @@ maybe_append_filters(Base,
         {error, FilterParseError} ->
             throw({error, FilterParseError})
         end,
-    couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))).
+    Res = couch_util:to_hex(couch_hash:md5_hash(term_to_binary(Base2))),
+    list_to_binary(Res).
 
 
-maybe_append_options(Options, RepOptions) ->
+maybe_append_options(Options, #{} = RepOptions) ->
     lists:foldl(fun(Option, Acc) ->
         Acc ++
-        case couch_util:get_value(Option, RepOptions, false) of
-        true ->
-            "+" ++ atom_to_list(Option);
-        false ->
-            ""
+        case maps:get(Option, RepOptions, false) of
+            true -> "+" ++ binary_to_list(Option);
+            false -> ""
         end
     end, [], Options).
 
 
-get_rep_endpoint(#httpdb{url=Url, headers=Headers}) ->
+get_rep_endpoint(#{<<"url">> := Url0, <<"headers">> := Headers0}) ->
+    % We turn everything to lists and proplists to calculate the same
+    % replication ID as CouchDB <= 3.x
+    Url = binary_to_list(Url0),
+    Headers1 = maps:fold(fun(K, V, Acc) ->
+        [{binary_to_list(K), binary_to_list(V)} | Acc]
+    end, [], Headers0),
+    Headers2 = lists:keysort(1, Headers1),
     DefaultHeaders = (#httpdb{})#httpdb.headers,
-    {remote, Url, Headers -- DefaultHeaders}.
+    {remote, Url, Headers2 -- DefaultHeaders}.
 
 
-get_v4_endpoint(#httpdb{} = HttpDb) ->
+get_v4_endpoint(#{} = HttpDb) ->
     {remote, Url, Headers} = get_rep_endpoint(HttpDb),
     {{UserFromHeaders, _}, HeadersWithoutBasicAuth} =
         couch_replicator_utils:remove_basic_auth_from_headers(Headers),
@@ -184,92 +224,132 @@ get_non_default_port(_Schema, Port) ->
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
 
 
 replication_id_convert_test_() ->
     [?_assertEqual(Expected, convert(Id)) || {Expected, Id} <- [
-        {{"abc", ""}, "abc"},
-        {{"abc", ""}, <<"abc">>},
-        {{"abc", "+x+y"}, <<"abc+x+y">>},
-        {{"abc", "+x+y"}, {"abc", "+x+y"}},
-        {{"abc", "+x+y"}, <<"abc x y">>}
+        {<<"abc">>, <<"abc">>},
+        {<<"abc+x">>, <<"abc+x">>},
+        {<<"abc+x">>, <<"abc x">>},
+        {<<"abc+x+y">>, <<"abc+x+y">>},
+        {<<"abc+x+y">>, <<"abc x y">>}
     ]].
 
+
 http_v4_endpoint_test_() ->
     [?_assertMatch({remote, User, Host, Port, Path, HeadersNoAuth, undefined},
-        get_v4_endpoint(#httpdb{url = Url, headers = Headers})) ||
+        get_v4_endpoint(#{<<"url">> => Url, <<"headers">> => Headers})) ||
             {{User, Host, Port, Path, HeadersNoAuth}, {Url, Headers}} <- [
                 {
                     {undefined, "host", default, "/", []},
-                    {"http://host", []}
+                    {<<"http://host">>, #{}}
                 },
                 {
                     {undefined, "host", default, "/", []},
-                    {"https://host", []}
+                    {<<"https://host">>, #{}}
                 },
                 {
                     {undefined, "host", default, "/", []},
-                    {"http://host:5984", []}
+                    {<<"http://host:5984">>, #{}}
                 },
                 {
                     {undefined, "host", 1, "/", []},
-                    {"http://host:1", []}
+                    {<<"http://host:1">>, #{}}
                 },
                 {
                     {undefined, "host", 2, "/", []},
-                    {"https://host:2", []}
+                    {<<"https://host:2">>, #{}}
                 },
                 {
-                    {undefined, "host", default, "/", [{"h","v"}]},
-                    {"http://host", [{"h","v"}]}
+                    {undefined, "host", default, "/", [{"h", "v"}]},
+                    {<<"http://host">>, #{<<"h">> => <<"v">>}}
                 },
                 {
                     {undefined, "host", default, "/a/b", []},
-                    {"http://host/a/b", []}
+                    {<<"http://host/a/b">>, #{}}
                 },
                 {
                     {"user", "host", default, "/", []},
-                    {"http://user:pass@host", []}
+                    {<<"http://user:pass@host">>, #{}}
                 },
                 {
                     {"user", "host", 3, "/", []},
-                    {"http://user:pass@host:3", []}
+                    {<<"http://user:pass@host:3">>, #{}}
                 },
                 {
                     {"user", "host", default, "/", []},
-                    {"http://user:newpass@host", []}
+                    {<<"http://user:newpass@host">>, #{}}
                 },
                 {
                     {"user", "host", default, "/", []},
-                    {"http://host", [basic_auth("user","pass")]}
+                    {<<"http://host">>, basic_auth(<<"user">>, <<"pass">>)}
                 },
                 {
                     {"user", "host", default, "/", []},
-                    {"http://host", [basic_auth("user","newpass")]}
+                    {<<"http://host">>, basic_auth(<<"user">>, <<"newpass">>)}
                 },
                 {
                     {"user1", "host", default, "/", []},
-                    {"http://user1:pass1@host", [basic_auth("user2","pass2")]}
+                    {<<"http://user1:pass1@host">>, basic_auth(<<"user2">>,
+                        <<"pass2">>)}
                 },
                 {
                     {"user", "host", default, "/", [{"h", "v"}]},
-                    {"http://host", [{"h", "v"}, basic_auth("user","pass")]}
+                    {<<"http://host">>, maps:merge(#{<<"h">> => <<"v">>},
+                        basic_auth(<<"user">>, <<"pass">>))}
                 },
                 {
                     {undefined, "random_junk", undefined, undefined},
-                    {"random_junk", []}
+                    {<<"random_junk">>, #{}}
                 },
                 {
                     {undefined, "host", default, "/", []},
-                    {"http://host", [{"Authorization", "Basic bad"}]}
+                    {<<"http://host">>, #{<<"Authorization">> =>
+                        <<"Basic bad">>}}
                 }
         ]
     ].
 
 
 basic_auth(User, Pass) ->
-    B64Auth = base64:encode_to_string(User ++ ":" ++ Pass),
-    {"Authorization", "Basic " ++ B64Auth}.
+    B64Auth = base64:encode(<<User/binary, ":", Pass/binary>>),
+    #{<<"Authorization">> => <<"Basic ", B64Auth/binary>>}.
+
+
+version4_matches_couchdb3_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            ?TDEF_FE(id_matches_couchdb3)
+        ]
+    }.
+
+
+setup() ->
+    meck:expect(config, get, fun(_, _, Default) -> Default end).
+
+
+teardown(_) ->
+    meck:unload().
+
+
+id_matches_couchdb3(_) ->
+    {ok, Rep} = couch_replicator_parse:parse_rep(#{
+        <<"source">> => <<"http://adm:pass@127.0.0.1/abc">>,
+        <<"target">> => <<"http://adm:pass@127.0.0.1/xyz">>,
+        <<"create_target">> => true,
+        <<"continuous">> => true
+    }, null),
+    meck:expect(couch_server, get_uuid, 0, "somefixedid"),
+    {RepId, BaseId} = replication_id(Rep),
+    % Calculated on CouchDB 3.x
+    RepId3x = <<"ff71e1208f93ba054eb60e7ca8683fe4+continuous+create_target">>,
+    BaseId3x = <<"ff71e1208f93ba054eb60e7ca8683fe4">>,
+    ?assertEqual(RepId3x, RepId),
+    ?assertEqual(BaseId3x, BaseId).
 
 
 -endif.