You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ja...@apache.org on 2010/06/14 15:40:13 UTC

svn commit: r954462 - in /couchdb/branches/0.11.x: src/couchdb/ test/etap/

Author: jan
Date: Mon Jun 14 13:40:13 2010
New Revision: 954462

URL: http://svn.apache.org/viewvc?rev=954462&view=rev
Log:
efficient attachment replication. Patch by Filipe Manana. Closes COUCHDB-639

Added:
    couchdb/branches/0.11.x/test/etap/113-replication-attachment-comp.t
Modified:
    couchdb/branches/0.11.x/src/couchdb/couch_db.erl
    couchdb/branches/0.11.x/src/couchdb/couch_doc.erl
    couchdb/branches/0.11.x/src/couchdb/couch_httpd_db.erl
    couchdb/branches/0.11.x/src/couchdb/couch_rep_att.erl
    couchdb/branches/0.11.x/src/couchdb/couch_rep_reader.erl
    couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl
    couchdb/branches/0.11.x/test/etap/Makefile.am

Modified: couchdb/branches/0.11.x/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_db.erl?rev=954462&r1=954461&r2=954462&view=diff
==============================================================================
--- couchdb/branches/0.11.x/src/couchdb/couch_db.erl (original)
+++ couchdb/branches/0.11.x/src/couchdb/couch_db.erl Mon Jun 14 13:40:13 2010
@@ -340,7 +340,11 @@ update_doc(Db, Doc, Options, UpdateType)
     {ok, [{ok, NewRev}]} ->
         {ok, NewRev};
     {ok, [Error]} ->
-        throw(Error)
+        throw(Error);
+    {ok, []} ->
+        % replication success
+        {Pos, [RevId | _]} = Doc#doc.revs,
+        {ok, {Pos, RevId}}
     end.
 
 update_docs(Db, Docs) ->
