You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/03/14 19:26:17 UTC

[22/50] couch-replicator commit: updated refs/heads/63012-scheduler to 27a5eae

Implement Compatibility Mode

Compatibilty mode enables writing of `triggered` and `error` states to the
replication documents. It is designed for backward compatibility with the
previous replicator behavior.

Compatibility mode only affects writing of states to the documents, the
scheduling and backoff as well as the new _scheduler/docs and
_scheduler/jobs HTTP endpoints are not affected by the mode change.

Implementation notes:

 * A main idea is avoiding an update feedback cycle,  that is when
 a document state update triggers a replication job update,  which,
 in turn, generates another state update and so on... To avoid this situation a
 semantic short circuit is used: discarad all document updates which do not
 change replication parameters. This includes changes to the _replicator_* fields
 used during state updates, or custom user fields. To accomplish this shortcut,
 introduce the idea of a "normalized" #rep{} record. That is a replication record
 which only has fields set that make it possible to compare 2 instances of rep
 records.

* To make normalization work, some options and other list-based rep fields have
to be sorted. Since options are already sorted, don't sort them again, however,
doc_ids filter was not sorted, so make sure to sort just that.

* Since the scheduling replicator might force documents to wait a while in the
pending queue before running them, make sure to update the `triggered` state not
only when jobs started successfully but also when they are added to the
replicator the first time. The preserves the rough semantic of "triggered means
replicator has noticed my document and turned into a replication job" idea.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/f41fd677
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/f41fd677
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/f41fd677

Branch: refs/heads/63012-scheduler
Commit: f41fd677beffbb37e87134983c936a0bcc8cc9df
Parents: 2f95128
Author: Nick Vatamaniuc <va...@apache.org>
Authored: Tue Nov 8 20:58:58 2016 -0500
Committer: Nick Vatamaniuc <va...@apache.org>
Committed: Tue Nov 8 20:58:58 2016 -0500

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl      |  46 ++++++++-
 src/couch_replicator_doc_processor.erl | 144 ++++++++++++++++++++++++----
 src/couch_replicator_docs.erl          |  63 ++++++++++--
 src/couch_replicator_scheduler.erl     |   6 ++
 src/couch_replicator_scheduler_job.erl |   8 +-
 5 files changed, 240 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f41fd677/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index b87b020..7828684 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -38,7 +38,8 @@
     open_doc/3,
     open_doc_revs/6,
     changes_since/5,
-    db_uri/1
+    db_uri/1,
+    normalize_db/1
     ]).
 
 -import(couch_replicator_httpc, [
@@ -989,3 +990,46 @@ header_value(Key, Headers, Default) ->
         _ ->
             Default
     end.
+
+
+% Normalize an #httpdb{} or #db{} record such that it can be used for
+% comparisons. This means remove things like pids and also sort options / props.
+normalize_db(#httpdb{} = HttpDb) ->
+    #httpdb{
+       url = HttpDb#httpdb.url,
+       oauth = HttpDb#httpdb.oauth,
+       headers = lists:keysort(1, HttpDb#httpdb.headers),
+       timeout = HttpDb#httpdb.timeout,
+       ibrowse_options = lists:keysort(1, HttpDb#httpdb.ibrowse_options),
+       retries = HttpDb#httpdb.retries,
+       http_connections = HttpDb#httpdb.http_connections
+    };
+
+normalize_db(<<DbName/binary>>) ->
+    DbName.
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+normalize_http_db_test() ->
+    HttpDb =  #httpdb{
+       url = "http://host/db",
+       oauth = #oauth{},
+       headers = [{"k2","v2"}, {"k1","v1"}],
+       timeout = 30000,
+       ibrowse_options = [{k2, v2}, {k1, v1}],
+       retries = 10,
+       http_connections = 20
+    },
+    Expected = HttpDb#httpdb{
+        headers = [{"k1","v1"}, {"k2","v2"}],
+        ibrowse_options = [{k1, v1}, {k2, v2}]
+    },
+    ?assertEqual(Expected, normalize_db(HttpDb)),
+    ?assertEqual(<<"local">>, normalize_db(<<"local">>)).
+
+
+-endif.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f41fd677/src/couch_replicator_doc_processor.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_doc_processor.erl b/src/couch_replicator_doc_processor.erl
index 733406f..dddde0f 100644
--- a/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator_doc_processor.erl
@@ -15,6 +15,7 @@
 
 -export([start_link/0]).
 -export([docs/1, doc/2]).
+-export([compat_mode/0]).
 
 % multidb changes callback
 -export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
@@ -31,6 +32,7 @@
     get_json_value/3
 ]).
 
+-define(DEFAULT_COMPATIBILITY, false).
 -define(ERROR_MAX_BACKOFF_EXPONENT, 12).  % ~ 1 day on average
 -define(TS_DAY_SEC, 86400).
 
