You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by jc...@apache.org on 2010/01/29 21:08:54 UTC

svn commit: r904615 - in /couchdb/trunk: THANKS share/www/script/test/replication.js src/couchdb/couch_rep.erl src/couchdb/couch_rep_reader.erl src/couchdb/couch_rep_writer.erl

Author: jchris
Date: Fri Jan 29 20:08:54 2010
New Revision: 904615

URL: http://svn.apache.org/viewvc?rev=904615&view=rev
Log:
Thanks Filipe Manana. Closes COUCHDB-631.

Replicator option to replicate a list of docids (bypasses by_seq index).

Usage: POST to /_replicate with a JSON body of:

{"source": "myfoo",
"target" : "http://remotedb.com/theirfoo",
"doc_ids": ["foo1", "foo3", "foo666"]}

This will copy the listed docs from the source to the target database.

Modified:
    couchdb/trunk/THANKS
    couchdb/trunk/share/www/script/test/replication.js
    couchdb/trunk/src/couchdb/couch_rep.erl
    couchdb/trunk/src/couchdb/couch_rep_reader.erl
    couchdb/trunk/src/couchdb/couch_rep_writer.erl

Modified: couchdb/trunk/THANKS
URL: http://svn.apache.org/viewvc/couchdb/trunk/THANKS?rev=904615&r1=904614&r2=904615&view=diff
==============================================================================
--- couchdb/trunk/THANKS (original)
+++ couchdb/trunk/THANKS Fri Jan 29 20:08:54 2010
@@ -43,5 +43,6 @@
  * Filipe Manana <fd...@gmail.com>
  * Ilia Cheishvili <il...@gmail.com>
  * Lena Herrmann <le...@zeromail.org>
+ * Filipe Manana <fd...@gmail.com>
 
 For a list of authors see the `AUTHORS` file.

Modified: couchdb/trunk/share/www/script/test/replication.js
URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/script/test/replication.js?rev=904615&r1=904614&r2=904615&view=diff
==============================================================================
--- couchdb/trunk/share/www/script/test/replication.js (original)
+++ couchdb/trunk/share/www/script/test/replication.js Fri Jan 29 20:08:54 2010
@@ -301,4 +301,61 @@
   });
   TEquals("test_suite_db_b", dbB.info().db_name,
     "Target database should exist");
+
+
+  // test replication object option doc_ids
+
+  var dbA = new CouchDB("test_suite_rep_docs_db_a", {"X-Couch-Full-Commit":"false"});
+  var dbB = new CouchDB("test_suite_rep_docs_db_b", {"X-Couch-Full-Commit":"false"});
+
+  dbA.deleteDb();
+  dbA.createDb();
+  dbB.deleteDb();
+  dbB.createDb();
+
+  T(dbA.save({_id:"foo1",value:"a"}).ok);
+  T(dbA.save({_id:"foo2",value:"b"}).ok);
+  T(dbA.save({_id:"foo3",value:"c"}).ok);
+
+  var dbPairs = [
+    {source:"test_suite_rep_docs_db_a",
+      target:"test_suite_rep_docs_db_b"},
+    {source:"test_suite_rep_docs_db_a",
+      target:"http://" + host + "/test_suite_rep_docs_db_b"},
+    {source:"http://" + host + "/test_suite_rep_docs_db_a",
+      target:"test_suite_rep_docs_db_b"},
+    {source:"http://" + host + "/test_suite_rep_docs_db_a",
+      target:"http://" + host + "/test_suite_rep_docs_db_b"}
+  ];
+
+  for (var i = 0; i < dbPairs.length; i++) {
+    var dbA = dbPairs[i].source;
+    var dbB = dbPairs[i].target;
+
+    var repResult = CouchDB.replicate(dbA, dbB, {
+      body: {"doc_ids": ["foo1", "foo3", "foo666"]}
+    });
+
+    T(repResult.ok);
+    T(repResult.docs_written === 2);
+    T(repResult.docs_read === 2);
+    T(repResult.doc_write_failures === 0);
+
+    dbB = new CouchDB("test_suite_rep_docs_db_b");
+
+    var docFoo1 = dbB.open("foo1");
+    T(docFoo1 !== null);
+    T(docFoo1.value === "a");
+
+    var docFoo2 = dbB.open("foo2");
+    T(docFoo2 === null);
+
+    var docFoo3 = dbB.open("foo3");
+    T(docFoo3 !== null);
+    T(docFoo3.value === "c");
+
+    var docFoo666 = dbB.open("foo666");
+    T(docFoo666 === null);
+  }
+
 };

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=904615&r1=904614&r2=904615&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Fri Jan 29 20:08:54 2010
@@ -45,7 +45,8 @@
     complete = false,
     committed_seq = 0,
 
