You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/13 01:18:54 UTC
[12/12] couch-replicator commit: updated refs/heads/1994-merge-rcouch
to c8f517d
couch_replicator: add replication using changes in a view
Instead of a database, the replicator can now filter the documents using
a view index. All documents having a key emitted in the view can be
replicated.
View parameters can be used. Which means that you can replicate results
corresponding to a key in a view or a range.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/09d7a60c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/09d7a60c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/09d7a60c
Branch: refs/heads/1994-merge-rcouch
Commit: 09d7a60c29e4e800b6e80efd2337e9b744ae1229
Parents: 7999202
Author: benoitc <be...@apache.org>
Authored: Sun Feb 9 00:43:23 2014 +0100
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Wed Feb 12 18:16:58 2014 -0600
----------------------------------------------------------------------
src/couch_replicator.erl | 39 ++++++++++++++++++++++++++++++----
src/couch_replicator.hrl | 2 ++
src/couch_replicator_api_wrap.erl | 25 +++++++++++++++++++++-
src/couch_replicator_utils.erl | 27 +++++++++++++++++++++--
4 files changed, 86 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/09d7a60c/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index d470c8a..10aaf37 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -70,7 +70,9 @@
target_monitor = nil,
source_seq = nil,
use_checkpoints = true,
- checkpoint_interval = 5000
+ checkpoint_interval = 5000,
+ type = db,
+ view = nil
}).
@@ -533,7 +535,8 @@ cancel_timer(#rep_state{timer = Timer} = State) ->
init_state(Rep) ->
#rep{
source = Src, target = Tgt,
- options = Options, user_ctx = UserCtx
+ options = Options, user_ctx = UserCtx,
+ type = Type, view = View
} = Rep,
{ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
{ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
@@ -547,6 +550,17 @@ init_state(Rep) ->
{StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
StartSeq1 = get_value(since_seq, Options, StartSeq0),
StartSeq = {0, StartSeq1},
+
+ SourceSeq = case Type of
+ db -> get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ);
+ view ->
+ {DDoc, VName} = View,
+ {ok, VInfo} = couch_replicator_api_wrap:get_view_info(Source, DDoc,
+ VName),
+ get_value(<<"update_seq">>, VInfo, ?LOWEST_SEQ)
+ end,
+
+
#doc{body={CheckpointHistory}} = SourceLog,
State = #rep_state{
rep_details = Rep,
@@ -571,9 +585,12 @@ init_state(Rep) ->
start_db_compaction_notifier(Target, self()),
source_monitor = db_monitor(Source),
target_monitor = db_monitor(Target),
- source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
+ source_seq = SourceSeq,
use_checkpoints = get_value(use_checkpoints, Options, true),
- checkpoint_interval = get_value(checkpoint_interval, Options, 5000)
+ checkpoint_interval = get_value(checkpoint_interval, Options,
+ 5000),
+ type = Type,
+ view = View
},
State#rep_state{timer = start_timer(State)}.
@@ -914,6 +931,20 @@ db_monitor(#db{} = Db) ->
db_monitor(_HttpDb) ->
nil.
+source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq,
+ type = view, view = {DDoc, VName}}) ->
+ case (catch couch_replicator_api_wrap:get_view_info(
+ Db#httpdb{retries = 3}, DDoc, VName)) of
+ {ok, Info} ->
+ get_value(<<"update_seq">>, Info, Seq);
+ _ ->
+ Seq
+ end;
+
+source_cur_seq(#rep_state{source = Db, source_seq = Seq,
+ type = view, view = {DDoc, VName}}) ->
+ {ok, Info} = couch_replicator_api_wrap:get_view_info(Db, DDoc, VName),
+ get_value(<<"update_seq">>, Info, Seq);
source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) ->
case (catch couch_replicator_api_wrap:get_db_info(Db#httpdb{retries = 3})) of
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/09d7a60c/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.hrl b/src/couch_replicator.hrl
index 018aa4b..1eee88e 100644
--- a/src/couch_replicator.hrl
+++ b/src/couch_replicator.hrl
@@ -18,6 +18,8 @@
target,
options,
user_ctx,
+ type = db,
+ view = nil,
doc_id
}).
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/09d7a60c/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index 311025b..1e0e660 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -19,6 +19,7 @@
% Many options and apis aren't yet supported here, they are added as needed.
-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
-include("couch_replicator_api_wrap.hrl").
-export([
@@ -26,6 +27,7 @@
db_open/3,
db_close/1,
get_db_info/1,
+ get_view_info/3,
update_doc/3,
update_doc/4,
update_docs/3,
@@ -121,6 +123,16 @@ get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
{ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
+get_view_info(#httpdb{} = Db, DDocId, ViewName) ->
+ Path = iolist_to_binary([DDocId, "/_view/", ViewName, "/_info"]),
+ send_req(Db, [{path, Path}],
+ fun(200, _, {Props}) ->
+ {ok, Props}
+ end);
+get_view_info(#db{name = DbName}, DDocId, ViewName) ->
+ couch_mrview:get_view_info(DbName, DDocId, ViewName).
+
+
ensure_full_commit(#httpdb{} = Db) ->
send_req(
Db,
@@ -439,7 +451,8 @@ changes_since(Db, Style, StartSeq, UserFun, Options) ->
},
QueryParams = get_value(query_params, Options, {[]}),
Req = changes_json_req(Db, Filter, QueryParams, Options),
- ChangesFeedFun = couch_changes:handle_changes(Args, {json_req, Req}, Db),
+ ChangesFeedFun = couch_httpd_changes:handle_changes(Args, {json_req, Req},
+ Db),
ChangesFeedFun(fun({change, Change, _}, _) ->
UserFun(json_to_doc_info(Change));
(_, _) ->
@@ -454,6 +467,10 @@ maybe_add_changes_filter_q_args(BaseQS, Options) ->
undefined ->
BaseQS;
FilterName ->
+ %% get list of view attributes
+ ViewFields0 = [atom_to_list(F) || F <- record_info(fields, mrargs)],
+ ViewFields = ["key" | ViewFields0],
+
{Params} = get_value(query_params, Options, {[]}),
[{"filter", ?b2l(FilterName)} | lists:foldl(
fun({K, V}, QSAcc) ->
@@ -461,6 +478,12 @@ maybe_add_changes_filter_q_args(BaseQS, Options) ->
case lists:keymember(Ks, 1, QSAcc) of
true ->
QSAcc;
+ false when FilterName =:= <<"_view">> ->
+ V1 = case lists:member(Ks, ViewFields) of
+ true -> ?JSON_ENCODE(V);
+ false -> couch_util:to_list(V)
+ end,
+ [{Ks, V1} | QSAcc];
false ->
[{Ks, couch_util:to_list(V)} | QSAcc]
end
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/09d7a60c/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index 0baddc2..085df43 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -37,13 +37,34 @@ parse_rep_doc({Props}, UserCtx) ->
true ->
{ok, #rep{options = Options, user_ctx = UserCtx}};
false ->
- Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options),
- Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams, Options),
+ Source = parse_rep_db(get_value(<<"source">>, Props),
+ ProxyParams, Options),
+ Target = parse_rep_db(get_value(<<"target">>, Props),
+ ProxyParams, Options),
+
+
+ {RepType, View} = case get_value(<<"filter">>, Props) of
+ <<"_view">> ->
+ {QP} = get_value(query_params, Options, {[]}),
+ ViewParam = get_value(<<"view">>, QP),
+ View1 = case re:split(ViewParam, <<"/">>) of
+ [DName, ViewName] ->
+ {<< "_design/", DName/binary >>, ViewName};
+ _ ->
+ throw({bad_request, "Invalid `view` parameter."})
+ end,
+ {view, View1};
+ _ ->
+ {db, nil}
+ end,
+
Rep = #rep{
source = Source,
target = Target,
options = Options,
user_ctx = UserCtx,
+ type = RepType,
+ view = View,
doc_id = get_value(<<"_id">>, Props, null)
},
{ok, Rep#rep{id = replication_id(Rep)}}
@@ -100,6 +121,8 @@ maybe_append_filters(Base,
DocIds ->
[DocIds]
end;
+ <<"_", _/binary>> = Filter ->
+ [Filter, get_value(query_params, Options, {[]})];
Filter ->
[filter_code(Filter, Source, UserCtx),
get_value(query_params, Options, {[]})]