You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/03/14 19:25:59 UTC
[04/50] couch-replicator commit: updated refs/heads/63012-scheduler
to 27a5eae
Add support for _scheduler/jobs/<jobid> and _scheduler/docs/<docid>
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/f1140e94
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/f1140e94
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/f1140e94
Branch: refs/heads/63012-scheduler
Commit: f1140e941d8e417f4f271f2b3c3f81d4f09a2e67
Parents: a1d7554
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Fri Oct 21 15:44:53 2016 -0400
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Fri Oct 21 15:55:44 2016 -0400
----------------------------------------------------------------------
src/couch_replicator.erl | 51 +++++++++++++++
src/couch_replicator_doc_processor.erl | 23 ++++++-
src/couch_replicator_docs.erl | 3 +-
src/couch_replicator_ids.erl | 13 +++-
src/couch_replicator_scheduler.erl | 99 +++++++++++++++++------------
5 files changed, 143 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index b888f82..0bfd9f6 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -15,6 +15,7 @@
-export([replicate/2, ensure_rep_db_exists/0]).
-export([stream_active_docs_info/3, stream_terminal_docs_info/4]).
-export([replication_states/0]).
+-export([job/1, doc/3]).
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator.hrl").
@@ -230,3 +231,53 @@ filter_replicator_doc_query(_DocState, []) ->
true;
filter_replicator_doc_query(State, States) when is_list(States) ->
lists:member(State, States).
+
+
+-spec job(binary()) -> {ok, {[_]}} | {error, not_found}.
+job(JobId0) when is_binary(JobId0) ->
+ JobId = couch_replicator_ids:convert(JobId0),
+ {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]),
+ case [JobInfo || {ok, JobInfo} <- Res] of
+ [JobInfo| _] ->
+ {ok, JobInfo};
+ [] ->
+ {error, not_found}
+ end.
+
+
+-spec doc(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
+doc(RepDb, DocId, UserCtx) ->
+ {Res, _Bad} = rpc:multicall(couch_replicator_doc_processor, doc, [RepDb, DocId]),
+ case [DocInfo || {ok, DocInfo} <- Res] of
+ [DocInfo| _] ->
+ {ok, DocInfo};
+ [] ->
+ doc_from_db(RepDb, DocId, UserCtx)
+ end.
+
+
+-spec doc_from_db(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
+doc_from_db(RepDb, DocId, UserCtx) ->
+ case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
+ {ok, Doc} ->
+ {Props} = couch_doc:to_json_obj(Doc, []),
+ State = couch_util:get_value(<<"_replication_state">>, Props, null),
+ {StateInfo, ErrorCount} = case State of
+ <<"completed">> ->
+ {couch_util:get_value(<<"_replication_stats">>, Props, null), 0};
+ <<"failed">> ->
+ {couch_util:get_value(<<"_replication_state_reason">>, Props, null), 1};
+ _OtherState ->
+ {null, 0}
+ end,
+ {ok, {[
+ {doc_id, DocId},
+ {database, RepDb},
+ {id, null},
+ {state, State},
+ {error_count, ErrorCount},
+ {info, StateInfo}
+ ]}};
+ {not_found, _Reason} ->
+ {error, not_found}
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 60a3ad1..0349fa6 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -14,7 +14,7 @@
-behaviour(couch_multidb_changes).
-export([start_link/0]).
--export([docs/1]).
+-export([docs/1, doc/2]).
% multidb changes callback
-export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
@@ -375,6 +375,27 @@ docs(States) ->
end, [], ?MODULE).
+-spec doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
+doc(Db, DocId) ->
+ HealthThreshold = couch_replicator_scheduler:health_threshold(),
+ Res = ets:foldl(fun(_RDoc, [_] = Acc) -> Acc;
+ (RDoc, []) ->
+ {Shard, RDocId} = RDoc#rdoc.id,
+ case {mem3:dbname(Shard), RDocId} of
+ {Db, DocId} ->
+ [ejson_doc(RDoc, HealthThreshold)];
+ {_OtherDb, _OtherDocId} ->
+ []
+ end
+ end, [], ?MODULE),
+ case Res of
+ [DocInfo] ->
+ {ok, DocInfo};
+ [] ->
+ {error, not_found}
+ end.
+
+
-spec ejson_state_info(binary() | nil) -> binary() | null.
ejson_state_info(nil) ->
null;
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index 777c8a4..aeb9219 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -438,8 +438,7 @@ convert_options([{<<"cancel">>, V} | R]) ->
[{cancel, V} | convert_options(R)];
convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
- Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)),
- [{id, Id} | convert_options(R)];
+ [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)->
throw({bad_request, <<"parameter `create_target` must be a boolean">>});
convert_options([{<<"create_target">>, V} | R]) ->
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_ids.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_ids.erl b/src/couch_replicator_ids.erl
index 50760b0..565ed9d 100644
--- a/src/couch_replicator_ids.erl
+++ b/src/couch_replicator_ids.erl
@@ -12,7 +12,7 @@
-module(couch_replicator_ids).
--export([replication_id/1, replication_id/2]).
+-export([replication_id/1, replication_id/2, convert/1]).
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator_api_wrap.hrl").
@@ -67,6 +67,17 @@ replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
maybe_append_filters([HostName, Src, Tgt], Rep).
+-spec convert([_] | binary() | {string(), string()}) -> {string(), string()}.
+convert(Id) when is_list(Id) ->
+ convert(?l2b(Id));
+
+convert(Id) when is_binary(Id) ->
+ lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
+
+convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
+ Id.
+
+
% Private functions
maybe_append_filters(Base,
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f1140e94/src/couch_replicator_scheduler.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl
index 3ca417a..b43b36c 100644
--- a/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator_scheduler.erl
@@ -24,7 +24,7 @@
-export([start_link/0, add_job/1, remove_job/1, reschedule/0]).
-export([rep_state/1, find_jobs_by_dbname/1, find_jobs_by_doc/2]).
-export([job_summary/2, health_threshold/0]).
--export([jobs/0]).
+-export([jobs/0, job/1]).
%% gen_server callbacks
-export([init/1, terminate/2, code_change/3]).
@@ -748,48 +748,63 @@ ejson_url(DbName) when is_binary(DbName) ->
DbName.
+-spec job_ejson(#job{}) -> {[_ | _]}.
+job_ejson(Job) ->
+ Rep = Job#job.rep,
+ Source = ejson_url(Rep#rep.source),
+ Target = ejson_url(Rep#rep.target),
+ History = lists:map(fun(Event) ->
+ EventProps = case Event of
+ {{crashed, Reason}, _When} ->
+ [{type, crashed}, {reason, crash_reason_json(Reason)}];
+ {Type, _When} ->
+ [{type, Type}]
+ end,
+ {_Type, {_Mega, _Sec, Micros}=When} = Event,
+ {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(When),
+ ISO8601 = iolist_to_binary(io_lib:format(
+ "~B-~2..0B-~2..0BT~2..0B-~2..0B-~2..0B.~BZ",
+ [Y,Mon,D,H,Min,S,Micros]
+ )),
+ {[{timestamp, ISO8601} | EventProps]}
+ end, Job#job.history),
+ {BaseID, Ext} = Job#job.id,
+ Pid = case Job#job.pid of
+ undefined ->
+ null;
+ P when is_pid(P) ->
+ ?l2b(pid_to_list(P))
+ end,
+ {[
+ {id, iolist_to_binary([BaseID, Ext])},
+ {pid, Pid},
+ {source, iolist_to_binary(Source)},
+ {target, iolist_to_binary(Target)},
+ {database, Rep#rep.db_name},
+ {user, (Rep#rep.user_ctx)#user_ctx.name},
+ {doc_id, Rep#rep.doc_id},
+ {history, History},
+ {node, node()}
+ ]}.
+
+
-spec jobs() -> [[tuple()]].
jobs() ->
ets:foldl(fun(Job, Acc) ->
- Rep = Job#job.rep,
- Source = ejson_url(Rep#rep.source),
- Target = ejson_url(Rep#rep.target),
- History = lists:map(fun(Event) ->
- EventProps = case Event of
- {{crashed, Reason}, _When} ->
- [{type, crashed}, {reason, crash_reason_json(Reason)}];
- {Type, _When} ->
- [{type, Type}]
- end,
- {_Type, {_Mega, _Sec, Micros}=When} = Event,
- {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(When),
- ISO8601 = iolist_to_binary(io_lib:format(
- "~B-~2..0B-~2..0BT~2..0B-~2..0B-~2..0B.~BZ",
- [Y,Mon,D,H,Min,S,Micros]
- )),
- {[{timestamp, ISO8601} | EventProps]}
- end, Job#job.history),
- {BaseID, Ext} = Job#job.id,
- Pid = case Job#job.pid of
- undefined ->
- null;
- P when is_pid(P) ->
- ?l2b(pid_to_list(P))
- end,
- [{[
- {id, iolist_to_binary([BaseID, Ext])},
- {pid, Pid},
- {source, iolist_to_binary(Source)},
- {target, iolist_to_binary(Target)},
- {database, Rep#rep.db_name},
- {user, (Rep#rep.user_ctx)#user_ctx.name},
- {doc_id, Rep#rep.doc_id},
- {history, History},
- {node, node()}
- ]} | Acc]
+ [job_ejson(Job) | Acc]
end, [], couch_replicator_scheduler).
+-spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}.
+job(JobId) ->
+ case job_by_id(JobId) of
+ {ok, Job} ->
+ {ok, job_ejson(Job)};
+ Error ->
+ Error
+ end.
+
+
crash_reason_json({_CrashType, Info}) when is_binary(Info) ->
Info;
crash_reason_json(Reason) when is_binary(Reason) ->
@@ -884,7 +899,7 @@ latest_crash_timestamp_test_() ->
last_started_test_() ->
- [?_assertEqual({0, R, 0}, last_started(job(H))) || {R, H} <- [
+ [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [
{0, [added()]},
{0, [crashed(1)]},
{1, [started(1)]},
@@ -895,9 +910,9 @@ last_started_test_() ->
oldest_job_first_test() ->
- J0 = job([crashed()]),
- J1 = job([started(1)]),
- J2 = job([started(2)]),
+ J0 = testjob([crashed()]),
+ J1 = testjob([started(1)]),
+ J2 = testjob([started(2)]),
Sort = fun(Jobs) -> lists:sort(fun oldest_job_first/2, Jobs) end,
?assertEqual([], Sort([])),
?assertEqual([J1], Sort([J1])),
@@ -1312,7 +1327,7 @@ oneshot_running(Id) when is_integer(Id) ->
}.
-job(Hist) when is_list(Hist) ->
+testjob(Hist) when is_list(Hist) ->
#job{history = Hist}.