@@ -104,14 +106,14 @@ process_change(DbName, {Change}) ->
         undefined ->
             ok = process_updated(Id, JsonRepDoc);
         <<"triggered">> ->
-            couch_replicator_docs:remove_state_fields(DbName, DocId),
+            maybe_remove_state_fields(DbName, DocId),
             ok = process_updated(Id, JsonRepDoc);
         <<"completed">> ->
             ok = gen_server:call(?MODULE, {completed, Id}, infinity);
         <<"error">> ->
             % Handle replications started from older versions of replicator
             % which wrote transient errors to replication docs
-            couch_replicator_docs:remove_state_fields(DbName, DocId),
+            maybe_remove_state_fields(DbName, DocId),
             ok = process_updated(Id, JsonRepDoc);
         <<"failed">> ->
             ok
@@ -122,6 +124,15 @@ process_change(DbName, {Change}) ->
     ok.
 
 
+maybe_remove_state_fields(DbName, DocId) ->
+    case compat_mode() of
+        true ->
+            ok;
+        false ->
+            couch_replicator_docs:remove_state_fields(DbName, DocId)
+    end.
+
+
 process_updated({DbName, _DocId} = Id, JsonRepDoc) ->
     % Parsing replication doc (but not calculating the id) could throw an
     % exception which would indicate this document is malformed. This exception
@@ -206,21 +217,68 @@ code_change(_OldVsn, State, _Extra) ->
 % same document.
 -spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok.
 updated_doc(Id, Rep, Filter) ->
