You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/07/10 18:28:49 UTC

svn commit: r962876 - in /couchdb/branches/new_replicator: share/www/script/test/new_replication.js src/couchdb/couch_api_wrap.erl src/couchdb/couch_changes.erl src/couchdb/couch_replicate.erl

Author: fdmanana
Date: Sat Jul 10 16:28:49 2010
New Revision: 962876

URL: http://svn.apache.org/viewvc?rev=962876&view=rev
Log:
Added support for filtered replication (filter function resides in a design doc of the source DB).

Modified:
    couchdb/branches/new_replicator/share/www/script/test/new_replication.js
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_changes.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl

Modified: couchdb/branches/new_replicator/share/www/script/test/new_replication.js
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/share/www/script/test/new_replication.js?rev=962876&r1=962875&r2=962876&view=diff
==============================================================================
--- couchdb/branches/new_replicator/share/www/script/test/new_replication.js (original)
+++ couchdb/branches/new_replicator/share/www/script/test/new_replication.js Sat Jul 10 16:28:49 2010
@@ -297,6 +297,57 @@ couchTests.new_replication = function(de
   }
 
 
+  // test filtered replication
+  docs = makeDocs(1, 31);
+  docs.push({
+    _id: "_design/mydesign",
+    language: "javascript",
+    filters: {
+      myfilter: (function(doc, req) {
+        var modulus = Number(req.query.modulus);
+        var special = req.query.special;
+        return (doc.integer % modulus === 0) || (doc.string === special);
+      }).toString()
+    }
+  });
+
+  for (i = 0; i < dbPairs.length; i++) {
+    populateDb(sourceDb, docs);
+    populateDb(targetDb, []);
+
+    repResult = CouchDB.new_replicate(
+      dbPairs[i].source,
+      dbPairs[i].target,
+      {
+        body: {
+          filter: "mydesign/myfilter",
+          query_params: {
+            modulus: 2,
+            special: "7"
+          }
+        }
+      }
+    );
+
+    T(repResult.ok === true);
+
+    for (j = 0; j < docs.length; j++) {
+      doc = docs[j];
+      copy = targetDb.open(doc._id);
+
+      if ((doc.integer && (doc.integer % 2 === 0)) || (doc.string === "7")) {
+
+        T(copy !== null);
+        for (var p in doc) {
+          T(copy[p] === doc[p]);
+        }
+      } else {
+        T(copy === null);
+      }
+    }
+  }
+
+
   // cleanup
   sourceDb.deleteDb();
   targetDb.deleteDb();

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=962876&r1=962875&r2=962876&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Sat Jul 10 16:28:49 2010
@@ -43,7 +43,7 @@
     get_missing_revs/2,
     open_doc_revs/6,
     update_doc/4,
-    changes_since/5
+    changes_since/6
     ]).
 
 db_open(Db, Options) ->
