You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2019/07/30 16:58:53 UTC
[couchdb] branch prototype/fdb-replicator updated: WIP 3
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch prototype/fdb-replicator
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/prototype/fdb-replicator by this push:
new 17f6c23 WIP 3
17f6c23 is described below
commit 17f6c235a6d366f68cfdc1c29a8e9a9fbff4ad24
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Jul 30 12:55:44 2019 -0400
WIP 3
* Local endpoints have been cleaned up
* #rep{} records parsed into #{}
- #httpdb{} records also parsed (include ibrowse options + ssl opts)
* #httpdb{} endpoint records are initialized from #{}
* _replicate logic (adding + canceling) interface
* _replicator doc update handler job adding logic
- listen for db creates
- listen for db deletes
- listen for doc updates
---
src/couch_jobs/src/couch_jobs.erl | 17 ++
src/couch_replicator/src/couch_replicator.erl | 50 ++--
src/couch_replicator/src/couch_replicator.hrl | 1 +
.../src/couch_replicator_api_wrap.erl | 45 +++-
.../src/couch_replicator_doc_processor.erl | 229 ++++++++---------
src/couch_replicator/src/couch_replicator_docs.erl | 280 ++++++++++++---------
.../src/couch_replicator_filters.erl | 42 ++--
src/couch_replicator/src/couch_replicator_ids.erl | 46 ++--
.../src/couch_replicator_scheduler_job.erl | 4 +-
.../src/couch_replicator_utils.erl | 75 +-----
.../src/couch_replicator_worker.erl | 140 +----------
11 files changed, 418 insertions(+), 511 deletions(-)
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
index d469ed4..393f256 100644
--- a/src/couch_jobs/src/couch_jobs.erl
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -19,6 +19,8 @@
remove/3,
get_job_data/3,
get_job_state/3,
+ get_jobs/2,
+ get_jobs/3,
% Job processing
accept/1,
@@ -103,6 +105,21 @@ get_job_state(Tx, Type, JobId) when is_binary(JobId) ->
end).
+-spec get_jobs(jtx(), job_type()) -> #{}.
+get_jobs(Tx, Type) when is_binary(JobId) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_job_fdb:get_jobs(JTx, Type)
+ end).
+
+
+-spec get_jobs(jtx(), job_type()) -> #{}.
+get_jobs(Tx, Type, Filter) when is_binary(JobId), is_function(Filter, 1) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_job_fdb:get_jobs(JTx, Type, Filter)
+ end).
+
+
+
%% Job processor API
-spec accept(job_type()) -> {ok, job(), job_data()} | {error, any()}.
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 3dee919..cb50df7 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -112,17 +112,18 @@ wait_for_result(RepId) ->
-spec cancel_replication(rep_id()) ->
{ok, {cancelled, binary()}} | {error, not_found}.
-cancel_replication({BasedId, Extension} = RepId) ->
- FullRepId = BasedId ++ Extension,
- couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
- case couch_replicator_scheduler:rep_state(RepId) of
- #rep{} ->
- ok = couch_replicator_scheduler:remove_job(RepId),
- couch_log:notice("Replication '~s' cancelled", [FullRepId]),
- {ok, {cancelled, ?l2b(FullRepId)}};
- nil ->
- couch_log:notice("Replication '~s' not found", [FullRepId]),
- {error, not_found}
+cancel_replication(RepId) when is_binary(RepId) ->
+ couch_log:notice("Canceling replication '~s' ...", [RepId]),
+ case couch_jobs:get_job_data(undefined, ?REP_JOBS, RepId) of
+ {error_not, found} ->
+ {error, not_found};
+ #{<<"rep">> := #{<<"db_name">> := null}} ->
+ couch_jobs:remove(undefined, ?REP_JOBS, RepId)
+ {ok, {cancelled, ?l2b(FullRepId)}};
+ #{<<"rep">> := #{}} ->
+ % Job was started from a replicator doc canceling via _replicate
+ % doesn't quite make sense, instead replicator should be deleted.
+ {error, not_found}
end.
@@ -131,10 +132,10 @@ replication_states() ->
?REPLICATION_STATES.
--spec strip_url_creds(binary() | {[_]}) -> binary().
+-spec strip_url_creds(binary() | #{}) -> binary().
strip_url_creds(Endpoint) ->
- case couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
- #httpdb{url=Url} ->
+ case couch_replicator_docs:parse_rep_db(Endpoint, #{}, #{}) of
+ #{<<"url">> := Url} ->
iolist_to_binary(couch_util:url_strip_password(Url));
LocalDb when is_binary(LocalDb) ->
LocalDb
@@ -275,13 +276,13 @@ state_atom(State) when is_atom(State) ->
-spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
- case couch_replicator_scheduler:rep_state(RepId) of
- #{<<"user_ctx">> := #{<<"name">> := Name}} ->
- ok;
- #{} ->
- couch_httpd:verify_is_server_admin(Ctx);
- nil ->
- not_found
+ case couch_jobs:get_job_data(undefined, ?REP_JOBS, RePid) of
+ {error_not, found} ->
+ not_found;
+ #{<<"rep">> := {<<"user">> := Name}} ->
+ ok;
+ #{} ->
+ couch_httpd:verify_is_server_admin(Ctx)
end.
@@ -331,13 +332,6 @@ t_replication_not_found() ->
end).
-expect_rep_user_ctx(Name, Role) ->
- meck:expect(couch_replicator_scheduler, rep_state,
- fun(_Id) ->
- UserCtx = #user_ctx{name = Name, roles = [Role]},
- #rep{user_ctx = UserCtx}
- end).
-
strip_url_creds_test_() ->
{
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index 30cb485..6584078 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -31,6 +31,7 @@
}).
-type rep_id() :: binary().
+-type user_name() :: binary() | null.
-type db_doc_id() :: {binary(), binary() | '_'}.
-type seconds() :: non_neg_integer().
-type rep_start_result() ::
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 24ec99b..66319bf 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -23,8 +23,8 @@
-include("couch_replicator_api_wrap.hrl").
-export([
- db_open/2,
- db_open/4,
+ db_open/1,
+ db_open/3,
db_close/1,
get_db_info/1,
get_pending_count/2,
@@ -64,11 +64,11 @@ db_uri(#httpdb{url = Url}) ->
couch_util:url_strip_password(Url).
-db_open(Db, #{} = UserCtxMap) when is_map(Db) orelse is_binary(Db) ->
- db_open(Db, #{} = UserCtxMap, false, []);
+db_open(#{} = Db) ->
+ db_open(Db, false, []);
-db_open(#{} = Db0, #httpdb{} = Db1, #{} =_UserCtxMap, Create, CreateParams) ->
+db_open(#{} = Db0, Create, CreateParams) ->
{ok, Db} = couch_replicator_httpc:setup(db_from_json(Db0)),
try
case Create of
@@ -910,6 +910,14 @@ db_from_json(#{} = DbMap) ->
[{binary_to_list(K), binary_to_list(V)} | Acc]
end, [], Headers0),
IBrowseOptions0 = maps:fold(fun
+ (<<"proxy_protocol">>, V, Acc) ->
+ [{binary_to_atom(K), binary_to_existing_atom(V)} | Acc];
+ (<<"socket_options">>, #{} = SockOpts, Acc) ->
+ SockOptsKVs = maps:fold(fun sock_opts_fold/3, [], SockOpts),
+ [{socket_options, SockOptsKVs} | Acc];
+ (<<"ssl_options">>, #{} = SslOpts, Acc) ->
+ SslOptsKVs = maps:fold(fun ssl_opts_fold/3, [], SslOpts),
+ [{ssl_options, SslOptsKVs} | Acc];
(K, V, Acc) when is_binary(V) ->
[{binary_to_atom(K), binary_to_list(V)} | Acc];
(K, V, Acc) ->
@@ -929,3 +937,30 @@ db_from_json(#{} = DbMap) ->
retries = Retries,
proxy_url = ProxyURL
}.
+
+
+
+% See couch_replicator_docs:ssl_params/1 for ssl parsed options
+% and http://erlang.org/doc/man/ssl.html#type-server_option
+% all latest SSL server options
+%
+ssl_opts_fold(K, V, Acc) when is_boolean(V); is_integer(V) ->
+ [{binary_to_atom(K), V} | Acc];
+
+ssl_opts_fold(K, null, Acc) ->
+ [{binary_to_atom(K), undefined} | Acc];
+
+ssl_opts_fold(<<"verify">>, V, Acc) ->
+ [{binary_to_atom(K), binary_to_atom(V)};
+
+ssl_opts_fold(K, V, Acc) when is_list(V) ->
+ [{binary_to_atom(K), binary_to_list(V)} | Acc].
+
+
+% See ?VALID_SOCK_OPTS in couch_replicator_docs for accepted socket options
+%
+sock_opts_fold(K, V, Acc) when is_list(V) ->
+ [{binary_to_atom(K), binary_to_atom(V)} | Acc];
+
+sock_opts_fold(K, V, Acc) when is_boolean(V); is_integer(V) ->
+ [{binary_to_atom(K), V} | Acc].
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index a2887a0..74ec014 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -75,16 +75,7 @@
during_doc_update(#doc{} = Doc, Db, _UpdateType) ->
couch_stats:increment_counter([couch_replicator, docs, db_changes]),
- try
- process_change(Db, Doc)
- catch
- _Tag:Error ->
- DocId = Doc#doc.id,
- #{name := DbName} = Db,
- couch_replicator_docs:update_failed(DbName, DocId, Error)
- end,
- ok.
-
+ ok = process_change(Db, Doc).
after_db_create(#{name := DbName}) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
@@ -93,11 +84,7 @@ after_db_create(#{name := DbName}) ->
after_db_delete(#{name := DbName}) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
- gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity).
-
-
-docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
- <<DbName/binary, Id/binary>>.
+ remove_replications_by_dbname(DbName).
process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
@@ -105,86 +92,84 @@ process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
process_change(#{name := DbName} = Db, #doc{deleted = true} = Doc) ->
Id = docs_job_id(DbName, Doc#doc.id),
- case couch_jobs:get_job_data(Db, ?REP_DOCS, Id) of
- {error, not_found} ->
- ok;
- {ok, #{<<"rid">> := null}} ->
- couch_jobs:remove(Db, ?REP_DOCS, Id),
- ok;
- {ok, #{<<"rid">> := RepId}} ->
- couch_jobs:remove(Db, ?REP_JOBS, RepId),
- couch_jobs:remove(Db, ?REP_DOCS, Id),
- ok
- end.
-
+ ok = remove_replication_by_doc_job_id(Db, Id);
process_change(#{name := DbName} = Db, #doc{} = Doc) ->
#doc{id = DocId, body = {Props} = Body} = Doc,
- Id = docs_job_id(DbName, DocId),
- DocState = get_json_value(<<"_replication_state">>, Props),
- {Rep, RepParseError} = try
+ {Rep, RepError} = try
Rep0 = couch_replicator_docs:parse_rep_doc_without_id(Body),
- Rep1 = Rep0#{db_name = DbName, start_time = erlang:system_time()},
+ Rep1 = Rep0#{
+ <<"db_name">> => DbName,
+ <<"start_time">> => erlang:system_time()
+ },
{Rep1, null}
catch
throw:{bad_rep_doc, Reason} ->
- {null, Reason}
+ {null, couch_replicator_utils:rep_error_to_binary(Reason)}
end,
- RepMap = couch_replicator_docs:rep_to_map(Rep),
- case couch_jobs:get_job_data(Db, ?REP_DOCS, Id) of
+ % We keep track of the doc's state in order to clear it if update_docs
+ % is toggled from true to false
+ DocState = get_json_value(<<"_replication_state">>, Props, null),
+ case couch_jobs:get_job_data(Db, ?REP_DOCS, docs_job_id(DbName, DocId)) of
{error, not_found} ->
- RepDocsJob = #{
- <<"rep_id">> := null,
- <<"db_name">> := DbName,
- <<"doc_id">> := Doc#doc.id,
- <<"rep">> := RepMap,
- <<"rep_parse_error">> := RepParseError
- },
- couch_jobs:add(Db, ?REP_DOCS, RepDocsJob);
- {ok, #{} = Old} ->
- % Normalize old rep and new rep and only update job
- % if changed
- #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError} = Old,
- NOldRep = couch_replicator_util:normalize_rep(OldRep),
- NRep = couch_replicator_util:normalize_rep(Rep),
- RepDocsJob = #{
- <<"rep_id">> := null,
- <<"db_name">> := DbName,
- <<"doc_id">> := Doc#doc.id,
- <<"rep">> := RepMap,
- <<"rep_parse_error">> := RepParseError
- }
+ update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
+ {ok, #{<<"rep">> := null, <<"rep_parse_error">> := RepError}}
+ when Rep =:= null ->
+ % Same error as before occurred, don't bother updating the job
+ ok;
+ {ok, #{<<"rep">> := null}} when Rep =:= null ->
+ % Error occured but it's a different error. Update the job so user
+ % sees the new error
+ update_replication_job(Db, DbName, DocId, Rep, RepError, DocState);
+ {ok, #{<<"rep">> := OldRep, <<"rep_parse_error">> := OldError}} ->
+ NormOldRep = couch_replicator_util:normalize_rep(OldRep),
+ NormRep = couch_replicator_util:normalize_rep(Rep),
+ case NormOldRep == NormRep of
+ true ->
+ % Document was changed but none of the parameters relevent
+ % for the replication job have changed, so make it a no-op
+ ok;
+ false ->
+ update_replication_job(Db, DbName, DocId, Rep, RepError,
+ DocState)
+ end
end.
-process_change(#{name := DbName} = Db, #doc{} = Doc) ->
- #doc{id = DocId, body = {Props} = Body, deleted = Deleted} = Doc,
- Id = {DbName, DocId},
- case Deleted of
- true ->
- process_deleted_doc(Db, Id);
- false ->
- process_updated_doc(Db, Id, Body)
- end.
- State = get_json_value(<<"_replication_state">>, Props),
- case {Deleted, State} of
- {true, _} ->
- ok = gen_server:call(?MODULE, {removed, Id}, infinity);
- {false, undefined} ->
- ok = process_updated(Id, Body);
- {false, <<"triggered">>} ->
- maybe_remove_state_fields(DbName, DocId),
- ok = process_updated(Id, Body);
- {false, <<"completed">>} ->
- ok = gen_server:call(?MODULE, {completed, Id}, infinity);
- {false, <<"error">>} ->
- % Handle replications started from older versions of replicator
- % which wrote transient errors to replication docs
- maybe_remove_state_fields(DbName, DocId),
- ok = process_updated(Id, Body);
- {false, <<"failed">>} ->
- ok
- end.
+rep_docs_job_execute(#{} = Job, #{<<"rep">> := null} = JobData) ->
+ #{
+ <<"rep_parse_error">> := Error,
+ <<"db_name">> := DbName,
+ <<"doc_id">> := DocId,
+ } = JobData,
+ JobData1 = JobData#{
+ <<"finished_state">> := <<"failed">>,
+ <<"finished_result">> := Error
+ }
+ case couch_jobs:finish(undefined, Job, JobData1) of
+ ok ->
+ couch_replicator_docs:update_failed(DbName, DocId, Error),
+ ok;
+ {error, JobError} ->
+ Msg = "Replication ~s job could not finish. JobError:~p",
+ couch_log:error(Msg, [RepId, JobError]),
+ {error, JobError}
+ end;
+
+rep_docs_job_execute(#{} = Job, #{} = JobData) ->
+ #{<<"rep">> := Rep, <<"doc_state">> := DocState} = JobData,
+ case lists:member(DocState, [<<"triggered">>, <<"error">>]) of
+ true -> maybe_remove_state_fields(DbName, DocId),
+ false -> ok
+ end,
+ % completed jobs should finish right away
+
+ % otherwise start computing the replication id
+
+ Rep1 = update_replication_id(Rep),
+
+ % when done add or update the replicaton job
+ % if jobs has a filter keep checking if filter changes
maybe_remove_state_fields(DbName, DocId) ->
@@ -607,21 +592,57 @@ ejson_doc_state_filter(State, States) when is_list(States), is_atom(State) ->
lists:member(State, States).
--spec cluster_membership_foldl(#rdoc{}, nil) -> nil.
-cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
- case couch_replicator_clustering:owner(DbName, DocId) of
- unstable ->
- nil;
- ThisNode when ThisNode =:= node() ->
- nil;
- OtherNode ->
- Msg = "Replication doc ~p:~p with id ~p usurped by node ~p",
- couch_log:notice(Msg, [DbName, DocId, RepId, OtherNode]),
- removed_doc(Id),
- nil
+-spec update_replication(any(), binary(), binary(), #{} | null,
+ binary() | null, binary() | null) -> ok.
+update_replication_job(Tx, DbName, DocId, Rep, RepParseError, DocState) ->
+ JobId = docs_job_id(DbName, DocId),
+ ok = remove_replication_by_doc_job_id(Db, JobId),
+ RepDocsJob = #{
+ <<"rep_id">> := null,
+ <<"db_name">> := DbName,
+ <<"doc_id">> := DocId,
+ <<"rep">> := Rep,
+ <<"rep_parse_error">> := RepParseError,
+ <<"doc_state">> := DocState
+ },
+ ok = couch_jobs:add(Tx, ?REP_DOCS, RepDocsJob).
+
+
+docs_job_id(DbName, Id) when is_binary(DbName), is_binary(Id) ->
+ <<DbName/binary, "|", Id/binary>>.
+
+
+-spec remove_replication_by_doc_job_id(Tx, Id) -> ok.
+remove_replication_by_doc_job_id(Tx, Id) ->
+ case couch_jobs:get_job_data(Tx, ?REP_DOCS, Id) of
+ {error, not_found} ->
+ ok;
+ {ok, #{<<"rep_id">> := null}} ->
+ couch_jobs:remove(Tx, ?REP_DOCS, Id),
+ ok;
+ {ok, #{<<"rep_id">> := RepId}} ->
+ couch_jobs:remove(Tx, ?REP_JOBS, RepId),
+ couch_jobs:remove(Tx, ?REP_DOCS, Id),
+ ok
end.
+-spec remove_replications_by_dbname(DbName) -> ok.
+remove_replications_by_dbname(DbName) ->
+ DbNameSize = byte_size(DbName),
+ Filter = fun
+ (<<DbName:DbNameSize/binary, "|", _, _/binary>>) -> true;
+ (_) -> false
+ end,
+ JobsMap = couch_job:get_jobs(undefined, ?REP_DOCS, Filter),
+ % Batch these into smaller transactions eventually...
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ maps:map(fun(Id, _) ->
+ remove_replication_by_doc_job_id(JTx, Id)
+ end, JobsMap)
+ end).
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -650,8 +671,7 @@ doc_processor_test_() ->
t_failed_change(),
t_change_for_different_node(),
t_change_when_cluster_unstable(),
- t_ejson_docs(),
- t_cluster_membership_foldl()
+ t_ejson_docs()
]
}.
@@ -803,21 +823,6 @@ t_ejson_docs() ->
end).
-% Check that when cluster membership changes records from doc processor and job
-% scheduler get removed
-t_cluster_membership_foldl() ->
- ?_test(begin
- mock_existing_jobs_lookup([test_rep(?R1)]),
- ?assertEqual(ok, process_change(?DB, change())),
- meck:expect(couch_replicator_clustering, owner, 2, different_node),
- ?assert(ets:member(?MODULE, {?DB, ?DOC1})),
- gen_server:cast(?MODULE, {cluster, stable}),
- meck:wait(2, couch_replicator_scheduler, find_jobs_by_doc, 2, 5000),
- ?assertNot(ets:member(?MODULE, {?DB, ?DOC1})),
- ?assert(removed_job(?R1))
- end).
-
-
get_worker_ref_test_() ->
{
setup,
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index b427d9a..3d2c2d7 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -18,7 +18,6 @@
parse_rep_db/3,
parse_rep_doc_without_id/1,
parse_rep_doc_without_id/2,
- rep_to_map/1,
before_doc_update/3,
after_doc_read/2,
ensure_rep_db_exists/0,
@@ -62,6 +61,7 @@
keepalive, nodelay, recbuf, send_timeout, send_timout_close, sndbuf,
priority, tos, tclass
]).
+
-define(CONFIG_DEFAULTS, [
{"worker_processes", "4", fun list_to_integer/1},
{"worker_batch_size", "500", fun list_to_integer/1},
@@ -116,14 +116,17 @@ update_triggered(Id, DocId, DbName) ->
ok.
--spec update_error(#rep{}, any()) -> ok.
-update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
+-spec update_error(#{}, any()) -> ok.
+update_error(#rep{} = Rep, Error) ->
+ #{
+ <<"id">> := RepId0,
+ <<"db_name">> := DbName,
+ <<"doc_id">> := DocId,
+ } = Rep,
Reason = error_reason(Error),
- BinRepId = case RepId of
- {Base, Ext} ->
- iolist_to_binary([Base, Ext]);
- _Other ->
- null
+ RepId = case RepId0 of
+ Id when is_binary(Id) -> Id;
+ _Other -> null
end,
update_rep_doc(DbName, DocId, [
{<<"_replication_state">>, <<"error">>},
@@ -198,7 +201,7 @@ replication_design_doc_props(DDocId) ->
-spec parse_rep_doc_without_id({[_]}) -> #{}.
parse_rep_doc_without_id(RepDoc) ->
{ok, Rep} = try
- parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
+ parse_rep_doc_without_id(RepDoc, rep_user_name(RepDoc))
catch
throw:{error, Reason} ->
throw({bad_rep_doc, Reason});
@@ -208,9 +211,9 @@ parse_rep_doc_without_id(RepDoc) ->
Rep.
--spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #{}}.
-parse_rep_doc(Doc, UserCtx) ->
- {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
+-spec parse_rep_doc({[_]}, user_name()) -> {ok, #{}}.
+parse_rep_doc({[_]} = Doc, UserName) ->
+ {ok, Rep} = parse_rep_doc_without_id(Doc, UserName),
#{<<"options">> := Options} = Rep,
Cancel = maps:get(<<"cancel">>, Options, false),
Id = maps:get(<<"id">>, Options, nil),
@@ -227,44 +230,43 @@ parse_rep_doc(Doc, UserCtx) ->
end.
--spec parse_rep_doc_without_id({[_]} | #{}, #user_ctx{}) -> {ok, #{}}.
-parse_rep_doc_without_id({[_]} = EJSon, UserCtx) ->
+-spec parse_rep_doc_without_id({[_]} | #{}, user_name()) -> {ok, #{}}.
+parse_rep_doc_without_id({[_]} = EJson, UserName) ->
% Normalize all field names to be binaries and turn into a map
- Map = ?JSON_DECODE(?JSON_ENCODE(EJSon)),
- parse_rep_doc_without_id(Map, UserCtx);
+ Map = ?JSON_DECODE(?JSON_ENCODE(EJson)),
+ parse_rep_doc_without_id(Map, UserName);
-parse_rep_doc_without_id(#{} = Doc, UserCtx) ->
- Proxy = maps:get(<<"proxy">>, Doc, <<>>),
+parse_rep_doc_without_id(#{} = Doc, UserName) ->
+ Proxy = parse_proxy_params(maps:get(<<"proxy">>, Doc, <<>>)),
Opts = make_options(Doc),
Cancel = maps:get(<<"cancel">>, Opts, false),
Id = maps:get(<<"id">>, Opts, nil),
- case Cancel andalso Id =/= nill of
+ case Cancel andalso Id =/= nil of
true ->
- {ok, #{<<"options">> => Opts, user_ctx = UserCtx}};
+ {ok, #{<<"options">> => Opts, <<"user">> => UserName}};
false ->
- Source = parse_rep_db(get_value(<<"source">>, Props), Proxy, Opts),
- Target = parse_rep_db(get_value(<<"target">>, Props), Proxy, Opts),
+ #{<<"source">> := Source0, <<"target">> := Target0} = Doc,
+ Source = parse_rep_db(Source0, Proxy, Opts),
+ Target = parse_rep_db(Target0, Proxy, Opts),
{Type, View} = case couch_replicator_filters:view_type(Props, Opts) of
- {error, Error} ->
- throw({bad_request, Error});
- Result ->
- Result
+ {error, Error} -> throw({bad_request, Error});
+ Result -> Result
end,
- Rep = #rep{
- source = Source,
- target = Target,
- options = Opts,
- user_ctx = UserCtx,
- type = Type,
- view = View,
- doc_id = get_value(<<"_id">>, Props, null)
+ Rep = #{
+ <<"id">> => null,
+ <<"base_id">> => null,
+ <<"source">> => Source,
+ <<"target">> => Target,
+ <<"options">> => Opts,
+ <<"user">> => UserName,
+ <<"type">> => Type,
+ <<"view">> => View,
+ <<"doc_id">> => maps:get(<<"_id">>, Doc, null)
},
% Check if can parse filter code, if not throw exception
case couch_replicator_filters:parse(Opts) of
- {error, FilterError} ->
- throw({error, FilterError});
- {ok, _Filter} ->
- ok
+ {error, FilterError} -> throw({error, FilterError});
+ {ok, _Filter} -> ok
end,
{ok, Rep}
end.
@@ -277,7 +279,7 @@ parse_rep_doc_without_id(#{} = Doc, UserCtx) ->
update_rep_id(#{} = Rep) ->
{BaseId, ExtId} = couch_replicator_ids:replication_id(Rep),
RepId = erlang:iolist_to_binary([BaseId, ExtId]),
- Rep#{<<"id">> => RepId, <<"base_id">> = list_toBaseId}.
+ Rep#{<<"id">> => RepId, <<"base_id">> = BaseId}.
update_rep_doc(RepDbName, RepDocId, KVs) ->
@@ -356,71 +358,73 @@ save_rep_doc(DbName, Doc) ->
end.
--spec rep_user_ctx({[_]}) -> #user_ctx{}.
-rep_user_ctx({RepDoc}) ->
+-spec rep_user_name({[_]}) -> binary() | null.
+rep_user_name({RepDoc}) ->
case get_json_value(<<"user_ctx">>, RepDoc) of
- undefined ->
- #user_ctx{};
- {UserCtx} ->
- #user_ctx{
- name = get_json_value(<<"name">>, UserCtx, null),
- roles = get_json_value(<<"roles">>, UserCtx, [])
- }
+ undefined -> null;
+ {UserCtx} -> get_json_value(<<"name">>, UserCtx, null)
end.
--spec parse_rep_db({[_]} | binary(), binary(), [_]) -> #httpd{} | binary().
-parse_rep_db({Props}, Proxy, Options) ->
- ProxyParams = parse_proxy_params(Proxy),
+-spec parse_rep_db(#{}, #{}, #{}) -> #{}.
+parse_rep_db(#{} = Endpoint, #{} = ProxyParams, #{} = Options) ->
ProxyURL = case ProxyParams of
- [] -> undefined;
- _ -> binary_to_list(Proxy)
+ #{<<"proxy_url">> := PUrl} -> PUrl;
+ _ -> null
end,
- Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
- {AuthProps} = get_value(<<"auth">>, Props, {[]}),
- {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
- Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
- DefaultHeaders = (#httpdb{})#httpdb.headers,
- #httpdb{
- url = Url,
- auth_props = AuthProps,
- headers = lists:ukeymerge(1, Headers, DefaultHeaders),
- ibrowse_options = lists:keysort(1,
- [{socket_options, get_value(socket_options, Options)} |
- ProxyParams ++ ssl_params(Url)]),
- timeout = get_value(connection_timeout, Options),
- http_connections = get_value(http_connections, Options),
- retries = get_value(retries, Options),
- proxy_url = ProxyURL
- };
+
+ Url0 = maps:get(<<"url">>, Endpoint),
+ Url = maybe_add_trailing_slash(Url0),
+
+ AuthProps = maps:get(<<"auth">>, Endpoint, #{}),
+
+ Headers0 = maps:get(<<"headers">>, Endpoint, #{}),
+ DefaultHeaders = couch_replicator_utils:get_default_headers(),
+ % For same keys values in second map override those in the first
+ Headers = maps:merge(DefaultHeaders, Headers0),
+
+ SockOpts = maps:get(<<"socket_options">>, Options, #{}),
+ SockAndProxy = maps:merge(SockOpts, ProxyParams),
+
+ SslParams = ssl_params(Url),
+
+ #{
+ <<"url">> => Url,
+ <<"auth_props">> => AuthProps,
+ <<"headers">> => Headers,
+ <<"ibrowse_options">> => maps:merge(SslParams, SockAndProxy),
+ <<"timeout">> => maps:get(<<"timeout">>, Options),
+ <<"http_connections">> => maps:get(<<"http_connections">>, Options),
+ <<"retries">> => maps:get(<<"retries">>, Options)
+ <<"proxy_url">> => ProxyUrl
+ }.
+
parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
- parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+ parse_rep_db(#{<<"url">> => Url}, Proxy, Options);
parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
- parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
+ parse_rep_db(#{<<"url">> => Url}, Proxy, Options);
-parse_rep_db(<<DbName/binary>>, _Proxy, _Options) ->
- DbName;
+parse_rep_db(<<DbName/binary>> = LocalDb, _Proxyh, _Options) ->
+ throw({error, <<"Local endpoint not supported: ", DbName/binary>>});
parse_rep_db(undefined, _Proxy, _Options) ->
throw({error, <<"Missing replicator database">>}).
--spec maybe_add_trailing_slash(binary() | list()) -> list().
+-spec maybe_add_trailing_slash(binary()) -> binary().
+maybe_add_trailing_slash(<<>>) ->
+ <<>>;
+
maybe_add_trailing_slash(Url) when is_binary(Url) ->
- maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
- case lists:member($?, Url) of
- true ->
- Url; % skip if there are query params
- false ->
- case lists:last(Url) of
- $/ ->
- Url;
- _ ->
- Url ++ "/"
- end
+ case binary:match(Url, <<"?">>) of
+ nomatch ->
+ case binary:last(Url) of
+ $/ -> Url;
+ _ -> <<Url/binary, "/">>;
+ _ ->
+ Url % skip if there are query params
end.
@@ -526,66 +530,100 @@ parse_sock_opts(V) ->
end, #{}, SocketOptions).
--spec parse_proxy_params(binary() | [_]) -> [_].
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
- parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
- [];
-parse_proxy_params(ProxyUrl) ->
+-spec parse_proxy_params(binary() | #{}) -> #{}.
+parse_proxy_params(<<>>) ->
+ #{};
+parse_proxy_params(ProxyUrl0) when is_binary(ProxyUrl0)->
+ ProxyUrl = binary_to_list(ProxyUrl0),
#url{
host = Host,
port = Port,
username = User,
password = Passwd,
- protocol = Protocol
+ protocol = Protocol0
} = ibrowse_lib:parse_url(ProxyUrl),
- [
- {proxy_protocol, Protocol},
- {proxy_host, Host},
- {proxy_port, Port}
- ] ++ case is_list(User) andalso is_list(Passwd) of
+ Protocol = atom_to_binary(Protocol, utf8),
+ case lists:member(Protocol, [<<"http">>, <<"https">>, <<"socks5">>]) of
+ true ->
+ atom_to_binary(Protocol, utf8);
false ->
- [];
+ Error = <<"Unsupported proxy protocol", Protocol/binary>>,
+ throw({bad_request, Error})
+ end,
+ ProxyParams = #{
+ <<"proxy_url">> => ProxyUrl,
+ <<"proxy_protocol">> => Protocol,
+ <<"proxy_host">> => list_to_binary(Host),
+ <<"proxy_port">> => Port
+ #},
+ case is_list(User) andalso is_list(Passwd) of
true ->
- [{proxy_user, User}, {proxy_password, Passwd}]
- end.
+ ProxyParams#{
+ <<"proxy_user">> => list_to_binary(User),
+ <<"proxy_password">> => list_to_binary(Passwd)
+ };
+ false ->
+ ProxyParams
+ end.
--spec ssl_params([_]) -> [_].
+-spec ssl_params(binary()) -> #{}.
ssl_params(Url) ->
- case ibrowse_lib:parse_url(Url) of
+ case ibrowse_lib:parse_url(binary_to_list(Url)) of
#url{protocol = https} ->
Depth = list_to_integer(
config:get("replicator", "ssl_certificate_max_depth", "3")
),
VerifyCerts = config:get("replicator", "verify_ssl_certificates"),
- CertFile = config:get("replicator", "cert_file", undefined),
- KeyFile = config:get("replicator", "key_file", undefined),
- Password = config:get("replicator", "password", undefined),
- SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
- SslOpts1 = case CertFile /= undefined andalso KeyFile /= undefined of
+ CertFile = config:get("replicator", "cert_file", null),
+ KeyFile = config:get("replicator", "key_file", null),
+ Password = config:get("replicator", "password", null),
+ VerifySslOptions = ssl_verify_options(VerifyCerts =:= "true"),
+ SslOpts = maps:merge(VerifySslOptions, #{<<"depth">> => Depth}),
+ SslOpts1 = case CertFile /= null andalso KeyFile /= null of
true ->
- case Password of
- undefined ->
- [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
+ CertFileOpts = case Password of
+ null ->
+ #{
+ <<"certfile">> => list_to_binary(CertFile),
+ <<"keyfile">> => list_to_binary(KeyFile)
+ };
_ ->
- [{certfile, CertFile}, {keyfile, KeyFile},
- {password, Password}] ++ SslOpts
- end;
- false -> SslOpts
+ #{
+ <<"certfile">> => list_to_binary(CertFile),
+ <<"keyfile">> => list_to_binary(KeyFile),
+ <<"password">> => list_to_binary(Password)
+ }
+ end,
+ maps:merge(SslOpts, CertFileOpts)
+ false ->
+ SslOpts
end,
- [{is_ssl, true}, {ssl_options, SslOpts1}];
+ #{<<"is_ssl">> => true, <<"ssl_options">> => SslOpts1};
#url{protocol = http} ->
- []
+ #{}
end.
-spec ssl_verify_options(true | false) -> [_].
ssl_verify_options(true) ->
- CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
- [{verify, verify_peer}, {cacertfile, CAFile}];
+ case config:get("replicator", "ssl_trusted_certificates_file", undefined) of
+ undefined ->
+ #{
+ <<"verify">> => <<"verify_peer">>,
+ <<"cacertfile">> => null
+ };
+ CAFile when is_list(CAFile) ->
+ #{
+ <<"verify">> => <<"verify_peer">>,
+ <<"cacertfile">> => list_to_binary(CAFile)
+ }
+ end;
+
ssl_verify_options(false) ->
- [{verify, verify_none}].
+ #{
+ <<"verify">> => <<"verify_none">>
+ }.
-spec before_doc_update(#doc{}, Db::any(), couch_db:update_type()) -> #doc{}.
diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl
index 940b8ad..a03ffe8 100644
--- a/src/couch_replicator/src/couch_replicator_filters.erl
+++ b/src/couch_replicator/src/couch_replicator_filters.erl
@@ -63,11 +63,11 @@ parse(Options) ->
% Fetches body of filter function from source database. Guaranteed to either
% return {ok, Body} or an {error, Reason}. Also assume this function might
% block due to network / socket issues for an undeterminted amount of time.
--spec fetch(binary(), binary(), binary(), #{}) ->
+-spec fetch(binary(), binary(), binary()) ->
{ok, {[_]}} | {error, binary()}.
-fetch(DDocName, FilterName, Source, #{} = UserCtx) ->
+fetch(DDocName, FilterName, Source) ->
{Pid, Ref} = spawn_monitor(fun() ->
- try fetch_internal(DDocName, FilterName, Source, UserCtx) of
+ try fetch_internal(DDocName, FilterName, Source) of
Resp ->
exit({exit_ok, Resp})
catch
@@ -88,28 +88,30 @@ fetch(DDocName, FilterName, Source, #{} = UserCtx) ->
% Get replication type and view (if any) from replication document props
--spec view_type([_], [_]) ->
- {view, {binary(), binary()}} | {db, nil} | {error, binary()}.
-view_type(Props, Options) ->
- case couch_util:get_value(<<"filter">>, Props) of
- <<"_view">> ->
- {QP} = couch_util:get_value(query_params, Options, {[]}),
- ViewParam = couch_util:get_value(<<"view">>, QP),
- case re:split(ViewParam, <<"/">>) of
- [DName, ViewName] ->
- {view, {<< "_design/", DName/binary >>, ViewName}};
- _ ->
- {error, <<"Invalid `view` parameter.">>}
- end;
+-spec view_type(#{}, [_]) ->
+ {binary(), #{}} | {error, binary()}.
+view_type(#{<<"filter">> := <<"_view">>}, Options) ->
+ {QP} = couch_util:get_value(query_params, Options, {[]}),
+ ViewParam = couch_util:get_value(<<"view">>, QP),
+ case re:split(ViewParam, <<"/">>) of
+ [DName, ViewName] ->
+ DDocMap = #{
+ <<"ddoc">> => <<"_design/",DName/binary>>,
+ <<"view">> => ViewName
+ },
+ {<<"view">>, DDocMap};
_ ->
- {db, nil}
- end.
+ {error, <<"Invalid `view` parameter.">>}
+ end;
+
+view_type(#{}, [_] = Options) ->
+ {<<"db">>, #{}}.
% Private functions
-fetch_internal(DDocName, FilterName, Source, #{} = UserCtx) ->
- Db = case (catch couch_replicator_api_wrap:db_open(Source, UserCtx) of
+fetch_internal(DDocName, FilterName, Source) ->
+ Db = case (catch couch_replicator_api_wrap:db_open(Source) of
{ok, Db0} ->
Db0;
DbError ->
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 0d73cbf..da30532 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -40,19 +40,19 @@ replication_id(#{<<"options">> := Options} = Rep) ->
% If a change is made to how replications are identified,
% please add a new clause and increase ?REP_ID_VERSION.
-replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 4) ->
+replication_id(#{<<"source">> := Src, <<"target">> := Tgt} = Rep, 4) ->
UUID = couch_server:get_uuid(),
- SrcInfo = get_v4_endpoint(UserCtx, maps:get(<<"source">>, Rep)),
- TgtInfo = get_v4_endpoint(UserCtx, maps:get(<<"target">>, Rep)),
+ SrcInfo = get_v4_endpoint(Src),
+ TgtInfo = get_v4_endpoint(Tgt),
maybe_append_filters([UUID, SrcInfo, TgtInfo], Rep);
-replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 3) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 3) ->
UUID = couch_server:get_uuid(),
- Src = get_rep_endpoint(UserCtx, maps:get(<<"source">>, Rep)),
- Tgt = get_rep_endpoint(UserCtx, maps:get(<<"target">>, Res)),
+ Src = get_rep_endpoint(Src0),
+ Tgt = get_rep_endpoint(Tgt0),
maybe_append_filters([UUID, Src, Tgt], Rep);
-replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 2) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 2) ->
{ok, HostName} = inet:gethostname(),
Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
P when is_number(P) ->
@@ -65,14 +65,14 @@ replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 2) ->
% ... mochiweb_socket_server:get(https, port)
list_to_integer(config:get("httpd", "port", "5984"))
end,
- Src = get_rep_endpoint(UserCtx, maps:get(<<"source">>, Rep)),
- Tgt = get_rep_endpoint(UserCtx, maps:get(<<"target">>, Rep)),
+ Src = get_rep_endpoint(Src0),
+ Tgt = get_rep_endpoint(Tgt0),
maybe_append_filters([HostName, Port, Src, Tgt], Rep);
-replication_id(#{<<"user_ctx">> := UserCtx} = Rep, 1) ->
+replication_id(#{<<"source">> := Src0, <<"target">> := Tgt0} = Rep, 1) ->
{ok, HostName} = inet:gethostname(),
- Src = get_rep_endpoint(UserCtx, maps:get(<<"source">>, Rep)),
- Tgt = get_rep_endpoint(UserCtx, maps:get(<<"target">>, Rep)),
+ Src = get_rep_endpoint(Src0),
+ Tgt = get_rep_endpoint(Tgt0),
maybe_append_filters([HostName, Src, Tgt], Rep).
@@ -99,7 +99,6 @@ convert({BaseId, Ext} = Id) when is_binary(BaseId), is_binary(Ext) ->
maybe_append_filters(Base, #{} = Rep) ->
#{
<<"source">> := Source,
- <<"user_ctx">> := UserCtx,
<<"options">> := Options
} = Rep,
Base2 = Base ++
@@ -109,7 +108,7 @@ maybe_append_filters(Base, #{} = Rep) ->
{ok, {view, Filter, QueryParams}} ->
[Filter, QueryParams];
{ok, {user, {Doc, Filter}, QueryParams}} ->
- case couch_replicator_filters:fetch(Doc, Filter, Source, UserCtx) of
+ case couch_replicator_filters:fetch(Doc, Filter, Source) of
{ok, Code} ->
[Code, QueryParams];
{error, Error} ->
@@ -138,31 +137,26 @@ maybe_append_options(Options, RepOptions) ->
end, [], Options).
-get_rep_endpoint(_UserCtx, #{<<"url">> = Url0, <<"headers">> = Headers0}) ->
+get_rep_endpoint(#{<<"url">> := Url0, <<"headers">> := Headers0}) ->
Url = binary_to_list(Url0),
+ % We turn headers into a proplist of string() KVs to calculate
+ % the same replication ID as CouchDB 2.x
Headers1 = maps:fold(fun(K, V, Acc) ->
[{binary_to_list(K), binary_to_list(V)} | Acc]
end, [], Header0),
Headers2 = lists:keysort(1, Headers1),
DefaultHeaders = (#httpdb{})#httpdb.headers,
- {remote, Url, Headers2 -- DefaultHeaders};
-get_rep_endpoint(#{} = UserCtx, <<DbName/binary>>) ->
- UserCtxRec = couch_replicator_utils:user_ctx_from_json(UserCtx),
- {local, DbName, UserCtxRec}.
+ {remote, Url, Headers2 -- DefaultHeaders}.
-get_v4_endpoint(#{} = UserCtx, #{} = HttpDb) ->
- {remote, Url, Headers} = get_rep_endpoint(UserCtx, HttpDb),
+get_v4_endpoint(#{} = HttpDb) ->
+ {remote, Url, Headers} = get_rep_endpoint(HttpDb),
{{UserFromHeaders, _}, HeadersWithoutBasicAuth} =
couch_replicator_utils:remove_basic_auth_from_headers(Headers),
{UserFromUrl, Host, NonDefaultPort, Path} = get_v4_url_info(Url),
User = pick_defined_value([UserFromUrl, UserFromHeaders]),
OAuth = undefined, % Keep this to ensure checkpoints don't change
- {remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth};
-get_v4_endpoint(#{} = UserCtx, <<DbName/binary>>) ->
- UserCtxRec = couch_replicator_utils:user_ctx_from_json(UserCtx),
- {local, DbName, UserCtxRec}.
-
+ {remote, User, Host, NonDefaultPort, Path, HeadersWithoutBasicAuth, OAuth}.
pick_defined_value(Values) ->
case [V || V <- Values, V /= undefined] of
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index e6e8a4d..b205c84 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -1078,8 +1078,8 @@ scheduler_job_format_status_test() ->
Target = <<"http://u:p@h2/d2">>,
Rep = #rep{
id = {"base", "+ext"},
- source = couch_replicator_docs:parse_rep_db(Source, [], []),
- target = couch_replicator_docs:parse_rep_db(Target, [], []),
+ source = couch_replicator_docs:parse_rep_db(Source, #{}, #{}),
+ target = couch_replicator_docs:parse_rep_db(Target, #{}, #{}),
options = [{create_target, true}],
doc_id = <<"mydoc">>,
db_name = <<"mydb">>
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index ff61fcf..c6efde5 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -14,11 +14,6 @@
-export([
parse_rep_doc/2,
- open_db/1,
- close_db/1,
- local_db_name/1,
- start_db_compaction_notifier/2,
- stop_db_compaction_notifier/1,
replication_id/2,
sum_stats/2,
is_deleted/1,
@@ -29,8 +24,7 @@
filter_state/3,
remove_basic_auth_from_headers/1,
normalize_rep/1,
- user_ctx_from_json/1,
- user_ctx_to_json/1
+ default_headers_map/0
]).
-export([
@@ -47,49 +41,6 @@
]).
-open_db(#httpdb{} = HttpDb) ->
- HttpDb;
-open_db(Db) ->
- DbName = couch_db:name(Db),
- UserCtx = couch_db:get_user_ctx(Db),
- {ok, NewDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
- NewDb.
-
-
-close_db(#httpdb{}) ->
- ok;
-close_db(Db) ->
- couch_db:close(Db).
-
-
-local_db_name(#httpdb{}) ->
- undefined;
-local_db_name(Db) ->
- couch_db:name(Db).
-
-
-start_db_compaction_notifier(#httpdb{}, _) ->
- nil;
-start_db_compaction_notifier(Db, Server) ->
- DbName = couch_db:name(Db),
- {ok, Pid} = couch_event:link_listener(
- ?MODULE, handle_db_event, Server, [{dbname, DbName}]
- ),
- Pid.
-
-
-stop_db_compaction_notifier(nil) ->
- ok;
-stop_db_compaction_notifier(Listener) ->
- couch_event:stop_listener(Listener).
-
-
-handle_db_event(DbName, compacted, Server) ->
- gen_server:cast(Server, {db_compacted, DbName}),
- {ok, Server};
-handle_db_event(_DbName, _Event, Server) ->
- {ok, Server}.
-
rep_error_to_binary(Error) ->
couch_util:to_binary(error_reason(Error)).
@@ -228,30 +179,16 @@ normalize_endpoint(<<DbName/binary>>) ->
DbName;
normalize_endpoint(#{} = Endpoint) ->
- Ks = [<<"url">>,<<"auth_props">>, <<"headers">>, <<"timeout">>,
+ Ks = [<<"url">>, <<"auth_props">>, <<"headers">>, <<"timeout">>,
<<"ibrowse_options">>, <<"retries">>, <<"http_connections">>
],
maps:with(Ks, Endpoint).
-user_ctx_to_json(#user_ctx{name = Name, roles = Roles0} = UserCtx) ->
- {AtomRoles0, Roles} = lists:partition(fun erlang:is_atom/1, Roles0),
- AtomRoles = lists:map(fun(R) -> atom_to_binary(V, utf8) end, AtomRoles0),
- UserCtxMap = #{
- <<"name">> => Name,
- <<"roles">> => Roles,
- <<"atom_roles">> => AtomRoles
- }.
-
-
-user_ctx_from_json(#{} = UserCtxMap) ->
- #{
- <<"name">> := Name,
- <<"roles">> := Roles
- <<"atom_roles">> := AtomRoles0
- },
- AtomRoles = lists:map(fun(R) -> binary_to_atom(V, utf8) end, AtomRoles0),
- #user_ctx{name = Name, roles = lists:sort(Roles ++ AtomRoles)}.
+get_default_headers() ->
+ lists:foldl(fun({K, V}, Acc) ->
+ Acc#{list_to_binary(K) => list_to_binary(V)}
+ end, #{}, (#httpdb{})#httpdb.headers).
-ifdef(TEST).
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index ec98fa0..0787303 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -28,18 +28,11 @@
% TODO: maybe make both buffer max sizes configurable
-define(DOC_BUFFER_BYTE_SIZE, 512 * 1024). % for remote targets
--define(DOC_BUFFER_LEN, 10). % for local targets, # of documents
-define(MAX_BULK_ATT_SIZE, 64 * 1024).
-define(MAX_BULK_ATTS_PER_DOC, 8).
-define(STATS_DELAY, 10000000). % 10 seconds (in microseconds)
-define(MISSING_DOC_RETRY_MSEC, 2000).
--import(couch_replicator_utils, [
- open_db/1,
- close_db/1,
- start_db_compaction_notifier/2,
- stop_db_compaction_notifier/1
-]).
-import(couch_util, [
to_binary/1,
get_value/3
@@ -62,8 +55,6 @@
pending_fetch = nil,
flush_waiter = nil,
stats = couch_replicator_stats:new(),
- source_db_compaction_notifier = nil,
- target_db_compaction_notifier = nil,
batch = #batch{}
}).
@@ -71,14 +62,7 @@
start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) ->
gen_server:start_link(
- ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []);
-
-start_link(Cp, Source, Target, ChangesManager, _MaxConns) ->
- Pid = spawn_link(fun() ->
- erlang:put(last_stats_report, os:timestamp()),
- queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager)
- end),
- {ok, Pid}.
+ ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []).
init({Cp, Source, Target, ChangesManager, MaxConns}) ->
@@ -92,12 +76,8 @@ init({Cp, Source, Target, ChangesManager, MaxConns}) ->
cp = Cp,
max_parallel_conns = MaxConns,
loop = LoopPid,
- source = open_db(Source),
- target = open_db(Target),
- source_db_compaction_notifier =
- start_db_compaction_notifier(Source, self()),
- target_db_compaction_notifier =
- start_db_compaction_notifier(Target, self())
+ source = Source,
+ target = Target,
},
{ok, State}.
@@ -141,24 +121,6 @@ handle_call(flush, {Pid, _} = From,
{noreply, State2#state{flush_waiter = From}}.
-handle_cast({db_compacted, DbName} = Msg, #state{} = State) ->
- #state{
- source = Source,
- target = Target
- } = State,
- SourceName = couch_replicator_utils:local_db_name(Source),
- TargetName = couch_replicator_utils:local_db_name(Target),
- case DbName of
- SourceName ->
- {ok, NewSource} = couch_db:reopen(Source),
- {noreply, State#state{source = NewSource}};
- TargetName ->
- {ok, NewTarget} = couch_db:reopen(Target),
- {noreply, State#state{target = NewTarget}};
- _Else ->
- {stop, {unexpected_async_call, Msg}, State}
- end;
-
handle_cast(Msg, State) ->
{stop, {unexpected_async_call, Msg}, State}.
@@ -214,10 +176,8 @@ handle_info({'EXIT', Pid, Reason}, State) ->
terminate(_Reason, State) ->
- close_db(State#state.source),
- close_db(State#state.target),
- stop_db_compaction_notifier(State#state.source_db_compaction_notifier),
- stop_db_compaction_notifier(State#state.target_db_compaction_notifier).
+ ok.
+
format_status(_Opt, [_PDict, State]) ->
#state{
@@ -253,20 +213,10 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager);
{changes, ChangesManager, Changes, ReportSeq} ->
- Target2 = open_db(Target),
- {IdRevs, Stats0} = find_missing(Changes, Target2),
- case Source of
- #httpdb{} ->
- ok = gen_server:call(Parent, {add_stats, Stats0}, infinity),
- remote_process_batch(IdRevs, Parent),
- {ok, Stats} = gen_server:call(Parent, flush, infinity);
- _Db ->
- Source2 = open_db(Source),
- Stats = local_process_batch(
- IdRevs, Cp, Source2, Target2, #batch{}, Stats0),
- close_db(Source2)
- end,
- close_db(Target2),
+ {IdRevs, Stats0} = find_missing(Changes, Target),
+ ok = gen_server:call(Parent, {add_stats, Stats0}, infinity),
+ remote_process_batch(IdRevs, Parent),
+ {ok, Stats} = gen_server:call(Parent, flush, infinity),
ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
erlang:put(last_stats_report, os:timestamp()),
couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]),
@@ -274,32 +224,6 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
end.
-local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats) ->
- Stats;
-
-local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats) ->
- case Target of
- #httpdb{} ->
- couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]);
- _Db ->
- couch_log:debug("Worker flushing doc batch of ~p docs", [Size])
- end,
- Stats2 = flush_docs(Target, Docs),
- Stats3 = couch_replicator_utils:sum_stats(Stats, Stats2),
- local_process_batch([], Cp, Source, Target, #batch{}, Stats3);
-
-local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats) ->
- {ok, {_, DocList, Stats2, _}} = fetch_doc(
- Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp}),
- {Batch2, Stats3} = lists:foldl(
- fun(Doc, {Batch0, Stats0}) ->
- {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc),
- {Batch1, couch_replicator_utils:sum_stats(Stats0, S)}
- end,
- {Batch, Stats2}, DocList),
- local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3).
-
-
remote_process_batch([], _Parent) ->
ok;
@@ -319,10 +243,8 @@ remote_process_batch([{Id, Revs, PAs} | Rest], Parent) ->
spawn_doc_reader(Source, Target, FetchParams) ->
Parent = self(),
spawn_link(fun() ->
- Source2 = open_db(Source),
fetch_doc(
- Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}),
- close_db(Source2)
+ Source, FetchParams, fun remote_doc_handler/2, {Parent, Target}),
end).
@@ -350,29 +272,6 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
end.
-local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
- Stats2 = couch_replicator_stats:increment(docs_read, Stats),
- case batch_doc(Doc) of
- true ->
- {ok, {Target, [Doc | DocList], Stats2, Cp}};
- false ->
- couch_log:debug("Worker flushing doc with attachments", []),
- Target2 = open_db(Target),
- Success = (flush_doc(Target2, Doc) =:= ok),
- close_db(Target2),
- Stats3 = case Success of
- true ->
- couch_replicator_stats:increment(docs_written, Stats2);
- false ->
- couch_replicator_stats:increment(doc_write_failures, Stats2)
- end,
- Stats4 = maybe_report_stats(Cp, Stats3),
- {ok, {Target, DocList, Stats4, Cp}}
- end;
-local_doc_handler(_, Acc) ->
- {ok, Acc}.
-
-
remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
{ok, Acc};
@@ -383,9 +282,7 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
% convenient to call it ASAP to avoid ibrowse inactivity timeouts.
Stats = couch_replicator_stats:new([{docs_read, 1}]),
couch_log:debug("Worker flushing doc with attachments", []),
- Target2 = open_db(Target),
- Success = (flush_doc(Target2, Doc) =:= ok),
- close_db(Target2),
+ Success = (flush_doc(Target, Doc) =:= ok),
{Result, Stats2} = case Success of
true ->
{{ok, Acc}, couch_replicator_stats:increment(docs_written, Stats)};
@@ -410,9 +307,7 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
Parent = self(),
spawn_link(
fun() ->
- Target2 = open_db(Target),
- Stats = flush_docs(Target2, DocList),
- close_db(Target2),
+ Stats = flush_docs(Target, DocList),
ok = gen_server:call(Parent, {add_stats, Stats}, infinity)
end).
@@ -462,17 +357,6 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
Stats = couch_replicator_stats:new(),
{#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, Stats}
end
- end;
-
-maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
- case SizeAcc + 1 of
- SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
- couch_log:debug("Worker flushing doc batch of ~p docs", [SizeAcc2]),
- Stats = flush_docs(Target, [Doc | DocAcc]),
- {#batch{}, Stats};
- SizeAcc2 ->
- Stats = couch_replicator_stats:new(),
- {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, Stats}
end.