You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/04/09 04:02:41 UTC

[couchdb] branch 63012-scheduler updated (c77bb95 -> 9e3a544)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git.

      from  c77bb95   [fixup] Use div instead of / in rate limiter
       new  676fd79   [fixup] use m,f,a in cluster event listener
       new  a5d0300   [fixup] move rate limiter tables to separate module
       new  5ec9110   [fixup] reduce logging of change feed exit event
       new  9e3a544   [fixup] avoid introducing a new metadata field

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "adds" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch/src/couch_doc.erl                        |  3 --
 src/couch_replicator/src/couch_multidb_changes.erl |  2 +-
 src/couch_replicator/src/couch_replicator.erl      | 27 +++++++----
 src/couch_replicator/src/couch_replicator.hrl      |  2 +-
 .../src/couch_replicator_clustering.erl            | 11 ++---
 .../src/couch_replicator_db_changes.erl            |  9 +++-
 .../src/couch_replicator_doc_processor.erl         | 17 +++++--
 .../src/couch_replicator_doc_processor_worker.erl  |  5 +-
 src/couch_replicator/src/couch_replicator_docs.erl | 29 +++++-------
 .../src/couch_replicator_js_functions.hrl          | 16 +++++--
 .../src/couch_replicator_rate_limiter.erl          | 32 ++++---------
 .../src/couch_replicator_rate_limiter_tables.erl   | 54 ++++++++++++++++++++++
 .../src/couch_replicator_scheduler_job.erl         |  5 +-
 13 files changed, 137 insertions(+), 75 deletions(-)
 create mode 100644 src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].

[couchdb] 03/04: [fixup] reduce logging of change feed exit event

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 5ec9110a4d1f4cf0178463c7b7e5a26b9f92e375
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Sat Apr 8 22:10:17 2017 -0400

    [fixup] reduce logging of change feed exit event
---
 src/couch_replicator/src/couch_multidb_changes.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/couch_replicator/src/couch_multidb_changes.erl b/src/couch_replicator/src/couch_multidb_changes.erl
