You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2010/06/18 00:35:49 UTC
svn commit: r955773 - in /couchdb/branches/new_replicator/src/couchdb:
couch_api_wrap.erl couch_api_wrap.hrl couch_doc.erl couch_httpd_rep.erl
couch_replicate.erl couch_replicate.hrl
Author: damien
Date: Thu Jun 17 22:35:49 2010
New Revision: 955773
URL: http://svn.apache.org/viewvc?rev=955773&view=rev
Log:
Reorganized and commented the new replicator code for easier understanding.
Added:
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.hrl
- copied unchanged from r955001, couchdb/branches/new_replicator/src/couchdb/couch_replicate.hrl
Removed:
couchdb/branches/new_replicator/src/couchdb/couch_replicate.hrl
Modified:
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
couchdb/branches/new_replicator/src/couchdb/couch_doc.erl
couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=955773&r1=955772&r2=955773&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Thu Jun 17 22:35:49 2010
@@ -12,13 +12,29 @@
-module(couch_api_wrap).
+% This module wraps the native erlang API, and allows for performing
+% operations on a remote vs. local databases via the same API.
+%
+% Notes:
+% Many options and apis aren't yet supported here, they are added as needed.
+%
+% This file neesds a lot of work to "robustify" the common failures, and
+% convert the json errors back to Erlang style errors.
+%
+% Also, we open a new connection for every HTTP call, to avoid the
+% problems when requests are pipelined over a single connection and earlier
+% requests that fail and disconnect don't cause network errors for other
+% requests. This should eventually be optimized so each process has it's own
+% connection that's kept alive between requests.
+%
-include("couch_db.hrl").
--include("couch_replicate.hrl").
+-include("couch_api_wrap.hrl").
-include("../ibrowse/ibrowse.hrl").
-export([
db_open/2,
+ db_close/1,
get_db_info/1,
open_doc/3,
update_doc/3,
@@ -34,6 +50,12 @@ db_open(#httpdb{}=Db, _Options) ->
db_open(DbName, Options) ->
couch_db:open(DbName,Options).
+db_close(#httpdb{}) ->
+ ok;
+db_close(DbName) ->
+ couch_db:close(DbName).
+
+
get_db_info(#httpdb{url=Url,oauth=OAuth,headers=Headers}) ->
Headers2 = oauth_header(Url, [], get, OAuth) ++ Headers,
case ibrowse:send_req(Url, Headers2, get, [], [
@@ -67,9 +89,11 @@ open_doc(#httpdb{url=Url,oauth=OAuth,hea
open_doc(Db, DocId, Options) ->
couch_db:open_doc(Db, DocId, Options).
+
update_doc(Db, Doc, Options) ->
update_doc(Db,Doc,Options,interactive_edit).
+
ensure_full_commit(#httpdb{url=Url,oauth=OAuth,headers=Headers}) ->
Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers,
#url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
@@ -85,6 +109,7 @@ ensure_full_commit(#httpdb{url=Url,oauth
ensure_full_commit(Db) ->
couch_db:ensure_full_commit(Db).
+
get_missing_revs(#httpdb{url=Url,oauth=OAuth,headers=Headers}, IdRevs) ->
Json = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs],
Headers2 = oauth_header(Url, [], post, OAuth) ++ Headers,
@@ -109,22 +134,6 @@ get_missing_revs(Db, IdRevs) ->
couch_db:get_missing_revs(Db, IdRevs).
-options_to_query_args([], Acc) ->
- lists:reverse(Acc);
-options_to_query_args([delay_commit|Rest], Acc) ->
- options_to_query_args(Rest, Acc);
-options_to_query_args([{atts_since,[]}|Rest], Acc) ->
- options_to_query_args(Rest, Acc);
-options_to_query_args([{atts_since,PossibleAncestors}|Rest], Acc) ->
- options_to_query_args(Rest, [{"atts_since",?JSON_ENCODE(
- couch_doc:revs_to_strs(PossibleAncestors))} | Acc]).
-
-query_args_to_string([], []) ->
- "";
-query_args_to_string([], Acc) ->
- "?" ++ string:join(lists:reverse(Acc), "&");
-query_args_to_string([{K,V}|Rest], Acc) ->
- query_args_to_string(Rest, [(K ++ "=" ++ V) | Acc]).
open_doc_revs(#httpdb{url=Url,oauth=OAuth,headers=Headers}, Id, Revs,
Options, Fun, Acc) ->
@@ -163,6 +172,107 @@ open_doc_revs(Db, Id, Revs, Options, Fun
{ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
{ok, lists:foldl(Fun, Acc, Results)}.
+
+
+update_doc(#httpdb{url=Url,headers=Headers,oauth=OAuth},Doc,Options,Type) ->
+ QArgs = if Type == replicated_changes ->
+ [{"new_edits", "false"}]; true -> [] end ++
+ options_to_query_args(Options, []),
+
+ Boundary = couch_uuids:random(),
+ JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments,follows|Options])),
+ {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
+ JsonBytes, Doc#doc.atts, false),
+ Self = self(),
+ Headers2 = case lists:member(delay_commit, Options) of
+ true -> [{"X-Couch-Full-Commit", "false"}];
+ false -> []
+ end ++ [{"Content-Type", ?b2l(ContentType)}] ++
+ oauth_header(Url, QArgs, put, OAuth) ++ Headers,
+ Ref = make_ref(),
+ % this streams the doc data to the ibrowse requester
+ DocStreamer = spawn_link(fun() ->
+ couch_doc:doc_to_multi_part_stream(Boundary,
+ JsonBytes, Doc#doc.atts,
+ fun(Data) ->
+ receive {get_data, Ref, Pid} ->
+ Pid ! {data, Ref, Data}
+ end
+ end,
+ false),
+ unlink(Self)
+ end),
+ #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
+ {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
+ case ibrowse:send_req_direct(Worker, Url ++ couch_util:url_encode(Doc#doc.id) ++ query_args_to_string(QArgs, []),
+ [{"Content-Length",Len}|Headers2], put,
+ {fun(0) ->
+ eof;
+ (LenLeft) when LenLeft > 0 ->
+ DocStreamer ! {get_data, Ref, self()},
+ receive {data, Ref, Data} ->
+ {ok, Data, LenLeft - iolist_size(Data)}
+ end
+ end, Len}, [], infinity) of
+ {ok, [$2,$0, _], _RespHeaders, Body} ->
+ catch ibrowse:stop_worker_process(Worker),
+ {Props} = ?JSON_DECODE(Body),
+ {ok, couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props))}
+ end;
+update_doc(Db,Doc,Options,Type) ->
+ couch_db:update_doc(Db,Doc,Options,Type).
+
+changes_since(#httpdb{url=Url,headers=Headers,oauth=OAuth}, Style,
+ StartSeq, UserFun, Acc) ->
+ Url2 = Url ++ "_changes",
+ QArgs = [{"style", atom_to_list(Style)},
+ {"since", integer_to_list(StartSeq)}],
+ Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers,
+ #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
+ {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
+ {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, Url2 ++ query_args_to_string(QArgs, ""),
+ Headers2, get, [], [
+ {response_format,binary},
+ {stream_to, {self(), once}}], infinity),
+ DataFun = fun() ->
+ receive {ibrowse_async_headers, ReqId, "200", _Headers} ->
+ stream_data_self(ReqId)
+ end
+ end,
+ EventFun = fun(Ev) ->
+ changes_ev1(Ev, UserFun, Acc)
+ end,
+ try
+ json_stream_parse:events(DataFun, EventFun)
+ after
+ catch ibrowse:stop_worker_process(Worker)
+ end;
+changes_since(Db, Style, StartSeq, UserFun, Acc) ->
+ couch_db:changes_since(Db, Style, StartSeq, UserFun, Acc).
+
+
+% internal functions
+
+options_to_query_args([], Acc) ->
+ lists:reverse(Acc);
+options_to_query_args([delay_commit|Rest], Acc) ->
+ options_to_query_args(Rest, Acc);
+options_to_query_args([{atts_since,[]}|Rest], Acc) ->
+ options_to_query_args(Rest, Acc);
+options_to_query_args([{atts_since,PossibleAncestors}|Rest], Acc) ->
+ % NOTE, we should limit the # of PossibleAncestors sent. Since a large
+ % # can exceed the max URL length. Limiting the # only results in
+ % attachments being fully copied from source to target, instead of
+ % incrementally.
+ options_to_query_args(Rest, [{"atts_since",?JSON_ENCODE(
+ couch_doc:revs_to_strs(PossibleAncestors))} | Acc]).
+
+query_args_to_string([], []) ->
+ "";
+query_args_to_string([], Acc) ->
+ "?" ++ string:join(lists:reverse(Acc), "&");
+query_args_to_string([{K,V}|Rest], Acc) ->
+ query_args_to_string(Rest, [(K ++ "=" ++ V) | Acc]).
receive_docs(Streamer, UserFun, UserAcc) ->
Streamer ! {get_headers, self()},
@@ -242,82 +352,6 @@ mp_parse_mixed(body_end) ->
mp_parse_mixed(Next)
end.
-update_doc(#httpdb{url=Url,headers=Headers,oauth=OAuth},Doc,Options,Type) ->
- QArgs = if Type == replicated_changes ->
- [{"new_edits", "false"}]; true -> [] end ++
- options_to_query_args(Options, []),
-
- Boundary = couch_uuids:random(),
- JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs,attachments,follows|Options])),
- {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
- JsonBytes, Doc#doc.atts, false),
- Self = self(),
- Headers2 = case lists:member(delay_commit, Options) of
- true -> [{"X-Couch-Full-Commit", "false"}];
- false -> []
- end ++ [{"Content-Type", ?b2l(ContentType)}] ++
- oauth_header(Url, QArgs, put, OAuth) ++ Headers,
- Ref = make_ref(),
- % this streams the doc data to the ibrowse requester
- DocStreamer = spawn_link(fun() ->
- couch_doc:doc_to_multi_part_stream(Boundary,
- JsonBytes, Doc#doc.atts,
- fun(Data) ->
- receive {get_data, Ref, Pid} ->
- Pid ! {data, Ref, Data}
- end
- end,
- false),
- unlink(Self)
- end),
- #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
- {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
- case ibrowse:send_req_direct(Worker, Url ++ couch_util:url_encode(Doc#doc.id) ++ query_args_to_string(QArgs, []),
- [{"Content-Length",Len}|Headers2], put,
- {fun(0) ->
- eof;
- (LenLeft) when LenLeft > 0 ->
- DocStreamer ! {get_data, Ref, self()},
- receive {data, Ref, Data} ->
- {ok, Data, LenLeft - iolist_size(Data)}
- end
- end, Len}, [], infinity) of
- {ok, [$2,$0, _], _RespHeaders, Body} ->
- catch ibrowse:stop_worker_process(Worker),
- {Props} = ?JSON_DECODE(Body),
- {ok, couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props))}
- end;
-update_doc(Db,Doc,Options,Type) ->
- couch_db:update_doc(Db,Doc,Options,Type).
-
-changes_since(#httpdb{url=Url,headers=Headers,oauth=OAuth}, Style,
- StartSeq, UserFun, Acc) ->
- Url2 = Url ++ "_changes",
- QArgs = [{"style", atom_to_list(Style)},
- {"since", integer_to_list(StartSeq)}],
- Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers,
- #url{host=Host,port=Port}=ibrowse_lib:parse_url(Url),
- {ok, Worker} = ibrowse:spawn_link_worker_process(Host,Port),
- {ibrowse_req_id, ReqId} = ibrowse:send_req_direct(Worker, Url2 ++ query_args_to_string(QArgs, ""),
- Headers2, get, [], [
- {response_format,binary},
- {stream_to, {self(), once}}], infinity),
- DataFun = fun() ->
- receive {ibrowse_async_headers, ReqId, "200", _Headers} ->
- stream_data_self(ReqId)
- end
- end,
- EventFun = fun(Ev) ->
- changes_ev1(Ev, UserFun, Acc)
- end,
- try
- json_stream_parse:events(DataFun, EventFun)
- after
- catch ibrowse:stop_worker_process(Worker)
- end;
-changes_since(Db, Style, StartSeq, UserFun, Acc) ->
- couch_db:changes_since(Db, Style, StartSeq, UserFun, Acc).
-
stream_data_self(ReqId) ->
ibrowse:stream_next(ReqId),
receive {ibrowse_async_response, ReqId, Data} ->
Modified: couchdb/branches/new_replicator/src/couchdb/couch_doc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_doc.erl?rev=955773&r1=955772&r2=955773&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_doc.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_doc.erl Thu Jun 17 22:35:49 2010
@@ -309,8 +309,8 @@ att_foldl(#att{data=DataFun,att_len=Len}
att_foldl_decode(#att{data={Fd,Sp},md5=Md5,encoding=Enc}, Fun, Acc) ->
couch_stream:foldl_decode(Fd, Sp, Md5, Enc, Fun, Acc);
-att_foldl_decode(#att{data=Fun,att_len=Len, encoding=identity}, Fun, Acc) ->
- fold_streamed_data(Fun, Len, Fun, Acc).
+att_foldl_decode(#att{data=Fun2,att_len=Len, encoding=identity}, Fun, Acc) ->
+ fold_streamed_data(Fun2, Len, Fun, Acc).
att_to_bin(#att{data=Bin}) when is_binary(Bin) ->
Bin;
Modified: couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl?rev=955773&r1=955772&r2=955773&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Thu Jun 17 22:35:49 2010
@@ -13,7 +13,7 @@
-module(couch_httpd_rep).
-include("couch_db.hrl").
--include("couch_replicate.hrl").
+-include("couch_api_wrap.hrl").
-import(couch_httpd,
[send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,
@@ -24,6 +24,23 @@
-export([handle_req/1]).
+
+handle_req(#httpd{method='POST'}=Req) ->
+ {PostBody} = couch_httpd:json_body_obj(Req),
+ SrcDb = parse_rep_db(couch_util:get_value(<<"source">>, PostBody)),
+ TgtDb = parse_rep_db(couch_util:get_value(<<"target">>, PostBody)),
+ Options = convert_options(PostBody),
+ try couch_replicate:start(SrcDb, TgtDb, Options, Req#httpd.user_ctx) of
+ {ok, {HistoryResults}} ->
+ send_json(Req, {[{ok, true} | HistoryResults]})
+ catch
+ throw:{db_not_found, Msg} ->
+ send_json(Req, 404, {[{error, db_not_found}, {reason, Msg}]})
+ end;
+handle_req(Req) ->
+ send_method_not_allowed(Req, "POST").
+
+
maybe_add_trailing_slash(Url) ->
re:replace(Url, "[^/]$", "&/", [{return, list}]).
@@ -83,17 +100,3 @@ convert_options([_|R])-> % skip unknown
convert_options(R).
-handle_req(#httpd{method='POST'}=Req) ->
- {PostBody} = couch_httpd:json_body_obj(Req),
- SrcDb = parse_rep_db(couch_util:get_value(<<"source">>, PostBody)),
- TgtDb = parse_rep_db(couch_util:get_value(<<"target">>, PostBody)),
- Options = convert_options(PostBody),
- try couch_replicate:start(SrcDb, TgtDb, Options, Req#httpd.user_ctx) of
- {ok, {HistoryResults}} ->
- send_json(Req, {[{ok, true} | HistoryResults]})
- catch
- throw:{db_not_found, Msg} ->
- send_json(Req, 404, {[{error, db_not_found}, {reason, Msg}]})
- end;
-handle_req(Req) ->
- send_method_not_allowed(Req, "POST").
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=955773&r1=955772&r2=955773&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Thu Jun 17 22:35:49 2010
@@ -15,7 +15,7 @@
-export([start/4]).
-include("couch_db.hrl").
--include("couch_replicate.hrl").
+-include("couch_api_wrap.hrl").
-record(rep_state, {
@@ -49,37 +49,40 @@ start(Src, Tgt, Options, UserCtx) ->
_Continuous = proplists:get_value(continuous, Options, false),
_CreateTarget = proplists:get_value(create_target, Options, false),
+ % initalize the replication state, looking for existing rep records
+ % for incremental replication.
#rep_state{source=Source,target=Target,start_seq=StartSeq} = State =
init_state(Src, Tgt, Options, UserCtx),
+ % Create the work queues
{ok, ChangesQueue} = couch_work_queue:new(100000, 500),
{ok, MissingRevsQueue} = couch_work_queue:new(100000, 500),
+ % this is starts the _changes reader process. It adds the changes from
+ % the source db to the ChangesQueue.
spawn_changes_reader(self(), StartSeq, Source, ChangesQueue),
+
+ % this starts the missing revs finder, it checks the target for changes
+ % in the ChangesQueue to see if they exist on the target or not. If not,
+ % adds them to MissingRevsQueue.
spawn_missing_revs_finder(self(), Target, ChangesQueue, MissingRevsQueue),
+
+ % This starts the doc copy process. It gets the documents from the
+ % MissingRevsQueue, copying them from the source to the target database.
spawn_doc_copy(self(), Source, Target, MissingRevsQueue),
+ % This is the checkpoint loop, it updates the replication record in the
+ % database every X seconds, so that if the replication is interuppted,
+ % it can restart near where it left off.
{ok, State2, _Stats} = checkpoint_loop(State, gb_trees:from_orddict([]),
#stats{}),
+ couch_api_wrap:db_close(Source),
+ couch_api_wrap:db_close(Target),
{ok, State2#rep_state.checkpoint_history}.
-
-spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue) ->
- spawn_link(
- fun()->
- couch_api_wrap:changes_since(Source, all_docs, StartSeq,
- fun(#doc_info{high_seq=Seq,revs=Revs}=DocInfo, _)->
- Cp ! {seq_start, {Seq, length(Revs)}},
- Cp ! {add_stat, {#stats.missing_checked, length(Revs)}},
- ok = couch_work_queue:queue(ChangesQueue, DocInfo),
- {ok, ok}
- end, ok),
- couch_work_queue:close(ChangesQueue)
- end).
init_state(Src,Tgt,Options,UserCtx)->
-
{ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
{ok, Target} = couch_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}]),
@@ -118,15 +121,134 @@ init_state(Src,Tgt,Options,UserCtx)->
self(), timed_checkpoint)}.
+spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue) ->
+ spawn_link(
+ fun()->
+ couch_api_wrap:changes_since(Source, all_docs, StartSeq,
+ fun(#doc_info{high_seq=Seq,revs=Revs}=DocInfo, _)->
+ Cp ! {seq_start, {Seq, length(Revs)}},
+ Cp ! {add_stat, {#stats.missing_checked, length(Revs)}},
+ ok = couch_work_queue:queue(ChangesQueue, DocInfo),
+ {ok, ok}
+ end, ok),
+ couch_work_queue:close(ChangesQueue)
+ end).
+
+
+spawn_missing_revs_finder(StatsProcess,
+ Target, ChangesQueue, MissingRevsQueue) ->
+ % Note, we could spawn more missing revs processes here. Before that's
+ % possible the work_queue code needs to be modified to work with multiple
+ % dequeueing processes
+ spawn_link(fun() ->
+ missing_revs_finder_loop(StatsProcess,
+ Target, ChangesQueue, MissingRevsQueue)
+ end).
+
+
+missing_revs_finder_loop(Cp,
+ Target, ChangesQueue, MissingRevsQueue) ->
+ case couch_work_queue:dequeue(ChangesQueue) of
+ closed ->
+ couch_work_queue:close(MissingRevsQueue);
+ {ok, DocInfos} ->
+ IdRevs = [{Id, [Rev || #rev_info{rev=Rev} <- RevsInfo]} ||
+ #doc_info{id=Id,revs=RevsInfo} <- DocInfos],
+ {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs),
+ % Figured out which on the target are missing.
+ % Missing contains the id and revs missing, and any possible
+ % ancestors that already exist on the target. This enables
+ % incremental attachment replication, so the source only needs to send
+ % attachments modified since the common ancestor on target.
+
+ % Signal to the checkpointer any that are already on the target are
+ % now complete.
+ IdRevsSeqDict = dict:from_list(
+ [{Id, {[Rev || #rev_info{rev=Rev} <- RevsInfo], Seq}} ||
+ #doc_info{id=Id,revs=RevsInfo,high_seq=Seq} <- DocInfos]),
+ NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing),
+ % signal the completion of these that aren't missing
+ lists:foreach(fun({_Id, {Revs, Seq}})->
+ Cp ! {seq_changes_done, {Seq, length(Revs)}}
+ end, dict:to_list(NonMissingIdRevsSeqDict)),
+
+ % Expand out each docs and seq into it's own work item
+ lists:foreach(fun({Id, Revs, PAs})->
+ % PA means "possible ancestor"
+ Cp ! {add_stat, {#stats.missing_found, length(Revs)}},
+ {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
+ ok = couch_work_queue:queue(MissingRevsQueue,
+ {Id, Revs, PAs, Seq})
+ end, Missing),
+ missing_revs_finder_loop(Cp, Target, ChangesQueue,
+ MissingRevsQueue)
+ end.
+
+
+remove_missing(IdRevsSeqDict, []) ->
+ IdRevsSeqDict;
+remove_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _}|Rest]) ->
+ {AllChangedRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict),
+ case AllChangedRevs -- MissingRevs of
+ [] ->
+ remove_missing(dict:erase(MissingId, IdRevsSeqDict), Rest);
+ NotMissingRevs ->
+ IdRevsSeqDict2 =
+ dict:store(MissingId, {NotMissingRevs, Seq}, IdRevsSeqDict),
+ remove_missing(IdRevsSeqDict2, Rest)
+ end.
+
+
+spawn_doc_copy(Cp, Source, Target, MissingRevsQueue) ->
+ % Note, we could spawn many doc copy process here. Before that's possible
+ % the work_queue code needs to be modified to work with multiple
+ % dequeueing processes
+ spawn_link(fun() ->
+ doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
+ end).
+
+
+doc_copy_loop(Cp, Source, Target, MissingRevsQueue) ->
+ case couch_work_queue:dequeue(MissingRevsQueue,1) of
+ closed ->
+ Cp ! done;
+ {ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
+ couch_api_wrap:open_doc_revs(Source, Id, Revs,
+ [{atts_since,PossibleAncestors}],
+ fun({ok, Doc}, _) ->
+ % we are called for every rev read on the source
+ Cp ! {add_stat, {#stats.docs_read, 1}},
+ % now write the doc to the target.
+ case couch_api_wrap:update_doc(Target, Doc, [],
+ replicated_changes) of
+ {ok, _} ->
+ Cp ! {add_stat, {#stats.docs_written, 1}};
+ _Error ->
+ Cp ! {add_stat, {#stats.doc_write_failures, 1}}
+ end;
+ (_, _) ->
+ ok
+ end, []),
+ Cp ! {seq_changes_done, {Seq, length(Revs)}},
+ doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
+ end.
+
checkpoint_loop(State, SeqsInProgress, Stats) ->
+ % SeqsInProgress contains the number of revs for each seq foiund by the
+ % changes process.
receive
{seq_start, {Seq, NumChanges}} ->
+ % Add this seq to the SeqsInProgress
SeqsInProgress2 = gb_trees:insert(Seq, NumChanges, SeqsInProgress),
checkpoint_loop(State, SeqsInProgress2, Stats);
{seq_changes_done, {Seq, NumChangesDone}} ->
+ % decrement the # changes for this seq by NumChangesDone
TotalChanges = gb_trees:get(Seq, SeqsInProgress),
case TotalChanges - NumChangesDone of
0 ->
+ % this seq is completely processed. Chck to see if it was the
+ % smallest seq in progess. If so, we've made progress that can
+ % be checkpointed.
State2 =
case gb_trees:smallest(SeqsInProgress) of
{Seq, _} ->
@@ -137,16 +259,19 @@ checkpoint_loop(State, SeqsInProgress, S
checkpoint_loop(State2,
gb_trees:delete(Seq,SeqsInProgress), Stats);
NewTotalChanges when NewTotalChanges > 0 ->
+ % Still some changes that need work done. Put the new count back.
SeqsInProgress2 =
gb_trees:update(Seq, NewTotalChanges, SeqsInProgress),
checkpoint_loop(State, SeqsInProgress2, Stats)
end;
{add_stat, {StatPos, Val}} ->
+ % Increment the stat at the pos.
Stat = element(StatPos, Stats),
Stats2 = setelement(StatPos, Stats, Stat + Val),
checkpoint_loop(State, SeqsInProgress, Stats2);
done ->
- io:format("checkpoint_loop done~n"),
+ % This means all the worker processes have completed their work.
+ % Assert that all the seqs have been processed
0 = gb_trees:size(SeqsInProgress),
State2 = do_checkpoint(State, Stats),
erlang:cancel_timer(State2#rep_state.timer),
@@ -155,6 +280,7 @@ checkpoint_loop(State, SeqsInProgress, S
end,
{ok, State2, Stats};
timed_checkpoint ->
+ % every checkpoint interval while processing
State2 = do_checkpoint(State, Stats),
Timer = erlang:start_timer(checkpoint_interval(State),
self(), timed_checkpoint),
@@ -250,89 +376,6 @@ commit_to_both(Source, Target) ->
{SourceStartTime, TargetStartTime}.
-spawn_missing_revs_finder(StatsProcess,
- Target, ChangesQueue, MissingRevsQueue) ->
- spawn_link(fun() ->
- missing_revs_finder_loop(StatsProcess,
- Target, ChangesQueue, MissingRevsQueue)
- end).
-
-
-remove_missing(IdRevsSeqDict, []) ->
- IdRevsSeqDict;
-remove_missing(IdRevsSeqDict, [{MissingId, MissingRevs, _}|Rest]) ->
- {AllChangedRevs, Seq} = dict:fetch(MissingId, IdRevsSeqDict),
- case AllChangedRevs -- MissingRevs of
- [] ->
- remove_missing(dict:erase(MissingId, IdRevsSeqDict), Rest);
- NotMissingRevs ->
- IdRevsSeqDict2 =
- dict:store(MissingId, {NotMissingRevs, Seq}, IdRevsSeqDict),
- remove_missing(IdRevsSeqDict2, Rest)
- end.
-
-
-missing_revs_finder_loop(Cp,
- Target, ChangesQueue, MissingRevsQueue) ->
- case couch_work_queue:dequeue(ChangesQueue) of
- closed ->
- io:format("missing_revs_finder_loop done~n"),
- couch_work_queue:close(MissingRevsQueue);
- {ok, DocInfos} ->
- IdRevs = [{Id, [Rev || #rev_info{rev=Rev} <- RevsInfo]} ||
- #doc_info{id=Id,revs=RevsInfo} <- DocInfos],
- {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs),
- IdRevsSeqDict = dict:from_list(
- [{Id, {[Rev || #rev_info{rev=Rev} <- RevsInfo], Seq}} ||
- #doc_info{id=Id,revs=RevsInfo,high_seq=Seq} <- DocInfos]),
- NonMissingIdRevsSeqDict = remove_missing(IdRevsSeqDict, Missing),
- % signal the completion of these that aren't missing
- lists:foreach(fun({_Id, {Revs, Seq}})->
- Cp ! {seq_changes_done, {Seq, length(Revs)}}
- end, dict:to_list(NonMissingIdRevsSeqDict)),
- % Expand out each into it's own work item
- lists:foreach(fun({Id, Revs, PAs})->
- % PA means "possible ancestor"
- Cp ! {add_stat, {#stats.missing_found, length(Revs)}},
- {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
- ok = couch_work_queue:queue(MissingRevsQueue,
- {Id, Revs, PAs, Seq})
- end, Missing),
- missing_revs_finder_loop(Cp, Target, ChangesQueue,
- MissingRevsQueue)
- end.
-
-
-spawn_doc_copy(Cp, Source, Target, MissingRevsQueue) ->
- spawn_link(fun() ->
- doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
- end).
-
-
-doc_copy_loop(Cp, Source, Target, MissingRevsQueue) ->
- case couch_work_queue:dequeue(MissingRevsQueue,1) of
- closed ->
- io:format("doc_copy_loop done~n"),
- Cp ! done;
- {ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
- couch_api_wrap:open_doc_revs(Source, Id, Revs,
- [{atts_since,PossibleAncestors}],
- fun({ok, Doc}, _) ->
- Cp ! {add_stat, {#stats.docs_read, 1}},
- case couch_api_wrap:update_doc(Target, Doc, [],
- replicated_changes) of
- {ok, _} ->
- Cp ! {add_stat, {#stats.docs_written, 1}};
- _Error ->
- Cp ! {add_stat, {#stats.doc_write_failures, 1}}
- end;
- (_, _) ->
- io:format("doc error!!!!!!~n"),
- ok
- end, []),
- Cp ! {seq_changes_done, {Seq, length(Revs)}},
- doc_copy_loop(Cp, Source, Target, MissingRevsQueue)
- end.
make_replication_id(Source, Target, UserCtx, Options) ->