You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/07/10 18:28:49 UTC
svn commit: r962876 - in /couchdb/branches/new_replicator:
share/www/script/test/new_replication.js src/couchdb/couch_api_wrap.erl
src/couchdb/couch_changes.erl src/couchdb/couch_replicate.erl
Author: fdmanana
Date: Sat Jul 10 16:28:49 2010
New Revision: 962876
URL: http://svn.apache.org/viewvc?rev=962876&view=rev
Log:
Added support for filtered replication (filter function resides in a design doc of the source DB).
Modified:
couchdb/branches/new_replicator/share/www/script/test/new_replication.js
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
couchdb/branches/new_replicator/src/couchdb/couch_changes.erl
couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
Modified: couchdb/branches/new_replicator/share/www/script/test/new_replication.js
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/share/www/script/test/new_replication.js?rev=962876&r1=962875&r2=962876&view=diff
==============================================================================
--- couchdb/branches/new_replicator/share/www/script/test/new_replication.js (original)
+++ couchdb/branches/new_replicator/share/www/script/test/new_replication.js Sat Jul 10 16:28:49 2010
@@ -297,6 +297,57 @@ couchTests.new_replication = function(de
}
+ // test filtered replication
+ docs = makeDocs(1, 31);
+ docs.push({
+ _id: "_design/mydesign",
+ language: "javascript",
+ filters: {
+ myfilter: (function(doc, req) {
+ var modulus = Number(req.query.modulus);
+ var special = req.query.special;
+ return (doc.integer % modulus === 0) || (doc.string === special);
+ }).toString()
+ }
+ });
+
+ for (i = 0; i < dbPairs.length; i++) {
+ populateDb(sourceDb, docs);
+ populateDb(targetDb, []);
+
+ repResult = CouchDB.new_replicate(
+ dbPairs[i].source,
+ dbPairs[i].target,
+ {
+ body: {
+ filter: "mydesign/myfilter",
+ query_params: {
+ modulus: 2,
+ special: "7"
+ }
+ }
+ }
+ );
+
+ T(repResult.ok === true);
+
+ for (j = 0; j < docs.length; j++) {
+ doc = docs[j];
+ copy = targetDb.open(doc._id);
+
+ if ((doc.integer && (doc.integer % 2 === 0)) || (doc.string === "7")) {
+
+ T(copy !== null);
+ for (var p in doc) {
+ T(copy[p] === doc[p]);
+ }
+ } else {
+ T(copy === null);
+ }
+ }
+ }
+
+
// cleanup
sourceDb.deleteDb();
targetDb.deleteDb();
Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=962876&r1=962875&r2=962876&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Sat Jul 10 16:28:49 2010
@@ -43,7 +43,7 @@
get_missing_revs/2,
open_doc_revs/6,
update_doc/4,
- changes_since/5
+ changes_since/6
]).
db_open(Db, Options) ->
@@ -261,13 +261,12 @@ update_doc(#httpdb{} = HttpDb, Doc, Opti
update_doc(Db, Doc, Options, Type) ->
couch_db:update_doc(Db, Doc, Options, Type).
-changes_since(#httpdb{} = HttpDb, Style, StartSeq, UserFun, Acc) ->
+changes_since(#httpdb{} = HttpDb, Style, StartSeq, UserFun, Acc, Options) ->
#httpdb{url=Url, headers=Headers, oauth=OAuth} = HttpDb,
Url2 = Url ++ "_changes",
- QArgs = [
- {"style", atom_to_list(Style)},
- {"since", integer_to_list(StartSeq)}
- ],
+ QArgs = changes_q_args(
+ [{"style", atom_to_list(Style)}, {"since", integer_to_list(StartSeq)}],
+ Options),
Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers,
#url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
{ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
@@ -289,12 +288,60 @@ changes_since(#httpdb{} = HttpDb, Style,
after
catch ibrowse:stop_worker_process(Worker)
end;
-changes_since(Db, Style, StartSeq, UserFun, Acc) ->
- couch_db:changes_since(Db, Style, StartSeq, UserFun, Acc).
+changes_since(Db, Style, StartSeq, UserFun, Acc, Options) ->
+ FilterName = ?b2l(couch_util:get_value(filter, Options, <<>>)),
+ QueryParams = couch_util:get_value(query_params, Options, {[]}),
+ JsonReq = changes_json_req(Db, FilterName, QueryParams),
+ DocFilterFun = couch_changes:doc_info_filter_fun(FilterName, Style,
+ {json_req, JsonReq}, Db),
+ ChangesFun = fun(DocInfo, Acc2) ->
+ DocInfoList = DocFilterFun(DocInfo),
+ Acc3 = lists:foldl(UserFun, Acc2, DocInfoList),
+ {ok, Acc3}
+ end,
+ couch_db:changes_since(Db, Style, StartSeq, ChangesFun, Acc).
% internal functions
+changes_q_args(BaseQS, Options) ->
+ case couch_util:get_value(filter, Options) of
+ undefined ->
+ BaseQS;
+ FilterName ->
+ {Params} = couch_util:get_value(query_params, Options, {[]}),
+ [{"filter", ?b2l(FilterName)} | lists:foldl(
+ fun({K, V}, QSAcc) ->
+ Ks = couch_util:to_list(K),
+ case lists:keymember(Ks, 1, QSAcc) of
+ true ->
+ QSAcc;
+ false ->
+ [{Ks, couch_util:to_list(V)} | QSAcc]
+ end
+ end,
+ BaseQS, Params)]
+ end.
+
+changes_json_req(_Db, "", _QueryParams) ->
+ {[]};
+changes_json_req(Db, FilterName, {QueryParams}) ->
+ {ok, Info} = couch_db:get_db_info(Db),
+ % simulate a request to db_name/_changes
+ {[
+ {<<"info">>, {Info}},
+ {<<"id">>, null},
+ {<<"method">>, 'GET'},
+ {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
+ {<<"query">>, {[{<<"filter">>, FilterName} | QueryParams]}},
+ {<<"headers">>, []},
+ {<<"body">>, []},
+ {<<"peer">>, <<"replicator">>},
+ {<<"form">>, []},
+ {<<"cookie">>, []},
+ {<<"userCtx">>, couch_util:json_user_ctx(Db)}
+ ]}.
+
options_to_query_args([], Acc) ->
lists:reverse(Acc);
options_to_query_args([delay_commit | Rest], Acc) ->
Modified: couchdb/branches/new_replicator/src/couchdb/couch_changes.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_changes.erl?rev=962876&r1=962875&r2=962876&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_changes.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_changes.erl Sat Jul 10 16:28:49 2010
@@ -14,6 +14,7 @@
-include("couch_db.hrl").
-export([handle_changes/3]).
+-export([doc_info_filter_fun/4]).
%% @type Req -> #httpd{} | {json_req, JsonObj()}
handle_changes(#changes_args{style=Style}=Args1, Req, Db) ->
@@ -74,47 +75,73 @@ handle_changes(#changes_args{style=Style
%% @type Req -> #httpd{} | {json_req, JsonObj()}
make_filter_fun(FilterName, Style, Req, Db) ->
- case [list_to_binary(couch_httpd:unquote(Part))
- || Part <- string:tokens(FilterName, "/")] of
+ DocInfoFilter = doc_info_filter_fun(FilterName, Style, Req, Db),
+ fun(DocInfo) ->
+ DocInfoList = DocInfoFilter(DocInfo),
+ [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} ||
+ #doc_info{revs=[#rev_info{rev=R} | _]} <- DocInfoList]
+ end.
+
+doc_info_filter_fun(FilterName, Style, Req, Db) ->
+ case [?l2b(couch_httpd:unquote(Part))
+ || Part <- string:tokens(FilterName, "/")] of
[] ->
- fun(#doc_info{revs=[#rev_info{rev=Rev}|_]=Revs}) ->
+ fun(#doc_info{revs=Revs} = DocInfo) ->
case Style of
main_only ->
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+ [DocInfo];
all_docs ->
- [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
- || #rev_info{rev=R} <- Revs]
+ [DocInfo#doc_info{revs=[RevInfo]} || RevInfo <- Revs]
end
end;
- [DName, FName] ->
- DesignId = <<"_design/", DName/binary>>,
+ [DDocName, FName] ->
+ DesignId = <<"_design/", DDocName/binary>>,
DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
+ {ok, DDoc} = couch_db:open_doc(Db, <<"_design/", DDocName/binary>>),
% validate that the ddoc has the filter fun
#doc{body={Props}} = DDoc,
couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
fun(DocInfo) ->
- DocInfos =
- case Style of
- main_only ->
- [DocInfo];
- all_docs ->
- [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
- end,
- Docs = [Doc || {ok, Doc} <- [
- couch_db:open_doc(Db, DocInfo2, [deleted, conflicts])
- || DocInfo2 <- DocInfos]],
- {ok, Passes} = couch_query_servers:filter_docs(
- Req, Db, DDoc, FName, Docs
- ),
- [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
- || {Pass, #doc{revs={RevPos,[RevId|_]}}}
- <- lists:zip(Passes, Docs), Pass == true]
+ apply_filter_docinfo(DocInfo, Style, Req, Db, FName, DDoc)
end;
_Else ->
throw({bad_request,
"filter parameter must be of the form `designname/filtername`"})
end.
+apply_filter_docinfo(DocInfo, Style, Req, Db, FilterName, DDoc) ->
+ DocInfos = case Style of
+ main_only ->
+ [DocInfo];
+ all_docs ->
+ [DocInfo#doc_info{revs=[Rev]} || Rev <- DocInfo#doc_info.revs]
+ end,
+ {_, PosDocInfoDict, Docs} = lists:foldl(
+ fun(DI, {P, Dict, Acc}) ->
+ case couch_db:open_doc(Db, DI, [deleted, conflicts]) of
+ {ok, Doc} ->
+ {P + 1, dict:store(P, DI, Dict), [Doc | Acc]};
+ _ ->
+ {P, Dict, Acc}
+ end
+ end,
+ {1, dict:new(), []},
+ DocInfos
+ ),
+ {ok, Passes} = couch_query_servers:filter_docs(
+ Req, Db, DDoc, FilterName, lists:reverse(Docs)
+ ),
+ {_, Filtered} = lists:foldl(
+ fun(true, {P, Acc}) ->
+ {P + 1, [dict:fetch(P, PosDocInfoDict) | Acc]};
+ (_, {P, Acc}) ->
+ {P + 1, Acc}
+ end,
+ {1, []},
+ Passes
+ ),
+ lists:reverse(Filtered).
+
get_changes_timeout(Args, Callback) ->
#changes_args{
heartbeat = Heartbeat,
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=962876&r1=962875&r2=962876&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Sat Jul 10 16:28:49 2010
@@ -59,7 +59,7 @@ start(Src, Tgt, Options, UserCtx) ->
% this is starts the _changes reader process. It adds the changes from
% the source db to the ChangesQueue.
- spawn_changes_reader(self(), StartSeq, Source, ChangesQueue),
+ spawn_changes_reader(self(), StartSeq, Source, ChangesQueue, Options),
% this starts the missing revs finder, it checks the target for changes
% in the ChangesQueue to see if they exist on the target or not. If not,
@@ -121,16 +121,15 @@ init_state(Src,Tgt,Options,UserCtx)->
self(), timed_checkpoint)}.
-spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue) ->
+spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue, Options) ->
spawn_link(
fun()->
couch_api_wrap:changes_since(Source, all_docs, StartSeq,
fun(#doc_info{high_seq=Seq, revs=Revs} = DocInfo, _) ->
Cp ! {seq_start, {Seq, length(Revs)}},
Cp ! {add_stat, {#stats.missing_checked, length(Revs)}},
- ok = couch_work_queue:queue(ChangesQueue, DocInfo),
- {ok, ok}
- end, ok),
+ ok = couch_work_queue:queue(ChangesQueue, DocInfo)
+ end, ok, Options),
couch_work_queue:close(ChangesQueue)
end).