You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2009/07/16 22:48:36 UTC

svn commit: r794848 - in /couchdb/trunk: src/couchdb/Makefile.am src/couchdb/couch_rep_changes_feed.erl test/etap/110-replication-changes-feed.t

Author: kocolosk
Date: Thu Jul 16 20:48:35 2009
New Revision: 794848

URL: http://svn.apache.org/viewvc?rev=794848&view=rev
Log:
first cut at _changes feed consumer.  not yet used by replication

Added:
    couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
    couchdb/trunk/test/etap/110-replication-changes-feed.t
Modified:
    couchdb/trunk/src/couchdb/Makefile.am

Modified: couchdb/trunk/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/Makefile.am?rev=794848&r1=794847&r2=794848&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/Makefile.am (original)
+++ couchdb/trunk/src/couchdb/Makefile.am Thu Jul 16 20:48:35 2009
@@ -72,6 +72,7 @@
     couch_query_servers.erl \
     couch_ref_counter.erl \
     couch_rep.erl \
+    couch_rep_changes_feed.erl \
     couch_rep_sup.erl \
     couch_server.erl \
     couch_server_sup.erl \
@@ -116,6 +117,7 @@
     couch_query_servers.beam \
     couch_ref_counter.beam \
     couch_rep.beam \
+    couch_rep_changes_feed.beam \
     couch_rep_sup.beam \
     couch_server.beam \
     couch_server_sup.beam \

