You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/09/04 15:13:24 UTC

svn commit: r992596 - in /couchdb/branches/new_replicator: share/www/script/test/new_replication.js src/couchdb/couch_api_wrap.erl src/couchdb/couch_db.erl src/couchdb/couch_httpd_rep.erl src/couchdb/couch_replicate.erl

Author: fdmanana
Date: Sat Sep  4 13:13:23 2010
New Revision: 992596

URL: http://svn.apache.org/viewvc?rev=992596&view=rev
Log:
New replicator: add support for continuous replication.

Modified:
    couchdb/branches/new_replicator/share/www/script/test/new_replication.js
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_db.erl
    couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl

Modified: couchdb/branches/new_replicator/share/www/script/test/new_replication.js
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/share/www/script/test/new_replication.js?rev=992596&r1=992595&r2=992596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/share/www/script/test/new_replication.js (original)
+++ couchdb/branches/new_replicator/share/www/script/test/new_replication.js Sat Sep  4 13:13:23 2010
@@ -104,6 +104,28 @@ couchTests.new_replication = function(de
   }
 
 
+  function waitForSeq(sourceDb, targetDb) {
+    var targetSeq,
+        sourceSeq = sourceDb.info().update_seq,
+        t0 = new Date(),
+        t1,
+        ms = 1000;
+
+    do {
+      targetSeq = targetDb.info().update_seq;
+      t1 = new Date();
+    } while (((t1 - t0) <= ms) && targetSeq < sourceSeq);
+  }
+
+
+  function wait(ms) {
+    var t0 = new Date(), t1;
+    do {
+      CouchDB.request("GET", "/");
+      t1 = new Date();
+    } while ((t1 - t0) <= ms);
+  }
+
 
   // test simple replications (not continuous, not filtered), including
   // conflict creation
@@ -741,6 +763,215 @@ couchTests.new_replication = function(de
   }
 
 
