You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/06 18:19:11 UTC

[13/26] couch-replicator commit: updated refs/heads/import-rcouch to 589d958

configurable checkpoint interval


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/ee27edf1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/ee27edf1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/ee27edf1

Branch: refs/heads/import-rcouch
Commit: ee27edf10f3ddc2eb8619a154b92c617de4b6b02
Parents: a6d8389
Author: Robert Newson <rn...@apache.org>
Authored: Wed Nov 20 14:59:00 2013 +0000
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Nov 20 14:59:00 2013 +0000

----------------------------------------------------------------------
 src/couch_replicator.erl       | 25 ++++++++++++++-----------
 src/couch_replicator_utils.erl |  6 +++++-
 2 files changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ee27edf1/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index b00efec..91039b6 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -69,7 +69,8 @@
     source_monitor = nil,
     target_monitor = nil,
     source_seq = nil,
-    use_checkpoints = true
+    use_checkpoints = true,
+    checkpoint_interval = 5000
 }).
 
 
@@ -248,7 +249,8 @@ do_init(#rep{options = Options, id = {BaseId, Ext}} = Rep) ->
         target_name = TargetName,
         start_seq = {_Ts, StartSeq},
         source_seq = SourceCurSeq,
-        committed_seq = {_, CommittedSeq}
+        committed_seq = {_, CommittedSeq},
+        checkpoint_interval = CheckpointInterval
     } = State = init_state(Rep),
 
     NumWorkers = get_value(worker_processes, Options),
@@ -289,7 +291,8 @@ do_init(#rep{options = Options, id = {BaseId, Ext}} = Rep) ->
         {doc_write_failures, 0},
         {source_seq, SourceCurSeq},
         {checkpointed_source_seq, CommittedSeq},
-        {progress, 0}
+        {progress, 0},
+        {checkpoint_interval, CheckpointInterval}
     ]),
     couch_task_status:set_update_frequency(1000),
 
@@ -456,9 +459,11 @@ handle_cast({report_seq, Seq},
     {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
 
 
-code_change(_OldVsn, OldState, _Extra) when tuple_size(OldState) =:= 30 ->
-    {ok, erlang:append_element(OldState, true)};
-code_change(_OldVsn, State, _Extra) ->
+code_change(OldVsn, OldState, Extra) when tuple_size(OldState) =:= 30 ->
+    code_change(OldVsn, erlang:append_element(OldState, true), Extra);
+code_change(OldVsn, OldState, Extra) when tuple_size(OldState) =:= 31 ->
+    code_change(OldVsn, erlang:append_element(OldState, 5000), Extra);
+code_change(_OldVsn, #rep_state{}=State, _Extra) ->
     {ok, State}.
 
 
@@ -508,7 +513,7 @@ do_last_checkpoint(#rep_state{seqs_in_progress = [],
 
 
 start_timer(State) ->
-    After = checkpoint_interval(State),
+    After = State#rep_state.checkpoint_interval,
     case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
     {ok, Ref} ->
         Ref;
@@ -567,7 +572,8 @@ init_state(Rep) ->
         source_monitor = db_monitor(Source),
         target_monitor = db_monitor(Target),
         source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
-        use_checkpoints = get_value(use_checkpoints, Options, true)
+        use_checkpoints = get_value(use_checkpoints, Options, true),
+        checkpoint_interval = get_value(checkpoint_interval, Options, 5000)
     },
     State#rep_state{timer = start_timer(State)}.
 
@@ -673,9 +679,6 @@ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
     end.
 
 
-checkpoint_interval(_State) ->
-    5000.
-
 do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
     NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
     {ok, NewState};

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/ee27edf1/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index 1646c69..240d7d4 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -229,6 +229,7 @@ make_options(Props) ->
     DefTimeout = couch_config:get("replicator", "connection_timeout", "30000"),
     DefRetries = couch_config:get("replicator", "retries_per_request", "10"),
     UseCheckpoints = couch_config:get("replicator", "use_checkpoints", "true"),
+    DefCheckpointInterval = couch_config:get("replicator", "checkpoint_interval", "5000"),
     {ok, DefSocketOptions} = couch_util:parse_term(
         couch_config:get("replicator", "socket_options",
             "[{keepalive, true}, {nodelay, false}]")),
@@ -239,7 +240,8 @@ make_options(Props) ->
         {socket_options, DefSocketOptions},
         {worker_batch_size, list_to_integer(DefBatchSize)},
         {worker_processes, list_to_integer(DefWorkers)},
-        {use_checkpoints, list_to_existing_atom(UseCheckpoints)}
+        {use_checkpoints, list_to_existing_atom(UseCheckpoints)},
+        {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
     ])).
 
 
@@ -283,6 +285,8 @@ convert_options([{<<"since_seq">>, V} | R]) ->
     [{since_seq, V} | convert_options(R)];
 convert_options([{<<"use_checkpoints">>, V} | R]) ->
     [{use_checkpoints, V} | convert_options(R)];
+convert_options([{<<"checkpoint_interval">>, V} | R]) ->
+    [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
 convert_options([_ | R]) -> % skip unknown option
     convert_options(R).