You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@couchdb.apache.org by Chris Anderson <jc...@apache.org> on 2009/03/15 18:54:44 UTC

streaming attachment replicator redirects

I've committed a change to the streaming attachment replicator, so
that it follows redirects and checks for error codes. I'm pretty
confident that it works (passes tests and works for me in practice),
but it might be worth reviewing.

Adam do you mind taking a look? The only thing I'm not sure on is if I
need to explicitly clean up the ibrowse processes or if they are fine
like this.

Chris

On Sun, Mar 15, 2009 at 10:47 AM,  <jc...@apache.org> wrote:
> Author: jchris
> Date: Sun Mar 15 17:47:29 2009
> New Revision: 754704
>
> URL: http://svn.apache.org/viewvc?rev=754704&view=rev
> Log:
> Streaming attachment replication now follows redirects and checks for error codes. Includes tests that design doc attachments are replicated.
>
> Modified:
>    couchdb/trunk/share/www/script/test/replication.js
>    couchdb/trunk/src/couchdb/couch_rep.erl
>
> Modified: couchdb/trunk/share/www/script/test/replication.js
> URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/script/test/replication.js?rev=754704&r1=754703&r2=754704&view=diff
> ==============================================================================
> --- couchdb/trunk/share/www/script/test/replication.js (original)
> +++ couchdb/trunk/share/www/script/test/replication.js Sun Mar 15 17:47:29 2009
> @@ -150,6 +150,16 @@
>               }
>             }
>           });
> +          // make sure on design docs as well
> +          dbA.save({
> +            _id:"_design/with_bin",
> +            _attachments:{
> +              "foo.txt": {
> +                "type":"base64",
> +                "data": "VGhpcyBpcyBhIGJhc2U2NCBlbmNvZGVkIHRleHQ="
> +              }
> +            }
> +          });
>         };
>
>         this.afterAB1 = function(dbA, dbB) {
> @@ -158,6 +168,13 @@
>
>           xhr = CouchDB.request("GET", "/test_suite_db_b/bin_doc/foo.txt");
>           T(xhr.responseText == "This is a base64 encoded text")
> +
> +          // and the design-doc
> +          xhr = CouchDB.request("GET", "/test_suite_db_a/_design/with_bin/foo.txt");
> +          T(xhr.responseText == "This is a base64 encoded text")
> +
> +          xhr = CouchDB.request("GET", "/test_suite_db_b/_design/with_bin/foo.txt");
> +          T(xhr.responseText == "This is a base64 encoded text")
>         };
>       },
>
>
> Modified: couchdb/trunk/src/couchdb/couch_rep.erl
> URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=754704&r1=754703&r2=754704&view=diff
> ==============================================================================
> --- couchdb/trunk/src/couchdb/couch_rep.erl (original)
> +++ couchdb/trunk/src/couchdb/couch_rep.erl Sun Mar 15 17:47:29 2009
> @@ -327,9 +327,30 @@
>     couch_util:should_flush(),
>     receive
>         {From, {set_req_id, NewId}} ->
> +            %% we learn the ReqId to listen for
>             From ! {self(), {ok, NewId}},
>             attachment_loop(NewId);
> -        {ibrowse_async_headers, ReqId, _Status, _Headers} ->
> +        {ibrowse_async_headers, ReqId, Status, Headers} ->
> +            %% we got header, give the controlling process a chance to react
> +            receive
> +                {From, gimme_status} ->
> +                    %% send status/headers to controller
> +                    From ! {self(), {status, Status, Headers}},
> +                    receive
> +                        {From, continue} ->
> +                            %% normal case
> +                            attachment_loop(ReqId);
> +                        {From, fail} ->
> +                            %% error, failure code
> +                            ?LOG_ERROR(
> +                                "streaming attachment failed with status ~p",
> +                                [Status]),
> +                            exit(attachment_request_failed);
> +                        {From, stop_ok} ->
> +                            %% stop looping, controller will start a new loop
> +                            stop_ok
> +                    end
> +            end,
>             attachment_loop(ReqId);
>         {ibrowse_async_response, ReqId, {chunk_start,_}} ->
>             attachment_loop(ReqId);
> @@ -349,7 +370,19 @@
>     % TODO worry about revisions
>     Url = DbUrl ++ url_encode(Id) ++ "/" ++ ?b2l(Name),
>     ?LOG_DEBUG("Attachment URL ~p", [Url]),
> +    {ok, RcvFun} = make_attachment_stub_receiver(Url, Headers, Name,
> +        Type, Length),
> +    {Name, {Type, {RcvFun, Length}}}.
> +
> +make_attachment_stub_receiver(Url, Headers, Name, Type, Length) ->
> +    make_attachment_stub_receiver(Url, Headers, Name, Type, Length, 10).
> +
> +make_attachment_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0) ->
> +    ?LOG_ERROR("streaming attachment request failed after 10 retries: ~s",
> +        [Url]),
> +    exit(attachment_request_failed);
>
> +make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) ->
>     %% start the process that receives attachment data from ibrowse
>     Pid = spawn_link(fun() -> attachment_loop(nil) end),
>
> @@ -364,14 +397,46 @@
>     Pid ! {self(), {set_req_id, ReqId}},
>     receive {Pid, {ok, ReqId}} -> ok end,
>
> -    %% this is the function that goes into the streaming attachment code.
> -    %% It gets executed by the replication gen_server, so it can't
> -    %% be the one to actually receive the ibrowse data.
> -    RcvFun = fun() ->
> -        Pid ! {self(), gimme_data},
> -        receive {Pid, Data} -> Data end
> -    end,
> -    {Name, {Type, {RcvFun, Length}}}.
> +    %% wait for headers to ensure that we have a 200 status code
> +    %% this is where we follow redirects etc
> +    Pid ! {self(), gimme_status},
> +    receive {Pid, {status, StreamStatus, StreamHeaders}} ->
> +        ?LOG_DEBUG("streaming attachment Status ~p Headers ~p",
> +            [StreamStatus, StreamHeaders]),
> +
> +        ResponseCode = list_to_integer(StreamStatus),
> +        if
> +        ResponseCode >= 200, ResponseCode < 300 ->
> +            % the normal case
> +            Pid ! {self(), continue},
> +            %% this function goes into the streaming attachment code.
> +            %% It gets executed by the replication gen_server, so it can't
> +            %% be the one to actually receive the ibrowse data.
> +            {ok, fun() ->
> +                Pid ! {self(), gimme_data},
> +                receive {Pid, Data} -> Data end
> +            end};
> +        ResponseCode >= 300, ResponseCode < 400 ->
> +            % follow the redirect
> +            Pid ! {self(), stop_ok},
> +            RedirectUrl = mochiweb_headers:get_value("Location",
> +                mochiweb_headers:make(StreamHeaders)),
> +            make_attachment_stub_receiver(RedirectUrl, Headers, Name, Type, Length,
> +                Retries - 1);
> +        ResponseCode >= 400, ResponseCode < 500 ->
> +            % an error... log and fail
> +            ?LOG_ERROR("streaming attachment failed with code ~p: ~s",
> +                [ResponseCode, Url]),
> +            Pid ! {self(), fail},
> +            exit(attachment_request_failed);
> +        ResponseCode == 500 ->
> +            % an error... log and retry
> +            ?LOG_INFO("retrying streaming attachment request due to 500 error: ~s", [Url]),
> +            Pid ! {self(), fail},
> +            make_attachment_stub_receiver(Url, Headers, Name, Type, Length,
> +                Retries - 1)
> +        end
> +    end.
>
>
>  open_db({remote, Url, Headers})->
>
>
>



-- 
Chris Anderson
http://jchris.mfdz.com