@@ -261,13 +261,12 @@ update_doc(#httpdb{} = HttpDb, Doc, Opti
 update_doc(Db, Doc, Options, Type) ->
     couch_db:update_doc(Db, Doc, Options, Type).
 
-changes_since(#httpdb{} = HttpDb, Style, StartSeq, UserFun, Acc) ->
+changes_since(#httpdb{} = HttpDb, Style, StartSeq, UserFun, Acc, Options) ->
     #httpdb{url=Url, headers=Headers, oauth=OAuth} = HttpDb,
     Url2 = Url ++ "_changes",
-    QArgs = [
-        {"style", atom_to_list(Style)},
-        {"since", integer_to_list(StartSeq)}
-    ],
+    QArgs = changes_q_args(
+        [{"style", atom_to_list(Style)}, {"since", integer_to_list(StartSeq)}],
+        Options),
     Headers2 = oauth_header(Url2, QArgs, get, OAuth) ++ Headers,        
     #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
     {ok, Worker} = ibrowse:spawn_link_worker_process(Host, Port),
@@ -289,12 +288,60 @@ changes_since(#httpdb{} = HttpDb, Style,
     after
         catch ibrowse:stop_worker_process(Worker)
     end;
-changes_since(Db, Style, StartSeq, UserFun, Acc) ->
-    couch_db:changes_since(Db, Style, StartSeq, UserFun, Acc).
+changes_since(Db, Style, StartSeq, UserFun, Acc, Options) ->
+    FilterName = ?b2l(couch_util:get_value(filter, Options, <<>>)),
+    QueryParams = couch_util:get_value(query_params, Options, {[]}),
+    JsonReq = changes_json_req(Db, FilterName, QueryParams),
+    DocFilterFun = couch_changes:doc_info_filter_fun(FilterName, Style,
+        {json_req, JsonReq}, Db),
+    ChangesFun = fun(DocInfo, Acc2) ->
+        DocInfoList = DocFilterFun(DocInfo),
+        Acc3 = lists:foldl(UserFun, Acc2, DocInfoList),
+        {ok, Acc3}
+    end,
+    couch_db:changes_since(Db, Style, StartSeq, ChangesFun, Acc).
 
 
 % internal functions
 
+changes_q_args(BaseQS, Options) ->
+    case couch_util:get_value(filter, Options) of
+    undefined ->
+        BaseQS;
+    FilterName ->
+        {Params} = couch_util:get_value(query_params, Options, {[]}),
+        [{"filter", ?b2l(FilterName)} | lists:foldl(
+            fun({K, V}, QSAcc) ->
+                Ks = couch_util:to_list(K),
+                case lists:keymember(Ks, 1, QSAcc) of
+                true ->
+                    QSAcc;
+                false ->
+                    [{Ks, couch_util:to_list(V)} | QSAcc]
+                end
+            end,
+            BaseQS, Params)]
+    end.
+
+changes_json_req(_Db, "", _QueryParams) ->
+    {[]};
+changes_json_req(Db, FilterName, {QueryParams}) ->
+    {ok, Info} = couch_db:get_db_info(Db),
+    % simulate a request to db_name/_changes
+    {[
+        {<<"info">>, {Info}},
+        {<<"id">>, null},
+        {<<"method">>, 'GET'},
+        {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
+        {<<"query">>, {[{<<"filter">>, FilterName} | QueryParams]}},
+        {<<"headers">>, []},
+        {<<"body">>, []},
+        {<<"peer">>, <<"replicator">>},
+        {<<"form">>, []},
+        {<<"cookie">>, []},
+        {<<"userCtx">>, couch_util:json_user_ctx(Db)}
+    ]}.
+
 options_to_query_args([], Acc) ->
     lists:reverse(Acc);
 options_to_query_args([delay_commit | Rest], Acc) ->

Modified: couchdb/branches/new_replicator/src/couchdb/couch_changes.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_changes.erl?rev=962876&r1=962875&r2=962876&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_changes.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_changes.erl Sat Jul 10 16:28:49 2010
@@ -14,6 +14,7 @@
 -include("couch_db.hrl").
 
 -export([handle_changes/3]).
+-export([doc_info_filter_fun/4]).
 
 %% @type Req -> #httpd{} | {json_req, JsonObj()}
 handle_changes(#changes_args{style=Style}=Args1, Req, Db) ->
@@ -74,47 +75,73 @@ handle_changes(#changes_args{style=Style
 
 %% @type Req -> #httpd{} | {json_req, JsonObj()}
 make_filter_fun(FilterName, Style, Req, Db) ->
-    case [list_to_binary(couch_httpd:unquote(Part))
-            || Part <- string:tokens(FilterName, "/")] of
+    DocInfoFilter = doc_info_filter_fun(FilterName, Style, Req, Db),
+    fun(DocInfo) ->
+         DocInfoList = DocInfoFilter(DocInfo),
+         [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} ||
+            #doc_info{revs=[#rev_info{rev=R} | _]} <- DocInfoList]
+    end.
+
+doc_info_filter_fun(FilterName, Style, Req, Db) ->
+    case [?l2b(couch_httpd:unquote(Part))
+        || Part <- string:tokens(FilterName, "/")] of
     [] ->
-        fun(#doc_info{revs=[#rev_info{rev=Rev}|_]=Revs}) ->
+        fun(#doc_info{revs=Revs} = DocInfo) ->
             case Style of
             main_only ->
-                [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+                [DocInfo];
             all_docs ->
-                [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
-                        || #rev_info{rev=R} <- Revs]
+                [DocInfo#doc_info{revs=[RevInfo]} || RevInfo <- Revs]
             end
         end;
-    [DName, FName] ->
-        DesignId = <<"_design/", DName/binary>>,
+    [DDocName, FName] ->
+        DesignId = <<"_design/", DDocName/binary>>,
         DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
+        {ok, DDoc} = couch_db:open_doc(Db, <<"_design/", DDocName/binary>>),
         % validate that the ddoc has the filter fun
         #doc{body={Props}} = DDoc,
         couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
         fun(DocInfo) ->
-            DocInfos =
-            case Style of
-            main_only ->
-                [DocInfo];
-            all_docs ->
-                [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
-            end,
-            Docs = [Doc || {ok, Doc} <- [
-                    couch_db:open_doc(Db, DocInfo2, [deleted, conflicts])
-                        || DocInfo2 <- DocInfos]],
-            {ok, Passes} = couch_query_servers:filter_docs(
-                Req, Db, DDoc, FName, Docs
-            ),
-            [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
-                || {Pass, #doc{revs={RevPos,[RevId|_]}}}
-                <- lists:zip(Passes, Docs), Pass == true]
+            apply_filter_docinfo(DocInfo, Style, Req, Db, FName, DDoc)
         end;
     _Else ->
         throw({bad_request,
             "filter parameter must be of the form `designname/filtername`"})
     end.
 
+apply_filter_docinfo(DocInfo, Style, Req, Db, FilterName, DDoc) ->
+    DocInfos = case Style of
+    main_only ->
+        [DocInfo];
+    all_docs ->
+        [DocInfo#doc_info{revs=[Rev]} || Rev <- DocInfo#doc_info.revs]
+    end,
+    {_, PosDocInfoDict, Docs} = lists:foldl(
+        fun(DI, {P, Dict, Acc}) ->
+            case couch_db:open_doc(Db, DI, [deleted, conflicts]) of
+            {ok, Doc} ->
+                {P + 1, dict:store(P, DI, Dict), [Doc | Acc]};
+            _ ->
+                {P, Dict, Acc}
+            end
+        end,
+        {1, dict:new(), []},
+        DocInfos
+    ),
+    {ok, Passes} = couch_query_servers:filter_docs(
+        Req, Db, DDoc, FilterName, lists:reverse(Docs)
+    ),
+    {_, Filtered} = lists:foldl(
+        fun(true, {P, Acc}) ->
+            {P + 1, [dict:fetch(P, PosDocInfoDict) | Acc]};
+        (_, {P, Acc}) ->
+            {P + 1, Acc}
+        end,
+        {1, []},
+        Passes
+    ),
+    lists:reverse(Filtered).
+
 get_changes_timeout(Args, Callback) ->
     #changes_args{
         heartbeat = Heartbeat,

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=962876&r1=962875&r2=962876&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Sat Jul 10 16:28:49 2010
@@ -59,7 +59,7 @@ start(Src, Tgt, Options, UserCtx) ->
     
     % this is starts the _changes reader process. It adds the changes from
     % the source db to the ChangesQueue.
-    spawn_changes_reader(self(), StartSeq, Source, ChangesQueue),
+    spawn_changes_reader(self(), StartSeq, Source, ChangesQueue, Options),
     
     % this starts the missing revs finder, it checks the target for changes
     % in the ChangesQueue to see if they exist on the target or not. If not, 
@@ -121,16 +121,15 @@ init_state(Src,Tgt,Options,UserCtx)->   
             self(), timed_checkpoint)}.
 
 
-spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue) ->
+spawn_changes_reader(Cp, StartSeq, Source, ChangesQueue, Options) ->
     spawn_link(
         fun()->
             couch_api_wrap:changes_since(Source, all_docs, StartSeq,
                 fun(#doc_info{high_seq=Seq, revs=Revs} = DocInfo, _) ->
                     Cp ! {seq_start, {Seq, length(Revs)}},
                     Cp ! {add_stat, {#stats.missing_checked, length(Revs)}},
-                    ok = couch_work_queue:queue(ChangesQueue, DocInfo),
-                    {ok, ok}
-                end, ok),
+                    ok = couch_work_queue:queue(ChangesQueue, DocInfo)
+                end, ok, Options),
             couch_work_queue:close(ChangesQueue)
         end).