-    stats = nil
+    stats = nil,
+    doc_ids = nil
 }).
 
 %% convenience function to do a simple replication from the shell
@@ -102,26 +103,49 @@
     SourceProps = proplists:get_value(<<"source">>, PostProps),
     TargetProps = proplists:get_value(<<"target">>, PostProps),
 
+    DocIds = proplists:get_value(<<"doc_ids">>, PostProps, nil),
     Continuous = proplists:get_value(<<"continuous">>, PostProps, false),
     CreateTarget = proplists:get_value(<<"create_target">>, PostProps, false),
 
     Source = open_db(SourceProps, UserCtx),
     Target = open_db(TargetProps, UserCtx, CreateTarget),
 
-    SourceLog = open_replication_log(Source, RepId),
-    TargetLog = open_replication_log(Target, RepId),
-
     SourceInfo = dbinfo(Source),
     TargetInfo = dbinfo(Target),
+
+    case DocIds of
+    List when is_list(List) ->
+        % Fast replication using only a list of doc IDs to replicate.
+        % Replication sessions, checkpoints and logs are not created
+        % since the update sequence number of the source DB is not used
+        % for determining which documents are copied into the target DB.
+        SourceLog = nil,
+        TargetLog = nil,
+
+        StartSeq = nil,
+        History = nil,
+
+        ChangesFeed = nil,
+        MissingRevs = nil,
+
+        {ok, Reader} =
+        couch_rep_reader:start_link(self(), Source, DocIds, PostProps);
+
+    _ ->
+        % Replication using the _changes API (DB sequence update numbers).
+        SourceLog = open_replication_log(Source, RepId),
+        TargetLog = open_replication_log(Target, RepId),
     
-    {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
+        {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
+
+        {ok, ChangesFeed} =
+        couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
+        {ok, MissingRevs} =
+        couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
+        {ok, Reader} =
+        couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps)
+    end,
 
-    {ok, ChangesFeed} =
-    couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
-    {ok, MissingRevs} =
-    couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
-    {ok, Reader} =
-    couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),
     {ok, Writer} =
     couch_rep_writer:start_link(self(), Target, Reader, PostProps),
 
@@ -156,7 +180,8 @@
         target_log = TargetLog,
         rep_starttime = httpd_util:rfc1123_date(),
         src_starttime = proplists:get_value(instance_start_time, SourceInfo),
-        tgt_starttime = proplists:get_value(instance_start_time, TargetInfo)
+        tgt_starttime = proplists:get_value(instance_start_time, TargetInfo),
+        doc_ids = DocIds
     },
     {ok, State}.
 
@@ -325,20 +350,34 @@
     {ok, Info} = couch_db:get_db_info(Db),
     Info.
 
+do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) ->
+    #state{
+        listeners = Listeners,
+        rep_starttime = ReplicationStartTime,
+        stats = Stats
+    } = State,
+
+    RepByDocsJson = {[
+        {<<"start_time">>, ?l2b(ReplicationStartTime)},
+        {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())},
+        {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+        {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
+        {<<"doc_write_failures">>,
+            ets:lookup_element(Stats, doc_write_failures, 2)}
+    ]},
+
+    terminate_cleanup(State),
+    [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)];
+
 do_terminate(State) ->
     #state{
         checkpoint_history = CheckpointHistory,
         committed_seq = NewSeq,
         listeners = Listeners,
         source = Source,
-        target = Target,
         continuous = Continuous,
-        stats = Stats,
         source_log = #doc{body={OldHistory}}
     } = State,
-    couch_task_status:update("Finishing"),
-    ets:delete(Stats),
-    close_db(Target),
     
     NewRepHistory = case CheckpointHistory of
     nil ->
@@ -366,7 +405,13 @@
         false ->
             [gen_server:reply(R, retry) || R <- OtherListeners]
     end,
-    close_db(Source).
+    terminate_cleanup(State).
+
+terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) ->
+    couch_task_status:update("Finishing"),
+    close_db(Target),
+    close_db(Source),
+    ets:delete(Stats).
 
 has_session_id(_SessionId, []) ->
     false;

Modified: couchdb/trunk/src/couchdb/couch_rep_reader.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_reader.erl?rev=904615&r1=904614&r2=904615&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_reader.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_reader.erl Fri Jan 29 20:08:54 2010
@@ -43,13 +43,15 @@
     opened_seqs = []
 }).
 
-start_link(Parent, Source, MissingRevs, PostProps) ->
-    gen_server:start_link(?MODULE, [Parent, Source, MissingRevs, PostProps], []).
+start_link(Parent, Source, MissingRevs_or_DocIds, PostProps) ->
+    gen_server:start_link(
+        ?MODULE, [Parent, Source, MissingRevs_or_DocIds, PostProps], []
+    ).
 
 next(Pid) ->
     gen_server:call(Pid, next_docs, infinity).
 
