You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/03/14 19:25:59 UTC

[04/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Add support for _scheduler/jobs/<jobid> and _scheduler/docs/<docid>


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/f1140e94
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/f1140e94
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/f1140e94

Branch: refs/heads/63012-scheduler
Commit: f1140e941d8e417f4f271f2b3c3f81d4f09a2e67
Parents: a1d7554
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Fri Oct 21 15:44:53 2016 -0400
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Fri Oct 21 15:55:44 2016 -0400

----------------------------------------------------------------------
 src/couch_replicator.erl               | 51 +++++++++++++++
 src/couch_replicator_doc_processor.erl | 23 ++++++-
 src/couch_replicator_docs.erl          |  3 +-
 src/couch_replicator_ids.erl           | 13 +++-
 src/couch_replicator_scheduler.erl     | 99 +++++++++++++++++------------
 5 files changed, 143 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index b888f82..0bfd9f6 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -15,6 +15,7 @@
 -export([replicate/2, ensure_rep_db_exists/0]).
 -export([stream_active_docs_info/3, stream_terminal_docs_info/4]).
 -export([replication_states/0]).
+-export([job/1, doc/3]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator.hrl").
@@ -230,3 +231,53 @@ filter_replicator_doc_query(_DocState, []) ->
     true;
 filter_replicator_doc_query(State, States) when is_list(States) ->
     lists:member(State, States).
+
+
+-spec job(binary()) -> {ok, {[_]}} | {error, not_found}.
+job(JobId0) when is_binary(JobId0) ->
+    JobId = couch_replicator_ids:convert(JobId0),
+    {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]),
+    case [JobInfo || {ok, JobInfo} <- Res] of
+        [JobInfo| _] ->
+            {ok, JobInfo};
+        [] ->
+            {error, not_found}
+    end.
+
+
+-spec doc(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
+doc(RepDb, DocId, UserCtx) ->
+    {Res, _Bad} = rpc:multicall(couch_replicator_doc_processor, doc, [RepDb, DocId]),
+    case [DocInfo || {ok, DocInfo} <- Res] of
+        [DocInfo| _] ->
+            {ok, DocInfo};
+        [] ->
+            doc_from_db(RepDb, DocId, UserCtx)
+    end.
+
+
+-spec doc_from_db(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
+doc_from_db(RepDb, DocId, UserCtx) ->
+    case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
+        {ok, Doc} ->
+            {Props} = couch_doc:to_json_obj(Doc, []),
+            State = couch_util:get_value(<<"_replication_state">>, Props, null),
+            {StateInfo, ErrorCount} = case State of
+                <<"completed">> ->
+                    {couch_util:get_value(<<"_replication_stats">>, Props, null), 0};
+                <<"failed">> ->
+                    {couch_util:get_value(<<"_replication_state_reason">>, Props, null), 1};
+                _OtherState ->
+                    {null, 0}
+            end,
+            {ok, {[
+                {doc_id, DocId},
+                {database, RepDb},
+                {id, null},
+                {state, State},
+                {error_count, ErrorCount},
+                {info, StateInfo}
+            ]}};
+         {not_found, _Reason} ->
+            {error, not_found}
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 60a3ad1..0349fa6 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -14,7 +14,7 @@
 -behaviour(couch_multidb_changes).
 
 -export([start_link/0]).
--export([docs/1]).
+-export([docs/1, doc/2]).
 
 % multidb changes callback
 -export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
@@ -375,6 +375,27 @@ docs(States) ->
     end, [], ?MODULE).
 
 
+-spec doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
+doc(Db, DocId) ->
+    HealthThreshold = couch_replicator_scheduler:health_threshold(),
+    Res = ets:foldl(fun(_RDoc, [_] = Acc) -> Acc;
+        (RDoc, []) ->
+            {Shard, RDocId} = RDoc#rdoc.id,
+            case {mem3:dbname(Shard), RDocId} of
+                {Db, DocId} ->
+                    [ejson_doc(RDoc, HealthThreshold)];
+                {_OtherDb, _OtherDocId} ->
+                    []
+            end
+    end, [], ?MODULE),
+    case Res of
+        [DocInfo] ->
+            {ok, DocInfo};
+        [] ->
+            {error, not_found}
+    end.
+
+
 -spec ejson_state_info(binary() | nil) -> binary() | null.
 ejson_state_info(nil) ->
     null;

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index 777c8a4..aeb9219 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -438,8 +438,7 @@ convert_options([{<<"cancel">>, V} | R]) ->
     [{cancel, V} | convert_options(R)];
 convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
         IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
-    Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)),
-    [{id, Id} | convert_options(R)];
+    [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
 convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)->
     throw({bad_request, <<"parameter `create_target` must be a boolean">>});
 convert_options([{<<"create_target">>, V} | R]) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_ids.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_ids.erl b/src/couch_replicator_ids.erl
index 50760b0..565ed9d 100644
--- a/src/couch_replicator_ids.erl
+++ b/src/couch_replicator_ids.erl
@@ -12,7 +12,7 @@
 
 -module(couch_replicator_ids).
 
--export([replication_id/1, replication_id/2]).
+-export([replication_id/1, replication_id/2, convert/1]).
 
 -include_lib("couch/include/couch_db.hrl").
 -include("couch_replicator_api_wrap.hrl").
@@ -67,6 +67,17 @@ replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
     maybe_append_filters([HostName, Src, Tgt], Rep).
 
 
