You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2020/09/15 20:14:10 UTC

[couchdb] 13/16: Update replicator http handlers and supervisor

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit aa8836434655b9b1b34c6d72035eb306266c484b
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Aug 28 12:49:59 2020 -0400

    Update replicator http handlers and supervisor
    
    Stich everything together: the backend, frontend and http handlers.
    
    The supervisor `couch_replicator_sup` handles starting a set of fronted or
    backend children. It may also start both or neither.
    
    The HTTP layer for monitoring and creating jobs is simpler than before since
    there is rpc and clustering involved.
---
 src/chttpd/src/chttpd_misc.erl                     |  55 +------
 src/couch_replicator/src/couch_replicator.app.src  |  11 +-
 .../src/couch_replicator_httpd.erl                 | 163 ++++++++++-----------
 src/couch_replicator/src/couch_replicator_sup.erl  | 113 +++++++-------
 4 files changed, 149 insertions(+), 193 deletions(-)

diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index ec2435c..79c2914 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -302,17 +302,22 @@ handle_task_status_req(Req) ->
 handle_replicate_req(#httpd{method='POST', user_ctx=Ctx, req_body=PostBody} = Req) ->
     chttpd:validate_ctype(Req, "application/json"),
     %% see HACK in chttpd.erl about replication
-    case replicate(PostBody, Ctx) of
+    case couch_replicator:replicate(PostBody, Ctx) of
         {ok, {continuous, RepId}} ->
             send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
         {ok, {cancelled, RepId}} ->
             send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
-        {ok, {JsonResults}} ->
-            send_json(Req, {[{ok, true} | JsonResults]});
+        {ok, #{} = JsonResults} ->
+            send_json(Req, maps:merge(#{<<"ok">> => true}, JsonResults));
         {ok, stopped} ->
             send_json(Req, 200, {[{ok, stopped}]});
         {error, not_found=Error} ->
             chttpd:send_error(Req, Error);
+        {error, #{<<"error">> := Err, <<"reason">> := Reason}} when
+                is_binary(Err), is_binary(Reason) ->
+            % Safe to use binary_to_atom since this is only built
+            % from couch_replicator_jobs:error_info/1
+            chttpd:send_error(Req, {binary_to_atom(Err, utf8), Reason});
         {error, {_, _}=Error} ->
             chttpd:send_error(Req, Error);
         {_, _}=Error ->
@@ -321,50 +326,6 @@ handle_replicate_req(#httpd{method='POST', user_ctx=Ctx, req_body=PostBody} = Re
 handle_replicate_req(Req) ->
     send_method_not_allowed(Req, "POST").
 
-replicate({Props} = PostBody, Ctx) ->
-    case couch_util:get_value(<<"cancel">>, Props) of
-    true ->
-        cancel_replication(PostBody, Ctx);
-    _ ->
-        Node = choose_node([
-            couch_util:get_value(<<"source">>, Props),
-            couch_util:get_value(<<"target">>, Props)
-        ]),
-        case rpc:call(Node, couch_replicator, replicate, [PostBody, Ctx]) of
-        {badrpc, Reason} ->
-            erlang:error(Reason);
-        Res ->
-            Res
-        end
-    end.
-
-cancel_replication(PostBody, Ctx) ->
-    {Res, _Bad} = rpc:multicall(couch_replicator, replicate, [PostBody, Ctx]),
-    case [X || {ok, {cancelled, _}} = X <- Res] of
-    [Success|_] ->
-        % Report success if at least one node canceled the replication
-        Success;
-    [] ->
-        case lists:usort(Res) of
-        [UniqueReply] ->
-            % Report a universally agreed-upon reply
-            UniqueReply;
-        [] ->
-            {error, badrpc};
-        Else ->
-            % Unclear what to do here -- pick the first error?
-            % Except try ignoring any {error, not_found} responses
-            % because we'll always get two of those
-            hd(Else -- [{error, not_found}])
-        end
-    end.
-
-choose_node(Key) when is_binary(Key) ->
-    Checksum = erlang:crc32(Key),
-    Nodes = lists:sort([node()|erlang:nodes()]),
-    lists:nth(1 + Checksum rem length(Nodes), Nodes);
-choose_node(Key) ->
-    choose_node(term_to_binary(Key)).
 
 handle_reload_query_servers_req(#httpd{method='POST'}=Req) ->
     chttpd:validate_ctype(Req, "application/json"),
diff --git a/src/couch_replicator/src/couch_replicator.app.src b/src/couch_replicator/src/couch_replicator.app.src
index 2e0e191..81789f1 100644
--- a/src/couch_replicator/src/couch_replicator.app.src
+++ b/src/couch_replicator/src/couch_replicator.app.src
@@ -18,20 +18,15 @@
         couch_replicator_sup,
         couch_replicator_rate_limiter,
         couch_replicator_connection,
-        couch_replication,  % couch_replication_event gen_event
-        couch_replicator_clustering,
-        couch_replicator_scheduler,
-        couch_replicator_scheduler_sup,
-        couch_replicator_doc_processor
+        couch_replicator_job_server
     ]},
     {applications, [
         kernel,
         stdlib,
         couch_log,
-        mem3,
         config,
         couch,
-        couch_event,
-        couch_stats
+        couch_stats,
+        couch_jobs
     ]}
 ]}.
diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd.erl
index abd9f7f..196fcf2 100644
--- a/src/couch_replicator/src/couch_replicator_httpd.erl
+++ b/src/couch_replicator/src/couch_replicator_httpd.erl
@@ -12,9 +12,6 @@
 
 -module(couch_replicator_httpd).
 
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
-
 -export([
     handle_req/1,
     handle_scheduler_req/1
@@ -26,48 +23,40 @@
     send_method_not_allowed/2
 ]).
 
--import(couch_util, [
-    to_binary/1
-]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
 
 
 -define(DEFAULT_TASK_LIMIT, 100).
--define(REPDB, <<"_replicator">>).
-% This is a macro so it can be used as a guard
--define(ISREPDB(X), X =:= ?REPDB orelse binary_part(X, {byte_size(X), -12})
-    =:= <<"/_replicator">>).
 
 
 handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) ->
-    Limit = couch_replicator_httpd_util:parse_int_param(Req, "limit",
+    Limit = couch_replicator_utils:parse_int_param(Req, "limit",
         ?DEFAULT_TASK_LIMIT, 0, infinity),
-    Skip = couch_replicator_httpd_util:parse_int_param(Req, "skip", 0, 0,
+    Skip = couch_replicator_utils:parse_int_param(Req, "skip", 0, 0,
         infinity),
-    {Replies, _BadNodes} = rpc:multicall(couch_replicator_scheduler, jobs, []),
-    Flatlist = lists:concat(Replies),
-    % couch_replicator_scheduler:job_ejson/1 guarantees {id, Id} to be the
-    % the first item in the list
-    Sorted = lists:sort(fun({[{id,A}|_]},{[{id,B}|_]}) -> A =< B end, Flatlist),
-    Total = length(Sorted),
+    Jobs1 = couch_replicator:jobs(),
+    Total = length(Jobs1),
     Offset = min(Skip, Total),
-    Sublist = lists:sublist(Sorted, Offset+1, Limit),
-    Sublist1 = [couch_replicator_httpd_util:update_db_name(Task)
-        || Task <- Sublist],
-    send_json(Req, {[{total_rows, Total}, {offset, Offset}, {jobs, Sublist1}]});
+    Jobs2 = lists:sublist(Jobs1, Offset + 1, Limit),
+    send_json(Req, #{
+        <<"total_rows">> => Total,
+        <<"offset">> => Offset,
+        <<"jobs">> => Jobs2
+    });
 handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req) ->
     case couch_replicator:job(JobId) of
-        {ok, JobInfo} ->
-            send_json(Req, couch_replicator_httpd_util:update_db_name(JobInfo));
-        {error, not_found} ->
-            throw(not_found)
+        {ok, JobInfo} ->  send_json(Req, JobInfo);
+        {error, not_found} -> throw(not_found)
     end;
 handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>]}=Req) ->
