You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2011/02/02 07:20:49 UTC

svn commit: r1066340 - in /couchdb/trunk: etc/couchdb/default.ini.tpl.in src/couchdb/couch_rep_db_listener.erl

Author: fdmanana
Date: Wed Feb  2 06:20:49 2011
New Revision: 1066340

URL: http://svn.apache.org/viewvc?rev=1066340&view=rev
Log:
Replicator DB: make number of replication retry attempts configurable

Closes COUCHDB-1051

Modified:
    couchdb/trunk/etc/couchdb/default.ini.tpl.in
    couchdb/trunk/src/couchdb/couch_rep_db_listener.erl

Modified: couchdb/trunk/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/trunk/etc/couchdb/default.ini.tpl.in?rev=1066340&r1=1066339&r2=1066340&view=diff
==============================================================================
--- couchdb/trunk/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/trunk/etc/couchdb/default.ini.tpl.in Wed Feb  2 06:20:49 2011
@@ -136,6 +136,7 @@ compressible_types = text/*, application
 
 [replicator]
 db = _replicator
+max_replication_retry_count = 10
 max_http_sessions = 20
 max_http_pipeline_size = 50
 ; set to true to validate peer certificates

Modified: couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_db_listener.erl?rev=1066340&r1=1066339&r2=1066340&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_db_listener.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_db_listener.erl Wed Feb  2 06:20:49 2011
@@ -20,14 +20,14 @@
 
 -define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
 -define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
--define(MAX_RETRIES, 10).
 -define(INITIAL_WAIT, 5).
 
 -record(state, {
     changes_feed_loop = nil,
     db_notifier = nil,
     rep_db_name = nil,
-    rep_start_pids = []
+    rep_start_pids = [],
+    max_retries
 }).
 
 -import(couch_util, [
@@ -41,40 +41,42 @@ start_link() ->
 
 init(_) ->
     process_flag(trap_exit, true),
-    ?DOC_ID_TO_REP_ID = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, private]),
-    ?REP_ID_TO_DOC_ID = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
+    _ = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, protected]),
+    _ = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
     Server = self(),
     ok = couch_config:register(
         fun("replicator", "db", NewName) ->
-            ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)})
+            ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)});
+        ("replicator", "max_replication_retry_count", NewMaxRetries1) ->
+            NewMaxRetries = list_to_integer(NewMaxRetries1),
+            ok = gen_server:cast(Server, {set_max_retries, NewMaxRetries})
         end
     ),
     {Loop, RepDbName} = changes_feed_loop(),
     {ok, #state{
         changes_feed_loop = Loop,
         rep_db_name = RepDbName,
-        db_notifier = db_update_notifier()}
-    }.
+        db_notifier = db_update_notifier(),
+        max_retries = list_to_integer(
+            couch_config:get("replicator", "max_replication_retry_count", "10"))
+    }}.
 
 
 handle_call({rep_db_update, Change}, _From, State) ->
     {reply, ok, process_update(State, Change)};
 
 handle_call({triggered, {BaseId, _}}, _From, State) ->
-    case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
-    [{BaseId, {DocId, true}}] ->
-        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}});
-    _ ->
-        ok
-    end,
+    [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId),
+    true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, false}}),
     {reply, ok, State};
 
 handle_call({restart_failure, {Props} = RepDoc, Error}, _From, State) ->
     DocId = get_value(<<"_id">>, Props),
-    [{DocId, {BaseId, _} = RepId}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
+    [{DocId, {{BaseId, _} = RepId, MaxRetries}}] = ets:lookup(
+        ?DOC_ID_TO_REP_ID, DocId),
     ?LOG_ERROR("Failed to start replication `~s` after ~p attempts using "
         "the document `~s`. Last error reason was: ~p",
-        [pp_rep_id(RepId), ?MAX_RETRIES, DocId, Error]),
+        [pp_rep_id(RepId), MaxRetries, DocId, Error]),
     couch_rep:update_rep_doc(
         RepDoc,
         [{<<"_replication_state">>, <<"error">>},
@@ -103,6 +105,9 @@ handle_cast({rep_db_created, NewName},
 handle_cast({rep_db_created, _NewName}, State) ->
     {noreply, restart(State)};
 
+handle_cast({set_max_retries, MaxRetries}, State) ->
+    {noreply, State#state{max_retries = MaxRetries}};
+
 handle_cast(Msg, State) ->
     ?LOG_ERROR("Replicator DB listener received unexpected cast ~p", [Msg]),
     {stop, {error, {unexpected_cast, Msg}}, State}.
@@ -259,16 +264,17 @@ rep_user_ctx({RepDoc}) ->
     end.
 
 
-maybe_start_replication(State, DocId, JsonRepDoc) ->
+maybe_start_replication(#state{max_retries = MaxRetries} = State,
+        DocId, JsonRepDoc) ->
     UserCtx = rep_user_ctx(JsonRepDoc),
     {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
     case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
     [] ->
         true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}),
-        true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}),
+        true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, {RepId, MaxRetries}}),
         Server = self(),
         Pid = spawn_link(fun() ->
-            start_replication(Server, JsonRepDoc, RepId, UserCtx)
+            start_replication(Server, JsonRepDoc, RepId, UserCtx, MaxRetries)
         end),
         State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
     [{BaseId, {DocId, _}}] ->
@@ -295,7 +301,7 @@ maybe_tag_rep_doc({Props} = JsonRepDoc, 
     end.
 
 
-start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx) ->
+start_replication(Server, {RepProps} = RepDoc, RepId, UserCtx, MaxRetries) ->
     case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
     Pid when is_pid(Pid) ->
         ?LOG_INFO("Document `~s` triggered replication `~s`",
@@ -304,7 +310,7 @@ start_replication(Server, {RepProps} = R
         couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
     Error ->
         keep_retrying(
-            Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, ?MAX_RETRIES)
+            Server, RepId, RepDoc, UserCtx, Error, ?INITIAL_WAIT, MaxRetries)
     end.
 
 
@@ -319,9 +325,10 @@ keep_retrying(Server, RepId, RepDoc, Use
     Pid when is_pid(Pid) ->
         ok = gen_server:call(Server, {triggered, RepId}, infinity),
         {RepProps} = RepDoc,
+        DocId = get_value(<<"_id">>, RepProps),
+        [{DocId, {RepId, MaxRetries}}] = ets:lookup(?DOC_ID_TO_REP_ID, DocId),
         ?LOG_INFO("Document `~s` triggered replication `~s` after ~p attempts",
-            [get_value(<<"_id">>, RepProps), pp_rep_id(RepId),
-                ?MAX_RETRIES - RetriesLeft + 1]),
+            [DocId, pp_rep_id(RepId), MaxRetries - RetriesLeft + 1]),
         couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
     NewError ->
         keep_retrying(
@@ -351,7 +358,7 @@ replication_complete(DocId) ->
 
 stop_replication(DocId) ->
     case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
-    [{DocId, {BaseId, _} = RepId}] ->
+    [{DocId, {{BaseId, _} = RepId, _MaxRetries}}] ->
         couch_rep:end_replication(RepId),
         true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
         true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
@@ -365,7 +372,7 @@ stop_all_replications() ->
     ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
         "was deleted or changed", []),
     ets:foldl(
-        fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end,
+        fun({_, {RepId, _}}, _) -> couch_rep:end_replication(RepId) end,
         ok,
         ?DOC_ID_TO_REP_ID
     ),