You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/04/05 19:04:55 UTC

[couchdb] branch 63012-scheduler updated (6e3ae05 -> 7c2c27e)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git.

     omits  6e3ae05   Add `_scheduler/{jobs,docs}` API endpoints
       new  7c2c27e   Add `_scheduler/{jobs,docs}` API endpoints

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (6e3ae05)
            \
             N -- N -- N   refs/heads/63012-scheduler (7c2c27e)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omits" are not gone; other references still
refer to them.  Any revisions marked "discards" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "adds" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/chttpd/src/chttpd_misc.erl | 33 ++++++++++++++++++++++++++++++---
 1 file changed, 30 insertions(+), 3 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].

[couchdb] 01/01: Add `_scheduler/{jobs,docs}` API endpoints

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 7c2c27e19e30e08afb4659cb3c427b7cc8f8404b
Author: Benjamin Bastian <be...@gmail.com>
AuthorDate: Fri Mar 10 13:06:04 2017 -0800

    Add `_scheduler/{jobs,docs}` API endpoints
    
    The `_scheduler/docs` endpoint provides a view of all
    replicator docs which have been seen by the scheduler. This endpoint
    includes useful information such as the state of the replication and the
    coordinator node.
    
    The `_scheduler/jobs` endpoint provides a view of all replications
    managed by the scheduler. This endpoint includes more information on the
    replication than the `_scheduler/docs` endpoint, including the history
    of state transitions of the replication.
    
    Jira: COUCHDB-3324
---
 src/chttpd/src/chttpd_httpd_handlers.erl |   1 +
 src/chttpd/src/chttpd_misc.erl           | 127 ++++++++++++++++++++++++++++++-
 2 files changed, 127 insertions(+), 1 deletion(-)

diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl
index b91aae9..8d2c280 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -19,6 +19,7 @@ url_handler(<<"favicon.ico">>)     -> fun chttpd_misc:handle_favicon_req/1;
 url_handler(<<"_utils">>)          -> fun chttpd_misc:handle_utils_dir_req/1;
 url_handler(<<"_all_dbs">>)        -> fun chttpd_misc:handle_all_dbs_req/1;
 url_handler(<<"_active_tasks">>)   -> fun chttpd_misc:handle_task_status_req/1;
+url_handler(<<"_scheduler">>)      -> fun chttpd_misc:handle_scheduler_req/1;
 url_handler(<<"_node">>)           -> fun chttpd_misc:handle_node_req/1;
 url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1;
 url_handler(<<"_replicate">>)      -> fun chttpd_misc:handle_replicate_req/1;
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index cfeeb3f..b39a85c 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -21,13 +21,16 @@
     handle_reload_query_servers_req/1,
     handle_system_req/1,
     handle_task_status_req/1,
+    handle_scheduler_req/1,
     handle_up_req/1,
     handle_utils_dir_req/1,
     handle_utils_dir_req/2,
     handle_uuids_req/1,
     handle_welcome_req/1,
     handle_welcome_req/2,
-    get_stats/0
+    get_stats/0,
+    parse_int_param/5,
+    parse_replication_state_filter/1
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
@@ -37,6 +40,13 @@
     [send_json/2,send_json/3,send_method_not_allowed/2,
     send_chunk/2,start_chunked_response/3]).
 
+
+-record(rep_docs_acc, {prepend, resp, count, skip, limit}).
+
+-define(DEFAULT_TASK_LIMIT, 100).
+-define(DEFAULT_DOCS_LIMIT, 100).
+-define(REPDB, <<"_replicator">>).
+
 % httpd global handlers
 
 handle_welcome_req(Req) ->