+  docs = makeDocs(1, 25);
+  docs.push({
+    _id: "_design/foo",
+    language: "javascript"
+  });
+
+  for (i = 0; i < dbPairs.length; i++) {
+    populateDb(sourceDb, docs);
+    populateDb(targetDb, []);
+
+    // add some attachments
+    for (j = 10; j < 15; j++) {
+      addAtt(sourceDb, docs[j], "readme.txt", att1_data, "text/plain");
+    }
+
+    repResult = CouchDB.new_replicate(
+      dbPairs[i].source,
+      dbPairs[i].target,
+      {
+        body: {
+          continuous: true
+        }
+      }
+    );
+    T(repResult.ok === true);
+    T(typeof repResult._local_id === "string");
+
+    var rep_id = repResult._local_id;
+
+    waitForSeq(sourceDb, targetDb);
+
+    for (j = 0; j < docs.length; j++) {
+      doc = docs[j];
+      copy = targetDb.open(doc._id);
+
+      T(copy !== null);
+      T(compareObjects(doc, copy) === true);
+
+      if (j >= 10 && j < 15) {
+        var atts = copy._attachments;
+        T(typeof atts === "object");
+        T(typeof atts["readme.txt"] === "object");
+        T(atts["readme.txt"].revpos === 2);
+        T(atts["readme.txt"].content_type.indexOf("text/plain") === 0);
+        T(atts["readme.txt"].stub === true);
+
+        var att_copy = CouchDB.request(
+          "GET", "/" + targetDb.name + "/" + copy._id + "/readme.txt"
+        ).responseText;
+        T(att_copy.length === att1_data.length);
+        T(att_copy === att1_data);
+      }
+    }
+
+    sourceInfo = sourceDb.info();
+    targetInfo = targetDb.info();
+
+    T(sourceInfo.doc_count === targetInfo.doc_count);
+
+    // add attachments to docs in source
+    for (j = 10; j < 15; j++) {
+      addAtt(sourceDb, docs[j], "data.dat", att2_data, "application/binary");
+    }
+
+    var ddoc = docs[docs.length - 1]; // design doc
+    addAtt(sourceDb, ddoc, "readme.txt", att1_data, "text/plain");
+
+    waitForSeq(sourceDb, targetDb);
+
+    var modifDocs = docs.slice(10, 15).concat([ddoc]);
+    for (j = 0; j < modifDocs.length; j++) {
+      doc = modifDocs[j];
+      copy = targetDb.open(doc._id);
+
+      T(copy !== null);
+      T(compareObjects(doc, copy) === true);
+
+      var atts = copy._attachments;
+      T(typeof atts === "object");
+      T(typeof atts["readme.txt"] === "object");
+      T(atts["readme.txt"].revpos === 2);
+      T(atts["readme.txt"].content_type.indexOf("text/plain") === 0);
+      T(atts["readme.txt"].stub === true);
+
+      var att1_copy = CouchDB.request(
+        "GET", "/" + targetDb.name + "/" + copy._id + "/readme.txt"
+      ).responseText;
+      T(att1_copy.length === att1_data.length);
+      T(att1_copy === att1_data);
+
+      if (doc._id.indexOf("_design/") === -1) {
+        T(typeof atts["data.dat"] === "object");
+        T(atts["data.dat"].revpos === 3);
+        T(atts["data.dat"].content_type.indexOf("application/binary") === 0);
+        T(atts["data.dat"].stub === true);
+
+        var att2_copy = CouchDB.request(
+          "GET", "/" + targetDb.name + "/" + copy._id + "/data.dat"
+        ).responseText;
+        T(att2_copy.length === att2_data.length);
+        T(att2_copy === att2_data);
+      }
+    }
+
+    sourceInfo = sourceDb.info();
+    targetInfo = targetDb.info();
+
+    T(sourceInfo.doc_count === targetInfo.doc_count);
+
+    // add another attachment to the ddoc on source
+    addAtt(sourceDb, ddoc, "data.dat", att2_data, "application/binary");
+
+    waitForSeq(sourceDb, targetDb);
+
+    copy = targetDb.open(ddoc._id);
+    var atts = copy._attachments;
+    T(typeof atts === "object");
+    T(typeof atts["readme.txt"] === "object");
+    T(atts["readme.txt"].revpos === 2);
+    T(atts["readme.txt"].content_type.indexOf("text/plain") === 0);
+    T(atts["readme.txt"].stub === true);
+
+    var att1_copy = CouchDB.request(
+      "GET", "/" + targetDb.name + "/" + copy._id + "/readme.txt"
+    ).responseText;
+    T(att1_copy.length === att1_data.length);
+    T(att1_copy === att1_data);
+
+    T(typeof atts["data.dat"] === "object");
+    T(atts["data.dat"].revpos === 3);
+    T(atts["data.dat"].content_type.indexOf("application/binary") === 0);
+    T(atts["data.dat"].stub === true);
+
+    var att2_copy = CouchDB.request(
+      "GET", "/" + targetDb.name + "/" + copy._id + "/data.dat"
+    ).responseText;
+    T(att2_copy.length === att2_data.length);
+    T(att2_copy === att2_data);
+
+    sourceInfo = sourceDb.info();
+    targetInfo = targetDb.info();
+
+    T(sourceInfo.doc_count === targetInfo.doc_count);
+
+
+    // add more docs to source
+    var newDocs = makeDocs(25, 35);
+    populateDb(sourceDb, newDocs, true);
+
+    waitForSeq(sourceDb, targetDb);
+
+    for (j = 0; j < newDocs.length; j++) {
+      doc = newDocs[j];
+      copy = targetDb.open(doc._id);
+
+      T(copy !== null);
+      T(compareObjects(doc, copy) === true);
+    }
+
+    sourceInfo = sourceDb.info();
+    targetInfo = targetDb.info();
+
+    T(sourceInfo.doc_count === targetInfo.doc_count);
+
+    // delete docs from source
+    T(sourceDb.deleteDoc(newDocs[0]).ok);
+    T(sourceDb.deleteDoc(newDocs[6]).ok);
+
+    waitForSeq(sourceDb, targetDb);
+
+    copy = targetDb.open(newDocs[0]._id);
+    T(copy === null);
+    copy = targetDb.open(newDocs[6]._id);
+    T(copy === null);
+
+    var changes = targetDb.changes({since: targetInfo.update_seq});
+    var line1 = changes.results[changes.results.length - 2];
+    var line2 = changes.results[changes.results.length - 1];
+    T(line1.id === newDocs[0]._id);
+    T(line1.deleted === true);
+    T(line2.id === newDocs[6]._id);
+    T(line2.deleted === true);
+
+    // cancel the replication
+    repResult = CouchDB.new_replicate(
+      dbPairs[i].source,
+      dbPairs[i].target,
+      {
+        body: {
+          continuous: true,
+          cancel: true
+        }
+      }
+    );
+    T(repResult.ok === true);
+    T(repResult._local_id === rep_id);
+
+    doc = {
+      _id: 'foobar',
+      value: 666
+    };
+    T(sourceDb.save(doc).ok);
+
+    wait(2000);
+    copy = targetDb.open(doc._id);
+    T(copy === null);
+  }
+
+
   //
   // test replication triggered by non admins
   //

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=992596&r1=992595&r2=992596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Sat Sep  4 13:13:23 2010
@@ -34,6 +34,7 @@
 -export([
     db_open/2,
     db_open/3,
+    maybe_reopen_db/2,
     db_close/1,
     get_db_info/1,
     update_doc/3,
@@ -254,6 +255,18 @@ open_doc(Db, Id, Options) ->
     couch_db:open_doc(Db, Id, Options).
 
 
+maybe_reopen_db(#httpdb{} = Db, _TargetSeq) ->
+    Db;
+maybe_reopen_db(#db{update_seq = UpSeq, main_pid = Pid} = Db, TargetSeq) ->
+    case TargetSeq > UpSeq of
+    true ->
+        {ok, Db2} = gen_server:call(Pid, get_db, infinity),
+        Db2;
+    false ->
+        Db
+    end.
+
+
 update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
     QArgs = case Type of
     replicated_changes ->
@@ -320,10 +333,15 @@ changes_since(#httpdb{} = HttpDb, Style,
         [{path, "_changes"}, {qs, QArgs},
             {ibrowse_options, [{stream_to, {self(), once}}]}],
         fun(200, _, DataStreamFun) ->
-            EventFun = fun(Ev) ->
-                changes_ev1(Ev, fun(DocInfo, _Acc) -> UserFun(DocInfo) end, [])
-            end,
-            json_stream_parse:events(DataStreamFun, EventFun)
+            case couch_util:get_value(continuous, Options, false) of
+            true ->
+                continuous_changes(DataStreamFun, UserFun);
+            false ->
+                EventFun = fun(Ev) ->
+                    changes_ev1(Ev, fun(DocInfo, _) -> UserFun(DocInfo) end, [])
+                end,
+                json_stream_parse:events(DataStreamFun, EventFun)
+            end
         end);
 changes_since(Db, Style, StartSeq, UserFun, Options) ->
     Args = #changes_args{
@@ -367,6 +385,12 @@ changes_q_args(BaseQS, Options) ->
                 end
             end,
             BaseQS, Params)]
+    end ++
+    case get_value(continuous, Options, false) of
+    false ->
+        [{"feed", "normal"}];
+    true ->
+        [{"feed", "continuous"}, {"heartbeat", "10000"}]
     end.
 
 changes_json_req(_Db, "", _QueryParams) ->
@@ -505,6 +529,18 @@ changes_ev_loop(array_end, _UserFun, _Us
 changes_ev_done() ->
     fun(_Ev) -> changes_ev_done() end.
 
+continuous_changes(DataFun, UserFun) ->
+    {DataFun2, _, Rest} = json_stream_parse:events(
+        DataFun,
+        fun(Ev) -> parse_changes_line(Ev, UserFun) end),
+    continuous_changes(fun() -> {Rest, DataFun2} end, UserFun).
+
+parse_changes_line(object_start, UserFun) ->
+    fun(Ev) ->
+        json_stream_parse:collect_object(Ev,
+            fun(Obj) -> UserFun(json_to_doc_info(Obj)) end)
+    end.
+
 json_to_doc_info({Props}) ->
     RevsInfo = lists:map(
         fun({Change}) ->

Modified: couchdb/branches/new_replicator/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_db.erl?rev=992596&r1=992595&r2=992596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_db.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_db.erl Sat Sep  4 13:13:23 2010
@@ -987,7 +987,9 @@ handle_call({db_updated, NewDb}, _From, 
         couch_ref_counter:add(NewRefCntr),
         couch_ref_counter:drop(OldRefCntr)
     end,
-    {reply, ok, NewDb}.
+    {reply, ok, NewDb};
+handle_call(get_db, _From, Db) ->
+    {reply, {ok, Db}, Db}.
 
 
 handle_cast(Msg, Db) ->

Modified: couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl?rev=992596&r1=992595&r2=992596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Sat Sep  4 13:13:23 2010
@@ -43,6 +43,8 @@ handle_req(#httpd{method='POST'} = Req) 
         end;
     {ok, {cancelled, RepId}} ->
         send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
+    {ok, {continuous, RepId}} ->
+        send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
     {ok, {HistoryResults}} ->
         send_json(Req, {[{ok, true} | HistoryResults]})
     catch

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=992596&r1=992595&r2=992596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Sat Sep  4 13:13:23 2010
@@ -91,11 +91,16 @@ do_replication_loop(RepId, Src, Tgt, Opt
     end,
     do_replication_loop(RepId, Src, Tgt, Options, UserCtx, Seq).
 
-do_replication_loop(RepId, Src, Tgt, Options, UserCtx, FinalSeq) ->
+do_replication_loop({BaseId, _} = RepId, Src, Tgt, Options, UserCtx, Seq) ->
     case start_replication(RepId, Src, Tgt, Options, UserCtx) of
     {ok, _Pid} ->
-        Result = wait_for_result(RepId),
-        maybe_retry(Result, RepId, Src, Tgt, Options, UserCtx, FinalSeq);
+        case couch_util:get_value(continuous, Options, false) of
+        true ->
+            {ok, {continuous, ?l2b(BaseId)}};
+        false ->
+            Result = wait_for_result(RepId),
+            maybe_retry(Result, RepId, Src, Tgt, Options, UserCtx, Seq)
+        end;
     Error ->
         Error
     end.
@@ -611,17 +616,16 @@ doc_copy_loop(CopierId, Cp, Source, Targ
     {ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
         ?LOG_DEBUG("Doc copier ~p got {~p, ~p, ~p, ~p}",
             [CopierId, Id, Revs, PossibleAncestors, Seq]),
+        Source2 = couch_api_wrap:maybe_reopen_db(Source, Seq),
         couch_api_wrap:open_doc_revs(
-            Source, Id, Revs, [{atts_since, PossibleAncestors}],
+            Source2, Id, Revs, [{atts_since, PossibleAncestors}],
             fun(R, _) -> doc_handler(R, Target, Cp) end, []),
         Cp ! {seq_changes_done, {Seq, length(Revs)}},
-        doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue)
+        doc_copy_loop(CopierId, Cp, Source2, Target, MissingRevsQueue)
     end.
 
 doc_handler({ok, Doc}, Target, Cp) ->
-    % we are called for every rev read on the source
     Cp ! {add_stat, {#stats.docs_read, 1}},
-    % now write the doc to the target.
     case couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
     {ok, _} ->
         Cp ! {add_stat, {#stats.docs_written, 1}};