You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/09/04 15:13:24 UTC
svn commit: r992596 - in /couchdb/branches/new_replicator:
share/www/script/test/new_replication.js src/couchdb/couch_api_wrap.erl
src/couchdb/couch_db.erl src/couchdb/couch_httpd_rep.erl
src/couchdb/couch_replicate.erl
Author: fdmanana
Date: Sat Sep 4 13:13:23 2010
New Revision: 992596
URL: http://svn.apache.org/viewvc?rev=992596&view=rev
Log:
New replicator: add support for continuous replication.
Modified:
couchdb/branches/new_replicator/share/www/script/test/new_replication.js
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
couchdb/branches/new_replicator/src/couchdb/couch_db.erl
couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
Modified: couchdb/branches/new_replicator/share/www/script/test/new_replication.js
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/share/www/script/test/new_replication.js?rev=992596&r1=992595&r2=992596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/share/www/script/test/new_replication.js (original)
+++ couchdb/branches/new_replicator/share/www/script/test/new_replication.js Sat Sep 4 13:13:23 2010
@@ -104,6 +104,28 @@ couchTests.new_replication = function(de
}
+ function waitForSeq(sourceDb, targetDb) {
+ var targetSeq,
+ sourceSeq = sourceDb.info().update_seq,
+ t0 = new Date(),
+ t1,
+ ms = 1000;
+
+ do {
+ targetSeq = targetDb.info().update_seq;
+ t1 = new Date();
+ } while (((t1 - t0) <= ms) && targetSeq < sourceSeq);
+ }
+
+
+ function wait(ms) {
+ var t0 = new Date(), t1;
+ do {
+ CouchDB.request("GET", "/");
+ t1 = new Date();
+ } while ((t1 - t0) <= ms);
+ }
+
// test simple replications (not continuous, not filtered), including
// conflict creation
@@ -741,6 +763,215 @@ couchTests.new_replication = function(de
}
+ docs = makeDocs(1, 25);
+ docs.push({
+ _id: "_design/foo",
+ language: "javascript"
+ });
+
+ for (i = 0; i < dbPairs.length; i++) {
+ populateDb(sourceDb, docs);
+ populateDb(targetDb, []);
+
+ // add some attachments
+ for (j = 10; j < 15; j++) {
+ addAtt(sourceDb, docs[j], "readme.txt", att1_data, "text/plain");
+ }
+
+ repResult = CouchDB.new_replicate(
+ dbPairs[i].source,
+ dbPairs[i].target,
+ {
+ body: {
+ continuous: true
+ }
+ }
+ );
+ T(repResult.ok === true);
+ T(typeof repResult._local_id === "string");
+
+ var rep_id = repResult._local_id;
+
+ waitForSeq(sourceDb, targetDb);
+
+ for (j = 0; j < docs.length; j++) {
+ doc = docs[j];
+ copy = targetDb.open(doc._id);
+
+ T(copy !== null);
+ T(compareObjects(doc, copy) === true);
+
+ if (j >= 10 && j < 15) {
+ var atts = copy._attachments;
+ T(typeof atts === "object");
+ T(typeof atts["readme.txt"] === "object");
+ T(atts["readme.txt"].revpos === 2);
+ T(atts["readme.txt"].content_type.indexOf("text/plain") === 0);
+ T(atts["readme.txt"].stub === true);
+
+ var att_copy = CouchDB.request(
+ "GET", "/" + targetDb.name + "/" + copy._id + "/readme.txt"
+ ).responseText;
+ T(att_copy.length === att1_data.length);
+ T(att_copy === att1_data);
+ }
+ }
+
+ sourceInfo = sourceDb.info();
+ targetInfo = targetDb.info();
+
+ T(sourceInfo.doc_count === targetInfo.doc_count);
+
+ // add attachments to docs in source
+ for (j = 10; j < 15; j++) {
+ addAtt(sourceDb, docs[j], "data.dat", att2_data, "application/binary");
+ }
+
+ var ddoc = docs[docs.length - 1]; // design doc
+ addAtt(sourceDb, ddoc, "readme.txt", att1_data, "text/plain");
+
+ waitForSeq(sourceDb, targetDb);
+
+ var modifDocs = docs.slice(10, 15).concat([ddoc]);
+ for (j = 0; j < modifDocs.length; j++) {
+ doc = modifDocs[j];
+ copy = targetDb.open(doc._id);
+
+ T(copy !== null);
+ T(compareObjects(doc, copy) === true);
+
+ var atts = copy._attachments;
+ T(typeof atts === "object");
+ T(typeof atts["readme.txt"] === "object");
+ T(atts["readme.txt"].revpos === 2);
+ T(atts["readme.txt"].content_type.indexOf("text/plain") === 0);
+ T(atts["readme.txt"].stub === true);
+
+ var att1_copy = CouchDB.request(
+ "GET", "/" + targetDb.name + "/" + copy._id + "/readme.txt"
+ ).responseText;
+ T(att1_copy.length === att1_data.length);
+ T(att1_copy === att1_data);
+
+ if (doc._id.indexOf("_design/") === -1) {
+ T(typeof atts["data.dat"] === "object");
+ T(atts["data.dat"].revpos === 3);
+ T(atts["data.dat"].content_type.indexOf("application/binary") === 0);
+ T(atts["data.dat"].stub === true);
+
+ var att2_copy = CouchDB.request(
+ "GET", "/" + targetDb.name + "/" + copy._id + "/data.dat"
+ ).responseText;
+ T(att2_copy.length === att2_data.length);
+ T(att2_copy === att2_data);
+ }
+ }
+
+ sourceInfo = sourceDb.info();
+ targetInfo = targetDb.info();
+
+ T(sourceInfo.doc_count === targetInfo.doc_count);
+
+ // add another attachment to the ddoc on source
+ addAtt(sourceDb, ddoc, "data.dat", att2_data, "application/binary");
+
+ waitForSeq(sourceDb, targetDb);
+
+ copy = targetDb.open(ddoc._id);
+ var atts = copy._attachments;
+ T(typeof atts === "object");
+ T(typeof atts["readme.txt"] === "object");
+ T(atts["readme.txt"].revpos === 2);
+ T(atts["readme.txt"].content_type.indexOf("text/plain") === 0);
+ T(atts["readme.txt"].stub === true);
+
+ var att1_copy = CouchDB.request(
+ "GET", "/" + targetDb.name + "/" + copy._id + "/readme.txt"
+ ).responseText;
+ T(att1_copy.length === att1_data.length);
+ T(att1_copy === att1_data);
+
+ T(typeof atts["data.dat"] === "object");
+ T(atts["data.dat"].revpos === 3);
+ T(atts["data.dat"].content_type.indexOf("application/binary") === 0);
+ T(atts["data.dat"].stub === true);
+
+ var att2_copy = CouchDB.request(
+ "GET", "/" + targetDb.name + "/" + copy._id + "/data.dat"
+ ).responseText;
+ T(att2_copy.length === att2_data.length);
+ T(att2_copy === att2_data);
+
+ sourceInfo = sourceDb.info();
+ targetInfo = targetDb.info();
+
+ T(sourceInfo.doc_count === targetInfo.doc_count);
+
+
+ // add more docs to source
+ var newDocs = makeDocs(25, 35);
+ populateDb(sourceDb, newDocs, true);
+
+ waitForSeq(sourceDb, targetDb);
+
+ for (j = 0; j < newDocs.length; j++) {
+ doc = newDocs[j];
+ copy = targetDb.open(doc._id);
+
+ T(copy !== null);
+ T(compareObjects(doc, copy) === true);
+ }
+
+ sourceInfo = sourceDb.info();
+ targetInfo = targetDb.info();
+
+ T(sourceInfo.doc_count === targetInfo.doc_count);
+
+ // delete docs from source
+ T(sourceDb.deleteDoc(newDocs[0]).ok);
+ T(sourceDb.deleteDoc(newDocs[6]).ok);
+
+ waitForSeq(sourceDb, targetDb);
+
+ copy = targetDb.open(newDocs[0]._id);
+ T(copy === null);
+ copy = targetDb.open(newDocs[6]._id);
+ T(copy === null);
+
+ var changes = targetDb.changes({since: targetInfo.update_seq});
+ var line1 = changes.results[changes.results.length - 2];
+ var line2 = changes.results[changes.results.length - 1];
+ T(line1.id === newDocs[0]._id);
+ T(line1.deleted === true);
+ T(line2.id === newDocs[6]._id);
+ T(line2.deleted === true);
+
+ // cancel the replication
+ repResult = CouchDB.new_replicate(
+ dbPairs[i].source,
+ dbPairs[i].target,
+ {
+ body: {
+ continuous: true,
+ cancel: true
+ }
+ }
+ );
+ T(repResult.ok === true);
+ T(repResult._local_id === rep_id);
+
+ doc = {
+ _id: 'foobar',
+ value: 666
+ };
+ T(sourceDb.save(doc).ok);
+
+ wait(2000);
+ copy = targetDb.open(doc._id);
+ T(copy === null);
+ }
+
+
//
// test replication triggered by non admins
//
Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=992596&r1=992595&r2=992596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Sat Sep 4 13:13:23 2010
@@ -34,6 +34,7 @@
-export([
db_open/2,
db_open/3,
+ maybe_reopen_db/2,
db_close/1,
get_db_info/1,
update_doc/3,
@@ -254,6 +255,18 @@ open_doc(Db, Id, Options) ->
couch_db:open_doc(Db, Id, Options).
+maybe_reopen_db(#httpdb{} = Db, _TargetSeq) ->
+ Db;
+maybe_reopen_db(#db{update_seq = UpSeq, main_pid = Pid} = Db, TargetSeq) ->
+ case TargetSeq > UpSeq of
+ true ->
+ {ok, Db2} = gen_server:call(Pid, get_db, infinity),
+ Db2;
+ false ->
+ Db
+ end.
+
+
update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
QArgs = case Type of
replicated_changes ->
@@ -320,10 +333,15 @@ changes_since(#httpdb{} = HttpDb, Style,
[{path, "_changes"}, {qs, QArgs},
{ibrowse_options, [{stream_to, {self(), once}}]}],
fun(200, _, DataStreamFun) ->
- EventFun = fun(Ev) ->
- changes_ev1(Ev, fun(DocInfo, _Acc) -> UserFun(DocInfo) end, [])
- end,
- json_stream_parse:events(DataStreamFun, EventFun)
+ case couch_util:get_value(continuous, Options, false) of
+ true ->
+ continuous_changes(DataStreamFun, UserFun);
+ false ->
+ EventFun = fun(Ev) ->
+ changes_ev1(Ev, fun(DocInfo, _) -> UserFun(DocInfo) end, [])
+ end,
+ json_stream_parse:events(DataStreamFun, EventFun)
+ end
end);
changes_since(Db, Style, StartSeq, UserFun, Options) ->
Args = #changes_args{
@@ -367,6 +385,12 @@ changes_q_args(BaseQS, Options) ->
end
end,
BaseQS, Params)]
+ end ++
+ case get_value(continuous, Options, false) of
+ false ->
+ [{"feed", "normal"}];
+ true ->
+ [{"feed", "continuous"}, {"heartbeat", "10000"}]
end.
changes_json_req(_Db, "", _QueryParams) ->
@@ -505,6 +529,18 @@ changes_ev_loop(array_end, _UserFun, _Us
changes_ev_done() ->
fun(_Ev) -> changes_ev_done() end.
+continuous_changes(DataFun, UserFun) ->
+ {DataFun2, _, Rest} = json_stream_parse:events(
+ DataFun,
+ fun(Ev) -> parse_changes_line(Ev, UserFun) end),
+ continuous_changes(fun() -> {Rest, DataFun2} end, UserFun).
+
+parse_changes_line(object_start, UserFun) ->
+ fun(Ev) ->
+ json_stream_parse:collect_object(Ev,
+ fun(Obj) -> UserFun(json_to_doc_info(Obj)) end)
+ end.
+
json_to_doc_info({Props}) ->
RevsInfo = lists:map(
fun({Change}) ->
Modified: couchdb/branches/new_replicator/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_db.erl?rev=992596&r1=992595&r2=992596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_db.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_db.erl Sat Sep 4 13:13:23 2010
@@ -987,7 +987,9 @@ handle_call({db_updated, NewDb}, _From,
couch_ref_counter:add(NewRefCntr),
couch_ref_counter:drop(OldRefCntr)
end,
- {reply, ok, NewDb}.
+ {reply, ok, NewDb};
+handle_call(get_db, _From, Db) ->
+ {reply, {ok, Db}, Db}.
handle_cast(Msg, Db) ->
Modified: couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl?rev=992596&r1=992595&r2=992596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Sat Sep 4 13:13:23 2010
@@ -43,6 +43,8 @@ handle_req(#httpd{method='POST'} = Req)
end;
{ok, {cancelled, RepId}} ->
send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
+ {ok, {continuous, RepId}} ->
+ send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
{ok, {HistoryResults}} ->
send_json(Req, {[{ok, true} | HistoryResults]})
catch
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=992596&r1=992595&r2=992596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Sat Sep 4 13:13:23 2010
@@ -91,11 +91,16 @@ do_replication_loop(RepId, Src, Tgt, Opt
end,
do_replication_loop(RepId, Src, Tgt, Options, UserCtx, Seq).
-do_replication_loop(RepId, Src, Tgt, Options, UserCtx, FinalSeq) ->
+do_replication_loop({BaseId, _} = RepId, Src, Tgt, Options, UserCtx, Seq) ->
case start_replication(RepId, Src, Tgt, Options, UserCtx) of
{ok, _Pid} ->
- Result = wait_for_result(RepId),
- maybe_retry(Result, RepId, Src, Tgt, Options, UserCtx, FinalSeq);
+ case couch_util:get_value(continuous, Options, false) of
+ true ->
+ {ok, {continuous, ?l2b(BaseId)}};
+ false ->
+ Result = wait_for_result(RepId),
+ maybe_retry(Result, RepId, Src, Tgt, Options, UserCtx, Seq)
+ end;
Error ->
Error
end.
@@ -611,17 +616,16 @@ doc_copy_loop(CopierId, Cp, Source, Targ
{ok, [{Id, Revs, PossibleAncestors, Seq}]} ->
?LOG_DEBUG("Doc copier ~p got {~p, ~p, ~p, ~p}",
[CopierId, Id, Revs, PossibleAncestors, Seq]),
+ Source2 = couch_api_wrap:maybe_reopen_db(Source, Seq),
couch_api_wrap:open_doc_revs(
- Source, Id, Revs, [{atts_since, PossibleAncestors}],
+ Source2, Id, Revs, [{atts_since, PossibleAncestors}],
fun(R, _) -> doc_handler(R, Target, Cp) end, []),
Cp ! {seq_changes_done, {Seq, length(Revs)}},
- doc_copy_loop(CopierId, Cp, Source, Target, MissingRevsQueue)
+ doc_copy_loop(CopierId, Cp, Source2, Target, MissingRevsQueue)
end.
doc_handler({ok, Doc}, Target, Cp) ->
- % we are called for every rev read on the source
Cp ! {add_stat, {#stats.docs_read, 1}},
- % now write the doc to the target.
case couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
{ok, _} ->
Cp ! {add_stat, {#stats.docs_written, 1}};