@@ -150,6 +160,57 @@ handle_task_status_req(#httpd{method='GET'}=Req) ->
 handle_task_status_req(Req) ->
     send_method_not_allowed(Req, "GET,HEAD").
 
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) ->
+    Limit = parse_int_param(Req, "limit", ?DEFAULT_TASK_LIMIT, 0, infinity),
+    Skip = parse_int_param(Req, "skip", 0, 0, infinity),
+    {Replies, _BadNodes} = rpc:multicall(couch_replicator_scheduler, jobs, []),
+    Flatlist = lists:concat(Replies),
+    Sorted = lists:sort(fun({[{id,A}|_]},{[{id,B}|_]}) -> A =< B end, Flatlist),
+    Total = length(Sorted),
+    Offset = min(Skip, Total),
+    Sublist = lists:sublist(Sorted, Offset+1, Limit),
+    Sublist1 = [update_db_name(Task) || Task <- Sublist],
+    send_json(Req, {[{total, Total}, {offset, Offset}, {jobs, Sublist1}]});
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req) ->
+    case couch_replicator:job(JobId) of
+        {ok, JobInfo} ->
+            send_json(Req, update_db_name(JobInfo));
+        {error, not_found} ->
+            throw(not_found)
+    end;
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>]}=Req) ->
+    Limit = parse_int_param(Req, "limit", ?DEFAULT_DOCS_LIMIT, 0, infinity),
+    Skip = parse_int_param(Req, "skip", 0, 0, infinity),
+    States = parse_replication_state_filter(chttpd:qs_value(Req, "states")),
+    SkipStr = integer_to_list(Skip),
+    Preamble = ["{\r\n\"offset\": ", SkipStr, ",\r\n\"docs\": ["],
+    {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], Preamble),
+    Fun = fun stream_doc_info_cb/2,
+    Acc = #rep_docs_acc{
+        prepend = "\r\n",
+        resp = Resp,
+        count = 0,
+        skip = Skip,
+        limit = Limit
+    },
+    Acc1 = couch_replicator:stream_active_docs_info(Fun, Acc, States),
+    Acc2 = couch_replicator:stream_terminal_docs_info(?REPDB, Fun, Acc1, States),
+    #rep_docs_acc{resp = Resp1, count = Total}  = Acc2,
+    TotalStr = integer_to_list(Total),
+    Postamble = ["\r\n],\r\n\"total\": ", TotalStr, "\r\n}\r\n"],
+    {ok, Resp2} = chttpd:send_delayed_chunk(Resp1, Postamble),
+    chttpd:end_delayed_json_response(Resp2);
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,DocId]}=Req) ->
+    UserCtx = Req#httpd.user_ctx,
+    case couch_replicator:doc(?REPDB, DocId, UserCtx#user_ctx.roles) of
+        {ok, DocInfo} ->
+            send_json(Req, DocInfo);
+        {error, not_found} ->
+            throw(not_found)
+    end;
+handle_scheduler_req(Req) ->
+    send_method_not_allowed(Req, "GET,HEAD").
+
 handle_replicate_req(#httpd{method='POST', user_ctx=Ctx} = Req) ->
     chttpd:validate_ctype(Req, "application/json"),
     %% see HACK in chttpd.erl about replication
@@ -439,3 +500,67 @@ message_queues(Registered) ->
         {Type, Length} = process_info(whereis(Name), Type),
         {Name, Length}
     end, Registered).
+
+stream_doc_info_cb(Info, Acc) ->
+    #rep_docs_acc{
+        resp = Resp,
+        prepend = Pre,
+        count = Count,
+        skip = Skip,
+        limit = Limit
+    } = Acc,
+    case Count >= Skip andalso Count < (Skip + Limit) of
+    true ->
+        Chunk = [Pre, ?JSON_ENCODE(update_db_name(Info))],
+        {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
+        Acc#rep_docs_acc{resp = Resp1, prepend =  ",\r\n", count = Count + 1};
+    false ->
+        Acc#rep_docs_acc{count = Count + 1}
+    end.
+
+update_db_name({Props}) ->
+    {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props),
+    {[{database, normalize_db_name(DbName)} | Props1]}.
+
+normalize_db_name(<<"shards/", _/binary>> = DbName) ->
+    mem3:dbname(DbName);
+normalize_db_name(DbName) ->
+    DbName.
+
+parse_replication_state_filter(undefined) ->
+    [];  % This is the default (wildcard) filter
+parse_replication_state_filter(States) when is_list(States) ->
+    AllStates = couch_replicator:replication_states(),
+    StrStates = [string:to_lower(S) || S <- string:tokens(States, ",")],
+    AtomStates = try
+        [list_to_existing_atom(S) || S <- StrStates]
+    catch error:badarg ->
+        Msg1 = io_lib:format("States must be one or more of ~w", [AllStates]),
+        throw({query_parse_error, ?l2b(Msg1)})
+    end,
+    AllSet = sets:from_list(AllStates),
+    StatesSet = sets:from_list(AtomStates),
+    Diff = sets:to_list(sets:subtract(StatesSet, AllSet)),
+    case Diff of
+    [] ->
+        AtomStates;
+    _ ->
+        Args = [Diff, AllStates],
+        Msg2 = io_lib:format("Unknown states ~w. Choose from: ~w", Args),
+        throw({query_parse_error, ?l2b(Msg2)})
+    end.
+
+parse_int_param(Req, Param, Default, Min, Max) ->
+    IntVal = try
+        list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default)))
+    catch error:badarg ->
+        Msg1 = io_lib:format("~s must be an integer", [Param]),
+        throw({query_parse_error, ?l2b(Msg1)})
+    end,
+    case IntVal >= Min andalso IntVal =< Max of
+    true ->
+        IntVal;
+    false ->
+        Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]),
+        throw({query_parse_error, ?l2b(Msg2)})
+    end.

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.