-    handle_scheduler_docs(?REPDB, Req);
+    handle_scheduler_docs(?REP_DB_NAME, Req);
 handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,Db]}=Req)
-        when ?ISREPDB(Db) ->
+        when ?IS_REP_DB(Db) ->
     handle_scheduler_docs(Db, Req);
 handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,Db,DocId]}
-        = Req)  when ?ISREPDB(Db) ->
+        = Req)  when ?IS_REP_DB(Db) ->
     handle_scheduler_doc(Db, DocId, Req);
 % Allow users to pass in unencoded _replicator database names (/ are not
 % escaped). This is possible here because _replicator is not a valid document
@@ -82,77 +71,80 @@ handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>|Unquoted]}
         {error, invalid} ->
             throw(bad_request)
     end;
-handle_scheduler_req(#httpd{method='GET'} = Req) ->
-    send_json(Req, 404, {[{error, <<"not found">>}]});
+handle_scheduler_req(#httpd{method='GET'} = _Req) ->
+    throw(not_found);
 handle_scheduler_req(Req) ->
     send_method_not_allowed(Req, "GET,HEAD").
 
 
 handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
     couch_httpd:validate_ctype(Req, "application/json"),
-    RepDoc = {Props} = couch_httpd:json_body_obj(Req),
-    couch_replicator_httpd_util:validate_rep_props(Props),
+    RepDoc = couch_httpd:json_body_obj(Req),
     case couch_replicator:replicate(RepDoc, UserCtx) of
-    {error, {Error, Reason}} ->
-        send_json(
-            Req, 500,
-            {[{error, to_binary(Error)}, {reason, to_binary(Reason)}]});
-    {error, not_found} ->
-        % Tried to cancel a replication that didn't exist.
-        send_json(Req, 404, {[{error, <<"not found">>}]});
-    {error, Reason} ->
-        send_json(Req, 500, {[{error, to_binary(Reason)}]});
-    {ok, {cancelled, RepId}} ->
-        send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
-    {ok, {continuous, RepId}} ->
-        send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
-    {ok, {HistoryResults}} ->
-        send_json(Req, {[{ok, true} | HistoryResults]})
-    end;
+        {error, {Error, Reason}} ->
+            send_json(Req, 500, #{
+                <<"error">> => couch_util:to_binary(Error),
+                <<"reason">> => couch_util:to_binary(Reason)
+            });
+        {error, not_found} ->
+            throw(not_found);
+        {error, Reason} ->
+            send_json(Req, 500, #{<<"error">> => couch_util:to_binary(Reason)});
+        {ok, {cancelled, JobId}} ->
+            send_json(Req, 200, #{<<"ok">> => true, <<"_local_id">> => JobId});
+        {ok, {continuous, JobId}} ->
+            send_json(Req, 202, #{<<"ok">> => true, <<"_local_id">> => JobId});
+        {ok, #{} = CheckpointHistory} ->
+            Res = maps:merge(#{<<"ok">> => true}, CheckpointHistory),
+            send_json(Req, Res)
+        end;
 
 handle_req(Req) ->
     send_method_not_allowed(Req, "POST").
 
 
-handle_scheduler_docs(Db, Req) when is_binary(Db) ->
-    VArgs0 = couch_mrview_http:parse_params(Req, undefined),
-    StatesQs = chttpd:qs_value(Req, "states"),
-    States = couch_replicator_httpd_util:parse_replication_state_filter(StatesQs),
-    VArgs1 = VArgs0#mrargs{
-        view_type = map,
-        include_docs = true,
-        reduce = false,
-        extra = [{filter_states, States}]
-    },
-    VArgs2 = couch_mrview_util:validate_args(VArgs1),
-    Opts = [{user_ctx, Req#httpd.user_ctx}],
-    Max = chttpd:chunked_response_buffer_size(),
-    Acc = couch_replicator_httpd_util:docs_acc_new(Req, Db, Max),
-    Cb = fun couch_replicator_httpd_util:docs_cb/2,
-    {ok, RAcc} = couch_replicator_fabric:docs(Db, Opts, VArgs2, Cb, Acc),
-    {ok,  couch_replicator_httpd_util:docs_acc_response(RAcc)}.
-
-
-handle_scheduler_doc(Db, DocId, Req) when is_binary(Db), is_binary(DocId) ->
-    UserCtx = Req#httpd.user_ctx,
-    case couch_replicator:doc(Db, DocId, UserCtx#user_ctx.roles) of
-        {ok, DocInfo} ->
-            send_json(Req, couch_replicator_httpd_util:update_db_name(DocInfo));
-        {error, not_found} ->
+handle_scheduler_docs(DbName, #httpd{user_ctx = UserCtx} = Req) ->
+    try fabric2_db:open(DbName, [{user_ctx, UserCtx}]) of
+        {ok, Db} ->
+            ok = fabric2_db:check_is_member(Db),
+            StatesQs = chttpd:qs_value(Req, "states"),
+            States = couch_replicator_utils:parse_replication_states(StatesQs),
+            Docs = couch_replicator:docs(Db, States),
+            send_json(Req, #{
+                <<"total_rows">> => length(Docs),
+                <<"offset">> => 0,
+                <<"docs">> => Docs
+            })
+    catch
+        error:database_does_not_exist ->
             throw(not_found)
     end.
 
 
+handle_scheduler_doc(DbName, DocId, #httpd{user_ctx = UserCtx} = Req) ->
+     try fabric2_db:open(DbName, [{user_ctx, UserCtx}]) of
+        {ok, Db} ->
+             ok = fabric2_db:check_is_member(Db),
+             case couch_replicator:doc(Db, DocId) of
+                {ok, DocInfo} ->  send_json(Req, DocInfo);
+                {error, not_found} -> throw(not_found)
+             end
+     catch
+         error:database_does_not_exist ->
+             throw(not_found)
+     end.
+
+
 parse_unquoted_docs_path([_, _ | _] = Unquoted) ->
-    DbAndAfter = lists:dropwhile(fun(E) -> E =/= ?REPDB end, Unquoted),
-    BeforeRDb = lists:takewhile(fun(E) -> E =/= ?REPDB end, Unquoted),
+    DbAndAfter = lists:dropwhile(fun(E) -> E =/= ?REP_DB_NAME end, Unquoted),
+    BeforeRDb = lists:takewhile(fun(E) -> E =/= ?REP_DB_NAME end, Unquoted),
     case DbAndAfter of
         [] ->
             {error, invalid};
-        [?REPDB] ->
-            {db_only, filename:join(BeforeRDb ++ [?REPDB])};
-        [?REPDB, DocId] ->
-            {db_and_doc, filename:join(BeforeRDb ++ [?REPDB]), DocId}
+        [?REP_DB_NAME] ->
+            {db_only, filename:join(BeforeRDb ++ [?REP_DB_NAME])};
+        [?REP_DB_NAME, DocId] ->
+            {db_and_doc, filename:join(BeforeRDb ++ [?REP_DB_NAME]), DocId}
     end.
 
 
@@ -163,10 +155,13 @@ parse_unquoted_docs_path([_, _ | _] = Unquoted) ->
 unquoted_scheduler_docs_path_test_() ->
     [?_assertEqual(Res, parse_unquoted_docs_path(Path)) || {Res, Path} <- [
         {{error, invalid}, [<<"a">>,<< "b">>]},
-        {{db_only, <<"a/_replicator">>}, [<<"a">>, ?REPDB]},
-        {{db_only, <<"a/b/_replicator">>}, [<<"a">>, <<"b">>, ?REPDB]},
-        {{db_and_doc, <<"_replicator">>, <<"x">>}, [?REPDB, <<"x">>]},
-        {{db_and_doc, <<"a/_replicator">>, <<"x">>}, [<<"a">>, ?REPDB, <<"x">>]},
+        {{db_only, <<"a/_replicator">>}, [<<"a">>, ?REP_DB_NAME]},
+        {{db_only, <<"a/b/_replicator">>}, [<<"a">>, <<"b">>,
+            ?REP_DB_NAME]},
+        {{db_and_doc, <<"_replicator">>, <<"x">>},
+            [?REP_DB_NAME, <<"x">>]},
+        {{db_and_doc, <<"a/_replicator">>, <<"x">>}, [<<"a">>,
+            ?REP_DB_NAME, <<"x">>]},
         {{error, invalid}, [<<"a/_replicator">>,<<"x">>]}
     ]].
 
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl
index cd4512c..49d412a 100644
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_sup.erl
@@ -12,61 +12,66 @@
 % the License.
 
 -module(couch_replicator_sup).
+
+
 -behaviour(supervisor).
--export([start_link/0, init/1]).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1
+]).
+
 
 start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+    Backend = fabric2_node_types:is_type(replication),
+    Frontend = fabric2_node_types:is_type(api_frontend),
+    Arg = {Backend, Frontend},
+    supervisor:start_link({local, ?MODULE}, ?MODULE, Arg).
+
+
+init({Backend, Frontend}) ->
+    Children = case {Backend, Frontend} of
+        {true, true} -> backend() ++ frontend();
+        {true, false} -> backend();
+        {false, true} -> frontend();
+        {false, false} -> []
+    end,
+    Flags =  #{
+        strategy => rest_for_one,
+        intensity => 1,
+        period => 5
+    },
+    {ok, {Flags, Children}}.
+
+
+backend() ->
+    Timeout = 5000,
+    [
+        #{
+            id => couch_replicator_connection,
+            start => {couch_replicator_connection, start_link, []}
+        },
+        #{
+            id => couch_replicator_rate_limiter,
+            start => {couch_replicator_rate_limiter, start_link, []}
+        },
+        #{
+            id => couch_replicator_job_server,
+            start => {couch_replicator_job_server, start_link, [Timeout]},
+            shutdown => Timeout
+        }
+    ].
+
 
-init(_Args) ->
-    Children = [
-        {couch_replication_event,
-            {gen_event, start_link, [{local, couch_replication}]},
-            permanent,
-            brutal_kill,
-            worker,
-            dynamic},
-       {couch_replicator_clustering,
-            {couch_replicator_clustering, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_clustering]},
-       {couch_replicator_connection,
-            {couch_replicator_connection, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_connection]},
-       {couch_replicator_rate_limiter,
-            {couch_replicator_rate_limiter, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_rate_limiter]},
-        {couch_replicator_scheduler_sup,
-            {couch_replicator_scheduler_sup, start_link, []},
-            permanent,
-            infinity,
-            supervisor,
-            [couch_replicator_scheduler_sup]},
-        {couch_replicator_scheduler,
-            {couch_replicator_scheduler, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_scheduler]},
-        {couch_replicator_doc_processor,
-            {couch_replicator_doc_processor, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_replicator_doc_processor]},
-        {couch_replicator_db_changes,
-            {couch_replicator_db_changes, start_link, []},
-            permanent,
-            brutal_kill,
-            worker,
-            [couch_multidb_changes]}
-    ],
-    {ok, {{rest_for_one,10,1}, Children}}.
+frontend() ->
+    [
+        #{
+            id => couch_replicator,
+            start => {couch_replicator, ensure_rep_db_exists, []},
+            restart => transient
+        }
+    ] ++ couch_epi:register_service(couch_replicator_epi, []).