-init([Parent, Source, MissingRevs, _PostProps]) ->
+init([Parent, Source, MissingRevs_or_DocIds, PostProps]) ->
     process_flag(trap_exit, true),
     if is_record(Source, http_db) ->
         #url{host=Host, port=Port} = ibrowse_lib:parse_url(Source#http_db.url),
@@ -57,7 +59,15 @@
         ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);
     true -> ok end,
     Self = self(),
-    ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end),
+    ReaderLoop = spawn_link(
+        fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end
+    ),
+    MissingRevs = case MissingRevs_or_DocIds of
+    Pid when is_pid(Pid) ->
+        Pid;
+    _ListDocIds ->
+        nil
+    end,
     State = #state{
         parent = Parent,
         source = Source,
@@ -167,6 +177,8 @@
 handle_reader_loop_complete(State) ->
     {noreply, State#state{complete = waiting_on_monitors}}.
 
+calculate_new_high_seq(#state{missing_revs=nil}) ->
+    nil;
 calculate_new_high_seq(#state{requested_seqs=[], opened_seqs=[Open|_]}) ->
     Open;
 calculate_new_high_seq(#state{requested_seqs=[Req|_], opened_seqs=[Open|_]})
@@ -191,6 +203,8 @@
 % opened seqs greater than the smallest outstanding request.  I believe its the
 % minimal set of info needed to correctly calculate which seqs have been
 % replicated (because remote docs can be opened out-of-order) -- APK
+update_sequence_lists(_Seq, #state{missing_revs=nil} = State) ->
+    State;
 update_sequence_lists(Seq, State) ->
     Requested = lists:delete(Seq, State#state.requested_seqs),
     AllOpened = lists:merge([Seq], State#state.opened_seqs),
@@ -234,6 +248,37 @@
     end,
     [Transform(Result) || Result <- JsonResults].
 
+open_doc(#http_db{} = DbS, DocId) ->
+    % get latest rev of the doc
+    Req = DbS#http_db{resource=url_encode(DocId)},
+    case couch_rep_httpc:request(Req) of
+    {[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} ->
+        [];
+    Json ->
+        #doc{id=Id, revs=Rev, atts=Atts} = Doc = couch_doc:from_json_obj(Json),
+        [Doc#doc{
+            atts=[couch_rep_att:convert_stub(A, {DbS,Id,Rev}) || A <- Atts]
+        }]
+    end.
+
+reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) ->
+    case Source of
+    #http_db{} ->
+        [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil},
+            infinity) || Id <- DocIds];
+    _LocalDb ->
+        Docs = lists:foldr(fun(Id, Acc) ->
+            case couch_db:open_doc(Source, Id) of
+            {ok, Doc} ->
+                [Doc | Acc];
+            _ ->
+                Acc
+            end
+        end, [], DocIds),
+        gen_server:call(ReaderServer, {add_docs, nil, Docs}, infinity)
+    end,
+    exit(complete);
+    
 reader_loop(ReaderServer, Source, MissingRevsServer) ->
     case couch_rep_missing_revs:next(MissingRevsServer) of
     complete ->
@@ -267,6 +312,8 @@
 maybe_reopen_db(Db, _HighSeq) ->
     Db.
 
+spawn_document_request(Source, Id, nil, nil) ->
+    spawn_document_request(Source, Id);
 spawn_document_request(Source, Id, Seq, Revs) ->
     Server = self(),
     SpawnFun = fun() ->
@@ -274,3 +321,11 @@
         gen_server:call(Server, {add_docs, Seq, Results}, infinity)
     end,
     spawn_monitor(SpawnFun).
+
+spawn_document_request(Source, Id) ->
+    Server = self(),
+    SpawnFun = fun() ->
+        Results = open_doc(Source, Id),
+        gen_server:call(Server, {add_docs, nil, Results}, infinity)
+    end,
+    spawn_monitor(SpawnFun).

Modified: couchdb/trunk/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_writer.erl?rev=904615&r1=904614&r2=904615&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_writer.erl Fri Jan 29 20:08:54 2010
@@ -21,6 +21,8 @@
 
 writer_loop(Parent, Reader, Target) ->
     case couch_rep_reader:next(Reader) of
+    {complete, nil} ->
+        ok;
     {complete, FinalSeq} ->
         Parent ! {writer_checkpoint, FinalSeq},
         ok;
@@ -38,7 +40,12 @@
             ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]),
             exit({attachment_request_failed, Err, Docs})
         end,
-        Parent ! {writer_checkpoint, HighSeq},
+        case HighSeq of
+        nil ->
+            ok;
+        _SeqNumber ->
+            Parent ! {writer_checkpoint, HighSeq}
+        end,
         couch_rep_att:cleanup(),
         couch_util:should_flush(),
         writer_loop(Parent, Reader, Target)