You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2009/07/24 18:38:53 UTC

svn commit: r797553 - in /couchdb/trunk: src/couchdb/couch_rep_changes_feed.erl test/etap/110-replication-changes-feed.t

Author: kocolosk
Date: Fri Jul 24 16:38:53 2009
New Revision: 797553

URL: http://svn.apache.org/viewvc?rev=797553&view=rev
Log:
reassemble split chunks in changes feed, add all() export

Modified:
    couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
    couchdb/trunk/test/etap/110-replication-changes-feed.t

Modified: couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl?rev=797553&r1=797552&r2=797553&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl Fri Jul 24 16:38:53 2009
@@ -15,7 +15,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 
     code_change/3]).
 
--export([start/2, start_link/2, next/1, stop/1]).
+-export([start/2, start_link/2, all/1, next/1, stop/1]).
 
 -define(MIN_BUFFER_SIZE, 100).
 
@@ -23,10 +23,12 @@
 -include("../ibrowse/ibrowse.hrl").
 
 -record (remote, {
-    conn = nil,
-    reqid = nil,
+    conn,
+    reqid,
+    last_seq,
     complete = false,
     count = 0,
+    partial_chunk = nil,
     reply_to = nil,
     rows = queue:new()
 }).
@@ -46,6 +48,11 @@
 start_link(Url, Options) ->
     gen_server:start_link(?MODULE, [Url, Options], []).
 
+%% @doc does not block
+all(Server) ->
+    gen_server:call(Server, all_changes).
+
+%% @doc returns the next change from the feed, blocking if necessary
 next(Server) ->
     gen_server:call(Server, next_change, infinity).
 
@@ -57,7 +64,7 @@
     Continuous = proplists:get_value(continuous, Options, false),
     {Pid, ReqId} = start_http_request(lists:concat([Url, "/_changes",
         "?style=all_docs", "&since=", Since, "&continuous=", Continuous])),
