You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by be...@apache.org on 2011/12/05 10:33:30 UTC
[4/8] git commit: create couch_replicator application.
create couch_replicator application.
First step, move files in couch_replicator application & add them
build.
Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/ad526790
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/ad526790
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/ad526790
Branch: refs/heads/master
Commit: ad5267908ba034f25ef816408f555d7a5ac6cbb2
Parents: 0c6f529
Author: benoitc <be...@apache.org>
Authored: Sun Dec 4 12:38:23 2011 +0100
Committer: benoitc <be...@apache.org>
Committed: Mon Dec 5 10:32:53 2011 +0100
----------------------------------------------------------------------
.gitignore | 2 +
configure.ac | 1 +
src/Makefile.am | 1 +
src/couch_replicator/Makefile.am | 73 ++
src/couch_replicator/src/couch_api_wrap.erl | 775 ++++++++++++
src/couch_replicator/src/couch_api_wrap.hrl | 36 +
src/couch_replicator/src/couch_api_wrap_httpc.erl | 286 +++++
src/couch_replicator/src/couch_httpc_pool.erl | 138 +++
.../src/couch_httpd_replicator.erl | 66 +
src/couch_replicator/src/couch_rep_sup.erl | 31 +
.../src/couch_replication_manager.erl | 626 ++++++++++
.../src/couch_replication_notifier.erl | 57 +
src/couch_replicator/src/couch_replicator.app.src | 33 +
src/couch_replicator/src/couch_replicator.erl | 942 +++++++++++++++
src/couch_replicator/src/couch_replicator.hrl | 30 +
.../src/couch_replicator_js_functions.hrl | 151 +++
.../src/couch_replicator_utils.erl | 382 ++++++
.../src/couch_replicator_worker.erl | 515 ++++++++
src/couch_replicator/test/001-httpc-pool.t | 250 ++++
.../test/002-replication-compact.t | 486 ++++++++
.../test/003-replication-large-atts.t | 267 ++++
.../test/004-replication-many-leaves.t | 216 ++++
src/couchdb/Makefile.am | 26 +-
src/couchdb/couch.app.tpl.in | 1 -
src/couchdb/couch_api_wrap.erl | 775 ------------
src/couchdb/couch_api_wrap.hrl | 36 -
src/couchdb/couch_api_wrap_httpc.erl | 286 -----
src/couchdb/couch_httpc_pool.erl | 138 ---
src/couchdb/couch_httpd_replicator.erl | 66 -
src/couchdb/couch_js_functions.hrl | 141 ---
src/couchdb/couch_rep_sup.erl | 31 -
src/couchdb/couch_replication_manager.erl | 626 ----------
src/couchdb/couch_replication_notifier.erl | 57 -
src/couchdb/couch_replicator.erl | 942 ---------------
src/couchdb/couch_replicator.hrl | 30 -
src/couchdb/couch_replicator_utils.erl | 382 ------
src/couchdb/couch_replicator_worker.erl | 515 --------
test/etap/001-load.t | 9 -
test/etap/230-httpc-pool.t | 250 ----
test/etap/240-replication-compact.t | 486 --------
test/etap/241-replication-large-atts.t | 267 ----
test/etap/242-replication-many-leaves.t | 216 ----
test/etap/Makefile.am | 6 +-
43 files changed, 5368 insertions(+), 5282 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index e6526e9..f0ebdef 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,6 +9,7 @@
*~
*.orig
*.rej
+*.sw*
erl_crash.dump
configure
autom4te.cache
@@ -52,6 +53,7 @@ etc/couchdb/default.ini
etc/launchd/org.apache.couchdb.plist
etc/logrotate.d/couchdb
src/couch_mrview/ebin/
+src/couch_replicator/ebin/
src/couchdb/.libs/*
src/couchdb/couch.app
src/couchdb/couchjs
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index a470f94..016cf41 100644
--- a/configure.ac
+++ b/configure.ac
@@ -549,6 +549,7 @@ AC_CONFIG_FILES([share/Makefile])
AC_CONFIG_FILES([src/Makefile])
AC_CONFIG_FILES([src/couch_index/Makefile])
AC_CONFIG_FILES([src/couch_mrview/Makefile])
+AC_CONFIG_FILES([src/couch_replicator/Makefile])
AC_CONFIG_FILES([src/couchdb/couch.app.tpl])
AC_CONFIG_FILES([src/couchdb/Makefile])
AC_CONFIG_FILES([src/couchdb/priv/Makefile])
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index f10f2b8..fbd514c 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -13,6 +13,7 @@
SUBDIRS = \
couch_index \
couch_mrview \
+ couch_replicator \
couchdb \
ejson \
erlang-oauth \
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/Makefile.am
----------------------------------------------------------------------
diff --git a/src/couch_replicator/Makefile.am b/src/couch_replicator/Makefile.am
new file mode 100644
index 0000000..5a4a0e6
--- /dev/null
+++ b/src/couch_replicator/Makefile.am
@@ -0,0 +1,73 @@
+## Licensed under the Apache License, Version 2.0 (the "License"); you may not
+## use this file except in compliance with the License. You may obtain a copy of
+## the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+## License for the specific language governing permissions and limitations under
+## the License.
+
+couch_replicatorlibdir = $(localerlanglibdir)/couch_replicator-0.1
+couch_replicatorincludedir = $(couch_replicatorlibdir)/include
+couch_replicatorebindir = $(couch_replicatorlibdir)/ebin
+
+couch_replicatorinclude_DATA = $(include_files)
+couch_replicatorebin_DATA = $(compiled_files)
+
+include_files = \
+ src/couch_api_wrap.hrl \
+ src/couch_replicator.hrl \
+ src/couch_replicator_js_functions.hrl
+
+source_files = \
+ src/couch_api_wrap_httpc.erl \
+ src/couch_api_wrap.erl \
+ src/couch_httpc_pool.erl \
+ src/couch_httpd_replicator.erl \
+ src/couch_rep_sup.erl \
+ src/couch_replication_manager.erl \
+ src/couch_replication_notifier.erl \
+ src/couch_replicator_utils.erl \
+ src/couch_replicator_worker.erl \
+ src/couch_replicator.app.src \
+ src/couch_replicator.erl
+
+test_files = \
+ test/001-httpc-pool.t \
+ test/002-replication-compact.t \
+ test/003-replication-large-atts.t \
+ test/004-replication-many-leaves.t
+
+compiled_files = \
+ ebin/couch_api_wrap_httpc.beam \
+ ebin/couch_api_wrap.beam \
+ ebin/couch_httpc_pool.beam \
+ ebin/couch_httpd_replicator.beam \
+ ebin/couch_rep_sup.beam \
+ ebin/couch_replication_manager.beam \
+ ebin/couch_replication_notifier.beam \
+ ebin/couch_replicator_utils.beam \
+ ebin/couch_replicator_worker.beam \
+ ebin/couch_replicator.app \
+ ebin/couch_replicator.beam
+
+EXTRA_DIST = $(include_files) $(source_files) $(test_files)
+CLEANFILES = $(compiled_files)
+
+check:
+ $(abs_top_builddir)/test/etap/run $(abs_top_srcdir)/src/couch_replicator/test
+
+ebin/%.app: src/%.app.src
+ @mkdir -p ebin/
+ sed -e "s|%version%|@version@|g" \
+ < $< > $@
+
+ebin/%.beam: src/%.erl $(include_files)
+ @mkdir -p ebin/
+ $(ERLC) -Wall -I$(top_srcdir)/src -I$(top_srcdir)/src/couchdb \
+ -o ebin/ $(ERLC_FLAGS) ${TEST} $<;
+
+
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_api_wrap.erl b/src/couch_replicator/src/couch_api_wrap.erl
new file mode 100644
index 0000000..2c57008
--- /dev/null
+++ b/src/couch_replicator/src/couch_api_wrap.erl
@@ -0,0 +1,775 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_api_wrap).
+
+% This module wraps the native erlang API, and allows for performing
+% operations on a remote vs. local databases via the same API.
+%
+% Notes:
+% Many options and apis aren't yet supported here, they are added as needed.
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+
+-export([
+ db_open/2,
+ db_open/3,
+ db_close/1,
+ get_db_info/1,
+ update_doc/3,
+ update_doc/4,
+ update_docs/3,
+ update_docs/4,
+ ensure_full_commit/1,
+ get_missing_revs/2,
+ open_doc/3,
+ open_doc_revs/6,
+ changes_since/5,
+ db_uri/1
+ ]).
+
+-import(couch_api_wrap_httpc, [
+ send_req/3
+ ]).
+
+-import(couch_util, [
+ encode_doc_id/1,
+ get_value/2,
+ get_value/3
+ ]).
+
+
+db_uri(#httpdb{url = Url}) ->
+ couch_util:url_strip_password(Url);
+
+db_uri(#db{name = Name}) ->
+ db_uri(Name);
+
+db_uri(DbName) ->
+ ?b2l(DbName).
+
+
+db_open(Db, Options) ->
+ db_open(Db, Options, false).
+
+db_open(#httpdb{} = Db1, _Options, Create) ->
+ {ok, Db} = couch_api_wrap_httpc:setup(Db1),
+ case Create of
+ false ->
+ ok;
+ true ->
+ send_req(Db, [{method, put}], fun(_, _, _) -> ok end)
+ end,
+ send_req(Db, [{method, head}],
+ fun(200, _, _) ->
+ {ok, Db};
+ (401, _, _) ->
+ throw({unauthorized, ?l2b(db_uri(Db))});
+ (_, _, _) ->
+ throw({db_not_found, ?l2b(db_uri(Db))})
+ end);
+db_open(DbName, Options, Create) ->
+ try
+ case Create of
+ false ->
+ ok;
+ true ->
+ ok = couch_httpd:verify_is_server_admin(
+ get_value(user_ctx, Options)),
+ couch_db:create(DbName, Options)
+ end,
+ case couch_db:open(DbName, Options) of
+ {not_found, _Reason} ->
+ throw({db_not_found, DbName});
+ {ok, _Db} = Success ->
+ Success
+ end
+ catch
+ throw:{unauthorized, _} ->
+ throw({unauthorized, DbName})
+ end.
+
+db_close(#httpdb{httpc_pool = Pool}) ->
+ unlink(Pool),
+ ok = couch_httpc_pool:stop(Pool);
+db_close(DbName) ->
+ catch couch_db:close(DbName).
+
+
+get_db_info(#httpdb{} = Db) ->
+ send_req(Db, [],
+ fun(200, _, {Props}) ->
+ {ok, Props}
+ end);
+get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
+ {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+ {ok, Info} = couch_db:get_db_info(Db),
+ couch_db:close(Db),
+ {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
+
+
+ensure_full_commit(#httpdb{} = Db) ->
+ send_req(
+ Db,
+ [{method, post}, {path, "_ensure_full_commit"},
+ {headers, [{"Content-Type", "application/json"}]}],
+ fun(201, _, {Props}) ->
+ {ok, get_value(<<"instance_start_time">>, Props)};
+ (_, _, {Props}) ->
+ {error, get_value(<<"error">>, Props)}
+ end);
+ensure_full_commit(Db) ->
+ couch_db:ensure_full_commit(Db).
+
+
+get_missing_revs(#httpdb{} = Db, IdRevs) ->
+ JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]},
+ send_req(
+ Db,
+ [{method, post}, {path, "_revs_diff"}, {body, ?JSON_ENCODE(JsonBody)}],
+ fun(200, _, {Props}) ->
+ ConvertToNativeFun = fun({Id, {Result}}) ->
+ MissingRevs = couch_doc:parse_revs(
+ get_value(<<"missing">>, Result)
+ ),
+ PossibleAncestors = couch_doc:parse_revs(
+ get_value(<<"possible_ancestors">>, Result, [])
+ ),
+ {Id, MissingRevs, PossibleAncestors}
+ end,
+ {ok, lists:map(ConvertToNativeFun, Props)}
+ end);
+get_missing_revs(Db, IdRevs) ->
+ couch_db:get_missing_revs(Db, IdRevs).
+
+
+
+open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
+ Path = encode_doc_id(Id),
+ QArgs = options_to_query_args(
+ HttpDb, Path, [revs, {open_revs, Revs} | Options]),
+ Self = self(),
+ Streamer = spawn_link(fun() ->
+ send_req(
+ HttpDb,
+ [{path, Path}, {qs, QArgs},
+ {ibrowse_options, [{stream_to, {self(), once}}]},
+ {headers, [{"Accept", "multipart/mixed"}]}],
+ fun(200, Headers, StreamDataFun) ->
+ remote_open_doc_revs_streamer_start(Self),
+ {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
+ get_value("Content-Type", Headers),
+ StreamDataFun,
+ fun mp_parse_mixed/1)
+ end),
+ unlink(Self)
+ end),
+ receive
+ {started_open_doc_revs, Ref} ->
+ receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc)
+ end;
+open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
+ {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
+ {ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}.
+
+
+open_doc(#httpdb{} = Db, Id, Options) ->
+ send_req(
+ Db,
+ [{path, encode_doc_id(Id)}, {qs, options_to_query_args(Options, [])}],
+ fun(200, _, Body) ->
+ {ok, couch_doc:from_json_obj(Body)};
+ (_, _, {Props}) ->
+ {error, get_value(<<"error">>, Props)}
+ end);
+open_doc(Db, Id, Options) ->
+ case couch_db:open_doc(Db, Id, Options) of
+ {ok, _} = Ok ->
+ Ok;
+ {not_found, _Reason} ->
+ {error, <<"not_found">>}
+ end.
+
+
+update_doc(Db, Doc, Options) ->
+ update_doc(Db, Doc, Options, interactive_edit).
+
+update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
+ QArgs = case Type of
+ replicated_changes ->
+ [{"new_edits", "false"}];
+ _ ->
+ []
+ end ++ options_to_query_args(Options, []),
+ Boundary = couch_uuids:random(),
+ JsonBytes = ?JSON_ENCODE(
+ couch_doc:to_json_obj(
+ Doc, [revs, attachments, follows, att_encoding_info | Options])),
+ {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
+ JsonBytes, Doc#doc.atts, true),
+ Headers = case lists:member(delay_commit, Options) of
+ true ->
+ [{"X-Couch-Full-Commit", "false"}];
+ false ->
+ []
+ end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
+ Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}},
+ send_req(
+ HttpDb,
+ [{method, put}, {path, encode_doc_id(DocId)},
+ {qs, QArgs}, {headers, Headers}, {body, Body}],
+ fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 ->
+ {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))};
+ (409, _, _) ->
+ throw(conflict);
+ (Code, _, {Props}) ->
+ case {Code, get_value(<<"error">>, Props)} of
+ {401, <<"unauthorized">>} ->
+ throw({unauthorized, get_value(<<"reason">>, Props)});
+ {403, <<"forbidden">>} ->
+ throw({forbidden, get_value(<<"reason">>, Props)});
+ {412, <<"missing_stub">>} ->
+ throw({missing_stub, get_value(<<"reason">>, Props)});
+ {_, Error} ->
+ {error, Error}
+ end
+ end);
+update_doc(Db, Doc, Options, Type) ->
+ couch_db:update_doc(Db, Doc, Options, Type).
+
+
+update_docs(Db, DocList, Options) ->
+ update_docs(Db, DocList, Options, interactive_edit).
+
+update_docs(_Db, [], _Options, _UpdateType) ->
+ {ok, []};
+update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
+ FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
+ Prefix = case UpdateType of
+ replicated_changes ->
+ <<"{\"new_edits\":false,\"docs\":[">>;
+ interactive_edit ->
+ <<"{\"docs\":[">>
+ end,
+ Suffix = <<"]}">>,
+ % Note: nginx and other servers don't like PUT/POST requests without
+ % a Content-Length header, so we can't do a chunked transfer encoding
+ % and JSON encode each doc only before sending it through the socket.
+ {Docs, Len} = lists:mapfoldl(
+ fun(#doc{} = Doc, Acc) ->
+ Json = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
+ {Json, Acc + iolist_size(Json)};
+ (Doc, Acc) ->
+ {Doc, Acc + iolist_size(Doc)}
+ end,
+ byte_size(Prefix) + byte_size(Suffix) + length(DocList) - 1,
+ DocList),
+ BodyFun = fun(eof) ->
+ eof;
+ ([]) ->
+ {ok, Suffix, eof};
+ ([prefix | Rest]) ->
+ {ok, Prefix, Rest};
+ ([Doc]) ->
+ {ok, Doc, []};
+ ([Doc | RestDocs]) ->
+ {ok, [Doc, ","], RestDocs}
+ end,
+ Headers = [
+ {"Content-Length", Len},
+ {"Content-Type", "application/json"},
+ {"X-Couch-Full-Commit", FullCommit}
+ ],
+ send_req(
+ HttpDb,
+ [{method, post}, {path, "_bulk_docs"},
+ {body, {BodyFun, [prefix | Docs]}}, {headers, Headers}],
+ fun(201, _, Results) when is_list(Results) ->
+ {ok, bulk_results_to_errors(DocList, Results, remote)};
+ (417, _, Results) when is_list(Results) ->
+ {ok, bulk_results_to_errors(DocList, Results, remote)}
+ end);
+update_docs(Db, DocList, Options, UpdateType) ->
+ Result = couch_db:update_docs(Db, DocList, Options, UpdateType),
+ {ok, bulk_results_to_errors(DocList, Result, UpdateType)}.
+
+
+changes_since(#httpdb{headers = Headers1} = HttpDb, Style, StartSeq,
+ UserFun, Options) ->
+ BaseQArgs = case get_value(continuous, Options, false) of
+ false ->
+ [{"feed", "normal"}];
+ true ->
+ [{"feed", "continuous"}, {"heartbeat", "10000"}]
+ end ++ [
+ {"style", atom_to_list(Style)}, {"since", couch_util:to_list(StartSeq)}
+ ],
+ DocIds = get_value(doc_ids, Options),
+ {QArgs, Method, Body, Headers} = case DocIds of
+ undefined ->
+ QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options),
+ {QArgs1, get, [], Headers1};
+ _ when is_list(DocIds) ->
+ Headers2 = [{"Content-Type", "application/json"} | Headers1],
+ JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
+ {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
+ end,
+ send_req(
+ HttpDb,
+ [{method, Method}, {path, "_changes"}, {qs, QArgs},
+ {headers, Headers}, {body, Body},
+ {ibrowse_options, [{stream_to, {self(), once}}]}],
+ fun(200, _, DataStreamFun) ->
+ parse_changes_feed(Options, UserFun, DataStreamFun);
+ (405, _, _) when is_list(DocIds) ->
+ % CouchDB versions < 1.1.0 don't have the builtin _changes feed
+ % filter "_doc_ids" neither support POST
+ send_req(HttpDb, [{method, get}, {path, "_changes"},
+ {qs, BaseQArgs}, {headers, Headers1},
+ {ibrowse_options, [{stream_to, {self(), once}}]}],
+ fun(200, _, DataStreamFun2) ->
+ UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
+ case lists:member(Id, DocIds) of
+ true ->
+ UserFun(DocInfo);
+ false ->
+ ok
+ end
+ end,
+ parse_changes_feed(Options, UserFun2, DataStreamFun2)
+ end)
+ end);
+changes_since(Db, Style, StartSeq, UserFun, Options) ->
+ Filter = case get_value(doc_ids, Options) of
+ undefined ->
+ ?b2l(get_value(filter, Options, <<>>));
+ _DocIds ->
+ "_doc_ids"
+ end,
+ Args = #changes_args{
+ style = Style,
+ since = StartSeq,
+ filter = Filter,
+ feed = case get_value(continuous, Options, false) of
+ true ->
+ "continuous";
+ false ->
+ "normal"
+ end,
+ timeout = infinity
+ },
+ QueryParams = get_value(query_params, Options, {[]}),
+ Req = changes_json_req(Db, Filter, QueryParams, Options),
+ ChangesFeedFun = couch_changes:handle_changes(Args, {json_req, Req}, Db),
+ ChangesFeedFun(fun({change, Change, _}, _) ->
+ UserFun(json_to_doc_info(Change));
+ (_, _) ->
+ ok
+ end).
+
+
+% internal functions
+
+maybe_add_changes_filter_q_args(BaseQS, Options) ->
+ case get_value(filter, Options) of
+ undefined ->
+ BaseQS;
+ FilterName ->
+ {Params} = get_value(query_params, Options, {[]}),
+ [{"filter", ?b2l(FilterName)} | lists:foldl(
+ fun({K, V}, QSAcc) ->
+ Ks = couch_util:to_list(K),
+ case lists:keymember(Ks, 1, QSAcc) of
+ true ->
+ QSAcc;
+ false ->
+ [{Ks, couch_util:to_list(V)} | QSAcc]
+ end
+ end,
+ BaseQS, Params)]
+ end.
+
+parse_changes_feed(Options, UserFun, DataStreamFun) ->
+ case get_value(continuous, Options, false) of
+ true ->
+ continuous_changes(DataStreamFun, UserFun);
+ false ->
+ EventFun = fun(Ev) ->
+ changes_ev1(Ev, fun(DocInfo, _) -> UserFun(DocInfo) end, [])
+ end,
+ json_stream_parse:events(DataStreamFun, EventFun)
+ end.
+
+changes_json_req(_Db, "", _QueryParams, _Options) ->
+ {[]};
+changes_json_req(_Db, "_doc_ids", _QueryParams, Options) ->
+ {[{<<"doc_ids">>, get_value(doc_ids, Options)}]};
+changes_json_req(Db, FilterName, {QueryParams}, _Options) ->
+ {ok, Info} = couch_db:get_db_info(Db),
+ % simulate a request to db_name/_changes
+ {[
+ {<<"info">>, {Info}},
+ {<<"id">>, null},
+ {<<"method">>, 'GET'},
+ {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
+ {<<"query">>, {[{<<"filter">>, FilterName} | QueryParams]}},
+ {<<"headers">>, []},
+ {<<"body">>, []},
+ {<<"peer">>, <<"replicator">>},
+ {<<"form">>, []},
+ {<<"cookie">>, []},
+ {<<"userCtx">>, couch_util:json_user_ctx(Db)}
+ ]}.
+
+
+options_to_query_args(HttpDb, Path, Options) ->
+ case lists:keytake(atts_since, 1, Options) of
+ false ->
+ options_to_query_args(Options, []);
+ {value, {atts_since, []}, Options2} ->
+ options_to_query_args(Options2, []);
+ {value, {atts_since, PAs}, Options2} ->
+ QueryArgs1 = options_to_query_args(Options2, []),
+ FullUrl = couch_api_wrap_httpc:full_url(
+ HttpDb, [{path, Path}, {qs, QueryArgs1}]),
+ RevList = atts_since_arg(
+ length("GET " ++ FullUrl ++ " HTTP/1.1\r\n") +
+ length("&atts_since=") + 6, % +6 = % encoded [ and ]
+ PAs, []),
+ [{"atts_since", ?JSON_ENCODE(RevList)} | QueryArgs1]
+ end.
+
+
+options_to_query_args([], Acc) ->
+ lists:reverse(Acc);
+options_to_query_args([ejson_body | Rest], Acc) ->
+ options_to_query_args(Rest, Acc);
+options_to_query_args([delay_commit | Rest], Acc) ->
+ options_to_query_args(Rest, Acc);
+options_to_query_args([revs | Rest], Acc) ->
+ options_to_query_args(Rest, [{"revs", "true"} | Acc]);
+options_to_query_args([{open_revs, all} | Rest], Acc) ->
+ options_to_query_args(Rest, [{"open_revs", "all"} | Acc]);
+options_to_query_args([{open_revs, Revs} | Rest], Acc) ->
+ JsonRevs = ?b2l(?JSON_ENCODE(couch_doc:revs_to_strs(Revs))),
+ options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]).
+
+
+-define(MAX_URL_LEN, 7000).
+
+atts_since_arg(_UrlLen, [], Acc) ->
+ lists:reverse(Acc);
+atts_since_arg(UrlLen, [PA | Rest], Acc) ->
+ RevStr = couch_doc:rev_to_str(PA),
+ NewUrlLen = case Rest of
+ [] ->
+ % plus 2 double quotes (% encoded)
+ UrlLen + size(RevStr) + 6;
+ _ ->
+ % plus 2 double quotes and a comma (% encoded)
+ UrlLen + size(RevStr) + 9
+ end,
+ case NewUrlLen >= ?MAX_URL_LEN of
+ true ->
+ lists:reverse(Acc);
+ false ->
+ atts_since_arg(NewUrlLen, Rest, [RevStr | Acc])
+ end.
+
+
+% TODO: A less verbose, more elegant and automatic restart strategy for
+% the exported open_doc_revs/6 function. The restart should be
+% transparent to the caller like any other Couch API function exported
+% by this module.
+receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) ->
+ try
+ % Left only for debugging purposes via an interactive or remote shell
+ erlang:put(open_doc_revs, {Id, Revs, Ref, Streamer}),
+ receive_docs(Streamer, Fun, Ref, Acc)
+ catch
+ error:{restart_open_doc_revs, NewRef} ->
+ receive_docs_loop(Streamer, Fun, Id, Revs, NewRef, Acc)
+ end.
+
+receive_docs(Streamer, UserFun, Ref, UserAcc) ->
+ Streamer ! {get_headers, Ref, self()},
+ receive
+ {started_open_doc_revs, NewRef} ->
+ restart_remote_open_doc_revs(Ref, NewRef);
+ {headers, Ref, Headers} ->
+ case get_value("content-type", Headers) of
+ {"multipart/related", _} = ContentType ->
+ case doc_from_multi_part_stream(
+ ContentType,
+ fun() -> receive_doc_data(Streamer, Ref) end,
+ Ref) of
+ {ok, Doc, Parser} ->
+ case UserFun({ok, Doc}, UserAcc) of
+ {ok, UserAcc2} ->
+ ok;
+ {skip, UserAcc2} ->
+ couch_doc:abort_multi_part_stream(Parser)
+ end,
+ receive_docs(Streamer, UserFun, Ref, UserAcc2)
+ end;
+ {"application/json", []} ->
+ Doc = couch_doc:from_json_obj(
+ ?JSON_DECODE(receive_all(Streamer, Ref, []))),
+ {_, UserAcc2} = UserFun({ok, Doc}, UserAcc),
+ receive_docs(Streamer, UserFun, Ref, UserAcc2);
+ {"application/json", [{"error","true"}]} ->
+ {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])),
+ Rev = get_value(<<"missing">>, ErrorProps),
+ Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
+ {_, UserAcc2} = UserFun(Result, UserAcc),
+ receive_docs(Streamer, UserFun, Ref, UserAcc2)
+ end;
+ {done, Ref} ->
+ {ok, UserAcc}
+ end.
+
+
+restart_remote_open_doc_revs(Ref, NewRef) ->
+ receive
+ {body_bytes, Ref, _} ->
+ restart_remote_open_doc_revs(Ref, NewRef);
+ {body_done, Ref} ->
+ restart_remote_open_doc_revs(Ref, NewRef);
+ {done, Ref} ->
+ restart_remote_open_doc_revs(Ref, NewRef);
+ {headers, Ref, _} ->
+ restart_remote_open_doc_revs(Ref, NewRef)
+ after 0 ->
+ erlang:error({restart_open_doc_revs, NewRef})
+ end.
+
+
+remote_open_doc_revs_streamer_start(Parent) ->
+ receive
+ {get_headers, _Ref, Parent} ->
+ remote_open_doc_revs_streamer_start(Parent);
+ {next_bytes, _Ref, Parent} ->
+ remote_open_doc_revs_streamer_start(Parent)
+ after 0 ->
+ Parent ! {started_open_doc_revs, make_ref()}
+ end.
+
+
+receive_all(Streamer, Ref, Acc) ->
+ Streamer ! {next_bytes, Ref, self()},
+ receive
+ {started_open_doc_revs, NewRef} ->
+ restart_remote_open_doc_revs(Ref, NewRef);
+ {body_bytes, Ref, Bytes} ->
+ receive_all(Streamer, Ref, [Bytes | Acc]);
+ {body_done, Ref} ->
+ lists:reverse(Acc)
+ end.
+
+
+mp_parse_mixed(eof) ->
+ receive {get_headers, Ref, From} ->
+ From ! {done, Ref}
+ end;
+mp_parse_mixed({headers, H}) ->
+ receive {get_headers, Ref, From} ->
+ From ! {headers, Ref, H}
+ end,
+ fun mp_parse_mixed/1;
+mp_parse_mixed({body, Bytes}) ->
+ receive {next_bytes, Ref, From} ->
+ From ! {body_bytes, Ref, Bytes}
+ end,
+ fun mp_parse_mixed/1;
+mp_parse_mixed(body_end) ->
+ receive {next_bytes, Ref, From} ->
+ From ! {body_done, Ref};
+ {get_headers, Ref, From} ->
+ self() ! {get_headers, Ref, From}
+ end,
+ fun mp_parse_mixed/1.
+
+
+receive_doc_data(Streamer, Ref) ->
+ Streamer ! {next_bytes, Ref, self()},
+ receive
+ {body_bytes, Ref, Bytes} ->
+ {Bytes, fun() -> receive_doc_data(Streamer, Ref) end};
+ {body_done, Ref} ->
+ {<<>>, fun() -> receive_doc_data(Streamer, Ref) end}
+ end.
+
+doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
+ Self = self(),
+ Parser = spawn_link(fun() ->
+ {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
+ ContentType, DataFun,
+ fun(Next) -> couch_doc:mp_parse_doc(Next, []) end),
+ unlink(Self)
+ end),
+ Parser ! {get_doc_bytes, Ref, self()},
+ receive
+ {started_open_doc_revs, NewRef} ->
+ unlink(Parser),
+ exit(Parser, kill),
+ restart_remote_open_doc_revs(Ref, NewRef);
+ {doc_bytes, Ref, DocBytes} ->
+ Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)),
+ ReadAttachmentDataFun = fun() ->
+ Parser ! {get_bytes, Ref, self()},
+ receive
+ {started_open_doc_revs, NewRef} ->
+ unlink(Parser),
+ exit(Parser, kill),
+ receive {bytes, Ref, _} -> ok after 0 -> ok end,
+ restart_remote_open_doc_revs(Ref, NewRef);
+ {bytes, Ref, Bytes} ->
+ Bytes
+ end
+ end,
+ Atts2 = lists:map(
+ fun(#att{data = follows} = A) ->
+ A#att{data = ReadAttachmentDataFun};
+ (A) ->
+ A
+ end, Doc#doc.atts),
+ {ok, Doc#doc{atts = Atts2}, Parser}
+ end.
+
+
+changes_ev1(object_start, UserFun, UserAcc) ->
+ fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
+
+changes_ev2({key, <<"results">>}, UserFun, UserAcc) ->
+ fun(Ev) -> changes_ev3(Ev, UserFun, UserAcc) end;
+changes_ev2(_, UserFun, UserAcc) ->
+ fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
+
+changes_ev3(array_start, UserFun, UserAcc) ->
+ fun(Ev) -> changes_ev_loop(Ev, UserFun, UserAcc) end.
+
+changes_ev_loop(object_start, UserFun, UserAcc) ->
+ fun(Ev) ->
+ json_stream_parse:collect_object(Ev,
+ fun(Obj) ->
+ UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc),
+ fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end
+ end)
+ end;
+changes_ev_loop(array_end, _UserFun, _UserAcc) ->
+ fun(_Ev) -> changes_ev_done() end.
+
+changes_ev_done() ->
+ fun(_Ev) -> changes_ev_done() end.
+
+continuous_changes(DataFun, UserFun) ->
+ {DataFun2, _, Rest} = json_stream_parse:events(
+ DataFun,
+ fun(Ev) -> parse_changes_line(Ev, UserFun) end),
+ continuous_changes(fun() -> {Rest, DataFun2} end, UserFun).
+
+parse_changes_line(object_start, UserFun) ->
+ fun(Ev) ->
+ json_stream_parse:collect_object(Ev,
+ fun(Obj) -> UserFun(json_to_doc_info(Obj)) end)
+ end.
+
+json_to_doc_info({Props}) ->
+ RevsInfo = lists:map(
+ fun({Change}) ->
+ Rev = couch_doc:parse_rev(get_value(<<"rev">>, Change)),
+ Del = (true =:= get_value(<<"deleted">>, Change)),
+ #rev_info{rev=Rev, deleted=Del}
+ end, get_value(<<"changes">>, Props)),
+ #doc_info{
+ id = get_value(<<"id">>, Props),
+ high_seq = get_value(<<"seq">>, Props),
+ revs = RevsInfo
+ }.
+
+
+bulk_results_to_errors(Docs, {ok, Results}, interactive_edit) ->
+ lists:reverse(lists:foldl(
+ fun({_, {ok, _}}, Acc) ->
+ Acc;
+ ({#doc{id = Id, revs = {Pos, [RevId | _]}}, Error}, Acc) ->
+ {_, Error, Reason} = couch_httpd:error_info(Error),
+ [ {[{id, Id}, {rev, rev_to_str({Pos, RevId})},
+ {error, Error}, {reason, Reason}]} | Acc ]
+ end,
+ [], lists:zip(Docs, Results)));
+
+bulk_results_to_errors(Docs, {ok, Results}, replicated_changes) ->
+ bulk_results_to_errors(Docs, {aborted, Results}, interactive_edit);
+
+bulk_results_to_errors(_Docs, {aborted, Results}, interactive_edit) ->
+ lists:map(
+ fun({{Id, Rev}, Err}) ->
+ {_, Error, Reason} = couch_httpd:error_info(Err),
+ {[{id, Id}, {rev, rev_to_str(Rev)}, {error, Error}, {reason, Reason}]}
+ end,
+ Results);
+
+bulk_results_to_errors(_Docs, Results, remote) ->
+ lists:reverse(lists:foldl(
+ fun({Props}, Acc) ->
+ case get_value(<<"error">>, Props, get_value(error, Props)) of
+ undefined ->
+ Acc;
+ Error ->
+ Id = get_value(<<"id">>, Props, get_value(id, Props)),
+ Rev = get_value(<<"rev">>, Props, get_value(rev, Props)),
+ Reason = get_value(<<"reason">>, Props, get_value(reason, Props)),
+ [ {[{id, Id}, {rev, rev_to_str(Rev)},
+ {error, Error}, {reason, Reason}]} | Acc ]
+ end
+ end,
+ [], Results)).
+
+
+rev_to_str({_Pos, _Id} = Rev) ->
+ couch_doc:rev_to_str(Rev);
+rev_to_str(Rev) ->
+ Rev.
+
+
+stream_doc({JsonBytes, Atts, Boundary, Len}) ->
+ case erlang:erase({doc_streamer, Boundary}) of
+ Pid when is_pid(Pid) ->
+ unlink(Pid),
+ exit(Pid, kill);
+ _ ->
+ ok
+ end,
+ Self = self(),
+ DocStreamer = spawn_link(fun() ->
+ couch_doc:doc_to_multi_part_stream(
+ Boundary, JsonBytes, Atts,
+ fun(Data) ->
+ receive {get_data, Ref, From} ->
+ From ! {data, Ref, Data}
+ end
+ end, true),
+ unlink(Self)
+ end),
+ erlang:put({doc_streamer, Boundary}, DocStreamer),
+ {ok, <<>>, {Len, Boundary}};
+stream_doc({0, Id}) ->
+ erlang:erase({doc_streamer, Id}),
+ eof;
+stream_doc({LenLeft, Id}) when LenLeft > 0 ->
+ Ref = make_ref(),
+ erlang:get({doc_streamer, Id}) ! {get_data, Ref, self()},
+ receive {data, Ref, Data} ->
+ {ok, Data, {LenLeft - iolist_size(Data), Id}}
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_api_wrap.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_api_wrap.hrl b/src/couch_replicator/src/couch_api_wrap.hrl
new file mode 100644
index 0000000..1a6f27a
--- /dev/null
+++ b/src/couch_replicator/src/couch_api_wrap.hrl
@@ -0,0 +1,36 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+
+-record(httpdb, {
+ url,
+ oauth = nil,
+ headers = [
+ {"Accept", "application/json"},
+ {"User-Agent", "CouchDB/" ++ couch_server:get_version()}
+ ],
+ timeout, % milliseconds
+ ibrowse_options = [],
+ retries = 10,
+ wait = 250, % milliseconds
+ httpc_pool = nil,
+ http_connections
+}).
+
+-record(oauth, {
+ consumer_key,
+ token,
+ token_secret,
+ consumer_secret,
+ signature_method
+}).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_api_wrap_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_api_wrap_httpc.erl b/src/couch_replicator/src/couch_api_wrap_httpc.erl
new file mode 100644
index 0000000..d05eec7
--- /dev/null
+++ b/src/couch_replicator/src/couch_api_wrap_httpc.erl
@@ -0,0 +1,286 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_api_wrap_httpc).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-export([setup/1]).
+-export([send_req/3]).
+-export([full_url/2]).
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3
+]).
+
+-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
+-define(MAX_WAIT, 5 * 60 * 1000).
+
+
+setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) ->
+ {ok, Pid} = couch_httpc_pool:start_link(Url, [{max_connections, MaxConns}]),
+ {ok, Db#httpdb{httpc_pool = Pid}}.
+
+
+send_req(HttpDb, Params1, Callback) ->
+ Params2 = ?replace(Params1, qs,
+ [{K, ?b2l(iolist_to_binary(V))} || {K, V} <- get_value(qs, Params1, [])]),
+ Params = ?replace(Params2, ibrowse_options,
+ lists:keysort(1, get_value(ibrowse_options, Params2, []))),
+ {Worker, Response} = send_ibrowse_req(HttpDb, Params),
+ process_response(Response, Worker, HttpDb, Params, Callback).
+
+
+send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb, Params) ->
+ Method = get_value(method, Params, get),
+ UserHeaders = lists:keysort(1, get_value(headers, Params, [])),
+ Headers1 = lists:ukeymerge(1, UserHeaders, BaseHeaders),
+ Headers2 = oauth_header(HttpDb, Params) ++ Headers1,
+ Url = full_url(HttpDb, Params),
+ Body = get_value(body, Params, []),
+ case get_value(path, Params) of
+ "_changes" ->
+ {ok, Worker} = ibrowse:spawn_link_worker_process(Url);
+ _ ->
+ {ok, Worker} = couch_httpc_pool:get_worker(HttpDb#httpdb.httpc_pool)
+ end,
+ IbrowseOptions = [
+ {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout} |
+ lists:ukeymerge(1, get_value(ibrowse_options, Params, []),
+ HttpDb#httpdb.ibrowse_options)
+ ],
+ Response = ibrowse:send_req_direct(
+ Worker, Url, Headers2, Method, Body, IbrowseOptions, infinity),
+ {Worker, Response}.
+
+
+process_response({error, sel_conn_closed}, _Worker, HttpDb, Params, Callback) ->
+ send_req(HttpDb, Params, Callback);
+
+process_response({error, {'EXIT', {normal, _}}}, _Worker, HttpDb, Params, Cb) ->
+ % ibrowse worker terminated because remote peer closed the socket
+ % -> not an error
+ send_req(HttpDb, Params, Cb);
+
+process_response({ibrowse_req_id, ReqId}, Worker, HttpDb, Params, Callback) ->
+ process_stream_response(ReqId, Worker, HttpDb, Params, Callback);
+
+process_response({ok, Code, Headers, Body}, Worker, HttpDb, Params, Callback) ->
+ release_worker(Worker, HttpDb),
+ case list_to_integer(Code) of
+ Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
+ EJson = case Body of
+ <<>> ->
+ null;
+ Json ->
+ ?JSON_DECODE(Json)
+ end,
+ Callback(Ok, Headers, EJson);
+ R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+ do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
+ Error ->
+ maybe_retry({code, Error}, Worker, HttpDb, Params, Callback)
+ end;
+
+process_response(Error, Worker, HttpDb, Params, Callback) ->
+ maybe_retry(Error, Worker, HttpDb, Params, Callback).
+
+
+process_stream_response(ReqId, Worker, HttpDb, Params, Callback) ->
+ receive
+ {ibrowse_async_headers, ReqId, Code, Headers} ->
+ case list_to_integer(Code) of
+ Ok when Ok =:= 200 ; Ok =:= 201 ; (Ok >= 400 andalso Ok < 500) ->
+ StreamDataFun = fun() ->
+ stream_data_self(HttpDb, Params, Worker, ReqId, Callback)
+ end,
+ ibrowse:stream_next(ReqId),
+ try
+ Ret = Callback(Ok, Headers, StreamDataFun),
+ release_worker(Worker, HttpDb),
+ clean_mailbox_req(ReqId),
+ Ret
+ catch throw:{maybe_retry_req, Err} ->
+ clean_mailbox_req(ReqId),
+ maybe_retry(Err, Worker, HttpDb, Params, Callback)
+ end;
+ R when R =:= 301 ; R =:= 302 ; R =:= 303 ->
+ do_redirect(Worker, R, Headers, HttpDb, Params, Callback);
+ Error ->
+ report_error(Worker, HttpDb, Params, {code, Error})
+ end;
+ {ibrowse_async_response, ReqId, {error, _} = Error} ->
+ maybe_retry(Error, Worker, HttpDb, Params, Callback)
+ after HttpDb#httpdb.timeout + 500 ->
+ % Note: ibrowse should always reply with timeouts, but this doesn't
+ % seem to be always true when there's a very high rate of requests
+ % and many open connections.
+ maybe_retry(timeout, Worker, HttpDb, Params, Callback)
+ end.
+
+
+clean_mailbox_req(ReqId) ->
+ receive
+ {ibrowse_async_response, ReqId, _} ->
+ clean_mailbox_req(ReqId);
+ {ibrowse_async_response_end, ReqId} ->
+ clean_mailbox_req(ReqId)
+ after 0 ->
+ ok
+ end.
+
+
+release_worker(Worker, #httpdb{httpc_pool = Pool}) ->
+ ok = couch_httpc_pool:release_worker(Pool, Worker).
+
+
+maybe_retry(Error, Worker, #httpdb{retries = 0} = HttpDb, Params, _Cb) ->
+ report_error(Worker, HttpDb, Params, {error, Error});
+
+maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
+ Params, Cb) ->
+ release_worker(Worker, HttpDb),
+ Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
+ Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
+ ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~s",
+ [Method, Url, Wait / 1000, error_cause(Error)]),
+ ok = timer:sleep(Wait),
+ Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
+ send_req(HttpDb#httpdb{retries = Retries - 1, wait = Wait2}, Params, Cb).
+
+
+report_error(Worker, HttpDb, Params, Error) ->
+ Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
+ Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
+ do_report_error(Url, Method, Error),
+ release_worker(Worker, HttpDb),
+ exit({http_request_failed, Method, Url, Error}).
+
+
+do_report_error(Url, Method, {code, Code}) ->
+ ?LOG_ERROR("Replicator, request ~s to ~p failed. The received "
+ "HTTP error code is ~p", [Method, Url, Code]);
+
+do_report_error(FullUrl, Method, Error) ->
+ ?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~s",
+ [Method, FullUrl, error_cause(Error)]).
+
+
+error_cause({error, Cause}) ->
+ lists:flatten(io_lib:format("~p", [Cause]));
+error_cause(Cause) ->
+ lists:flatten(io_lib:format("~p", [Cause])).
+
+
+stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) ->
+ receive
+ {ibrowse_async_response, ReqId, {error, Error}} ->
+ throw({maybe_retry_req, Error});
+ {ibrowse_async_response, ReqId, <<>>} ->
+ ibrowse:stream_next(ReqId),
+ stream_data_self(HttpDb, Params, Worker, ReqId, Cb);
+ {ibrowse_async_response, ReqId, Data} ->
+ ibrowse:stream_next(ReqId),
+ {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end};
+ {ibrowse_async_response_end, ReqId} ->
+ {<<>>, fun() -> throw({maybe_retry_req, more_data_expected}) end}
+ after T + 500 ->
+ % Note: ibrowse should always reply with timeouts, but this doesn't
+ % seem to be always true when there's a very high rate of requests
+ % and many open connections.
+ throw({maybe_retry_req, timeout})
+ end.
+
+
+full_url(#httpdb{url = BaseUrl}, Params) ->
+ Path = get_value(path, Params, []),
+ QueryArgs = get_value(qs, Params, []),
+ BaseUrl ++ Path ++ query_args_to_string(QueryArgs, []).
+
+
+query_args_to_string([], []) ->
+ "";
+query_args_to_string([], Acc) ->
+ "?" ++ string:join(lists:reverse(Acc), "&");
+query_args_to_string([{K, V} | Rest], Acc) ->
+ query_args_to_string(Rest, [K ++ "=" ++ couch_httpd:quote(V) | Acc]).
+
+
+oauth_header(#httpdb{oauth = nil}, _ConnParams) ->
+ [];
+oauth_header(#httpdb{url = BaseUrl, oauth = OAuth}, ConnParams) ->
+ Consumer = {
+ OAuth#oauth.consumer_key,
+ OAuth#oauth.consumer_secret,
+ OAuth#oauth.signature_method
+ },
+ Method = case get_value(method, ConnParams, get) of
+ get -> "GET";
+ post -> "POST";
+ put -> "PUT";
+ head -> "HEAD"
+ end,
+ QSL = get_value(qs, ConnParams, []),
+ OAuthParams = oauth:signed_params(Method,
+ BaseUrl ++ get_value(path, ConnParams, []),
+ QSL, Consumer, OAuth#oauth.token, OAuth#oauth.token_secret) -- QSL,
+ [{"Authorization",
+ "OAuth " ++ oauth_uri:params_to_header_string(OAuthParams)}].
+
+
+do_redirect(Worker, Code, Headers, #httpdb{url = Url} = HttpDb, Params, Cb) ->
+ release_worker(Worker, HttpDb),
+ RedirectUrl = redirect_url(Headers, Url),
+ {HttpDb2, Params2} = after_redirect(RedirectUrl, Code, HttpDb, Params),
+ send_req(HttpDb2, Params2, Cb).
+
+
+redirect_url(RespHeaders, OrigUrl) ->
+ MochiHeaders = mochiweb_headers:make(RespHeaders),
+ RedUrl = mochiweb_headers:get_value("Location", MochiHeaders),
+ #url{
+ host = Host,
+ host_type = HostType,
+ port = Port,
+ path = Path, % includes query string
+ protocol = Proto
+ } = ibrowse_lib:parse_url(RedUrl),
+ #url{
+ username = User,
+ password = Passwd
+ } = ibrowse_lib:parse_url(OrigUrl),
+ Creds = case is_list(User) andalso is_list(Passwd) of
+ true ->
+ User ++ ":" ++ Passwd ++ "@";
+ false ->
+ []
+ end,
+ HostPart = case HostType of
+ ipv6_address ->
+ "[" ++ Host ++ "]";
+ _ ->
+ Host
+ end,
+ atom_to_list(Proto) ++ "://" ++ Creds ++ HostPart ++ ":" ++
+ integer_to_list(Port) ++ Path.
+
+after_redirect(RedirectUrl, 303, HttpDb, Params) ->
+ after_redirect(RedirectUrl, HttpDb, ?replace(Params, method, get));
+after_redirect(RedirectUrl, _Code, HttpDb, Params) ->
+ after_redirect(RedirectUrl, HttpDb, Params).
+
+after_redirect(RedirectUrl, HttpDb, Params) ->
+ Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
+ {HttpDb#httpdb{url = RedirectUrl}, Params2}.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_httpc_pool.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_httpc_pool.erl b/src/couch_replicator/src/couch_httpc_pool.erl
new file mode 100644
index 0000000..f6b7c26
--- /dev/null
+++ b/src/couch_replicator/src/couch_httpc_pool.erl
@@ -0,0 +1,138 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_httpc_pool).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/2, stop/1]).
+-export([get_worker/1, release_worker/2]).
+
+% gen_server API
+-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3
+]).
+
+-record(state, {
+ url,
+ limit, % max # of workers allowed
+ free = [], % free workers (connections)
+ busy = [], % busy workers (connections)
+ waiting = queue:new() % blocked clients waiting for a worker
+}).
+
+
+start_link(Url, Options) ->
+ gen_server:start_link(?MODULE, {Url, Options}, []).
+
+
+stop(Pool) ->
+ ok = gen_server:call(Pool, stop, infinity).
+
+
+get_worker(Pool) ->
+ {ok, _Worker} = gen_server:call(Pool, get_worker, infinity).
+
+
+release_worker(Pool, Worker) ->
+ ok = gen_server:cast(Pool, {release_worker, Worker}).
+
+
+init({Url, Options}) ->
+ process_flag(trap_exit, true),
+ State = #state{
+ url = Url,
+ limit = get_value(max_connections, Options)
+ },
+ {ok, State}.
+
+
+handle_call(get_worker, From, #state{waiting = Waiting} = State) ->
+ #state{url = Url, limit = Limit, busy = Busy, free = Free} = State,
+ case length(Busy) >= Limit of
+ true ->
+ {noreply, State#state{waiting = queue:in(From, Waiting)}};
+ false ->
+ case Free of
+ [] ->
+ {ok, Worker} = ibrowse:spawn_link_worker_process(Url),
+ Free2 = Free;
+ [Worker | Free2] ->
+ ok
+ end,
+ NewState = State#state{free = Free2, busy = [Worker | Busy]},
+ {reply, {ok, Worker}, NewState}
+ end;
+
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}.
+
+
+handle_cast({release_worker, Worker}, #state{waiting = Waiting} = State) ->
+ case is_process_alive(Worker) andalso
+ lists:member(Worker, State#state.busy) of
+ true ->
+ case queue:out(Waiting) of
+ {empty, Waiting2} ->
+ Busy2 = State#state.busy -- [Worker],
+ Free2 = [Worker | State#state.free];
+ {{value, From}, Waiting2} ->
+ gen_server:reply(From, {ok, Worker}),
+ Busy2 = State#state.busy,
+ Free2 = State#state.free
+ end,
+ NewState = State#state{
+ busy = Busy2,
+ free = Free2,
+ waiting = Waiting2
+ },
+ {noreply, NewState};
+ false ->
+ {noreply, State}
+ end.
+
+
+handle_info({'EXIT', Pid, _Reason}, #state{busy = Busy, free = Free} = State) ->
+ case Free -- [Pid] of
+ Free ->
+ case Busy -- [Pid] of
+ Busy ->
+ {noreply, State};
+ Busy2 ->
+ case queue:out(State#state.waiting) of
+ {empty, _} ->
+ {noreply, State#state{busy = Busy2}};
+ {{value, From}, Waiting2} ->
+ {ok, Worker} = ibrowse:spawn_link_worker_process(State#state.url),
+ gen_server:reply(From, {ok, Worker}),
+ {noreply, State#state{busy = [Worker | Busy2], waiting = Waiting2}}
+ end
+ end;
+ Free2 ->
+ {noreply, State#state{free = Free2}}
+ end.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+terminate(_Reason, State) ->
+ lists:foreach(fun ibrowse_http_client:stop/1, State#state.free),
+ lists:foreach(fun ibrowse_http_client:stop/1, State#state.busy).
+
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_httpd_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_httpd_replicator.erl b/src/couch_replicator/src/couch_httpd_replicator.erl
new file mode 100644
index 0000000..fb1e350
--- /dev/null
+++ b/src/couch_replicator/src/couch_httpd_replicator.erl
@@ -0,0 +1,66 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_httpd_replicator).
+
+-include("couch_db.hrl").
+
+-import(couch_httpd, [
+ send_json/2,
+ send_json/3,
+ send_method_not_allowed/2
+]).
+
+-import(couch_util, [
+ to_binary/1
+]).
+
+-export([handle_req/1]).
+
+
+handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
+ couch_httpd:validate_ctype(Req, "application/json"),
+ RepDoc = {Props} = couch_httpd:json_body_obj(Req),
+ validate_rep_props(Props),
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepDoc, UserCtx),
+ case couch_replicator:replicate(Rep) of
+ {error, {Error, Reason}} ->
+ send_json(
+ Req, 404,
+ {[{error, to_binary(Error)}, {reason, to_binary(Reason)}]});
+ {error, not_found} ->
+ % Tried to cancel a replication that didn't exist.
+ send_json(Req, 404, {[{error, <<"not found">>}]});
+ {error, Reason} ->
+ send_json(Req, 500, {[{error, to_binary(Reason)}]});
+ {ok, {cancelled, RepId}} ->
+ send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
+ {ok, {continuous, RepId}} ->
+ send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
+ {ok, {HistoryResults}} ->
+ send_json(Req, {[{ok, true} | HistoryResults]})
+ end;
+
+handle_req(Req) ->
+ send_method_not_allowed(Req, "POST").
+
+validate_rep_props([]) ->
+ ok;
+validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
+ lists:foreach(fun
+ ({_,V}) when is_binary(V) -> ok;
+ ({K,_}) -> throw({bad_request,
+ <<K/binary," value must be a string.">>})
+ end, Params),
+ validate_rep_props(Rest);
+validate_rep_props([_|Rest]) ->
+ validate_rep_props(Rest).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_rep_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_rep_sup.erl b/src/couch_replicator/src/couch_rep_sup.erl
new file mode 100644
index 0000000..1318c59
--- /dev/null
+++ b/src/couch_replicator/src/couch_rep_sup.erl
@@ -0,0 +1,31 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_sup).
+-behaviour(supervisor).
+-export([init/1, start_link/0]).
+
+-include("couch_db.hrl").
+
+start_link() ->
+ supervisor:start_link({local,?MODULE}, ?MODULE, []).
+
+%%=============================================================================
+%% supervisor callbacks
+%%=============================================================================
+
+init([]) ->
+ {ok, {{one_for_one, 3, 10}, []}}.
+
+%%=============================================================================
+%% internal functions
+%%=============================================================================
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_replication_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replication_manager.erl b/src/couch_replicator/src/couch_replication_manager.erl
new file mode 100644
index 0000000..6b66cf3
--- /dev/null
+++ b/src/couch_replicator/src/couch_replication_manager.erl
@@ -0,0 +1,626 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replication_manager).
+-behaviour(gen_server).
+
+% public API
+-export([replication_started/1, replication_completed/1, replication_error/2]).
+
+% gen_server callbacks
+-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_js_functions.hrl").
+
+-define(DOC_TO_REP, couch_rep_doc_id_to_rep_id).
+-define(REP_TO_STATE, couch_rep_id_to_rep_state).
+-define(INITIAL_WAIT, 2.5). % seconds
+-define(MAX_WAIT, 600). % seconds
+
+-record(rep_state, {
+ rep,
+ starting,
+ retries_left,
+ max_retries,
+ wait = ?INITIAL_WAIT
+}).
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3,
+ to_binary/1
+]).
+
+-record(state, {
+ changes_feed_loop = nil,
+ db_notifier = nil,
+ rep_db_name = nil,
+ rep_start_pids = [],
+ max_retries
+}).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+replication_started(#rep{id = {BaseId, _} = RepId}) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{rep = #rep{doc_id = DocId}} ->
+ update_rep_doc(DocId, [
+ {<<"_replication_state">>, <<"triggered">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
+ ok = gen_server:call(?MODULE, {rep_started, RepId}, infinity),
+ ?LOG_INFO("Document `~s` triggered replication `~s`",
+ [DocId, pp_rep_id(RepId)])
+ end.
+
+
+replication_completed(#rep{id = RepId}) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{rep = #rep{doc_id = DocId}} ->
+ update_rep_doc(DocId, [{<<"_replication_state">>, <<"completed">>}]),
+ ok = gen_server:call(?MODULE, {rep_complete, RepId}, infinity),
+ ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
+ [pp_rep_id(RepId), DocId])
+ end.
+
+
+replication_error(#rep{id = {BaseId, _} = RepId}, Error) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ #rep_state{rep = #rep{doc_id = DocId}} ->
+ % TODO: maybe add error reason to replication document
+ update_rep_doc(DocId, [
+ {<<"_replication_state">>, <<"error">>},
+ {<<"_replication_id">>, ?l2b(BaseId)}]),
+ ok = gen_server:call(?MODULE, {rep_error, RepId, Error}, infinity)
+ end.
+
+
+init(_) ->
+ process_flag(trap_exit, true),
+ ?DOC_TO_REP = ets:new(?DOC_TO_REP, [named_table, set, protected]),
+ ?REP_TO_STATE = ets:new(?REP_TO_STATE, [named_table, set, protected]),
+ Server = self(),
+ ok = couch_config:register(
+ fun("replicator", "db", NewName) ->
+ ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)});
+ ("replicator", "max_replication_retry_count", V) ->
+ ok = gen_server:cast(Server, {set_max_retries, retries_value(V)})
+ end
+ ),
+ {Loop, RepDbName} = changes_feed_loop(),
+ {ok, #state{
+ changes_feed_loop = Loop,
+ rep_db_name = RepDbName,
+ db_notifier = db_update_notifier(),
+ max_retries = retries_value(
+ couch_config:get("replicator", "max_replication_retry_count", "10"))
+ }}.
+
+
+handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) ->
+ NewState = try
+ process_update(State, Change)
+ catch
+ _Tag:Error ->
+ {RepProps} = get_value(doc, ChangeProps),
+ DocId = get_value(<<"_id">>, RepProps),
+ rep_db_update_error(Error, DocId),
+ State
+ end,
+ {reply, ok, NewState};
+
+
+handle_call({rep_started, RepId}, _From, State) ->
+ case rep_state(RepId) of
+ nil ->
+ ok;
+ RepState ->
+ NewRepState = RepState#rep_state{
+ starting = false,
+ retries_left = State#state.max_retries,
+ max_retries = State#state.max_retries,
+ wait = ?INITIAL_WAIT
+ },
+ true = ets:insert(?REP_TO_STATE, {RepId, NewRepState})
+ end,
+ {reply, ok, State};
+
+handle_call({rep_complete, RepId}, _From, State) ->
+ true = ets:delete(?REP_TO_STATE, RepId),
+ {reply, ok, State};
+
+handle_call({rep_error, RepId, Error}, _From, State) ->
+ {reply, ok, replication_error(State, RepId, Error)};
+
+handle_call(Msg, From, State) ->
+ ?LOG_ERROR("Replication manager received unexpected call ~p from ~p",
+ [Msg, From]),
+ {stop, {error, {unexpected_call, Msg}}, State}.
+
+
+handle_cast({rep_db_changed, NewName}, #state{rep_db_name = NewName} = State) ->
+ {noreply, State};
+
+handle_cast({rep_db_changed, _NewName}, State) ->
+ {noreply, restart(State)};
+
+handle_cast({rep_db_created, NewName}, #state{rep_db_name = NewName} = State) ->
+ {noreply, State};
+
+handle_cast({rep_db_created, _NewName}, State) ->
+ {noreply, restart(State)};
+
+handle_cast({set_max_retries, MaxRetries}, State) ->
+ {noreply, State#state{max_retries = MaxRetries}};
+
+handle_cast(Msg, State) ->
+ ?LOG_ERROR("Replication manager received unexpected cast ~p", [Msg]),
+ {stop, {error, {unexpected_cast, Msg}}, State}.
+
+
+handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
+ % replicator DB deleted
+ {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};
+
+handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
+ ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
+ {stop, {db_update_notifier_died, Reason}, State};
+
+handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
+ % one of the replication start processes terminated successfully
+ {noreply, State#state{rep_start_pids = Pids -- [From]}};
+
+handle_info({'DOWN', _Ref, _, _, _}, State) ->
+ % From a db monitor created by a replication process. Ignore.
+ {noreply, State};
+
+handle_info(Msg, State) ->
+ ?LOG_ERROR("Replication manager received unexpected message ~p", [Msg]),
+ {stop, {unexpected_msg, Msg}, State}.
+
+
+terminate(_Reason, State) ->
+ #state{
+ rep_start_pids = StartPids,
+ changes_feed_loop = Loop,
+ db_notifier = DbNotifier
+ } = State,
+ stop_all_replications(),
+ lists:foreach(
+ fun(Pid) ->
+ catch unlink(Pid),
+ catch exit(Pid, stop)
+ end,
+ [Loop | StartPids]),
+ true = ets:delete(?REP_TO_STATE),
+ true = ets:delete(?DOC_TO_REP),
+ couch_db_update_notifier:stop(DbNotifier).
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+changes_feed_loop() ->
+ {ok, RepDb} = ensure_rep_db_exists(),
+ RepDbName = couch_db:name(RepDb),
+ couch_db:close(RepDb),
+ Server = self(),
+ Pid = spawn_link(
+ fun() ->
+ DbOpenOptions = [{user_ctx, RepDb#db.user_ctx}, sys_db],
+ {ok, Db} = couch_db:open_int(RepDbName, DbOpenOptions),
+ ChangesFeedFun = couch_changes:handle_changes(
+ #changes_args{
+ include_docs = true,
+ feed = "continuous",
+ timeout = infinity,
+ db_open_options = [sys_db]
+ },
+ {json_req, null},
+ Db
+ ),
+ ChangesFeedFun(
+ fun({change, Change, _}, _) ->
+ case has_valid_rep_id(Change) of
+ true ->
+ ok = gen_server:call(
+ Server, {rep_db_update, Change}, infinity);
+ false ->
+ ok
+ end;
+ (_, _) ->
+ ok
+ end
+ ),
+ couch_db:close(Db)
+ end
+ ),
+ {Pid, RepDbName}.
+
+
+has_valid_rep_id({Change}) ->
+ has_valid_rep_id(get_value(<<"id">>, Change));
+has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
+ false;
+has_valid_rep_id(_Else) ->
+ true.
+
+
+db_update_notifier() ->
+ Server = self(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({created, DbName}) ->
+ case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
+ DbName ->
+ ok = gen_server:cast(Server, {rep_db_created, DbName});
+ _ ->
+ ok
+ end;
+ (_) ->
+ % no need to handle the 'deleted' event - the changes feed loop
+ % dies when the database is deleted
+ ok
+ end
+ ),
+ Notifier.
+
+
+restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
+ stop_all_replications(),
+ lists:foreach(
+ fun(Pid) ->
+ catch unlink(Pid),
+ catch exit(Pid, rep_db_changed)
+ end,
+ [Loop | StartPids]),
+ {NewLoop, NewRepDbName} = changes_feed_loop(),
+ State#state{
+ changes_feed_loop = NewLoop,
+ rep_db_name = NewRepDbName,
+ rep_start_pids = []
+ }.
+
+
+process_update(State, {Change}) ->
+ {RepProps} = JsonRepDoc = get_value(doc, Change),
+ DocId = get_value(<<"_id">>, RepProps),
+ case get_value(<<"deleted">>, Change, false) of
+ true ->
+ rep_doc_deleted(DocId),
+ State;
+ false ->
+ case get_value(<<"_replication_state">>, RepProps) of
+ undefined ->
+ maybe_start_replication(State, DocId, JsonRepDoc);
+ <<"triggered">> ->
+ maybe_start_replication(State, DocId, JsonRepDoc);
+ <<"completed">> ->
+ replication_complete(DocId),
+ State;
+ <<"error">> ->
+ case ets:lookup(?DOC_TO_REP, DocId) of
+ [] ->
+ maybe_start_replication(State, DocId, JsonRepDoc);
+ _ ->
+ State
+ end
+ end
+ end.
+
+
+rep_db_update_error(Error, DocId) ->
+ case Error of
+ {bad_rep_doc, Reason} ->
+ ok;
+ _ ->
+ Reason = to_binary(Error)
+ end,
+ ?LOG_ERROR("Replication manager, error processing document `~s`: ~s",
+ [DocId, Reason]),
+ update_rep_doc(DocId, [{<<"_replication_state">>, <<"error">>}]).
+
+
+rep_user_ctx({RepDoc}) ->
+ case get_value(<<"user_ctx">>, RepDoc) of
+ undefined ->
+ #user_ctx{};
+ {UserCtx} ->
+ #user_ctx{
+ name = get_value(<<"name">>, UserCtx, null),
+ roles = get_value(<<"roles">>, UserCtx, [])
+ }
+ end.
+
+
+maybe_start_replication(State, DocId, RepDoc) ->
+ #rep{id = {BaseId, _} = RepId} = Rep = parse_rep_doc(RepDoc),
+ case rep_state(RepId) of
+ nil ->
+ RepState = #rep_state{
+ rep = Rep,
+ starting = true,
+ retries_left = State#state.max_retries,
+ max_retries = State#state.max_retries
+ },
+ true = ets:insert(?REP_TO_STATE, {RepId, RepState}),
+ true = ets:insert(?DOC_TO_REP, {DocId, RepId}),
+ ?LOG_INFO("Attempting to start replication `~s` (document `~s`).",
+ [pp_rep_id(RepId), DocId]),
+ Pid = spawn_link(fun() -> start_replication(Rep, 0) end),
+ State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
+ #rep_state{rep = #rep{doc_id = DocId}} ->
+ State;
+ #rep_state{starting = false, rep = #rep{doc_id = OtherDocId}} ->
+ ?LOG_INFO("The replication specified by the document `~s` was already"
+ " triggered by the document `~s`", [DocId, OtherDocId]),
+ maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
+ State;
+ #rep_state{starting = true, rep = #rep{doc_id = OtherDocId}} ->
+ ?LOG_INFO("The replication specified by the document `~s` is already"
+ " being triggered by the document `~s`", [DocId, OtherDocId]),
+ maybe_tag_rep_doc(DocId, RepDoc, ?l2b(BaseId)),
+ State
+ end.
+
+
+parse_rep_doc(RepDoc) ->
+ {ok, Rep} = try
+ couch_replicator_utils:parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
+ catch
+ throw:{error, Reason} ->
+ throw({bad_rep_doc, Reason});
+ Tag:Err ->
+ throw({bad_rep_doc, to_binary({Tag, Err})})
+ end,
+ Rep.
+
+
+maybe_tag_rep_doc(DocId, {RepProps}, RepId) ->
+ case get_value(<<"_replication_id">>, RepProps) of
+ RepId ->
+ ok;
+ _ ->
+ update_rep_doc(DocId, [{<<"_replication_id">>, RepId}])
+ end.
+
+
+start_replication(Rep, Wait) ->
+ ok = timer:sleep(Wait * 1000),
+ case (catch couch_replicator:async_replicate(Rep)) of
+ {ok, _} ->
+ ok;
+ Error ->
+ replication_error(Rep, Error)
+ end.
+
+
+replication_complete(DocId) ->
+ case ets:lookup(?DOC_TO_REP, DocId) of
+ [{DocId, {BaseId, Ext} = RepId}] ->
+ case rep_state(RepId) of
+ nil ->
+ % Prior to OTP R14B02, temporary child specs remain in
+ % in the supervisor after a worker finishes - remove them.
+ % We want to be able to start the same replication but with
+ % eventually different values for parameters that don't
+ % contribute to its ID calculation.
+ _ = supervisor:delete_child(couch_rep_sup, BaseId ++ Ext);
+ #rep_state{} ->
+ ok
+ end,
+ true = ets:delete(?DOC_TO_REP, DocId);
+ _ ->
+ ok
+ end.
+
+
+rep_doc_deleted(DocId) ->
+ case ets:lookup(?DOC_TO_REP, DocId) of
+ [{DocId, RepId}] ->
+ couch_replicator:cancel_replication(RepId),
+ true = ets:delete(?REP_TO_STATE, RepId),
+ true = ets:delete(?DOC_TO_REP, DocId),
+ ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
+ " was deleted", [pp_rep_id(RepId), DocId]);
+ [] ->
+ ok
+ end.
+
+
+replication_error(State, RepId, Error) ->
+ case rep_state(RepId) of
+ nil ->
+ State;
+ RepState ->
+ maybe_retry_replication(RepState, Error, State)
+ end.
+
+maybe_retry_replication(#rep_state{retries_left = 0} = RepState, Error, State) ->
+ #rep_state{
+ rep = #rep{id = RepId, doc_id = DocId},
+ max_retries = MaxRetries
+ } = RepState,
+ couch_replicator:cancel_replication(RepId),
+ true = ets:delete(?REP_TO_STATE, RepId),
+ true = ets:delete(?DOC_TO_REP, DocId),
+ ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
+ "~nReached maximum retry attempts (~p).",
+ [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), MaxRetries]),
+ State;
+
+maybe_retry_replication(RepState, Error, State) ->
+ #rep_state{
+ rep = #rep{id = RepId, doc_id = DocId} = Rep
+ } = RepState,
+ #rep_state{wait = Wait} = NewRepState = state_after_error(RepState),
+ true = ets:insert(?REP_TO_STATE, {RepId, NewRepState}),
+ ?LOG_ERROR("Error in replication `~s` (triggered by document `~s`): ~s"
+ "~nRestarting replication in ~p seconds.",
+ [pp_rep_id(RepId), DocId, to_binary(error_reason(Error)), Wait]),
+ Pid = spawn_link(fun() -> start_replication(Rep, Wait) end),
+ State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}.
+
+
+stop_all_replications() ->
+ ?LOG_INFO("Stopping all ongoing replications because the replicator"
+ " database was deleted or changed", []),
+ ets:foldl(
+ fun({_, RepId}, _) ->
+ couch_replicator:cancel_replication(RepId)
+ end,
+ ok, ?DOC_TO_REP),
+ true = ets:delete_all_objects(?REP_TO_STATE),
+ true = ets:delete_all_objects(?DOC_TO_REP).
+
+
+update_rep_doc(RepDocId, KVs) ->
+ {ok, RepDb} = ensure_rep_db_exists(),
+ try
+ case couch_db:open_doc(RepDb, RepDocId, [ejson_body]) of
+ {ok, LatestRepDoc} ->
+ update_rep_doc(RepDb, LatestRepDoc, KVs);
+ _ ->
+ ok
+ end
+ catch throw:conflict ->
+ % Shouldn't happen, as by default only the role _replicator can
+ % update replication documents.
+ ?LOG_ERROR("Conflict error when updating replication document `~s`."
+ " Retrying.", [RepDocId]),
+ ok = timer:sleep(5),
+ update_rep_doc(RepDocId, KVs)
+ after
+ couch_db:close(RepDb)
+ end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+ NewRepDocBody = lists:foldl(
+ fun({<<"_replication_state">> = K, State} = KV, Body) ->
+ case get_value(K, Body) of
+ State ->
+ Body;
+ _ ->
+ Body1 = lists:keystore(K, 1, Body, KV),
+ lists:keystore(
+ <<"_replication_state_time">>, 1, Body1,
+ {<<"_replication_state_time">>, timestamp()})
+ end;
+ ({K, _V} = KV, Body) ->
+ lists:keystore(K, 1, Body, KV)
+ end,
+ RepDocBody, KVs),
+ case NewRepDocBody of
+ RepDocBody ->
+ ok;
+ _ ->
+ % Might not succeed - when the replication doc is deleted right
+ % before this update (not an error, ignore).
+ couch_db:update_doc(RepDb, RepDoc#doc{body = {NewRepDocBody}}, [])
+ end.
+
+
+% RFC3339 timestamps.
+% Note: doesn't include the time seconds fraction (RFC3339 says it's optional).
+timestamp() ->
+ {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(now()),
+ UTime = erlang:universaltime(),
+ LocalTime = calendar:universal_time_to_local_time(UTime),
+ DiffSecs = calendar:datetime_to_gregorian_seconds(LocalTime) -
+ calendar:datetime_to_gregorian_seconds(UTime),
+ zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60),
+ iolist_to_binary(
+ io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w~s",
+ [Year, Month, Day, Hour, Min, Sec,
+ zone(DiffSecs div 3600, (DiffSecs rem 3600) div 60)])).
+
+zone(Hr, Min) when Hr >= 0, Min >= 0 ->
+ io_lib:format("+~2..0w:~2..0w", [Hr, Min]);
+zone(Hr, Min) ->
+ io_lib:format("-~2..0w:~2..0w", [abs(Hr), abs(Min)]).
+
+
+
+ensure_rep_db_exists() ->
+ DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+ UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
+ case couch_db:open_int(DbName, [sys_db, {user_ctx, UserCtx}]) of
+ {ok, Db} ->
+ Db;
+ _Error ->
+ {ok, Db} = couch_db:create(DbName, [sys_db, {user_ctx, UserCtx}])
+ end,
+ ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+ {ok, Db}.
+
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+ case couch_db:open_doc(RepDb, DDocID, []) of
+ {ok, _Doc} ->
+ ok;
+ _ ->
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocID},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+ ]}),
+ {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+ end.
+
+
+% pretty-print replication id
+pp_rep_id(#rep{id = RepId}) ->
+ pp_rep_id(RepId);
+pp_rep_id({Base, Extension}) ->
+ Base ++ Extension.
+
+
+rep_state(RepId) ->
+ case ets:lookup(?REP_TO_STATE, RepId) of
+ [{RepId, RepState}] ->
+ RepState;
+ [] ->
+ nil
+ end.
+
+
+error_reason({error, Reason}) ->
+ Reason;
+error_reason(Reason) ->
+ Reason.
+
+
+retries_value("infinity") ->
+ infinity;
+retries_value(Value) ->
+ list_to_integer(Value).
+
+
+state_after_error(#rep_state{retries_left = Left, wait = Wait} = State) ->
+ Wait2 = erlang:min(trunc(Wait * 2), ?MAX_WAIT),
+ case Left of
+ infinity ->
+ State#rep_state{wait = Wait2};
+ _ ->
+ State#rep_state{retries_left = Left - 1, wait = Wait2}
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_replication_notifier.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replication_notifier.erl b/src/couch_replicator/src/couch_replication_notifier.erl
new file mode 100644
index 0000000..c686c2b
--- /dev/null
+++ b/src/couch_replicator/src/couch_replication_notifier.erl
@@ -0,0 +1,57 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replication_notifier).
+
+-behaviour(gen_event).
+
+% public API
+-export([start_link/1, stop/1, notify/1]).
+
+% gen_event callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_event/2, handle_call/2, handle_info/2]).
+
+-include("couch_db.hrl").
+
+start_link(FunAcc) ->
+ couch_event_sup:start_link(couch_replication,
+ {couch_replication_notifier, make_ref()}, FunAcc).
+
+notify(Event) ->
+ gen_event:notify(couch_replication, Event).
+
+stop(Pid) ->
+ couch_event_sup:stop(Pid).
+
+
+init(FunAcc) ->
+ {ok, FunAcc}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+handle_event(Event, Fun) when is_function(Fun, 1) ->
+ Fun(Event),
+ {ok, Fun};
+handle_event(Event, {Fun, Acc}) when is_function(Fun, 2) ->
+ Acc2 = Fun(Event, Acc),
+ {ok, {Fun, Acc2}}.
+
+handle_call(_Msg, State) ->
+ {reply, ok, State}.
+
+handle_info(_Msg, State) ->
+ {ok, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_replicator.app.src
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator.app.src b/src/couch_replicator/src/couch_replicator.app.src
new file mode 100644
index 0000000..70485ee
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator.app.src
@@ -0,0 +1,33 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_replicator, [
+ {description, "CouchDB replicator"},
+ {vsn, "@version@"},
+ {modules, [
+ couch_api_wrap_httpc,
+ couch_api_wrap,
+ couch_api_httpc_pool,
+ couch_httpd_replicator,
+ couch_rep_sup,
+ couch_replication_manager,
+ couch_replication_notifier,
+ couch_replicator_utils,
+ couch_replicator_worker,
+ couch_replicator
+ ]},
+ {registered, [
+ couch_rep_sup
+ ]},
+ {applications, [kernel, stdlib]}
+]}.
+