@@ -814,8 +818,9 @@ flush_att(Fd, #att{data=Fun,att_len=AttL
 % is present in the request, but there is no Content-MD5
 % trailer, we're free to ignore this inconsistency and
 % pretend that no Content-MD5 exists.
-with_stream(Fd, #att{md5=InMd5,type=Type}=Att, Fun) ->
-    {ok, OutputStream} = case couch_util:compressible_att_type(Type) of
+with_stream(Fd, #att{md5=InMd5,type=Type,comp=AlreadyComp}=Att, Fun) ->
+    {ok, OutputStream} = case (not AlreadyComp) andalso
+        couch_util:compressible_att_type(Type) of
     true ->
         CompLevel = list_to_integer(
             couch_config:get("attachments", "compression_level", "0")
@@ -836,12 +841,18 @@ with_stream(Fd, #att{md5=InMd5,type=Type
     {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
         couch_stream:close(OutputStream),
     check_md5(IdentityMd5, ReqMd5),
+    {AttLen, DiskLen} = case AlreadyComp of
+    true ->
+        {Att#att.att_len, Att#att.disk_len};
+    _ ->
+        {Len, IdentityLen}
+    end,
     Att#att{
         data={Fd,StreamInfo},
-        att_len=Len,
-        disk_len=IdentityLen,
+        att_len=AttLen,
+        disk_len=DiskLen,
         md5=Md5,
-        comp=(IdentityMd5 =/= Md5)
+        comp=(AlreadyComp orelse (IdentityMd5 =/= Md5))
     }.
 
 

Modified: couchdb/branches/0.11.x/src/couchdb/couch_doc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_doc.erl?rev=954462&r1=954461&r2=954462&view=diff
==============================================================================
--- couchdb/branches/0.11.x/src/couchdb/couch_doc.erl (original)
+++ couchdb/branches/0.11.x/src/couchdb/couch_doc.erl Mon Jun 14 13:40:13 2010
@@ -17,7 +17,7 @@
 -export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
 -export([validate_docid/1]).
 -export([doc_from_multi_part_stream/2]).
--export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
+-export([doc_to_multi_part_stream/6, len_doc_to_multi_part_stream/5]).
 
 -include("couch_db.hrl").
 
@@ -73,21 +73,26 @@ to_json_meta(Meta) ->
         end, Meta).
 
 to_json_attachments(Attachments, Options) ->
-    case lists:member(attachments, Options) of
+    RevPos = case lists:member(attachments, Options) of
     true -> % return all the binaries
-        to_json_attachments(Attachments, 0, lists:member(follows, Options));
+        0;
     false ->
         % note the default is [], because this sorts higher than all numbers.
         % and will return all the binaries.
-        RevPos = proplists:get_value(atts_after_revpos, Options, []),
-        to_json_attachments(Attachments, RevPos, lists:member(follows, Options))
-    end.
+        proplists:get_value(atts_after_revpos, Options, [])
+    end,
+    to_json_attachments(
+        Attachments,
+        RevPos,
+        lists:member(follows, Options),
+        lists:member(att_gzip_length, Options)
+    ).
 
-to_json_attachments([], _RevPosIncludeAfter, _DataToFollow) ->
+to_json_attachments([], _RevPosIncludeAfter, _DataToFollow, _ShowGzipLen) ->
     [];
-to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow) ->
+to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowGzipLen) ->
     AttProps = lists:map(
-        fun(#att{disk_len=DiskLen}=Att) ->
+        fun(#att{disk_len=DiskLen, att_len=AttLen, comp=Comp}=Att) ->
             {Att#att.name, {[
                 {<<"content_type">>, Att#att.type},
                 {<<"revpos">>, Att#att.revpos}
@@ -96,7 +101,7 @@ to_json_attachments(Atts, RevPosIncludeA
                     if DataToFollow ->
                         [{<<"length">>, DiskLen}, {<<"follows">>, true}];
                     true ->
-                        AttData = case Att#att.comp of
+                        AttData = case Comp of
                         true ->
                             zlib:gunzip(att_to_iolist(Att));
                         _ ->
@@ -107,7 +112,13 @@ to_json_attachments(Atts, RevPosIncludeA
                     end;
                 true ->
                     [{<<"length">>, DiskLen}, {<<"stub">>, true}]
-                end
+                end ++
+                    case {ShowGzipLen, Comp} of
+                    {true, true} ->
+                        [{<<"gzip_length">>, AttLen}];
+                    _ ->
+                        []
+                    end
             }}
         end, Atts),
     [{<<"_attachments">>, {AttProps}}].
@@ -191,19 +202,19 @@ transfer_fields([{<<"_attachments">>, {J
         case proplists:get_value(<<"stub">>, BinProps) of
         true ->
             Type = proplists:get_value(<<"content_type">>, BinProps),
-            Length = proplists:get_value(<<"length">>, BinProps),
             RevPos = proplists:get_value(<<"revpos">>, BinProps, 0),
-            #att{name=Name, data=stub, type=Type, att_len=Length,
-                disk_len=Length, revpos=RevPos};
+            {AttLen, DiskLen, Comp} = att_lengths(BinProps),
+            #att{name=Name, data=stub, type=Type, att_len=AttLen,
+                disk_len=DiskLen, comp=Comp, revpos=RevPos};
         _ ->
             Type = proplists:get_value(<<"content_type">>, BinProps,
                     ?DEFAULT_ATTACHMENT_CONTENT_TYPE),
             RevPos = proplists:get_value(<<"revpos">>, BinProps, 0),
             case proplists:get_value(<<"follows">>, BinProps) of
             true ->
-                Len = proplists:get_value(<<"length">>, BinProps),
-                #att{name=Name, data=follows, type=Type, 
-                    att_len=Len, disk_len=Len, revpos=RevPos};
+                {AttLen, DiskLen, Comp} = att_lengths(BinProps),
+                #att{name=Name, data=follows, type=Type, comp=Comp,
+                    att_len=AttLen, disk_len=DiskLen, revpos=RevPos};
             _ ->
                 Value = proplists:get_value(<<"data">>, BinProps),
                 Bin = couch_util:decodeBase64(Value),
@@ -251,6 +262,16 @@ transfer_fields([{<<"_",Name/binary>>, _
 transfer_fields([Field | Rest], #doc{body=Fields}=Doc) ->
     transfer_fields(Rest, Doc#doc{body=[Field|Fields]}).
 
+att_lengths(BinProps) ->
+    DiskLen = proplists:get_value(<<"length">>, BinProps),
+    GzipLen = proplists:get_value(<<"gzip_length">>, BinProps),
+    case GzipLen of
+    undefined ->
+        {DiskLen, DiskLen, false};
+    _ ->
+        {GzipLen, DiskLen, true}
+    end.
+
 to_doc_info(FullDocInfo) ->
     {DocInfo, _Path} = to_doc_info_path(FullDocInfo),
     DocInfo.
@@ -345,18 +366,24 @@ fold_streamed_data(RcvFun, LenLeft, Fun,
     ResultAcc = Fun(Bin, Acc),
     fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
 
-len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos) ->
+len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos,
+    SendGzipAtts) ->
     2 + % "--"
     size(Boundary) +
     36 + % "\r\ncontent-type: application/json\r\n\r\n"
     iolist_size(JsonBytes) +
     4 + % "\r\n--"
     size(Boundary) +
-    + lists:foldl(fun(#att{revpos=RevPos,disk_len=DiskLen}, AccAttsSize) ->
+    + lists:foldl(fun(#att{revpos=RevPos} = Att, AccAttsSize) ->
             if RevPos > AttsSinceRevPos ->
                 AccAttsSize +  
                 4 + % "\r\n\r\n"
-                DiskLen +
+                case SendGzipAtts of
+                true ->
+                    Att#att.att_len;
+                _ ->
+                    Att#att.disk_len
+                end +
                 4 + % "\r\n--"
                 size(Boundary);
             true ->
@@ -364,29 +391,33 @@ len_doc_to_multi_part_stream(Boundary,Js
             end
         end, 0, Atts) +
     2. % "--"
-    
-doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos,WriteFun) ->
+
+doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, WriteFun,
+    SendGzipAtts) ->
     WriteFun([<<"--", Boundary/binary,
             "\r\ncontent-type: application/json\r\n\r\n">>,
             JsonBytes, <<"\r\n--", Boundary/binary>>]),
-    atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos).
+    atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts).
 
-atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos) ->
+atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos, _SendGzipAtts) ->
     WriteFun(<<"--">>);
 atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun, 
-        AttsSinceRevPos) when RevPos > AttsSinceRevPos ->
+        AttsSinceRevPos, SendGzipAtts) when RevPos > AttsSinceRevPos ->
     WriteFun(<<"\r\n\r\n">>),
-    AttFun = case Att#att.comp of
-    true ->
+    AttFun = case {Att#att.comp, SendGzipAtts} of
+    {true, false} ->
         fun att_foldl_unzip/3;
     _ ->
+        % receiver knows that the attachment is compressed by checking that the
+        % "gzip_length" field is present in the corresponding JSON attachment
+        % object found within the JSON doc
         fun att_foldl/3
     end,
     AttFun(Att, fun(Data, ok) -> WriteFun(Data) end, ok),
     WriteFun(<<"\r\n--", Boundary/binary>>),
-    atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos);
-atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos) ->
-    atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos).
+    atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts);
+atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts) ->
+    atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts).
 
 
 doc_from_multi_part_stream(ContentType, DataFun) ->

Modified: couchdb/branches/0.11.x/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_httpd_db.erl?rev=954462&r1=954461&r2=954462&view=diff
==============================================================================
--- couchdb/branches/0.11.x/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/branches/0.11.x/src/couchdb/couch_httpd_db.erl Mon Jun 14 13:40:13 2010
@@ -742,13 +742,13 @@ send_doc_efficiently(Req, #doc{atts=Atts
             JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [follows|Options])),
             AttsSinceRevPos = proplists:get_value(atts_after_revpos, Options, 0),
             Len = couch_doc:len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,
-                    AttsSinceRevPos),
+                    AttsSinceRevPos,false),
             CType = {<<"Content-Type">>, 
                     <<"multipart/related; boundary=\"", Boundary/binary, "\"">>},
             {ok, Resp} = start_response_length(Req, 200, [CType|Headers], Len),
             couch_doc:doc_to_multi_part_stream(Boundary,JsonBytes,Atts,
                     AttsSinceRevPos,
-                    fun(Data) -> couch_httpd:send(Resp, Data) end)
+                    fun(Data) -> couch_httpd:send(Resp, Data) end, false)
         end;
     false ->
         send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options))