+-spec convert([_] | binary() | {string(), string()}) -> {string(), string()}.
+convert(Id) when is_list(Id) ->
+    convert(?l2b(Id));
+
+convert(Id) when is_binary(Id) ->
+    lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
+
+convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
+    Id.
+
+
 % Private functions
 
 maybe_append_filters(Base,

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_scheduler.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl
index 3ca417a..b43b36c 100644
--- a/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator_scheduler.erl
@@ -24,7 +24,7 @@
 -export([start_link/0, add_job/1, remove_job/1, reschedule/0]).
 -export([rep_state/1, find_jobs_by_dbname/1, find_jobs_by_doc/2]).
 -export([job_summary/2, health_threshold/0]).
--export([jobs/0]).
+-export([jobs/0, job/1]).
 
 %% gen_server callbacks
 -export([init/1, terminate/2, code_change/3]).
@@ -748,48 +748,63 @@ ejson_url(DbName) when is_binary(DbName) ->
     DbName.
 
 
+-spec job_ejson(#job{}) -> {[_ | _]}.
+job_ejson(Job) ->
+    Rep = Job#job.rep,
+    Source = ejson_url(Rep#rep.source),
+    Target = ejson_url(Rep#rep.target),
+    History = lists:map(fun(Event) ->
+    EventProps  = case Event of
+        {{crashed, Reason}, _When} ->
+            [{type, crashed}, {reason, crash_reason_json(Reason)}];
+        {Type, _When} ->
+            [{type, Type}]
+        end,
+        {_Type, {_Mega, _Sec, Micros}=When} = Event,
+        {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(When),
+        ISO8601 = iolist_to_binary(io_lib:format(
+            "~B-~2..0B-~2..0BT~2..0B-~2..0B-~2..0B.~BZ",
+            [Y,Mon,D,H,Min,S,Micros]
+        )),
+        {[{timestamp, ISO8601} | EventProps]}
+    end, Job#job.history),
+    {BaseID, Ext} = Job#job.id,
+    Pid = case Job#job.pid of
+        undefined ->
+            null;
+        P when is_pid(P) ->
+            ?l2b(pid_to_list(P))
+    end,
+    {[
+        {id, iolist_to_binary([BaseID, Ext])},
+        {pid, Pid},
+        {source, iolist_to_binary(Source)},
+        {target, iolist_to_binary(Target)},
+        {database, Rep#rep.db_name},
+        {user, (Rep#rep.user_ctx)#user_ctx.name},
+        {doc_id, Rep#rep.doc_id},
+        {history, History},
+        {node, node()}
+    ]}.
+
+
 -spec jobs() -> [[tuple()]].
 jobs() ->
     ets:foldl(fun(Job, Acc) ->
-        Rep = Job#job.rep,
-        Source = ejson_url(Rep#rep.source),
-        Target = ejson_url(Rep#rep.target),
-        History = lists:map(fun(Event) ->
-            EventProps  = case Event of
-                {{crashed, Reason}, _When} ->
-                    [{type, crashed}, {reason, crash_reason_json(Reason)}];
-                {Type, _When} ->
-                    [{type, Type}]
-            end,
-            {_Type, {_Mega, _Sec, Micros}=When} = Event,
-            {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(When),
-            ISO8601 = iolist_to_binary(io_lib:format(
-                "~B-~2..0B-~2..0BT~2..0B-~2..0B-~2..0B.~BZ",
-                [Y,Mon,D,H,Min,S,Micros]
-            )),
-            {[{timestamp, ISO8601} | EventProps]}
-        end, Job#job.history),
-        {BaseID, Ext} = Job#job.id,
-        Pid = case Job#job.pid of
-            undefined ->
-                null;
-            P when is_pid(P) ->
-                ?l2b(pid_to_list(P))
-        end,
-        [{[
-            {id, iolist_to_binary([BaseID, Ext])},
-            {pid, Pid},
-            {source, iolist_to_binary(Source)},
-            {target, iolist_to_binary(Target)},
-            {database, Rep#rep.db_name},
-            {user, (Rep#rep.user_ctx)#user_ctx.name},
-            {doc_id, Rep#rep.doc_id},
-            {history, History},
-            {node, node()}
-        ]} | Acc]
+        [job_ejson(Job) | Acc]
     end, [], couch_replicator_scheduler).
 
 
+-spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}.
+job(JobId) ->
+    case job_by_id(JobId) of
+        {ok, Job} ->
+            {ok, job_ejson(Job)};
+        Error ->
+            Error
+    end.
+
+
 crash_reason_json({_CrashType, Info}) when is_binary(Info) ->
     Info;
 crash_reason_json(Reason) when is_binary(Reason) ->
@@ -884,7 +899,7 @@ latest_crash_timestamp_test_() ->
 
 
 last_started_test_() ->
-    [?_assertEqual({0, R, 0}, last_started(job(H))) || {R, H} <- [
+    [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [
          {0, [added()]},
          {0, [crashed(1)]},
          {1, [started(1)]},
@@ -895,9 +910,9 @@ last_started_test_() ->
 
 
 oldest_job_first_test() ->
-    J0 = job([crashed()]),
-    J1 = job([started(1)]),
-    J2 = job([started(2)]),
+    J0 = testjob([crashed()]),
+    J1 = testjob([started(1)]),
+    J2 = testjob([started(2)]),
     Sort = fun(Jobs) -> lists:sort(fun oldest_job_first/2, Jobs) end,
     ?assertEqual([], Sort([])),
     ?assertEqual([J1], Sort([J1])),
@@ -1312,7 +1327,7 @@ oneshot_running(Id) when is_integer(Id) ->
     }.
 
 
-job(Hist) when is_list(Hist) ->
+testjob(Hist) when is_list(Hist) ->
     #job{history = Hist}.