-    {ok, #remote{conn=Pid, reqid=ReqId}};
+    {ok, #remote{conn=Pid, last_seq=Since, reqid=ReqId}};
 
 init([{local, DbName}, Options]) when is_list(DbName) ->
     init([{local, ?l2b(DbName)}, Options]);
@@ -91,6 +98,30 @@
     } = State,
     {noreply, State#local{count=Count+1, changes_from=From, rows=queue:in(Row,Rows)}};
 
+handle_call(all_changes, _From, #local{complete=Complete, count=Count}=State) 
+        when Complete =:= false; Count > 0 ->
+    #local{
+        changes_from = ChangesFrom,
+        rows = Rows
+    } = State,
+    if Count < ?MIN_BUFFER_SIZE, ChangesFrom =/= nil ->
+        gen_server:reply(ChangesFrom, ok);
+    true -> ok end,
+    {reply, queue:to_list(Rows), State#local{count=0, rows=queue:new()}};
+handle_call(all_changes, _From, #local{}=State) ->
+    {stop, normal, complete, State};
+
+handle_call(all_changes, _From, #remote{complete=Complete, count=Count}=State)
+        when Complete =:= false; Count > 0 ->
+    #remote{
+        reqid = Id,
+        rows = Rows
+    } = State,
+    ok = maybe_stream_next(Complete, 0, Id),
+    {reply, queue:to_list(Rows), State#remote{count=0, rows=queue:new()}};
+handle_call(all_changes, _From, #remote{}=State) ->
+    {stop, normal, complete, State};
+
 handle_call(next_change, From, #local{count=0}=State) ->
     if State#local.complete ->
         {stop, normal, complete, State};
@@ -158,28 +189,11 @@
         [Code,Hdrs]),
     {stop, {error, list_to_integer(Code)}, State};
 
+handle_info({ibrowse_async_response, _, {error, Reason}}, State) ->
+    {stop, {error, Reason}, State};
 handle_info({ibrowse_async_response, Id, Msg}, #remote{reqid=Id} = State) ->
     ?LOG_DEBUG("~p reqid ~p ibrowse_async_response ~p", [?MODULE, Id, Msg]),
-    #remote{
-        complete = Complete,
-        count = Count,
-        rows = Rows
-    } = State,
-    try
-        Row = decode_row(Msg),
-        case State of
-        #remote{reply_to=nil} ->
-            {noreply, State#remote{count=Count+1, rows = queue:in(Row, Rows)}};
-        #remote{count=0, reply_to=From}->
-            gen_server:reply(From, Row),
-            {noreply, State#remote{reply_to=nil}}
-        end
-    catch
-    throw:{invalid_json, Msg} ->
-        ?LOG_DEBUG("got invalid_json ~p", [Msg]),
-        ok = maybe_stream_next(Complete, Count, Id),
-        {noreply, State}
-    end;
+    {noreply, process_response(Msg, Id, State)};
 
 handle_info({ibrowse_async_response_end, Id}, #remote{reqid=Id} = State) ->
     ?LOG_DEBUG("got ibrowse_async_response_end ~p", [State#remote.reply_to]),
@@ -217,7 +231,63 @@
 
 %internal funs
 
-decode_row([$,, $\n | Rest]) ->
+process_response(<<"{\"results\":[\n">>, Id, State) ->
+    #remote{
+        complete = Complete,
+        count = Count
+    } = State,
+    ok = maybe_stream_next(Complete, Count, Id),
+    State;
+process_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, _, State) ->
+    LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
+    State#remote{last_seq = LastSeq};
+process_response(Chunk, Id, #remote{partial_chunk=nil} = State) ->
+    #remote{
+        complete = Complete,
+        count = Count,
+        rows = Rows
+    } = State,
+    try
+        Row = decode_row(Chunk),
+        ok = maybe_stream_next(Complete, Count+1, Id),
+        case State of
+        #remote{reply_to=nil} ->
+            State#remote{count=Count+1, rows = queue:in(Row, Rows)};
+        #remote{count=0, reply_to=From}->
+            gen_server:reply(From, Row),
+            State#remote{reply_to=nil}
+        end
+    catch
+    throw:{invalid_json, Bad} ->
+        ?LOG_DEBUG("got invalid_json ~p", [Bad]),
+        ok = maybe_stream_next(Complete, Count, Id),
+        State#remote{partial_chunk = Bad}
+    end;
+process_response(Chunk, Id, State) ->
+    #remote{
+        complete = Complete,
+        count = Count,
+        partial_chunk = Partial,
+        rows = Rows
+    } = State,
+    try
+        Row = decode_row(<<Partial/binary, Chunk/binary>>),
+        ok = maybe_stream_next(Complete, Count+1, Id),
+        case State of
+        #remote{reply_to=nil} ->
+            State#remote{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)};
+        #remote{count=0, reply_to=From}->
+            gen_server:reply(From, Row),
+            State#remote{reply_to=nil, partial_chunk=nil}
+        end
+    catch
+    throw:{invalid_json, Bad} ->
+        ?LOG_DEBUG("got invalid_json ~p", [Bad]),
+        ok = maybe_stream_next(Complete, Count, Id),
+        State#remote{partial_chunk = Bad}
+    end.
+    
+decode_row(<<",\n", Rest/binary>>) ->
     decode_row(Rest);
 decode_row(Row) ->
     ?JSON_DECODE(Row).
@@ -278,7 +348,8 @@
     {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
     Opts = [
         {stream_to, {self(), once}},
-        {inactivity_timeout, 30000}
+        {inactivity_timeout, 30000},
+        {response_format, binary}
     ],
     {ibrowse_req_id, Id} = 
         ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity),

Modified: couchdb/trunk/test/etap/110-replication-changes-feed.t
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/110-replication-changes-feed.t?rev=797553&r1=797552&r2=797553&view=diff
==============================================================================
--- couchdb/trunk/test/etap/110-replication-changes-feed.t (original)
+++ couchdb/trunk/test/etap/110-replication-changes-feed.t Fri Jul 24 16:38:53 2009
@@ -22,7 +22,7 @@
     code:add_pathz("src/ibrowse"),
     code:add_pathz("src/mochiweb"),
     
-    etap:plan(12),
+    etap:plan(17),
     case (catch test()) of
         ok ->
             etap:end_tests();
@@ -48,6 +48,7 @@
     couch_server:delete(<<"etap-test-db">>, []),
     {ok, Db2} = couch_db:create(<<"etap-test-db">>, []),
     test_all(remote),
+    test_remote_only(),
     couch_db:close(Db2),
     couch_server:delete(<<"etap-test-db">>, []),
 
@@ -59,7 +60,11 @@
     test_since_parameter(Type),
     test_continuous_parameter(Type),
     test_conflicts(Type),
-    test_deleted_conflicts(Type).
+    test_deleted_conflicts(Type),
+    test_non_blocking_call(Type).
+
+test_remote_only() ->
+    test_chunk_reassembly(remote).
 
 test_unchanged_db(Type) ->
     {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)}, []),
@@ -154,6 +159,40 @@
         io_lib:format("(~p) deleted conflict revisions show up in feed", [Type])
     ).
 
+test_non_blocking_call(Type) ->
+    Since = get_update_seq(),
+    {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)},
+        [{since, Since}, {continuous, true}]),
+    etap:is(
+        couch_rep_changes_feed:all(Pid),
+        [],
+        io_lib:format("(~p) all() returns empty list if no changes available",
+            [Type])
+    ),
+    Expect1 = generate_change(),
+    Expect2 = generate_change(),
+    timer:sleep(100),
+    etap:is(
+        couch_rep_changes_feed:all(Pid),
+        [Expect1, Expect2],
+        io_lib:format("(~p) all() returns full list of outstanding changes",
+            [Type])
+    ),
+    ok = couch_rep_changes_feed:stop(Pid).
+
+test_chunk_reassembly(Type) ->
+    Since = get_update_seq(),
+    Expect = [generate_change() || _I <- lists:seq(1,30)],
+    {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)},
+        [{since, Since}]),
+    timer:sleep(100),
+    etap:is(
+        couch_rep_changes_feed:all(Pid),
+        Expect,
+        io_lib:format("(~p) reassembles chunks split across TCP frames",
+            [Type])
+    ).
+
 generate_change() ->
     generate_change(couch_util:new_uuid()).