-    removed_doc(Id),
-    Row = #rdoc{
-        id = Id,
-        state = initializing,
-        rep = Rep,
-        rid = nil,
-        filter = Filter,
-        info = nil,
-        errcnt = 0,
-        worker = nil,
-        last_updated = os:timestamp()
-    },
-    true = ets:insert(?MODULE, Row),
-    ok = maybe_start_worker(Id),
-    ok.
+    case compare_replication_records(current_rep(Id), Rep) of
+        false ->
+            removed_doc(Id),
+            Row = #rdoc{
+                id = Id,
+                state = initializing,
+                rep = Rep,
+                rid = nil,
+                filter = Filter,
+                info = nil,
+                errcnt = 0,
+                worker = nil,
+                last_updated = os:timestamp()
+            },
+            true = ets:insert(?MODULE, Row),
+            ok = maybe_start_worker(Id);
+        true ->
+            ok
+    end.
+
+
+-spec compare_replication_records(#rep{}, #rep{}) -> boolean().
+compare_replication_records(Rep1, Rep2) ->
+    normalize_rep(Rep1) == normalize_rep(Rep2).
+
+
+% Return current #rep{} record if any. If replication hasn't been submitted
+% to the scheduler yet, #rep{} record will be in the document processor's
+% ETS table, otherwise query scheduler for the #rep{} record.
+-spec current_rep({binary(), binary()}) -> #rep{} | nil.
+current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
+    case ets:lookup(?MODULE, {DbName, DocId}) of
+        [] ->
+            nil;
+        [#rdoc{state = scheduled, rep = nil, rid = JobId}] ->
+            % When replication is scheduled, #rep{} record which can be quite
+            % large compared to other bits in #rdoc is removed in order to avoid
+            % having to keep 2 copies of it. So have to fetch it from the
+            % scheduler.
+            couch_replicator_scheduler:rep_state(JobId);
+        [#rdoc{rep = Rep}] ->
+            Rep
+    end.
+
+
+% Normalize a #rep{} record such that it doesn't contain time dependent fields
+% pids (like httpc pools), and options / props are sorted. This function would
+% used during comparisons.
+-spec normalize_rep(#rep{} | nil) -> #rep{} | nil.
+normalize_rep(nil) ->
+    nil;
+
+normalize_rep(#rep{} = Rep)->
+    #rep{
+       source = couch_replicator_api_wrap:normalize_db(Rep#rep.source),
+       target = couch_replicator_api_wrap:normalize_db(Rep#rep.target),
+       options = Rep#rep.options,  % already sorted in make_options/1
+       type = Rep#rep.type,
+       view = Rep#rep.view,
+       doc_id = Rep#rep.doc_id,
+       db_name = Rep#rep.db_name
+    }.
 
 
 -spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok.
@@ -255,6 +313,7 @@ worker_returned(Ref, Id, {ok, RepId}) ->
                 Row0#rdoc{rep=nil, rid=RepId, info=nil}
         end,
         true = ets:insert(?MODULE, NewRow),
+        ok = maybe_update_doc_triggered(Row#rdoc.rep, RepId),
         ok = maybe_start_worker(Id);
     _ ->
         ok  % doc could have been deleted, ignore
@@ -273,6 +332,7 @@ worker_returned(Ref, Id, {temporary_error, Reason}) ->
             last_updated = os:timestamp()
         },
         true = ets:insert(?MODULE, NewRow),
+        ok = maybe_update_doc_error(NewRow#rdoc.rep, Reason),
         ok = maybe_start_worker(Id);
     _ ->
         ok  % doc could have been deleted, ignore
@@ -288,6 +348,24 @@ worker_returned(Ref, Id, {permanent_failure, _Reason}) ->
     end,
     ok.
 
+-spec maybe_update_doc_error(#rep{}, any()) -> ok.
+maybe_update_doc_error(Rep, Reason) ->
+    case compat_mode() of
+        true ->
+            couch_replicator_docs:update_error(Rep, Reason);
+        false ->
+            ok
+    end.
+
+-spec maybe_update_doc_triggered(#rep{}, rep_id()) -> ok.
+maybe_update_doc_triggered(Rep, RepId) ->
+    case compat_mode() of
+        true ->
+            couch_replicator_docs:update_triggered(Rep, RepId);
+        false ->
+            ok
+    end.
+
 
 -spec error_backoff(non_neg_integer()) -> seconds().
 error_backoff(ErrCnt) ->
@@ -347,6 +425,10 @@ get_worker_wait(#rdoc{state = error, errcnt = ErrCnt}) ->
 get_worker_wait(#rdoc{state = initializing}) ->
     0.
 
+-spec compat_mode() -> boolean().
+compat_mode() ->
+    config:get_boolean("replicator", "compatibility_mode", ?DEFAULT_COMPATIBILITY).
+
 
 % _scheduler/docs HTTP endpoint helpers
 
@@ -630,6 +712,33 @@ t_ejson_docs() ->
     end).
 
 
+normalize_rep_test_() ->
+    {
+        setup,
+        fun() -> meck:expect(config, get, fun(_, _, Default) -> Default end) end,
+        fun(_) -> meck:unload() end,
+        ?_test(begin
+            EJson1 = {[
+                {<<"source">>, <<"http://host.com/source_db">>},
+                {<<"target">>, <<"local">>},
+                {<<"doc_ids">>, [<<"a">>, <<"c">>, <<"b">>]},
+                {<<"other_field">>, <<"some_value">>}
+            ]},
+            Rep1 = couch_replicator_docs:parse_rep_doc_without_id(EJson1),
+            EJson2 = {[
+                {<<"other_field">>, <<"unrelated">>},
+                {<<"target">>, <<"local">>},
+                {<<"source">>, <<"http://host.com/source_db">>},
+                {<<"doc_ids">>, [<<"c">>, <<"a">>, <<"b">>]},
+                {<<"other_field2">>, <<"unrelated2">>}
+            ]},
+            Rep2 = couch_replicator_docs:parse_rep_doc_without_id(EJson2),
+            ?assertEqual(normalize_rep(Rep1), normalize_rep(Rep2))
+        end)
+    }.
+
+
+
 % Test helper functions
 
 
@@ -639,6 +748,7 @@ setup() ->
     meck:expect(couch_log, warning, 2, ok),
     meck:expect(couch_log, error, 2, ok),
     meck:expect(config, get, fun(_, _, Default) -> Default end),
+    meck:expect(config, listen_for_changes, 2, ok),
     meck:expect(couch_replicator_clustering, owner, 2, node()),
     meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 3, wref),
     meck:expect(couch_replicator_scheduler, remove_job, 1, ok),

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f41fd677/src/couch_replicator_docs.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_docs.erl b/src/couch_replicator_docs.erl
index 75889e9..fb417e4 100644
--- a/src/couch_replicator_docs.erl
+++ b/src/couch_replicator_docs.erl
@@ -23,6 +23,8 @@
     update_failed/4,
     update_rep_id/1
 ]).
+-export([update_triggered/2, update_error/2]).
+
 
 -define(REP_DB_NAME, <<"_replicator">>).
 -define(REP_DESIGN_DOC, <<"_design/_replicator">>).
@@ -57,6 +59,8 @@ remove_state_fields(DbName, DocId) ->
         {<<"_replication_state">>, undefined},
         {<<"_replication_state_time">>, undefined},
         {<<"_replication_state_reason">>, undefined},
+        {<<"_replication_start_time">>, undefined},
+        {<<"_replication_id">>, undefined},
         {<<"_replication_stats">>, undefined}]).
 
 -spec update_doc_completed(binary(), binary(), [_], erlang:timestamp()) -> any().
@@ -72,19 +76,49 @@ update_doc_completed(DbName, DocId, Stats, StartTime) ->
 
 -spec update_failed(binary(), binary(), any(), erlang:timestamp()) -> any().
 update_failed(DbName, DocId, Error, StartTime) ->
-    Reason = case Error of
-        {bad_rep_doc, Reas} ->
-            Reas;
-        _ ->
-            to_binary(Error)
-    end,
+    Reason = error_reason(Error),
     couch_log:error("Error processing replication doc `~s`: ~s", [DocId, Reason]),
     StartTimeBin = couch_replicator_utils:iso8601(StartTime),
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"failed">>},
         {<<"_replication_start_time">>, StartTimeBin},
+        {<<"_replication_stats">>, undefined},
         {<<"_replication_state_reason">>, Reason}]),
-   couch_stats:increment_counter([couch_replicator, docs, failed_state_updates]).
+    couch_stats:increment_counter([couch_replicator, docs, failed_state_updates]).
+
+
+-spec update_triggered(#rep{}, rep_id()) -> ok.
+update_triggered(Rep, {Base, Ext}) ->
+    #rep{
+        db_name = DbName,
+        doc_id = DocId,
+        start_time = StartTime
+    } = Rep,
+    StartTimeBin = couch_replicator_utils:iso8601(StartTime),
+    update_rep_doc(DbName, DocId, [
+            {<<"_replication_state">>, <<"triggered">>},
+            {<<"_replication_state_reason">>, undefined},
+            {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
+            {<<"_replication_start_time">>, StartTimeBin},
+            {<<"_replication_stats">>, undefined}]),
+    ok.
+
+
+-spec update_error(#rep{}, any()) -> ok.
+update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
+    Reason = error_reason(Error),
+    BinRepId = case RepId of
+        {Base, Ext} ->
+            iolist_to_binary([Base, Ext]);
+        _Other ->
+            null
+    end,
+    update_rep_doc(DbName, DocId, [
+            {<<"_replication_state">>, <<"error">>},
+            {<<"_replication_state_reason">>, Reason},
+            {<<"_replication_stats">>, undefined},
+            {<<"_replication_id">>, BinRepId}]),
+    ok.
 
 
 -spec ensure_rep_db_exists() -> {ok, #db{}}.
@@ -458,7 +492,7 @@ convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) ->
 convert_options([{<<"doc_ids">>, V} | R]) ->
     % Ensure same behaviour as old replicator: accept a list of percent
     % encoded doc IDs.
-    DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
+    DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]),
     [{doc_ids, DocIds} | convert_options(R)];
 convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
     throw({bad_request, <<"parameter `selector` must be a JSON object">>});
@@ -626,6 +660,19 @@ strip_credentials({Props}) ->
     {lists:keydelete(<<"oauth">>, 1, Props)}.
 
 
+error_reason({shutdown, Error}) ->
+    error_reason(Error);
+error_reason({bad_rep_doc, Reason}) ->
+    to_binary(Reason);
+error_reason({error, {Error, Reason}})
+  when is_atom(Error), is_binary(Reason) ->
+    to_binary(io_lib:format("~s: ~s", [Error, Reason]));
+error_reason({error, Reason}) ->
+    to_binary(Reason);
+error_reason(Reason) ->
+    to_binary(Reason).
+
+
 
 -ifdef(TEST).
 

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f41fd677/src/couch_replicator_scheduler.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler.erl b/src/couch_replicator_scheduler.erl
index 0aa4ef6..a772ecf 100644
--- a/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator_scheduler.erl
@@ -309,6 +309,12 @@ handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, _State) ->
 
 handle_crashed_job(Job, Reason, State) ->
     ok = update_state_crashed(Job, Reason, State),
+    case couch_replicator_doc_processor:compat_mode() of
+        true ->
+            couch_replicator_docs:update_error(Job#job.rep, Reason);
+        false ->
+            ok
+    end,
     case ets:info(?MODULE, size) < State#state.max_jobs of
         true ->
             % Starting pending jobs is an O(TotalJobsCount) operation. Only do

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/f41fd677/src/couch_replicator_scheduler_job.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_scheduler_job.erl b/src/couch_replicator_scheduler_job.erl
index 9d14a9d..32e6930 100644
--- a/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator_scheduler_job.erl
@@ -486,7 +486,13 @@ format_status(_Opt, [_PDict, State]) ->
 -spec doc_update_triggered(#rep{}) -> ok.
 doc_update_triggered(#rep{db_name = null}) ->
     ok;
-doc_update_triggered(#rep{id = RepId, doc_id = DocId}) ->
+doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
+    case couch_replicator_doc_processor:compat_mode() of
+        true ->
+            couch_replicator_docs:update_triggered(Rep, RepId);
+        false ->
+            ok
+    end,
     couch_log:notice("Document `~s` triggered replication `~s`",
         [DocId, pp_rep_id(RepId)]),
     ok.