Added: couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl?rev=794848&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl (added)
+++ couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl Thu Jul 16 20:48:35 2009
@@ -0,0 +1,174 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_changes_feed).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 
+    code_change/3]).
+
+-export([start/2, start_link/2, next/1, stop/1]).
+
+-define(MIN_BUFFER_SIZE, 100).
+
+-include("couch_db.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-record (state, {
+    conn = nil,
+    reqid = nil,
+    count = 0,
+    reply_to = nil,
+    rows = queue:new(),
+    complete = false
+}).
+
+start(Url, Options) ->
+    gen_server:start(?MODULE, [Url, Options], []).
+
+start_link(Url, Options) ->
+    gen_server:start_link(?MODULE, [Url, Options], []).
+
+next(Server) ->
+    ?LOG_DEBUG("~p at ~p received next_change call", [?MODULE, Server]),
+    gen_server:call(Server, next_change, infinity).
+
+stop(Server) ->
+    gen_server:call(Server, stop).
+
+init([{remote, Url}, Options]) ->
+    Since = proplists:get_value(since, Options, 0),
+    Continuous = proplists:get_value(continuous, Options, false),
+    {Pid, ReqId} = start_http_request(lists:concat([Url, "/_changes",
+        "?style=all_docs", "&since=", Since, "&continuous=", Continuous])),
+    {ok, #state{conn=Pid, reqid=ReqId}};
+
+init([{local, DbName}, Options]) ->
+    init([{remote, "http://" ++ couch_config:get("httpd", "bind_address") ++ ":"
+        ++ couch_config:get("httpd", "port") ++ "/" ++ DbName}, Options]).
+
+handle_call(next_change, From, State) ->
+    #state{
+        reqid = Id,
+        complete = Complete,
+        count = Count,
+        rows = Rows
+    } = State,
+    
+    ok = maybe_stream_next(Complete, Count, Id),
+    
+    case queue:out(Rows) of
+    {{value, Row}, NewRows} ->
+        {reply, Row, State#state{count=Count-1, rows=NewRows}};
+    {empty, Rows} ->
+        if State#state.complete ->
+            {stop, normal, complete, State};
+        % State#state.waiting_on_headers ->
+        %     {noreply, State#state{reply_to=From}};
+        true ->
+            {noreply, State#state{reply_to=From}}
+        end
+    end;
+
+handle_call(stop, _From, State) ->
+    catch ibrowse:stop_worker_process(State#state.conn),
+    {stop, normal, ok, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({ibrowse_async_headers, Id, "200", _}, #state{reqid=Id}=State) ->
+    #state{
+        complete = Complete,
+        count = Count
+    } = State,
+    ?LOG_DEBUG("~p reqid ~p ibrowse_async_headers 200", [?MODULE, Id]),
+    ok = maybe_stream_next(Complete, Count, Id),
+    {noreply, State};
+handle_info({ibrowse_async_headers, Id, "301", Hdrs}, #state{reqid=Id}=State) ->
+    ?LOG_DEBUG("~p reqid ~p ibrowse_async_headers 301", [?MODULE, Id]),
+    catch ibrowse:stop_worker_process(State#state.conn),
+    Url = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)),
+    {Pid, ReqId} = start_http_request(Url),
+    {noreply, State#state{conn=Pid, reqid=ReqId}};
+handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) ->
+    ?LOG_ERROR("replicator changes feed failed with code ~s and Headers ~n~p",
+        [Code,Hdrs]),
+    {stop, {error, list_to_integer(Code)}, State};
+
+handle_info({ibrowse_async_response, Id, Msg}, #state{reqid=Id} = State) ->
+    ?LOG_DEBUG("~p reqid ~p ibrowse_async_response ~p", [?MODULE, Id, Msg]),
+    #state{
+        complete = Complete,
+        count = Count,
+        rows = Rows
+    } = State,
+    try
+        Row = decode_row(Msg),
+        case State of
+        #state{reply_to=nil} ->
+            {noreply, State#state{count=Count+1, rows = queue:in(Row, Rows)}};
+        #state{count=0, reply_to=From}->
+            gen_server:reply(From, Row),
+            {noreply, State#state{reply_to=nil}}
+        end
+    catch
+    throw:{invalid_json, Msg} ->
+        ?LOG_DEBUG("got invalid_json ~p", [Msg]),
+        ok = maybe_stream_next(Complete, Count, Id),
+        {noreply, State}
+    end;
+
+handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) ->
+    ?LOG_DEBUG("got ibrowse_async_response_end ~p", [State#state.reply_to]),
+    case State of
+    #state{reply_to=nil} ->
+        {noreply, State#state{complete=true}};
+    #state{count=0, reply_to=From}->
+        gen_server:reply(From, complete),
+        {stop, normal, State}
+    end;
+
+handle_info(Msg, State) ->
+    ?LOG_INFO("unexpected message ~p", [Msg]),
+    {noreply, State}.
+
+terminate(_Reason, #state{conn=Pid}) when is_pid(Pid) ->
+    catch ibrowse:stop_worker_process(Pid),
+    ok;
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+decode_row([$,, $\n | Rest]) ->
+    decode_row(Rest);
+decode_row(Row) ->
+    ?JSON_DECODE(Row).
+
+maybe_stream_next(false, Count, Id) when Count < ?MIN_BUFFER_SIZE ->
+    ?LOG_DEBUG("~p reqid ~p streaming next chunk", [?MODULE, Id]),
+    ibrowse:stream_next(Id);
+maybe_stream_next(_Complete, _Count, Id) ->
+    ?LOG_DEBUG("~p reqid ~p not streaming", [?MODULE, Id]),
+    ok.
+
+start_http_request(RawUrl) ->
+    Url = ibrowse_lib:parse_url(RawUrl),
+    {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
+    Opts = [
+        {stream_to, {self(), once}},
+        {inactivity_timeout, 30000}
+    ],
+    {ibrowse_req_id, Id} = 
+        ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity),
+    {Pid, Id}.

Added: couchdb/trunk/test/etap/110-replication-changes-feed.t
URL: http://svn.apache.org/viewvc/couchdb/trunk/test/etap/110-replication-changes-feed.t?rev=794848&view=auto
==============================================================================
--- couchdb/trunk/test/etap/110-replication-changes-feed.t (added)
+++ couchdb/trunk/test/etap/110-replication-changes-feed.t Thu Jul 16 20:48:35 2009
@@ -0,0 +1,185 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% XXX: Figure out how to -include("couch_db.hrl")
+-record(doc, {id= <<"">>, revs={0, []}, body={[]},
+            attachments=[], deleted=false, meta=[]}).
+
+main(_) ->
+    code:add_pathz("src/couchdb"),
+    code:add_pathz("src/ibrowse"),
+    code:add_pathz("src/mochiweb"),
+    
+    etap:plan(6),
+    case (catch test()) of
+        ok ->
+            etap:end_tests();
+        Other ->
+            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+            etap:bail(Other)
+    end,
+    ok.
+
+test() ->
+    couch_server:start(
+        ["etc/couchdb/default_dev.ini", "etc/couchdb/local_dev.ini"]
+    ),
+    ibrowse:start(),
+    crypto:start(),
+
+    couch_server:delete(<<"etap-test-db">>, []),
+    {ok, Db} = couch_db:create(<<"etap-test-db">>, []),
+
+    test_unchanged_db(),
+    test_simple_change(),
+    test_since_parameter(),
+    test_continuous_parameter(),
+    test_conflicts(),
+    test_deleted_conflicts(),
+
+    couch_db:close(Db),
+    couch_server:delete(<<"etap-test-db">>, []),
+    ok.
+
+test_unchanged_db() ->
+    {ok, Pid} = couch_rep_changes_feed:start({local, "etap-test-db"}, []),
+    etap:is(
+        couch_rep_changes_feed:next(Pid),
+        complete,
+        "changes feed for unchanged DB is automatically complete"
+    ).
+
+test_simple_change() ->
+    Expect = generate_change(),
+    {ok, Pid} = couch_rep_changes_feed:start({local, "etap-test-db"}, []),
+    etap:is(
+        {couch_rep_changes_feed:next(Pid), couch_rep_changes_feed:next(Pid)},
+        {Expect, complete},
+        "change one document, get one row"
+    ).
+
+test_since_parameter() ->
+    {ok, Pid} = couch_rep_changes_feed:start({local, "etap-test-db"}, 
+        [{since, get_update_seq()}]),
+    etap:is(
+        couch_rep_changes_feed:next(Pid),
+        complete,
+        "since query-string parameter allows us to skip changes"
+    ).
+
+test_continuous_parameter() ->
+    {ok, Pid} = couch_rep_changes_feed:start({local, "etap-test-db"},
+        [{since, get_update_seq()}, {continuous, true}]),
+
+    % make the changes_feed request before the next update
+    Self = self(),
+    spawn(fun() -> 
+        Change = couch_rep_changes_feed:next(Pid), 
+        Self ! {actual, Change}
+    end),
+
+    Expect = generate_change(),
+    etap:is(
+        receive {actual, Actual} -> Actual end,
+        Expect,
+        "continuous query-string parameter picks up new changes"
+    ),
+
+    ok = couch_rep_changes_feed:stop(Pid).
+
+test_conflicts() ->
+    Since = get_update_seq(),
+    Expect = generate_conflict(),
+    {ok, Pid} = couch_rep_changes_feed:start({local, "etap-test-db"}, 
+        [{since, Since}]),
+    etap:is(
+        {couch_rep_changes_feed:next(Pid), couch_rep_changes_feed:next(Pid)},
+        {Expect, complete},
+        "conflict revisions show up in feed"
+    ).
+
+test_deleted_conflicts() ->
+    Since = get_update_seq(),
+    {ExpectProps} = generate_conflict(),
+
+    %% delete the conflict revision
+    Id = proplists:get_value(<<"id">>, ExpectProps),
+    [Win, {[{<<"rev">>, Lose}]}] = proplists:get_value(<<"changes">>, ExpectProps),
+    Doc = couch_doc:from_json_obj({[
+        {<<"_id">>, Id},
+        {<<"_rev">>, Lose},
+        {<<"_deleted">>, true}
+    ]}),
+    Db = get_db(),
+    {ok, Rev} = couch_db:update_doc(Db, Doc, [full_commit]),
+    couch_db:close(Db),
+
+    Expect = {[
+        {<<"seq">>, get_update_seq()},
+        {<<"id">>, Id},
+        {<<"changes">>, [Win, {[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]}
+    ]},
+    
+    {ok, Pid} = couch_rep_changes_feed:start({local, "etap-test-db"}, 
+        [{since, Since}]),
+    etap:is(
+        {couch_rep_changes_feed:next(Pid), couch_rep_changes_feed:next(Pid)},
+        {Expect, complete},
+        "deleted conflict revisions show up in feed"
+    ).
+
+generate_change() ->
+    generate_change(couch_util:new_uuid()).
+
+generate_change(Id) ->
+    generate_change(Id, {[]}).
+
+generate_change(Id, EJson) ->
+    Doc = couch_doc:from_json_obj(EJson),
+    Db = get_db(),
+    {ok, Rev} = couch_db:update_doc(Db, Doc#doc{id = Id}, [full_commit]),
+    couch_db:close(Db),
+    {[
+        {<<"seq">>, get_update_seq()},
+        {<<"id">>, Id},
+        {<<"changes">>, [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]}
+    ]}.
+
+generate_conflict() ->
+    Id = couch_util:new_uuid(),
+    Db = get_db(),
+    Doc1 = (couch_doc:from_json_obj({[<<"foo">>, <<"bar">>]}))#doc{id = Id},
+    Doc2 = (couch_doc:from_json_obj({[<<"foo">>, <<"baz">>]}))#doc{id = Id},
+    {ok, Rev1} = couch_db:update_doc(Db, Doc1, [full_commit]),
+    {ok, Rev2} = couch_db:update_doc(Db, Doc2, [full_commit, all_or_nothing]),
+    
+    %% relies on undocumented CouchDB conflict winner algo and revision sorting!
+    RevList = [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || R
+        <- lists:sort(fun(A,B) -> B<A end, [Rev1,Rev2])],
+    {[
+        {<<"seq">>, get_update_seq()},
+        {<<"id">>, Id},
+        {<<"changes">>, RevList}
+    ]}.
+    
+get_db() ->
+    {ok, Db} = couch_db:open(<<"etap-test-db">>, []),
+    Db.
+
+get_update_seq() ->
+    Db = get_db(),
+    Seq = couch_db:get_update_seq(Db),
+    couch_db:close(Db),
+    Seq.