@@ -1045,6 +1045,9 @@ parse_doc_query(Req) ->
             Args#doc_query_args{update_type=replicated_changes};
         {"new_edits", "true"} ->
             Args#doc_query_args{update_type=interactive_edit};
+        {"att_gzip_length", "true"} ->
+            Options = [att_gzip_length | Args#doc_query_args.options],
+            Args#doc_query_args{options=Options};
         _Else -> % unknown key value pair, ignore.
             Args
         end

Modified: couchdb/branches/0.11.x/src/couchdb/couch_rep_att.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_rep_att.erl?rev=954462&r1=954461&r2=954462&view=diff
==============================================================================
--- couchdb/branches/0.11.x/src/couchdb/couch_rep_att.erl (original)
+++ couchdb/branches/0.11.x/src/couchdb/couch_rep_att.erl Mon Jun 14 13:40:13 2010
@@ -79,11 +79,7 @@ receive_data(Ref, ReqId, ContentEncoding
         throw({attachment_request_failed, Err});
     {ibrowse_async_response, ReqId, Data} ->
         % ?LOG_DEBUG("got ~p bytes for ~p", [size(Data), ReqId]),
-        if ContentEncoding =:= "gzip" ->
-            zlib:gunzip(Data);
-        true ->
-            Data
-        end;
+        Data;
     {ibrowse_async_response_end, ReqId} ->
         ?LOG_ERROR("streaming att. ended but more data requested ~p", [ReqId]),
         throw({attachment_request_failed, premature_end})

Modified: couchdb/branches/0.11.x/src/couchdb/couch_rep_reader.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_rep_reader.erl?rev=954462&r1=954461&r2=954462&view=diff
==============================================================================
--- couchdb/branches/0.11.x/src/couchdb/couch_rep_reader.erl (original)
+++ couchdb/branches/0.11.x/src/couchdb/couch_rep_reader.erl Mon Jun 14 13:40:13 2010
@@ -233,7 +233,7 @@ update_sequence_lists(Seq, State) ->
 open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
     %% all this logic just splits up revision lists that are too long for
     %% MochiWeb into multiple requests
-    BaseQS = [{revs,true}, {latest,true}],
+    BaseQS = [{revs,true}, {latest,true}, {att_gzip_length,true}],
     BaseReq = DbS#http_db{resource=url_encode(DocId), qs=BaseQS},
     BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs=
 
@@ -256,7 +256,10 @@ open_doc_revs(#http_db{} = DbS, DocId, R
 
 open_doc(#http_db{} = DbS, DocId) ->
     % get latest rev of the doc
-    Req = DbS#http_db{resource=url_encode(DocId)},
+    Req = DbS#http_db{
+        resource=url_encode(DocId),
+        qs=[{att_gzip_length, true}]
+    },
     case couch_rep_httpc:request(Req) of
     {[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} ->
         [];

Modified: couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl?rev=954462&r1=954461&r2=954462&view=diff
==============================================================================
--- couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/branches/0.11.x/src/couchdb/couch_rep_writer.erl Mon Jun 14 13:40:13 2010
@@ -51,8 +51,27 @@ writer_loop(Parent, Reader, Target) ->
         writer_loop(Parent, Reader, Target)
     end.
 
-write_docs(#http_db{headers = Headers} = Db, Docs) ->
-    JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+write_docs(#http_db{} = Db, Docs) ->
+    {DocsAtts, DocsNoAtts} = lists:partition(
+        fun(#doc{atts=[]}) -> false; (_) -> true end,
+        Docs
+    ),
+    ErrorsJson0 = write_bulk_docs(Db, DocsNoAtts),
+    ErrorsJson = lists:foldl(
+       fun(Doc, Acc) -> write_multi_part_doc(Db, Doc) ++ Acc end,
+       ErrorsJson0,
+       DocsAtts
+    ),
+    {ok, ErrorsJson};
+write_docs(Db, Docs) ->
+    couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes).
+
+write_bulk_docs(_Db, []) ->
+    [];
+write_bulk_docs(#http_db{headers = Headers} = Db, Docs) ->
+    JsonDocs = [
+        couch_doc:to_json_obj(Doc, [revs, att_gzip_length]) || Doc <- Docs
+    ],
     Request = Db#http_db{
         resource = "_bulk_docs",
         method = post,
@@ -65,10 +84,61 @@ write_docs(#http_db{headers = Headers} =
     List when is_list(List) ->
         List
     end,
-    ErrorsList = [write_docs_1(V) || V <- ErrorsJson],
-    {ok, ErrorsList};
-write_docs(Db, Docs) ->
-    couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes).
+    [write_docs_1(V) || V <- ErrorsJson].
+
+write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) ->
+    JsonBytes = ?JSON_ENCODE(
+        couch_doc:to_json_obj(
+            Doc,
+            [follows, att_gzip_length, {atts_after_revpos, 0}]
+        )
+    ),
+    Boundary = couch_uuids:random(),
+    Len = couch_doc:len_doc_to_multi_part_stream(
+        Boundary, JsonBytes, Atts, 0, true
+    ),
+    {ok, DataQueue} = couch_work_queue:new(1024*1024, 1000),
+    _StreamerPid = spawn_link(
+        fun() ->
+            couch_doc:doc_to_multi_part_stream(
+                Boundary,
+                JsonBytes,
+                Atts,
+                0,
+                fun(Data) -> couch_work_queue:queue(DataQueue, Data) end,
+                true
+            ),
+            couch_work_queue:close(DataQueue)
+        end
+    ),
+    BodyFun = fun(Acc) ->
+        case couch_work_queue:dequeue(DataQueue) of
+        closed ->
+            eof;
+        {ok, Data} ->
+            {ok, iolist_to_binary(lists:reverse(Data)), Acc}
+        end
+    end,
+    Request = Db#http_db{
+        resource = couch_util:url_encode(Doc#doc.id),
+        method = put,
+        qs = [{new_edits, false}],
+        body = {BodyFun, ok},
+        headers = [
+            {"x-couch-full-commit", "false"},
+            {"Content-Type",
+                "multipart/related; boundary=\"" ++ ?b2l(Boundary) ++ "\""},
+            {"Content-Length", Len} | Headers
+        ]
+    },
+    case couch_rep_httpc:request(Request) of
+    {[{<<"error">>, Error}, {<<"reason">>, Reason}]} ->
+        {Pos, [RevId | _]} = Doc#doc.revs,
+        ErrId = couch_util:to_existing_atom(Error),
+        [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}];
+    _ ->
+        []
+    end.
 
 write_docs_1({Props}) ->
     Id = proplists:get_value(<<"id">>, Props),

Added: couchdb/branches/0.11.x/test/etap/113-replication-attachment-comp.t
URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/test/etap/113-replication-attachment-comp.t?rev=954462&view=auto
==============================================================================
--- couchdb/branches/0.11.x/test/etap/113-replication-attachment-comp.t (added)
+++ couchdb/branches/0.11.x/test/etap/113-replication-attachment-comp.t Mon Jun 14 13:40:13 2010
@@ -0,0 +1,264 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-record(user_ctx, {
+    name = null,
+    roles = [],
+    handler
+}).
+
+default_config() ->
+    test_util:build_file("etc/couchdb/default_dev.ini").
+
+test_db_a_name() ->
+    <<"couch_test_rep_att_comp_a">>.
+
+test_db_b_name() ->
+    <<"couch_test_rep_att_comp_b">>.
+
+main(_) ->
+    test_util:init_code_path(),
+    etap:plan(28),
+    case (catch test()) of
+        ok ->
+            etap:end_tests();
+        Other ->
+            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+            etap:bail(Other)
+    end,
+    ok.
+
+test() ->
+    couch_server_sup:start_link([default_config()]),
+    put(addr, couch_config:get("httpd", "bind_address", "127.0.0.1")),
+    put(port, couch_config:get("httpd", "port", "5984")),
+    application:start(inets),
+    ibrowse:start(),
+    timer:sleep(1000),
+
+    %
+    % test pull replication
+    %
+
+    delete_db(test_db_a_name()),
+    delete_db(test_db_b_name()),
+    create_db(test_db_a_name()),
+    create_db(test_db_b_name()),
+
+    % enable compression
+    couch_config:set("attachments", "compression_level", "8"),
+    couch_config:set("attachments", "compressible_types", "text/*"),
+
+    % store doc with text attachment in DB A
+    put_text_att(test_db_a_name()),
+
+    % disable attachment compression
+    couch_config:set("attachments", "compression_level", "0"),
+
+    % do pull replication
+    do_pull_replication(test_db_a_name(), test_db_b_name()),
+
+    % verify that DB B has the attachment stored in compressed form
+    check_att_is_compressed(test_db_b_name()),
+    check_server_can_decompress_att(test_db_b_name()),
+    check_att_stubs(test_db_a_name(), test_db_b_name()),
+
+    %
+    % test push replication
+    %
+
+    delete_db(test_db_a_name()),
+    delete_db(test_db_b_name()),
+    create_db(test_db_a_name()),
+    create_db(test_db_b_name()),
+
+    % enable compression
+    couch_config:set("attachments", "compression_level", "8"),
+    couch_config:set("attachments", "compressible_types", "text/*"),
+
+    % store doc with text attachment in DB A
+    put_text_att(test_db_a_name()),
+
+    % disable attachment compression
+    couch_config:set("attachments", "compression_level", "0"),
+
+    % do push replication
+    do_push_replication(test_db_a_name(), test_db_b_name()),
+
+    % verify that DB B has the attachment stored in compressed form
+    check_att_is_compressed(test_db_b_name()),
+    check_server_can_decompress_att(test_db_b_name()),
+    check_att_stubs(test_db_a_name(), test_db_b_name()),
+
+    timer:sleep(3000), % to avoid mochiweb socket closed exceptions
+    delete_db(test_db_a_name()),
+    delete_db(test_db_b_name()),
+    couch_server_sup:stop(),
+    ok.
+
+put_text_att(DbName) ->
+    {ok, {{_, Code, _}, _Headers, _Body}} = http:request(
+        put,
+        {db_url(DbName) ++ "/testdoc1/readme.txt", [],
+        "text/plain", test_text_data()},
+        [],
+        [{sync, true}]),
+    etap:is(Code, 201, "Created text attachment"),
+    ok.
+
+do_pull_replication(SourceDbName, TargetDbName) ->
+    RepObj = {[
+        {<<"source">>, list_to_binary(db_url(SourceDbName))},
+        {<<"target">>, TargetDbName}
+    ]},
+    {ok, {{_, Code, _}, _Headers, Body}} = http:request(
+        post,
+        {rep_url(), [],
+        "application/json", list_to_binary(couch_util:json_encode(RepObj))},
+        [],
+        [{sync, true}]),
+    etap:is(Code, 200, "Pull replication successfully triggered"),
+    Json = couch_util:json_decode(Body),
+    RepOk = couch_util:get_nested_json_value(Json, [<<"ok">>]),
+    etap:is(RepOk, true, "Pull replication completed with success"),
+    ok.
+
+do_push_replication(SourceDbName, TargetDbName) ->
+    RepObj = {[
+        {<<"source">>, SourceDbName},
+        {<<"target">>, list_to_binary(db_url(TargetDbName))}
+    ]},
+    {ok, {{_, Code, _}, _Headers, Body}} = http:request(
+        post,
+        {rep_url(), [],
+        "application/json", list_to_binary(couch_util:json_encode(RepObj))},
+        [],
+        [{sync, true}]),
+    etap:is(Code, 200, "Push replication successfully triggered"),
+    Json = couch_util:json_decode(Body),
+    RepOk = couch_util:get_nested_json_value(Json, [<<"ok">>]),
+    etap:is(RepOk, true, "Push replication completed with success"),
+    ok.
+
+check_att_is_compressed(DbName) ->
+    {ok, {{_, Code, _}, Headers, Body}} = http:request(
+        get,
+        {db_url(DbName) ++ "/testdoc1/readme.txt",
+        [{"Accept-Encoding", "gzip"}]},
+        [],
+        [{sync, true}]),
+    etap:is(Code, 200, "HTTP response code for the attachment request is 200"),
+    Gziped = lists:member({"content-encoding", "gzip"}, Headers),
+    etap:is(Gziped, true, "The attachment was received in compressed form"),
+    Uncompressed = binary_to_list(zlib:gunzip(list_to_binary(Body))),
+    etap:is(
+        Uncompressed,
+        test_text_data(),
+        "The attachment content is valid after decompression at the client side"
+    ),
+    ok.
+
+check_server_can_decompress_att(DbName) ->
+    {ok, {{_, Code, _}, Headers, Body}} = http:request(
+        get,
+        {db_url(DbName) ++ "/testdoc1/readme.txt", []},
+        [],
+        [{sync, true}]),
+    etap:is(Code, 200, "HTTP response code for the attachment request is 200"),
+    Gziped = lists:member({"content-encoding", "gzip"}, Headers),
+    etap:is(
+        Gziped, false, "The attachment was not received in compressed form"
+    ),
+    etap:is(
+        Body,
+        test_text_data(),
+        "The attachment content is valid after server decompression"
+    ),
+    ok.
+
+check_att_stubs(SourceDbName, TargetDbName) ->
+    {ok, {{_, Code1, _}, _Headers1, Body1}} = http:request(
+        get,
+        {db_url(SourceDbName) ++ "/testdoc1?att_gzip_length=true", []},
+        [],
+        [{sync, true}]),
+    etap:is(
+        Code1,
+        200,
+        "HTTP response code is 200 for the source DB doc request"
+    ),
+    Json1 = couch_util:json_decode(Body1),
+    SourceAttStub = couch_util:get_nested_json_value(
+        Json1,
+        [<<"_attachments">>, <<"readme.txt">>]
+    ),
+    {ok, {{_, Code2, _}, _Headers2, Body2}} = http:request(
+        get,
+        {db_url(TargetDbName) ++ "/testdoc1?att_gzip_length=true", []},
+        [],
+        [{sync, true}]),
+    etap:is(
+        Code2,
+        200,
+        "HTTP response code is 200 for the target DB doc request"
+    ),
+    Json2 = couch_util:json_decode(Body2),
+    TargetAttStub = couch_util:get_nested_json_value(
+        Json2,
+        [<<"_attachments">>, <<"readme.txt">>]
+    ),
+    IdenticalStubs = (SourceAttStub =:= TargetAttStub),
+    etap:is(IdenticalStubs, true, "Attachment stubs are identical"),
+    TargetAttStubLength = couch_util:get_nested_json_value(
+        TargetAttStub,
+        [<<"length">>]
+    ),
+    TargetAttStubGzipLength = couch_util:get_nested_json_value(
+        TargetAttStub,
+        [<<"gzip_length">>]
+    ),
+    GzipLengthDefined = is_integer(TargetAttStubGzipLength),
+    etap:is(
+        GzipLengthDefined,
+        true,
+        "Stubs have the gzip_length field properly defined"
+    ),
+    GzipLengthSmaller = (TargetAttStubGzipLength < TargetAttStubLength),
+    etap:is(
+        GzipLengthSmaller,
+        true,
+        "Stubs have the gzip_length field smaller than their length field"
+    ),
+    ok.
+
+admin_user_ctx() ->
+    {user_ctx, #user_ctx{roles=[<<"_admin">>]}}.
+
+create_db(DbName) ->
+    {ok, _} = couch_db:create(DbName, [admin_user_ctx()]).
+
+delete_db(DbName) ->
+    couch_server:delete(DbName, [admin_user_ctx()]).
+
+db_url(DbName) ->
+    "http://" ++ get(addr) ++ ":" ++ get(port) ++ "/" ++
+    binary_to_list(DbName).
+
+rep_url() ->
+    "http://" ++ get(addr) ++ ":" ++ get(port) ++ "/_replicate".
+
+test_text_data() ->
+    {ok, Data} = file:read_file(test_util:source_file("README")),
+    binary_to_list(Data).

Modified: couchdb/branches/0.11.x/test/etap/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/branches/0.11.x/test/etap/Makefile.am?rev=954462&r1=954461&r2=954462&view=diff
==============================================================================
--- couchdb/branches/0.11.x/test/etap/Makefile.am (original)
+++ couchdb/branches/0.11.x/test/etap/Makefile.am Mon Jun 14 13:40:13 2010
@@ -58,6 +58,7 @@ EXTRA_DIST = \
     110-replication-httpc.t \
     111-replication-changes-feed.t \
     112-replication-missing-revs.t \
+    113-replication-attachment-comp.t \
     120-stats-collect.t \
     121-stats-aggregates.cfg \
     121-stats-aggregates.ini \