index 9c0b7cf..a100a02 100644
--- a/src/couch_replicator/src/couch_multidb_changes.erl
+++ b/src/couch_replicator/src/couch_multidb_changes.erl
@@ -135,7 +135,7 @@ handle_info({'EXIT', From, Reason}, #state{scanner = From} = State) ->
     {stop, {scanner_died, Reason}, State};
 
 handle_info({'EXIT', From, Reason}, #state{pids = Pids} = State) ->
-    couch_log:info("~p change feed exited ~p",[State#state.suffix, From]),
+    couch_log:debug("~p change feed exited ~p",[State#state.suffix, From]),
     case lists:keytake(From, 2, Pids) of
         {value, {DbName, From}, NewPids} ->
             if Reason == normal -> ok; true ->

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.

[couchdb] 02/04: [fixup] move rate limiter tables to separate module

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a5d03007e2ab143f3fd208b6a1b7a7a85c543abc
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Sat Apr 8 02:44:51 2017 -0400

    [fixup] move rate limiter tables to separate module
---
 .../src/couch_replicator_rate_limiter.erl          | 32 ++++---------
 .../src/couch_replicator_rate_limiter_tables.erl   | 54 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 23 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_rate_limiter.erl b/src/couch_replicator/src/couch_replicator_rate_limiter.erl
index f01f7f9..011372b 100644
--- a/src/couch_replicator/src/couch_replicator_rate_limiter.erl
+++ b/src/couch_replicator/src/couch_replicator_rate_limiter.erl
@@ -61,7 +61,6 @@
 
 % Definitions
 
--define(SHARDS_N, 16).
 
 % Main parameters of the algorithm. The factor is the multiplicative part and
 % base interval is the additive.
@@ -128,8 +127,7 @@ success(Key) ->
 % gen_server callbacks
 
 init([]) ->
-    Opts = [named_table, public, {keypos, #rec.id}, {read_concurrency, true}],
-    [ets:new(list_to_atom(TableName), Opts) || TableName <- table_names()],
+    couch_replicator_rate_limiter_tables:create(#rec.id),
     {ok, #state{timer = new_timer()}}.
 
 
@@ -147,7 +145,8 @@ handle_cast(_, State) ->
 
 handle_info(cleanup, #state{timer = Timer}) ->
     timer:cancel(Timer),
-    TIds = [list_to_existing_atom(TableName) || TableName <- table_names()],
+    TableNames = couch_replicator_rate_limiter_tables:table_names(),
+    TIds = [list_to_existing_atom(TableName) || TableName <- TableNames],
     [cleanup_table(TId, now_msec() - ?MAX_INTERVAL) || TId <- TIds],
     {noreply, #state{timer = new_timer()}}.
 
@@ -172,7 +171,8 @@ update_success(Key, Interval, Timestamp, Now) ->
     NewInterval = DecayedInterval - AdditiveFactor,
     if
         NewInterval =< 0 ->
-            ets:delete(term_to_table(Key), Key),
+            Table = couch_replicator_rate_limiter_tables:term_to_table(Key),
+            ets:delete(Table, Key),
             0;
         NewInterval =< ?BASE_INTERVAL ->
             insert(Key, ?BASE_INTERVAL, Now);
@@ -196,13 +196,15 @@ update_failure(Key, Interval, _Timestamp, Now) ->
 -spec insert(any(), interval(), msec()) -> interval().
 insert(Key, Interval, Timestamp) ->
     Entry = #rec{id = Key, backoff = Interval, ts = Timestamp},
-    ets:insert(term_to_table(Key), Entry),
+    Table = couch_replicator_rate_limiter_tables:term_to_table(Key),
+    ets:insert(Table, Entry),
     Interval.
 
 
 -spec interval_and_timestamp(key()) -> {interval(), msec()}.
 interval_and_timestamp(Key) ->
-    case ets:lookup(term_to_table(Key), Key) of
+    Table = couch_replicator_rate_limiter_tables:term_to_table(Key),
+    case ets:lookup(Table, Key) of
         [] ->
             {0, 0};
         [#rec{backoff = Interval, ts = Timestamp}] ->
@@ -239,22 +241,6 @@ additive_factor(_Interval) ->
     ?BASE_INTERVAL.
 
 
--spec table_name(non_neg_integer()) -> string().
-table_name(Id) when is_integer(Id), Id >= 0 andalso Id < ?SHARDS_N ->
-    atom_to_list(?MODULE) ++ "_" ++ integer_to_list(Id).
-
-
--spec table_names() -> [string()].
-table_names() ->
-    [table_name(N) || N <- lists:seq(0, ?SHARDS_N - 1)].
-
-
--spec term_to_table(any()) -> atom().
-term_to_table(Term) ->
-    PHash = erlang:phash2(Term),
-    list_to_existing_atom(table_name(PHash rem ?SHARDS_N)).
-
-
 -spec new_timer() -> timer:tref().
 new_timer() ->
     {ok, Timer} = timer:send_after(?MAX_INTERVAL * 2, cleanup),
diff --git a/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl b/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl
new file mode 100644
index 0000000..aa087d6
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_rate_limiter_tables.erl
@@ -0,0 +1,54 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+% Maintain cluster membership and stability notifications for replications.
+% On changes to cluster membership, broadcast events to `replication` gen_event.
+% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events.
+%
+% Cluster stability is defined as "there have been no nodes added or removed in
+% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a
+% speedier startup, during initialization there is a shorter StartupQuietPeriod in
+% effect (also configurable).
+%
+% This module is also in charge of calculating ownership of replications based on
+% where their _repicator db documents shards live.
+
+-module(couch_replicator_rate_limiter_tables).
+
+-export([create/1]).
+-export([table_names/0]).
+-export([term_to_table/1]).
+
+-define(SHARDS_N, 16).
+
+
+-spec create(non_neg_integer()) -> ok.
+create(KeyPos) ->
+    Opts = [named_table, public, {keypos, KeyPos}, {read_concurrency, true}],
+    [ets:new(list_to_atom(TableName), Opts) || TableName <- table_names()],
+    ok.
+
+-spec table_names() -> [string()].
+table_names() ->
+    [table_name(N) || N <- lists:seq(0, ?SHARDS_N - 1)].
+
+
+-spec term_to_table(any()) -> atom().
+term_to_table(Term) ->
+    PHash = erlang:phash2(Term),
+    list_to_existing_atom(table_name(PHash rem ?SHARDS_N)).
+
+
+-spec table_name(non_neg_integer()) -> string().
+table_name(Id) when is_integer(Id), Id >= 0 andalso Id < ?SHARDS_N ->
+    atom_to_list(?MODULE) ++ "_" ++ integer_to_list(Id).

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.

[couchdb] 04/04: [fixup] avoid introducing a new metadata field

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 9e3a544951c506f5081f63336910c6f1156d718e
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Sun Apr 9 00:02:01 2017 -0400

    [fixup] avoid introducing a new metadata field
---
 src/couch/src/couch_doc.erl                        |  3 ---
 src/couch_replicator/src/couch_replicator.erl      | 27 ++++++++++++++------
 src/couch_replicator/src/couch_replicator.hrl      |  2 +-
 .../src/couch_replicator_doc_processor.erl         |  5 ++--
 .../src/couch_replicator_doc_processor_worker.erl  |  5 ++--
 src/couch_replicator/src/couch_replicator_docs.erl | 29 ++++++++--------------
 .../src/couch_replicator_js_functions.hrl          | 16 +++++++++---
 .../src/couch_replicator_scheduler_job.erl         |  5 ++--
 8 files changed, 50 insertions(+), 42 deletions(-)

diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl
index 5e6d2b7..381ad4b 100644
--- a/src/couch/src/couch_doc.erl
+++ b/src/couch/src/couch_doc.erl
@@ -270,9 +270,6 @@ transfer_fields([{<<"_replication_state">>, _} = Field | Rest],
 transfer_fields([{<<"_replication_state_time">>, _} = Field | Rest],
     #doc{body=Fields} = Doc) ->
     transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
-transfer_fields([{<<"_replication_start_time">>, _} = Field | Rest],
-    #doc{body=Fields} = Doc) ->
-    transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
 transfer_fields([{<<"_replication_state_reason">>, _} = Field | Rest],
     #doc{body=Fields} = Doc) ->
     transfer_fields(Rest, Doc#doc{body=[Field|Fields]});
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
index f2b0d02..345cda3 100644
--- a/src/couch_replicator/src/couch_replicator.erl
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -266,7 +266,8 @@ job(JobId0) when is_binary(JobId0) ->
 
 -spec doc(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
 doc(RepDb, DocId, UserCtx) ->
-    {Res, _Bad} = rpc:multicall(couch_replicator_doc_processor, doc, [RepDb, DocId]),
+    {Res, _Bad} = rpc:multicall(couch_replicator_doc_processor, doc,
+        [RepDb, DocId]),
     case [DocInfo || {ok, DocInfo} <- Res] of
         [DocInfo| _] ->
             {ok, DocInfo};
@@ -283,17 +284,27 @@ doc_from_db(RepDb, DocId, UserCtx) ->
             Source = get_value(<<"source">>, Props),
             Target = get_value(<<"target">>, Props),
             State = get_value(<<"_replication_state">>, Props, null),
-            StartTime = get_value(<<"_replication_start_time">>, Props, null),
             StateTime = get_value(<<"_replication_state_time">>, Props, null),
-            {StateInfo, ErrorCount} = case State of
+            {StateInfo, ErrorCount, StartTime} = case State of
                 <<"completed">> ->
-                    Info = get_value(<<"_replication_stats">>, Props, null),
-                    {Info, 0};
+                    {InfoP} = get_value(<<"_replication_stats">>, Props, {[]}),
+                    case lists:keytake(<<"start_time">>, 1, InfoP) of
+                        {value, {_, Time}, InfoP1} ->
+                            {{InfoP1}, 0, Time};
+                        false ->
+                            case lists:keytake(start_time, 1, InfoP) of
+                                {value, {_, Time}, InfoP1} ->
+                                    {{InfoP1}, 0, Time};
+                            false ->
+                                    {{InfoP}, 0, null}
+                            end
+                    end;
                 <<"failed">> ->
-                    Info = get_value(<<"_replication_state_reason">>, Props, null),
-                    {Info, 1};
+                    Info = get_value(<<"_replication_state_reason">>, Props,
+                        null),
+                    {Info, 1, StateTime};
                 _OtherState ->
-                    {null, 0}
+                    {null, 0, null}
             end,
             {ok, {[
                 {doc_id, DocId},
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
index b8669e8..38b5a37 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -22,7 +22,7 @@
     view = nil :: any() | '_',
     doc_id :: any() | '_',
     db_name = null :: null | binary() | '_',
-    start_time :: erlang:timestamp() | '_'
+    start_time = {0, 0, 0}:: erlang:timestamp() | '_'
 }).
 
 -type rep_id() :: {string(), string()}.
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index d22ac26..9063436 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -84,8 +84,7 @@ db_change(DbName, {ChangeProps} = Change, Server) ->
     _Tag:Error ->
         {RepProps} = get_json_value(doc, ChangeProps),
         DocId = get_json_value(<<"_id">>, RepProps),
-        Timestamp = os:timestamp(),
-        couch_replicator_docs:update_failed(DbName, DocId, Error, Timestamp)
+        couch_replicator_docs:update_failed(DbName, DocId, Error)
     end,
     Server.
 
@@ -830,7 +829,7 @@ setup() ->
     meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 4, pid),
     meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
     meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),
-    meck:expect(couch_replicator_docs, update_failed, 4, ok),
+    meck:expect(couch_replicator_docs, update_failed, 3, ok),
     {ok, Pid} = start_link(),
     Pid.
 
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
index 30a6988..4f6dab0 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor_worker.erl
@@ -80,8 +80,7 @@ maybe_start_replication(Id, RepWithoutId, WRef) ->
         {temporary_error, Reason};
     {permanent_failure, Reason} ->
         {DbName, DocId} = Id,
-        StartTime = Rep#rep.start_time,
-        couch_replicator_docs:update_failed(DbName, DocId, Reason, StartTime),
+        couch_replicator_docs:update_failed(DbName, DocId, Reason),
         {permanent_failure, Reason}
     end.
 
@@ -235,7 +234,7 @@ setup() ->
     meck:expect(couch_replicator_scheduler, add_job, 1, ok),
     meck:expect(config, get, fun(_, _, Default) -> Default end),
     meck:expect(couch_server, get_uuid, 0, this_is_snek),
-    meck:expect(couch_replicator_docs, update_failed, 4, ok),
+    meck:expect(couch_replicator_docs, update_failed, 3, ok),
     meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
     meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
     ok.
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index 5bdfe92..f945c4a 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -19,8 +19,8 @@
 -export([ensure_cluster_rep_ddoc_exists/1]).
 -export([
     remove_state_fields/2,
-    update_doc_completed/4,
-    update_failed/4,
+    update_doc_completed/3,
+    update_failed/3,
     update_rep_id/1
 ]).
 -export([update_triggered/2, update_error/2]).
@@ -59,30 +59,26 @@ remove_state_fields(DbName, DocId) ->
         {<<"_replication_state">>, undefined},
         {<<"_replication_state_time">>, undefined},
         {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_start_time">>, undefined},
         {<<"_replication_id">>, undefined},
         {<<"_replication_stats">>, undefined}]).
 
--spec update_doc_completed(binary(), binary(), [_], erlang:timestamp()) -> any().
-update_doc_completed(DbName, DocId, Stats, StartTime) ->
-    StartTimeBin = couch_replicator_utils:iso8601(StartTime),
+-spec update_doc_completed(binary(), binary(), [_]) -> any().
+update_doc_completed(DbName, DocId, Stats) ->
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"completed">>},
         {<<"_replication_state_reason">>, undefined},
-        {<<"_replication_start_time">>,  StartTimeBin},
         {<<"_replication_stats">>, {Stats}}]),
-    couch_stats:increment_counter([couch_replicator, docs, completed_state_updates]).
+    couch_stats:increment_counter([couch_replicator, docs,
+        completed_state_updates]).
 
 
--spec update_failed(binary(), binary(), any(), erlang:timestamp()) -> any().
-update_failed(DbName, DocId, Error, StartTime) ->
+-spec update_failed(binary(), binary(), any()) -> any().
+update_failed(DbName, DocId, Error) ->
     Reason = error_reason(Error),
     couch_log:error("Error processing replication doc `~s` from `~s`: ~s",
         [DocId, DbName, Reason]),
-    StartTimeBin = couch_replicator_utils:iso8601(StartTime),
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"failed">>},
-        {<<"_replication_start_time">>, StartTimeBin},
         {<<"_replication_stats">>, undefined},
         {<<"_replication_state_reason">>, Reason}]),
     couch_stats:increment_counter([couch_replicator, docs, failed_state_updates]).
@@ -92,15 +88,12 @@ update_failed(DbName, DocId, Error, StartTime) ->
 update_triggered(Rep, {Base, Ext}) ->
     #rep{
         db_name = DbName,
-        doc_id = DocId,
-        start_time = StartTime
+        doc_id = DocId
     } = Rep,
-    StartTimeBin = couch_replicator_utils:iso8601(StartTime),
     update_rep_doc(DbName, DocId, [
         {<<"_replication_state">>, <<"triggered">>},
         {<<"_replication_state_reason">>, undefined},
         {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
-        {<<"_replication_start_time">>, StartTimeBin},
         {<<"_replication_stats">>, undefined}]),
     ok.
 
@@ -199,14 +192,14 @@ replication_design_doc_props(DDocId) ->
                 {<<"map">>, ?REP_DB_TERMINAL_STATE_VIEW_MAP_FUN},
                 {<<"reduce">>, <<"_count">>}
             ]},
-    DocProps = [
+    [
         {<<"_id">>, DDocId},
         {<<"language">>, <<"javascript">>},
         {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN},
         {<<"views">>, {[
             {<<"terminal_states">>, TerminalViewEJson}
         ]}}
-   ].
+    ].
 
 
 % Note: parse_rep_doc can handle filtered replications. During parsing of the
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
index dbad050..0c78b90 100644
--- a/src/couch_replicator/src/couch_replicator_js_functions.hrl
+++ b/src/couch_replicator/src/couch_replicator_js_functions.hrl
@@ -178,17 +178,25 @@
         if (state === 'failed') {
             source = doc.source;
             target = doc.target;
-            start_time = doc._replication_start_time;
             last_updated = doc._replication_state_time;
             state_reason = doc._replication_state_reason;
-            emit('failed', [source, target, start_time, last_updated, state_reason]);
+            emit('failed', [source, target, last_updated, last_updated, state_reason]);
         } else if (state === 'completed') {
             source = doc.source;
             target = doc.target;
-            start_time = doc._replication_start_time;
             last_updated = doc._replication_state_time;
             stats = doc._replication_stats;
-            emit('completed', [source, target, start_time, last_updated, stats]);
+            start_time = stats.start_time;
+            info = {
+                'changes_pending': stats['changes_pending'],
+                'checkpointed_source_seq': stats['checkpointed_source_seq'],
+                'doc_write_failures': stats['doc_write_failures'],
+                'docs_read': stats['docs_read'],
+                'docs_written': stats['docs_written'],
+                'missing_revisions_found': stats['missing_revisions_found'],
+                'revisions_checked': stats['revisions_checked']
+            }
+            emit('completed', [source, target, start_time, last_updated, info]);
         }
     }
 ">>).
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 1c9faaf..4d80af0 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -493,8 +493,9 @@ doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
 doc_update_completed(#rep{db_name = null}, _Stats) ->
     ok;
 doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName,
-    start_time = StartTime}, Stats) ->
-    couch_replicator_docs:update_doc_completed(DbName, DocId, Stats, StartTime),
+    start_time = StartTime}, Stats0) ->
+    Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}],
+    couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
     couch_log:notice("Replication `~s` completed (triggered by `~s`)",
         [pp_rep_id(RepId), DocId]),
     ok.

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.

[couchdb] 01/04: [fixup] use m,f,a in cluster event listener

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 676fd79017780d871a4f91bf3d6fe5b2a16009ee
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Sat Apr 8 02:09:47 2017 -0400

    [fixup] use m,f,a in cluster event listener
---
 src/couch_replicator/src/couch_replicator_clustering.erl    | 11 +++++------
 src/couch_replicator/src/couch_replicator_db_changes.erl    |  9 ++++++++-
 src/couch_replicator/src/couch_replicator_doc_processor.erl | 12 ++++++++++--
 3 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
index 4a6abd7..285be37 100644
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ b/src/couch_replicator/src/couch_replicator_clustering.erl
@@ -30,7 +30,7 @@
 
 % public API
 -export([start_link/0, owner/2, is_stable/0]).
--export([link_cluster_event_listener/1]).
+-export([link_cluster_event_listener/3]).
 
 % gen_server callbacks
 -export([init/1, handle_call/3, handle_info/2, handle_cast/2,
@@ -81,12 +81,11 @@ is_stable() ->
     gen_server:call(?MODULE, is_stable).
 
 
-% Convenience function for gen_servers to subscribe to {cluster, stable} and
-% {cluster, unstable} events from couch_replicator clustering module.
--spec link_cluster_event_listener(pid()) -> pid().
-link_cluster_event_listener(GenServer) when is_pid(GenServer) ->
+-spec link_cluster_event_listener(atom(), atom(), list()) -> pid().
+link_cluster_event_listener(Mod, Fun, Args)
+        when is_atom(Mod), is_atom(Fun), is_list(Args) ->
     CallbackFun =
-        fun(Event = {cluster, _}) -> gen_server:cast(GenServer, Event);
+        fun(Event = {cluster, _}) -> erlang:apply(Mod, Fun, Args ++ [Event]);
            (_) -> ok
         end,
     {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
diff --git a/src/couch_replicator/src/couch_replicator_db_changes.erl b/src/couch_replicator/src/couch_replicator_db_changes.erl
index 924c24f..dd4be64 100644
--- a/src/couch_replicator/src/couch_replicator_db_changes.erl
+++ b/src/couch_replicator/src/couch_replicator_db_changes.erl
@@ -18,6 +18,7 @@
 -export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
 -export([code_change/3, terminate/2]).
 
+-export([notify_cluster_event/2]).
 
 -record(state, {
    event_listener :: pid(),
@@ -25,6 +26,11 @@
 }).
 
 
+-spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
+notify_cluster_event(Server, {cluster, _} = Event) ->
+    gen_server:cast(Server, Event).
+
+
 -spec start_link() ->
     {ok, pid()} | ignore | {error, any()}.
 start_link() ->
@@ -32,7 +38,8 @@ start_link() ->
 
 
 init([]) ->
-    EvtPid = couch_replicator_clustering:link_cluster_event_listener(self()),
+    EvtPid = couch_replicator_clustering:link_cluster_event_listener(?MODULE,
+        notify_cluster_event, [self()]),
     State = #state{event_listener = EvtPid, mdb_changes = nil},
     case couch_replicator_clustering:is_stable() of
         true ->
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 9c2e2b3..d22ac26 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -17,6 +17,7 @@
 -export([docs/1, doc/2]).
 -export([update_docs/0]).
 -export([get_worker_ref/1]).
+-export([notify_cluster_event/2]).
 
 % multidb changes callback
 -export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
@@ -101,6 +102,12 @@ get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
     end.
 
 
+% Cluster membership change notification callback
+-spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
+notify_cluster_event(Server, {cluster, _} = Event) ->
+    gen_server:cast(Server, Event).
+
+
 % Private helpers for multidb changes API, these updates into the doc
 % processor gen_server
 
@@ -179,7 +186,8 @@ start_link() ->
 
 init([]) ->
     ?MODULE = ets:new(?MODULE, [ordered_set, named_table, {keypos, #rdoc.id}]),
-    couch_replicator_clustering:link_cluster_event_listener(self()),
+    couch_replicator_clustering:link_cluster_event_listener(?MODULE,
+        notify_cluster_event, [self()]),
     {ok, nil}.
 
 
@@ -818,7 +826,7 @@ setup() ->
     meck:expect(config, get, fun(_, _, Default) -> Default end),
     meck:expect(config, listen_for_changes, 2, ok),
     meck:expect(couch_replicator_clustering, owner, 2, node()),
-    meck:expect(couch_replicator_clustering, link_cluster_event_listener, 1, ok),
+    meck:expect(couch_replicator_clustering, link_cluster_event_listener, 3, ok),
     meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 4, pid),
     meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
     meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.