You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/11/22 16:34:00 UTC
[couchdb] 01/01: [wip] Replace the auto-inserted replicator VDU with a BDU
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch improve-replicator-doc-parsing
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit c75ced0f8baf85ec3caad3bf9c92092d4a592c26
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Tue Nov 22 11:31:54 2022 -0500
[wip] Replace the auto-inserted replicator VDU with a BDU
Switch out the VDU with a BDU (before_doc_update) check. Couch replicator
already had a BDU to update the `"owner"` field so we can plug right into it
and validate everything we need there. This way we'll have only single
validation and parsing code.
---
src/couch_replicator/src/couch_replicator.erl | 6 +-
.../src/couch_replicator_doc_processor.erl | 9 +-
.../src/couch_replicator_doc_processor_worker.erl | 14 +-
src/couch_replicator/src/couch_replicator_docs.erl | 564 +++------------------
src/couch_replicator/src/couch_replicator_ids.erl | 8 +-
.../src/couch_replicator_js_functions.hrl | 183 -------
...licator_docs.erl => couch_replicator_parse.erl} | 357 +------------
.../src/couch_replicator_scheduler_job.erl | 4 +-
.../src/couch_replicator_utils.erl | 8 +-
.../test/eunit/couch_replicator_compact_tests.erl | 2 +-
.../couch_replicator_error_reporting_tests.erl | 2 +-
.../test/eunit/couch_replicator_proxy_tests.erl | 10 +-
...ch_replicator_retain_stats_between_job_runs.erl | 2 +-
.../test/eunit/couch_replicator_test_helper.erl | 2 +-
14 files changed, 110 insertions(+), 1061 deletions(-)
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index 39b3903ea..3bb177968 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -25,10 +25,8 @@
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator.hrl").
-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("mem3/include/mem3.hrl").
--define(DESIGN_DOC_CREATION_DELAY_MSEC, 1000).
-define(REPLICATION_STATES, [
% Just added to scheduler
initializing,
@@ -58,7 +56,7 @@
| {error, any()}
| no_return().
replicate(PostBody, Ctx) ->
- {ok, Rep0} = couch_replicator_utils:parse_rep_doc(PostBody, Ctx),
+ {ok, Rep0} = couch_replicator_parse:parse_rep_doc(PostBody, Ctx),
Rep = Rep0#rep{start_time = os:timestamp()},
#rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep,
case get_value(cancel, Options, false) of
@@ -138,7 +136,7 @@ replication_states() ->
-spec strip_url_creds(binary() | {[_]}) -> binary().
strip_url_creds(Endpoint) ->
- try couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
+ try couch_replicator_parse:parse_rep_db(Endpoint, [], []) of
#httpdb{url = Url} ->
iolist_to_binary(couch_util:url_strip_password(Url))
catch
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 436d7c44d..eb4c02b49 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -44,9 +44,7 @@
notify_cluster_event/2
]).
--include_lib("couch/include/couch_db.hrl").
-include("couch_replicator.hrl").
--include_lib("mem3/include/mem3.hrl").
-import(couch_replicator_utils, [
get_json_value/2,
@@ -77,9 +75,8 @@
% couch_multidb_changes API callbacks
-db_created(DbName, Server) ->
+db_created(_DbName, Server) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
- couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
Server.
db_deleted(DbName, Server) ->
@@ -89,7 +86,7 @@ db_deleted(DbName, Server) ->
db_found(DbName, Server) ->
couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
- couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+ couch_replicator_docs:delete_old_rep_ddoc(DbName),
Server.
db_change(DbName, {ChangeProps} = Change, Server) ->
@@ -169,7 +166,7 @@ process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
% should propagate to db_change function and will be recorded as permanent
% failure in the document. User will have to update the documet to fix the
% problem.
- Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc),
+ Rep0 = couch_replicator_parse:parse_rep_doc_without_id(JsonRepDoc),
Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()},
Filter =
case couch_replicator_filters:parse(Rep#rep.options) of
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
index 22c5f8584..b1014ffa5 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
@@ -162,7 +162,7 @@ doc_processor_worker_test_() ->
t_should_add_job() ->
?_test(begin
Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
?assert(added_job())
end).
@@ -172,7 +172,7 @@ t_already_running_same_docid() ->
?_test(begin
Id = {?DB, ?DOC1},
mock_already_running(?DB, ?DOC1),
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
?assert(did_not_add_job())
end).
@@ -182,7 +182,7 @@ t_already_running_transient() ->
?_test(begin
Id = {?DB, ?DOC1},
mock_already_running(null, null),
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
?assertMatch(
{temporary_error, _},
maybe_start_replication(
@@ -200,7 +200,7 @@ t_already_running_other_db_other_doc() ->
?_test(begin
Id = {?DB, ?DOC1},
mock_already_running(<<"otherdb">>, <<"otherdoc">>),
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
?assertMatch(
{permanent_failure, _},
maybe_start_replication(
@@ -217,7 +217,7 @@ t_already_running_other_db_other_doc() ->
t_spawn_worker() ->
?_test(begin
Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
WRef = make_ref(),
meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
Pid = spawn_worker(Id, Rep, 0, WRef),
@@ -236,7 +236,7 @@ t_spawn_worker() ->
t_ignore_if_doc_deleted() ->
?_test(begin
Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
?assertNot(added_job())
@@ -247,7 +247,7 @@ t_ignore_if_doc_deleted() ->
t_ignore_if_worker_ref_does_not_match() ->
?_test(begin
Id = {?DB, ?DOC1},
- Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
+ Rep = couch_replicator_parse:parse_rep_doc_without_id(change()),
meck:expect(
couch_replicator_doc_processor,
get_worker_ref,
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index a60f1a1e1..20dbc0d67 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -13,15 +13,14 @@
-module(couch_replicator_docs).
-export([
- parse_rep_doc/1,
- parse_rep_doc/2,
- parse_rep_db/3,
- parse_rep_doc_without_id/1,
- parse_rep_doc_without_id/2,
+ %% parse_rep_doc/1,
+ %% parse_rep_doc/2,
+ %% parse_rep_db/3,
+ %% parse_rep_doc_without_id/1,
+ %% parse_rep_doc_without_id/2,
before_doc_update/3,
after_doc_read/2,
- ensure_rep_ddoc_exists/1,
- ensure_cluster_rep_ddoc_exists/1,
+ delete_old_rep_ddoc/1,
remove_state_fields/2,
update_doc_completed/3,
update_failed/3,
@@ -31,24 +30,10 @@
]).
-include_lib("couch/include/couch_db.hrl").
--include_lib("ibrowse/include/ibrowse.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
--import(couch_util, [
- get_value/2,
- get_value/3,
- to_binary/1
-]).
-
--import(couch_replicator_utils, [
- get_json_value/2,
- get_json_value/3
-]).
-
--define(REP_DB_NAME, <<"_replicator">>).
+% The ID of now deleted design doc. On every *_replicator db discovery we try
+% to delete it. At some point in the future, remove this logic altogether.
-define(REP_DESIGN_DOC, <<"_design/_replicator">>).
-define(OWNER, <<"owner">>).
-define(CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}}).
@@ -126,176 +111,32 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
]),
ok.
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
+-spec delete_old_rep_ddoc(binary()) -> ok.
+delete_old_rep_ddoc(RepDb) ->
case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
- true ->
- ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
- false ->
- ok
+ true -> delete_old_rep_ddoc(RepDb, ?REP_DESIGN_DOC);
+ false -> ok
end.
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
+-spec delete_old_rep_ddoc(binary(), binary()) -> ok.
+delete_old_rep_ddoc(RepDb, DDocId) ->
case open_rep_doc(RepDb, DDocId) of
{not_found, no_db_file} ->
- %% database was deleted.
ok;
{not_found, _Reason} ->
- DocProps = replication_design_doc_props(DDocId),
- DDoc = couch_doc:from_json_obj({DocProps}),
- couch_log:notice("creating replicator ddoc ~p", [RepDb]),
- {ok, _Rev} = save_rep_doc(RepDb, DDoc);
+ ok;
{ok, Doc} ->
- Latest = replication_design_doc_props(DDocId),
- {Props0} = couch_doc:to_json_obj(Doc, []),
- {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
- case compare_ejson({Props}, {Latest}) of
- true ->
- ok;
- false ->
- LatestWithRev = [{<<"_rev">>, Rev} | Latest],
- DDoc = couch_doc:from_json_obj({LatestWithRev}),
- couch_log:notice("updating replicator ddoc ~p", [RepDb]),
- try
- {ok, _} = save_rep_doc(RepDb, DDoc)
- catch
- throw:conflict ->
- %% ignore, we'll retry next time
- ok
- end
+ DeletedDoc = Doc#doc{deleted = true, body = {[]}},
+ try
+ save_rep_doc(RepDb, DeletedDoc)
+ catch
+ throw:conflict ->
+ % ignore, we'll retry next time
+ ok
end
end,
ok.
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
- DDocId = ?REP_DESIGN_DOC,
- [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
- ensure_rep_ddoc_exists(DbShard, DDocId).
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
- EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
- EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
- EjsonSorted1 == EjsonSorted2.
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
- [
- {<<"_id">>, DDocId},
- {<<"language">>, <<"javascript">>},
- {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
- ].
-
-% Note: parse_rep_doc can handle filtered replications. During parsing of the
-% replication doc it will make possibly remote http requests to the source
-% database. If failure or parsing of filter docs fails, parse_doc throws a
-% {filter_fetch_error, Error} excation. This exception should be considered
-% transient in respect to the contents of the document itself, since it depends
-% on netowrk availability of the source db and other factors.
--spec parse_rep_doc({[_]}) -> #rep{}.
-parse_rep_doc(RepDoc) ->
- {ok, Rep} =
- try
- parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
- catch
- throw:{error, Reason} ->
- throw({bad_rep_doc, Reason});
- throw:{filter_fetch_error, Reason} ->
- throw({filter_fetch_error, Reason});
- Tag:Err ->
- throw({bad_rep_doc, to_binary({Tag, Err})})
- end,
- Rep.
-
--spec parse_rep_doc_without_id({[_]}) -> #rep{}.
-parse_rep_doc_without_id(RepDoc) ->
- {ok, Rep} =
- try
- parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
- catch
- throw:{error, Reason} ->
- throw({bad_rep_doc, Reason});
- Tag:Err ->
- throw({bad_rep_doc, to_binary({Tag, Err})})
- end,
- Rep.
-
--spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc(Doc, UserCtx) ->
- {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
- Cancel = get_value(cancel, Rep#rep.options, false),
- Id = get_value(id, Rep#rep.options, nil),
- case {Cancel, Id} of
- {true, nil} ->
- % Cancel request with no id, must parse id out of body contents
- {ok, update_rep_id(Rep)};
- {true, Id} ->
- % Cancel request with an id specified, so do not parse id from body
- {ok, Rep};
- {false, _Id} ->
- % Not a cancel request, regular replication doc
- {ok, update_rep_id(Rep)}
- end.
-
--spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
-parse_rep_doc_without_id({Props}, UserCtx) ->
- {SrcProxy, TgtProxy} = parse_proxy_settings(Props),
- Opts = make_options(Props),
- case
- get_value(cancel, Opts, false) andalso
- (get_value(id, Opts, nil) =/= nil)
- of
- true ->
- {ok, #rep{options = Opts, user_ctx = UserCtx}};
- false ->
- Source = parse_rep_db(get_value(<<"source">>, Props), SrcProxy, Opts),
- Target = parse_rep_db(get_value(<<"target">>, Props), TgtProxy, Opts),
- {Type, View} =
- case couch_replicator_filters:view_type(Props, Opts) of
- {error, Error} ->
- throw({bad_request, Error});
- Result ->
- Result
- end,
- Rep = #rep{
- source = Source,
- target = Target,
- options = Opts,
- user_ctx = UserCtx,
- type = Type,
- view = View,
- doc_id = get_value(<<"_id">>, Props, null)
- },
- % Check if can parse filter code, if not throw exception
- case couch_replicator_filters:parse(Opts) of
- {error, FilterError} ->
- throw({error, FilterError});
- {ok, _Filter} ->
- ok
- end,
- {ok, Rep}
- end.
-
-parse_proxy_settings(Props) when is_list(Props) ->
- Proxy = get_value(<<"proxy">>, Props, <<>>),
- SrcProxy = get_value(<<"source_proxy">>, Props, <<>>),
- TgtProxy = get_value(<<"target_proxy">>, Props, <<>>),
-
- case Proxy =/= <<>> of
- true when SrcProxy =/= <<>> ->
- Error = "`proxy` is mutually exclusive with `source_proxy`",
- throw({bad_request, Error});
- true when TgtProxy =/= <<>> ->
- Error = "`proxy` is mutually exclusive with `target_proxy`",
- throw({bad_request, Error});
- true ->
- {Proxy, Proxy};
- false ->
- {SrcProxy, TgtProxy}
- end.
-
% Update a #rep{} record with a replication_id. Calculating the id might involve
% fetching a filter from the source db, and so it could fail intermetently.
% In case of a failure to fetch the filter this function will throw a
@@ -386,316 +227,46 @@ save_rep_doc(DbName, Doc) ->
couch_db:close(Db)
end.
--spec rep_user_ctx({[_]}) -> #user_ctx{}.
-rep_user_ctx({RepDoc}) ->
- case get_json_value(<<"user_ctx">>, RepDoc) of
- undefined ->
- #user_ctx{};
- {UserCtx} ->
- #user_ctx{
- name = get_json_value(<<"name">>, UserCtx, null),
- roles = get_json_value(<<"roles">>, UserCtx, [])
- }
- end.
-
--spec parse_rep_db({[_]} | binary(), [_] | binary(), [_]) -> #httpdb{} | no_return().
-parse_rep_db({Props}, Proxy, Options) ->
- ProxyParams = parse_proxy_params(Proxy),
- ProxyURL =
- case ProxyParams of
- [] -> undefined;
- _ -> binary_to_list(Proxy)
- end,
- Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
- {AuthProps} = get_value(<<"auth">>, Props, {[]}),
- {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
- Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
- DefaultHeaders = (#httpdb{})#httpdb.headers,
- HttpDb = #httpdb{
- url = Url,
- auth_props = AuthProps,
- headers = lists:ukeymerge(1, Headers, DefaultHeaders),
- ibrowse_options = lists:keysort(
- 1,
- [
- {socket_options, get_value(socket_options, Options)}
- | ProxyParams ++ ssl_params(Url)
- ]
- ),
- timeout = get_value(connection_timeout, Options),
- http_connections = get_value(http_connections, Options),
- retries = get_value(retries, Options),
- proxy_url = ProxyURL
- },
- couch_replicator_utils:normalize_basic_auth(HttpDb);
-parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) ->
- parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
-parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) ->
- parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options);
-parse_rep_db(<<_/binary>>, _Proxy, _Options) ->
- throw({error, local_endpoints_not_supported});
-parse_rep_db(undefined, _Proxy, _Options) ->
- throw({error, <<"Missing replicator database">>}).
-
--spec maybe_add_trailing_slash(binary() | list()) -> list().
-maybe_add_trailing_slash(Url) when is_binary(Url) ->
- maybe_add_trailing_slash(?b2l(Url));
-maybe_add_trailing_slash(Url) ->
- case lists:member($?, Url) of
- true ->
- % skip if there are query params
- Url;
- false ->
- case lists:last(Url) of
- $/ ->
- Url;
- _ ->
- Url ++ "/"
- end
- end.
-
--spec make_options([_]) -> [_].
-make_options(Props) ->
- Options0 = lists:ukeysort(1, convert_options(Props)),
- Options = check_options(Options0),
- DefWorkers = config:get_integer("replicator", "worker_processes", 4),
- DefBatchSize = config:get_integer("replicator", "worker_batch_size", 500),
- DefConns = config:get_integer("replicator", "http_connections", 20),
- DefTimeout = config:get_integer("replicator", "connection_timeout", 30000),
- DefRetries = config:get_integer("replicator", "retries_per_request", 5),
- UseCheckpoints = config:get_boolean("replicator", "use_checkpoints", true),
- UseBulkGet = config:get_boolean("replicator", "use_bulk_get", true),
- DefCheckpointInterval = config:get_integer(
- "replicator",
- "checkpoint_interval",
- 30000
- ),
- {ok, DefSocketOptions} = couch_util:parse_term(
- config:get(
- "replicator",
- "socket_options",
- "[{keepalive, true}, {nodelay, false}]"
- )
- ),
- lists:ukeymerge(
- 1,
- Options,
- lists:keysort(1, [
- {connection_timeout, DefTimeout},
- {retries, DefRetries},
- {http_connections, DefConns},
- {socket_options, DefSocketOptions},
- {worker_batch_size, DefBatchSize},
- {worker_processes, DefWorkers},
- {use_checkpoints, UseCheckpoints},
- {use_bulk_get, UseBulkGet},
- {checkpoint_interval, DefCheckpointInterval}
- ])
- ).
-
--spec convert_options([_]) -> [_].
-convert_options([]) ->
- [];
-convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V) ->
- throw({bad_request, <<"parameter `cancel` must be a boolean">>});
-convert_options([{<<"cancel">>, V} | R]) ->
- [{cancel, V} | convert_options(R)];
-convert_options([{IdOpt, V} | R]) when
- IdOpt =:= <<"_local_id">>;
- IdOpt =:= <<"replication_id">>;
- IdOpt =:= <<"id">>
-->
- [{id, couch_replicator_ids:convert(V)} | convert_options(R)];
-convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V) ->
- throw({bad_request, <<"parameter `create_target` must be a boolean">>});
-convert_options([{<<"create_target">>, V} | R]) ->
- [{create_target, V} | convert_options(R)];
-convert_options([{<<"create_target_params">>, V} | _R]) when not is_tuple(V) ->
- throw({bad_request, <<"parameter `create_target_params` must be a JSON object">>});
-convert_options([{<<"create_target_params">>, V} | R]) ->
- [{create_target_params, V} | convert_options(R)];
-convert_options([{<<"winning_revs_only">>, V} | _R]) when not is_boolean(V) ->
- throw({bad_request, <<"parameter `winning_revs_only` must be a boolean">>});
-convert_options([{<<"winning_revs_only">>, V} | R]) ->
- [{winning_revs_only, V} | convert_options(R)];
-convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V) ->
- throw({bad_request, <<"parameter `continuous` must be a boolean">>});
-convert_options([{<<"continuous">>, V} | R]) ->
- [{continuous, V} | convert_options(R)];
-convert_options([{<<"filter">>, V} | R]) ->
- [{filter, V} | convert_options(R)];
-convert_options([{<<"query_params">>, V} | R]) ->
- [{query_params, V} | convert_options(R)];
-convert_options([{<<"doc_ids">>, null} | R]) ->
- convert_options(R);
-convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
- throw({bad_request, <<"parameter `doc_ids` must be an array">>});
-convert_options([{<<"doc_ids">>, V} | R]) ->
- % Ensure same behaviour as old replicator: accept a list of percent
- % encoded doc IDs.
- DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
- [{doc_ids, DocIds} | convert_options(R)];
-convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
- throw({bad_request, <<"parameter `selector` must be a JSON object">>});
-convert_options([{<<"selector">>, V} | R]) ->
- [{selector, V} | convert_options(R)];
-convert_options([{<<"worker_processes">>, V} | R]) ->
- [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"worker_batch_size">>, V} | R]) ->
- [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"http_connections">>, V} | R]) ->
- [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"connection_timeout">>, V} | R]) ->
- [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"retries_per_request">>, V} | R]) ->
- [{retries, couch_util:to_integer(V)} | convert_options(R)];
-convert_options([{<<"socket_options">>, V} | R]) ->
- {ok, SocketOptions} = couch_util:parse_term(V),
- [{socket_options, SocketOptions} | convert_options(R)];
-convert_options([{<<"since_seq">>, V} | R]) ->
- [{since_seq, V} | convert_options(R)];
-convert_options([{<<"use_checkpoints">>, V} | R]) ->
- [{use_checkpoints, V} | convert_options(R)];
-convert_options([{<<"use_bulk_get">>, V} | _R]) when not is_boolean(V) ->
- throw({bad_request, <<"parameter `use_bulk_get` must be a boolean">>});
-convert_options([{<<"use_bulk_get">>, V} | R]) ->
- [{use_bulk_get, V} | convert_options(R)];
-convert_options([{<<"checkpoint_interval">>, V} | R]) ->
- [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
-% skip unknown option
-convert_options([_ | R]) ->
- convert_options(R).
-
--spec check_options([_]) -> [_].
-check_options(Options) ->
- DocIds = lists:keyfind(doc_ids, 1, Options),
- Filter = lists:keyfind(filter, 1, Options),
- Selector = lists:keyfind(selector, 1, Options),
- case {DocIds, Filter, Selector} of
- {false, false, false} -> Options;
- {false, false, _} -> Options;
- {false, _, false} -> Options;
- {_, false, false} -> Options;
- _ -> throw({bad_request, "`doc_ids`,`filter`,`selector` are mutually exclusive"})
- end.
-
--spec parse_proxy_params(binary() | [_]) -> [_].
-parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
- parse_proxy_params(?b2l(ProxyUrl));
-parse_proxy_params([]) ->
- [];
-parse_proxy_params(ProxyUrl) ->
- #url{
- host = Host,
- port = Port,
- username = User,
- password = Passwd,
- protocol = Protocol
- } = ibrowse_lib:parse_url(ProxyUrl),
- Params =
- [
- {proxy_host, Host},
- {proxy_port, Port}
- ] ++
- case is_list(User) andalso is_list(Passwd) of
- false ->
- [];
- true ->
- [{proxy_user, User}, {proxy_password, Passwd}]
- end,
- case Protocol of
- socks5 ->
- [proxy_to_socks5(Param) || Param <- Params];
- _ ->
- Params
- end.
-
--spec proxy_to_socks5({atom(), string()}) -> {atom(), string()}.
-proxy_to_socks5({proxy_host, Val}) ->
- {socks5_host, Val};
-proxy_to_socks5({proxy_port, Val}) ->
- {socks5_port, Val};
-proxy_to_socks5({proxy_user, Val}) ->
- {socks5_user, Val};
-proxy_to_socks5({proxy_password, Val}) ->
- {socks5_password, Val}.
-
--spec ssl_params([_]) -> [_].
-ssl_params(Url) ->
- case ibrowse_lib:parse_url(Url) of
- #url{protocol = https} ->
- Depth = config:get_integer(
- "replicator",
- "ssl_certificate_max_depth",
- 3
- ),
- VerifyCerts = config:get_boolean(
- "replicator",
- "verify_ssl_certificates",
- false
- ),
- CertFile = config:get("replicator", "cert_file", undefined),
- KeyFile = config:get("replicator", "key_file", undefined),
- Password = config:get("replicator", "password", undefined),
- SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts)],
- SslOpts1 =
- case CertFile /= undefined andalso KeyFile /= undefined of
- true ->
- case Password of
- undefined ->
- [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
- _ ->
- [
- {certfile, CertFile},
- {keyfile, KeyFile},
- {password, Password}
- ] ++ SslOpts
- end;
- false ->
- SslOpts
- end,
- [{is_ssl, true}, {ssl_options, SslOpts1}];
- #url{protocol = http} ->
- []
- end.
-
--spec ssl_verify_options(true | false) -> [_].
-ssl_verify_options(true) ->
- CAFile = config:get("replicator", "ssl_trusted_certificates_file"),
- [{verify, verify_peer}, {cacertfile, CAFile}];
-ssl_verify_options(false) ->
- [{verify, verify_none}].
-
-spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}.
before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) ->
Doc;
+before_doc_update(#doc{} = Doc, _Db, ?REPLICATED_CHANGES) ->
+ % Skip internal replicator updates
+ Doc;
before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
- #user_ctx{
- roles = Roles,
- name = Name
- } = couch_db:get_user_ctx(Db),
- case lists:member(<<"_replicator">>, Roles) of
+ #user_ctx{roles = Roles, name = Name} = couch_db:get_user_ctx(Db),
+ IsReplicator = lists:member(<<"_replicator">>, Roles),
+ Doc1 = case IsReplicator of
+ true -> Doc;
+ false -> before_doc_update_owner(get_value(?OWNER, Body), Name, Db, Doc)
+ end,
+ IsFailed = get_value(<<"_replication_state">>, Body) =:= <<"failed">>,
+ case IsReplicator orelse Doc1#doc.deleted orelse IsFailed of
true ->
- Doc;
+ ok;
false ->
- case couch_util:get_value(?OWNER, Body) of
- undefined ->
- Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
- Name ->
- Doc;
- Other ->
- case (catch couch_db:check_is_admin(Db)) of
- ok when Other =:= null ->
- Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
- ok ->
- Doc;
- _ ->
- throw(
- {forbidden,
- <<"Can't update replication documents", " from other users.">>}
- )
- end
+ try
+ couch_replicator_parse:parse_rep_doc(Doc1#doc.body)
+ catch
+ throw:{bad_rep_doc, Error} ->
+ throw({forbidden, Error})
end
+ end,
+ Doc1.
+
+before_doc_update_owner(undefined, Name, _Db, #doc{body = {Body}} = Doc) ->
+ Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+before_doc_update_owner(Name, Name, _Db, #doc{} = Doc) ->
+ Doc;
+before_doc_update_owner(Other, Name, Db, #doc{body = {Body}} = Doc) ->
+ case (catch couch_db:check_is_admin(Db)) of
+ ok when Other =:= null ->
+ Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
+ ok ->
+ Doc;
+ _ ->
+ Err = <<"Can't update replication documents from other users.">>,
+ throw({forbidden, Err})
end.
-spec after_doc_read(#doc{}, Db :: any()) -> #doc{}.
@@ -707,22 +278,12 @@ after_doc_read(#doc{body = {Body}} = Doc, Db) ->
ok ->
Doc;
_ ->
- case couch_util:get_value(?OWNER, Body) of
+ case get_value(?OWNER, Body) of
Name ->
Doc;
_Other ->
- Source = strip_credentials(
- couch_util:get_value(
- <<"source">>,
- Body
- )
- ),
- Target = strip_credentials(
- couch_util:get_value(
- <<"target">>,
- Body
- )
- ),
+ Source = strip_credentials(get_value(<<"source">>, Body)),
+ Target = strip_credentials(get_value(<<"target">>, Body)),
NewBody0 = ?replace(Body, <<"source">>, Source),
NewBody = ?replace(NewBody0, <<"target">>, Target),
#doc{revs = {Pos, [_ | Revs]}} = Doc,
@@ -765,6 +326,15 @@ error_reason({error, Reason}) ->
error_reason(Reason) ->
to_binary(Reason).
+to_binary(Val) ->
+ couch_util:to_binary(Val).
+
+get_value(Key, Props) ->
+ couch_util:get_value(Key, Props).
+
+get_json_value(Key, Obj) ->
+ couch_replicator_utils:get_json_value(Key, Obj).
+
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 939070b95..86fe1f26e 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -196,10 +196,10 @@ winning_revs_generates_new_id(_) ->
{<<"source">>, <<"http://foo.example.bar">>},
{<<"target">>, <<"http://bar.example.foo">>}
],
- Rep1 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc1}),
+ Rep1 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc1}),
Id1 = replication_id(Rep1),
RepDoc2 = RepDoc1 ++ [{<<"winning_revs_only">>, true}],
- Rep2 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc2}),
+ Rep2 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc2}),
Id2 = replication_id(Rep2),
?assertNotEqual(Id1, Id2).
@@ -208,10 +208,10 @@ winning_revs_false_same_as_undefined(_) ->
{<<"source">>, <<"http://foo.example.bar">>},
{<<"target">>, <<"http://bar.example.foo">>}
],
- Rep1 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc1}),
+ Rep1 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc1}),
Id1 = replication_id(Rep1),
RepDoc2 = RepDoc1 ++ [{<<"winning_revs_only">>, false}],
- Rep2 = couch_replicator_docs:parse_rep_doc_without_id({RepDoc2}),
+ Rep2 = couch_replicator_parse:parse_rep_doc_without_id({RepDoc2}),
Id2 = replication_id(Rep2),
?assertEqual(Id1, Id2).
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
deleted file mode 100644
index 4f4369075..000000000
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ /dev/null
@@ -1,183 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_DB_DOC_VALIDATE_FUN, <<"
- function(newDoc, oldDoc, userCtx) {
- function reportError(error_msg) {
- log('Error writing document `' + newDoc._id +
- '\\' to the replicator database: ' + error_msg);
- throw({forbidden: error_msg});
- }
-
- function validateEndpoint(endpoint, fieldName) {
- if ((typeof endpoint !== 'string') &&
- ((typeof endpoint !== 'object') || (endpoint === null))) {
-
- reportError('The `' + fieldName + '\\' property must exist' +
- ' and be either a string or an object.');
- }
-
- if (typeof endpoint === 'object') {
- if ((typeof endpoint.url !== 'string') || !endpoint.url) {
- reportError('The url property must exist in the `' +
- fieldName + '\\' field and must be a non-empty string.');
- }
-
- if ((typeof endpoint.auth !== 'undefined') &&
- ((typeof endpoint.auth !== 'object') ||
- endpoint.auth === null)) {
-
- reportError('`' + fieldName +
- '.auth\\' must be a non-null object.');
- }
-
- if ((typeof endpoint.headers !== 'undefined') &&
- ((typeof endpoint.headers !== 'object') ||
- endpoint.headers === null)) {
-
- reportError('`' + fieldName +
- '.headers\\' must be a non-null object.');
- }
- }
- }
-
- var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
- var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
-
- if (isReplicator) {
- // Always let replicator update the replication document
- return;
- }
-
- if (newDoc._replication_state === 'failed') {
- // Skip validation in case when we update the document with the
- // failed state. In this case it might be malformed. However,
- // replicator will not pay attention to failed documents so this
- // is safe.
- return;
- }
-
- if (!newDoc._deleted) {
- validateEndpoint(newDoc.source, 'source');
- validateEndpoint(newDoc.target, 'target');
-
- if ((typeof newDoc.create_target !== 'undefined') &&
- (typeof newDoc.create_target !== 'boolean')) {
-
- reportError('The `create_target\\' field must be a boolean.');
- }
-
- if ((typeof newDoc.winning_revs_only !== 'undefined') &&
- (typeof newDoc.winning_revs_only !== 'boolean')) {
-
- reportError('The `winning_revs_only\\' field must be a boolean.');
- }
-
- if ((typeof newDoc.continuous !== 'undefined') &&
- (typeof newDoc.continuous !== 'boolean')) {
-
- reportError('The `continuous\\' field must be a boolean.');
- }
-
- if ((typeof newDoc.doc_ids !== 'undefined') &&
- !isArray(newDoc.doc_ids)) {
-
- reportError('The `doc_ids\\' field must be an array of strings.');
- }
-
- if ((typeof newDoc.selector !== 'undefined') &&
- (typeof newDoc.selector !== 'object')) {
-
- reportError('The `selector\\' field must be an object.');
- }
-
- if ((typeof newDoc.filter !== 'undefined') &&
- ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
-
- reportError('The `filter\\' field must be a non-empty string.');
- }
-
- if ((typeof newDoc.doc_ids !== 'undefined') &&
- (typeof newDoc.selector !== 'undefined')) {
-
- reportError('`doc_ids\\' field is incompatible with `selector\\'.');
- }
-
- if ( ((typeof newDoc.doc_ids !== 'undefined') ||
- (typeof newDoc.selector !== 'undefined')) &&
- (typeof newDoc.filter !== 'undefined') ) {
-
- reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.');
- }
-
- if ((typeof newDoc.query_params !== 'undefined') &&
- ((typeof newDoc.query_params !== 'object') ||
- newDoc.query_params === null)) {
-
- reportError('The `query_params\\' field must be an object.');
- }
-
- if (newDoc.user_ctx) {
- var user_ctx = newDoc.user_ctx;
-
- if ((typeof user_ctx !== 'object') || (user_ctx === null)) {
- reportError('The `user_ctx\\' property must be a ' +
- 'non-null object.');
- }
-
- if (!(user_ctx.name === null ||
- (typeof user_ctx.name === 'undefined') ||
- ((typeof user_ctx.name === 'string') &&
- user_ctx.name.length > 0))) {
-
- reportError('The `user_ctx.name\\' property must be a ' +
- 'non-empty string or null.');
- }
-
- if (!isAdmin && (user_ctx.name !== userCtx.name)) {
- reportError('The given `user_ctx.name\\' is not valid');
- }
-
- if (user_ctx.roles && !isArray(user_ctx.roles)) {
- reportError('The `user_ctx.roles\\' property must be ' +
- 'an array of strings.');
- }
-
- if (!isAdmin && user_ctx.roles) {
- for (var i = 0; i < user_ctx.roles.length; i++) {
- var role = user_ctx.roles[i];
-
- if (typeof role !== 'string' || role.length === 0) {
- reportError('Roles must be non-empty strings.');
- }
- if (userCtx.roles.indexOf(role) === -1) {
- reportError('Invalid role (`' + role +
- '\\') in the `user_ctx\\'');
- }
- }
- }
- } else {
- if (!isAdmin) {
- reportError('The `user_ctx\\' property is missing (it is ' +
- 'optional for admins only).');
- }
- }
- } else {
- if (!isAdmin) {
- if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) {
- reportError('Replication documents can only be deleted by ' +
- 'admins or by the users who created them.');
- }
- }
- }
- }
-">>).
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_parse.erl
similarity index 65%
copy from src/couch_replicator/src/couch_replicator_docs.erl
copy to src/couch_replicator/src/couch_replicator_parse.erl
index a60f1a1e1..a584d0306 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_parse.erl
@@ -10,183 +10,20 @@
% License for the specific language governing permissions and limitations under
% the License.
--module(couch_replicator_docs).
+-module(couch_replicator_parse).
-export([
parse_rep_doc/1,
parse_rep_doc/2,
parse_rep_db/3,
parse_rep_doc_without_id/1,
- parse_rep_doc_without_id/2,
- before_doc_update/3,
- after_doc_read/2,
- ensure_rep_ddoc_exists/1,
- ensure_cluster_rep_ddoc_exists/1,
- remove_state_fields/2,
- update_doc_completed/3,
- update_failed/3,
- update_rep_id/1,
- update_triggered/2,
- update_error/2
+ parse_rep_doc_without_id/2
]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("ibrowse/include/ibrowse.hrl").
--include_lib("mem3/include/mem3.hrl").
-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
--include("couch_replicator_js_functions.hrl").
-
--import(couch_util, [
- get_value/2,
- get_value/3,
- to_binary/1
-]).
-
--import(couch_replicator_utils, [
- get_json_value/2,
- get_json_value/3
-]).
-
--define(REP_DB_NAME, <<"_replicator">>).
--define(REP_DESIGN_DOC, <<"_design/_replicator">>).
--define(OWNER, <<"owner">>).
--define(CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}}).
--define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
-
-remove_state_fields(DbName, DocId) ->
- update_rep_doc(DbName, DocId, [
- {<<"_replication_state">>, undefined},
- {<<"_replication_state_time">>, undefined},
- {<<"_replication_state_reason">>, undefined},
- {<<"_replication_id">>, undefined},
- {<<"_replication_stats">>, undefined}
- ]).
-
--spec update_doc_completed(binary(), binary(), [_]) -> any().
-update_doc_completed(DbName, DocId, Stats) ->
- update_rep_doc(DbName, DocId, [
- {<<"_replication_state">>, <<"completed">>},
- {<<"_replication_state_reason">>, undefined},
- {<<"_replication_stats">>, {Stats}}
- ]),
- couch_stats:increment_counter([
- couch_replicator,
- docs,
- completed_state_updates
- ]).
-
--spec update_failed(binary(), binary(), any()) -> any().
-update_failed(DbName, DocId, Error) ->
- Reason = error_reason(Error),
- couch_log:error(
- "Error processing replication doc `~s` from `~s`: ~s",
- [DocId, DbName, Reason]
- ),
- update_rep_doc(DbName, DocId, [
- {<<"_replication_state">>, <<"failed">>},
- {<<"_replication_stats">>, undefined},
- {<<"_replication_state_reason">>, Reason}
- ]),
- couch_stats:increment_counter([
- couch_replicator,
- docs,
- failed_state_updates
- ]).
-
--spec update_triggered(#rep{}, rep_id()) -> ok.
-update_triggered(Rep, {Base, Ext}) ->
- #rep{
- db_name = DbName,
- doc_id = DocId
- } = Rep,
- update_rep_doc(DbName, DocId, [
- {<<"_replication_state">>, <<"triggered">>},
- {<<"_replication_state_reason">>, undefined},
- {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
- {<<"_replication_stats">>, undefined}
- ]),
- ok.
-
--spec update_error(#rep{}, any()) -> ok.
-update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
- Reason = error_reason(Error),
- BinRepId =
- case RepId of
- {Base, Ext} ->
- iolist_to_binary([Base, Ext]);
- _Other ->
- null
- end,
- update_rep_doc(DbName, DocId, [
- {<<"_replication_state">>, <<"error">>},
- {<<"_replication_state_reason">>, Reason},
- {<<"_replication_stats">>, undefined},
- {<<"_replication_id">>, BinRepId}
- ]),
- ok.
-
--spec ensure_rep_ddoc_exists(binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb) ->
- case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
- true ->
- ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
- false ->
- ok
- end.
-
--spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
-ensure_rep_ddoc_exists(RepDb, DDocId) ->
- case open_rep_doc(RepDb, DDocId) of
- {not_found, no_db_file} ->
- %% database was deleted.
- ok;
- {not_found, _Reason} ->
- DocProps = replication_design_doc_props(DDocId),
- DDoc = couch_doc:from_json_obj({DocProps}),
- couch_log:notice("creating replicator ddoc ~p", [RepDb]),
- {ok, _Rev} = save_rep_doc(RepDb, DDoc);
- {ok, Doc} ->
- Latest = replication_design_doc_props(DDocId),
- {Props0} = couch_doc:to_json_obj(Doc, []),
- {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
- case compare_ejson({Props}, {Latest}) of
- true ->
- ok;
- false ->
- LatestWithRev = [{<<"_rev">>, Rev} | Latest],
- DDoc = couch_doc:from_json_obj({LatestWithRev}),
- couch_log:notice("updating replicator ddoc ~p", [RepDb]),
- try
- {ok, _} = save_rep_doc(RepDb, DDoc)
- catch
- throw:conflict ->
- %% ignore, we'll retry next time
- ok
- end
- end
- end,
- ok.
-
--spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
-ensure_cluster_rep_ddoc_exists(RepDb) ->
- DDocId = ?REP_DESIGN_DOC,
- [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
- ensure_rep_ddoc_exists(DbShard, DDocId).
-
--spec compare_ejson({[_]}, {[_]}) -> boolean().
-compare_ejson(EJson1, EJson2) ->
- EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
- EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
- EjsonSorted1 == EjsonSorted2.
-
--spec replication_design_doc_props(binary()) -> [_].
-replication_design_doc_props(DDocId) ->
- [
- {<<"_id">>, DDocId},
- {<<"language">>, <<"javascript">>},
- {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
- ].
% Note: parse_rep_doc can handle filtered replications. During parsing of the
% replication doc it will make possibly remote http requests to the source
@@ -304,88 +141,6 @@ update_rep_id(Rep) ->
RepId = couch_replicator_ids:replication_id(Rep),
Rep#rep{id = RepId}.
-update_rep_doc(RepDbName, RepDocId, KVs) ->
- update_rep_doc(RepDbName, RepDocId, KVs, 1).
-
-update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
- try
- case open_rep_doc(RepDbName, RepDocId) of
- {ok, LastRepDoc} ->
- update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
- _ ->
- ok
- end
- catch
- throw:conflict ->
- Msg = "Conflict when updating replication doc `~s`. Retrying.",
- couch_log:error(Msg, [RepDocId]),
- ok = timer:sleep(couch_rand:uniform(erlang:min(128, Wait)) * 100),
- update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
- end;
-update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
- NewRepDocBody = lists:foldl(
- fun
- ({K, undefined}, Body) ->
- lists:keydelete(K, 1, Body);
- ({<<"_replication_state">> = K, State} = KV, Body) ->
- case get_json_value(K, Body) of
- State ->
- Body;
- _ ->
- Body1 = lists:keystore(K, 1, Body, KV),
- Timestamp = couch_replicator_utils:iso8601(os:timestamp()),
- lists:keystore(
- <<"_replication_state_time">>,
- 1,
- Body1,
- {<<"_replication_state_time">>, Timestamp}
- )
- end;
- ({K, _V} = KV, Body) ->
- lists:keystore(K, 1, Body, KV)
- end,
- RepDocBody,
- KVs
- ),
- case NewRepDocBody of
- RepDocBody ->
- ok;
- _ ->
- % Might not succeed - when the replication doc is deleted right
- % before this update (not an error, ignore).
- save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}})
- end.
-
-open_rep_doc(DbName, DocId) ->
- ioq:maybe_set_io_priority({system, DbName}),
- case couch_db:open_int(DbName, [?CTX, sys_db]) of
- {ok, Db} ->
- try
- couch_db:open_doc(Db, DocId, [ejson_body])
- after
- couch_db:close(Db)
- end;
- Else ->
- Else
- end.
-
-save_rep_doc(DbName, Doc) ->
- ioq:maybe_set_io_priority({system, DbName}),
- {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]),
- try
- couch_db:update_doc(Db, Doc, [])
- catch
- % User can accidentally write a VDU which prevents _replicator from
- % updating replication documents. Avoid crashing replicator and thus
- % preventing all other replication jobs on the node from running.
- throw:{forbidden, Reason} ->
- Msg = "~p VDU function preventing doc update to ~s ~s ~p",
- couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]),
- {ok, forbidden}
- after
- couch_db:close(Db)
- end.
-
-spec rep_user_ctx({[_]}) -> #user_ctx{}.
rep_user_ctx({RepDoc}) ->
case get_json_value(<<"user_ctx">>, RepDoc) of
@@ -666,104 +421,20 @@ ssl_verify_options(true) ->
ssl_verify_options(false) ->
[{verify, verify_none}].
--spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}.
-before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) ->
- Doc;
-before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
- #user_ctx{
- roles = Roles,
- name = Name
- } = couch_db:get_user_ctx(Db),
- case lists:member(<<"_replicator">>, Roles) of
- true ->
- Doc;
- false ->
- case couch_util:get_value(?OWNER, Body) of
- undefined ->
- Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
- Name ->
- Doc;
- Other ->
- case (catch couch_db:check_is_admin(Db)) of
- ok when Other =:= null ->
- Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
- ok ->
- Doc;
- _ ->
- throw(
- {forbidden,
- <<"Can't update replication documents", " from other users.">>}
- )
- end
- end
- end.
+get_value(Key, Props) ->
+ couch_util:get_value(Key, Props).
--spec after_doc_read(#doc{}, Db :: any()) -> #doc{}.
-after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
- Doc;
-after_doc_read(#doc{body = {Body}} = Doc, Db) ->
- #user_ctx{name = Name} = couch_db:get_user_ctx(Db),
- case (catch couch_db:check_is_admin(Db)) of
- ok ->
- Doc;
- _ ->
- case couch_util:get_value(?OWNER, Body) of
- Name ->
- Doc;
- _Other ->
- Source = strip_credentials(
- couch_util:get_value(
- <<"source">>,
- Body
- )
- ),
- Target = strip_credentials(
- couch_util:get_value(
- <<"target">>,
- Body
- )
- ),
- NewBody0 = ?replace(Body, <<"source">>, Source),
- NewBody = ?replace(NewBody0, <<"target">>, Target),
- #doc{revs = {Pos, [_ | Revs]}} = Doc,
- NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
- NewRevId = couch_db:new_revid(NewDoc),
- NewDoc#doc{revs = {Pos, [NewRevId | Revs]}}
- end
- end.
+get_value(Key, Props, Default) ->
+ couch_util:get_value(Key, Props, Default).
--spec strip_credentials
- (undefined) -> undefined;
- (binary()) -> binary();
- ({[_]}) -> {[_]}.
-strip_credentials(undefined) ->
- undefined;
-strip_credentials(Url) when is_binary(Url) ->
- re:replace(
- Url,
- "http(s)?://(?:[^:]+):[^@]+@(.*)$",
- "http\\1://\\2",
- [{return, binary}]
- );
-strip_credentials({Props0}) ->
- Props1 = lists:keydelete(<<"headers">>, 1, Props0),
- % Strip "auth" just like headers, for replication plugins it can be a place
- % to stash credential that are not necessarily in headers
- Props2 = lists:keydelete(<<"auth">>, 1, Props1),
- {Props2}.
-
-error_reason({shutdown, Error}) ->
- error_reason(Error);
-error_reason({bad_rep_doc, Reason}) ->
- to_binary(Reason);
-error_reason({error, {Error, Reason}}) when
- is_atom(Error), is_binary(Reason)
-->
- to_binary(io_lib:format("~s: ~s", [Error, Reason]));
-error_reason({error, Reason}) ->
- to_binary(Reason);
-error_reason(Reason) ->
- to_binary(Reason).
+to_binary(Val) ->
+ couch_util:to_binary(Val).
+
+get_json_value(Key, Obj) ->
+ couch_replicator_utils:get_json_value(Key, Obj).
+
+get_json_value(Key, Obj, Default) ->
+ couch_replicator_util:get_json_value(Key, Obj, Default).
-ifdef(TEST).
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 1ba933a5e..06d3052be 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -1149,8 +1149,8 @@ scheduler_job_format_status_test() ->
Target = <<"http://u:p@h2/d2">>,
Rep = #rep{
id = {"base", "+ext"},
- source = couch_replicator_docs:parse_rep_db(Source, [], []),
- target = couch_replicator_docs:parse_rep_db(Target, [], []),
+ source = couch_replicator_parse:parse_rep_db(Source, [], []),
+ target = couch_replicator_parse:parse_rep_db(Target, [], []),
options = [{create_target, true}],
doc_id = <<"mydoc">>,
db_name = <<"mydb">>
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index e4a2cd12f..36700d9d5 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -13,7 +13,6 @@
-module(couch_replicator_utils).
-export([
- parse_rep_doc/2,
replication_id/2,
sum_stats/2,
is_deleted/1,
@@ -95,9 +94,6 @@ replication_id(Rep, Version) ->
sum_stats(S1, S2) ->
couch_replicator_stats:sum_stats(S1, S2).
-parse_rep_doc(Props, UserCtx) ->
- couch_replicator_docs:parse_rep_doc(Props, UserCtx).
-
-spec iso8601(erlang:timestamp()) -> binary().
iso8601({_Mega, _Sec, _Micro} = Timestamp) ->
{{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time(Timestamp),
@@ -351,7 +347,7 @@ normalize_rep_test_() ->
{<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]},
{<<"other_field">>, <<"some_value">>}
]},
- Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1),
+ Rep1 = couch_replicator_parse:parse_rep_doc_without_id(EJson1),
EJson2 =
{[
{<<"other_field">>, <<"unrelated">>},
@@ -360,7 +356,7 @@ normalize_rep_test_() ->
{<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]},
{<<"other_field2">>, <<"unrelated2">>}
]},
- Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2),
+ Rep2 = couch_replicator_parse:parse_rep_doc_without_id(EJson2),
?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2))
end)
}.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
index 2d5ef96b1..df8074f1f 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
@@ -304,7 +304,7 @@ replicate(Source, Target) ->
{<<"target">>, db_url(Target)},
{<<"continuous">>, true}
]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
ok = couch_replicator_scheduler:add_job(Rep),
couch_replicator_scheduler:reschedule(),
Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index 6bdb4ecb2..7ba6bc69d 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -226,7 +226,7 @@ replicate(Source, Target) ->
% Low connection timeout so _changes feed gets restarted quicker
{<<"connection_timeout">>, 3000}
]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
ok = couch_replicator_scheduler:add_job(Rep),
couch_replicator_scheduler:reschedule(),
{ok, Rep#rep.id}.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
index 3468cda73..758c44f2b 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
@@ -50,7 +50,7 @@ parse_rep_doc_without_proxy(_) ->
{<<"source">>, <<"http://unproxied.com">>},
{<<"target">>, <<"http://otherunproxied.com">>}
]},
- Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc),
+ Rep = couch_replicator_parse:parse_rep_doc(NoProxyDoc),
?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined),
?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined).
@@ -62,7 +62,7 @@ parse_rep_doc_with_proxy(_) ->
{<<"target">>, <<"http://otherunproxied.com">>},
{<<"proxy">>, ProxyURL}
]},
- Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+ Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc),
?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)),
?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL)).
@@ -76,7 +76,7 @@ parse_rep_source_target_proxy(_) ->
{<<"source_proxy">>, SrcProxyURL},
{<<"target_proxy">>, TgtProxyURL}
]},
- Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
+ Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc),
?assertEqual(
(Rep#rep.source)#httpdb.proxy_url,
binary_to_list(SrcProxyURL)
@@ -96,7 +96,7 @@ mutually_exclusive_proxy_and_source_proxy(_) ->
]},
?assertThrow(
{bad_rep_doc, _},
- couch_replicator_docs:parse_rep_doc(ProxyDoc)
+ couch_replicator_parse:parse_rep_doc(ProxyDoc)
).
mutually_exclusive_proxy_and_target_proxy(_) ->
@@ -109,5 +109,5 @@ mutually_exclusive_proxy_and_target_proxy(_) ->
]},
?assertThrow(
{bad_rep_doc, _},
- couch_replicator_docs:parse_rep_doc(ProxyDoc)
+ couch_replicator_parse:parse_rep_doc(ProxyDoc)
).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 04c665af5..f413e5cf4 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -214,7 +214,7 @@ replicate(Source, Target) ->
{<<"target">>, db_url(Target)},
{<<"continuous">>, true}
]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
ok = couch_replicator_scheduler:add_job(Rep),
couch_replicator_scheduler:reschedule(),
Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
index f30bdb1cd..f862527f4 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
@@ -173,7 +173,7 @@ replicate(Source, Target) ->
).
replicate({[_ | _]} = RepObject) ->
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
ok = couch_replicator_scheduler:add_job(Rep),
couch_replicator_scheduler:reschedule(),
Pid = get_pid(Rep#rep.id),