Posted to by GitBox <> on 2018/11/14 19:04:28 UTC

[GitHub] nickva closed pull request #1195: Add support for bulk get with Accept:"multipart/mixed" or "multipart/related"

nickva closed pull request #1195: Add support for bulk get with Accept:"multipart/mixed" or "multipart/related"

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index d46b5bbf28..9c78cf3fba 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -468,7 +468,8 @@ db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) ->
     send_method_not_allowed(Req, "POST");
-db_req(#httpd{method='POST', path_parts=[_, <<"_bulk_get">>]}=Req, Db) ->
+db_req(#httpd{method='POST', path_parts=[_, <<"_bulk_get">>],
+                             mochi_req=MochiReq}=Req, Db) ->
     couch_stats:increment_counter([couchdb, httpd, bulk_requests]),
     couch_httpd:validate_ctype(Req, "application/json"),
     {JsonProps} = chttpd:json_body_obj(Req),
@@ -481,18 +482,62 @@ db_req(#httpd{method='POST', path_parts=[_, <<"_bulk_get">>]}=Req, Db) ->
             } = bulk_get_parse_doc_query(Req),
             Options = [{user_ctx, Req#httpd.user_ctx} | Options0],
-            {ok, Resp} = start_json_response(Req, 200),
-            send_chunk(Resp, <<"{\"results\": [">>),
-            lists:foldl(fun(Doc, Sep) ->
-                {DocId, Results, Options1} = bulk_get_open_doc_revs(Db, Doc,
-                                                                    Options),
-                bulk_get_send_docs_json(Resp, DocId, Results, Options1, Sep),
-                <<",">>
-            end, <<"">>, Docs),
-            send_chunk(Resp, <<"]}">>),
-            end_json_response(Resp)
+            AcceptJson =  MochiReq:accepts_content_type("application/json"),
+            AcceptMixedMp = MochiReq:accepts_content_type("multipart/mixed"),
+            AcceptRelatedMp = MochiReq:accepts_content_type("multipart/related"),
+            AcceptMp = not AcceptJson andalso (AcceptMixedMp orelse AcceptRelatedMp),
+            case AcceptMp of
+                false ->
+                    {ok, Resp} = start_json_response(Req, 200),
+                    send_chunk(Resp, <<"{\"results\": [">>),
+                    lists:foldl(fun(Doc, Sep) ->
+                        {DocId, Results, Options1} = bulk_get_open_doc_revs(Db, Doc,
+                            Options),
+                        bulk_get_send_docs_json(Resp, DocId, Results, Options1, Sep),
+                        <<",">>
+                    end, <<"">>, Docs),
+                    send_chunk(Resp, <<"]}">>),
+                    end_json_response(Resp);
+                true ->
+                    OuterBoundary = bulk_get_multipart_boundary(),
+                    MpType = case AcceptMixedMp of
+                        true ->
+                            "multipart/mixed";
+                        _ ->
+                            "multipart/related"
+                    end,
+                    CType = {"Content-Type", MpType ++ "; boundary=\"" ++
+                                          ?b2l(OuterBoundary) ++  "\""},
+                    {ok, Resp} = start_chunked_response(Req, 200, [CType]),
+                    lists:foldl(fun(Doc, _Pre) ->
+                        case bulk_get_open_doc_revs(Db, Doc, Options) of
+                            {_, {ok, []}, _Options1} ->
+                                ok;
+                            {_, {ok, Results}, Options1} ->
+                                send_docs_multipart_bulk_get(Results, Options1,
+                                    OuterBoundary, Resp);
+                            {DocId, {error, {RevId, Error, Reason}}, _Options1} ->
+                                Json = ?JSON_ENCODE({[
+                                    {<<"id">>, DocId},
+                                    {<<"rev">>, RevId},
+                                    {<<"error">>, Error},
+                                    {<<"reason">>, Reason}
+                                ]}),
+                                couch_httpd:send_chunk(Resp,[
+                                    <<"\r\n--", OuterBoundary/binary>>,
+                                    <<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>,
+                                    Json
+                                ])
+                        end
+                    end, <<"">>, Docs),
+                    case Docs of
+                        [] ->
+                            ok;
+                        _ ->
+                            couch_httpd:send_chunk(Resp, <<"\r\n", "--", OuterBoundary/binary, "--\r\n">>)
+                    end,
+                    couch_httpd:last_chunk(Resp)
+            end
 db_req(#httpd{path_parts=[_, <<"_bulk_get">>]}=Req, _Db) ->
     send_method_not_allowed(Req, "POST");
@@ -962,6 +1007,39 @@ send_doc_efficiently(#httpd{mochi_req=MochiReq}=Req, #doc{atts=Atts}=Doc, Header
         send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options))
+send_docs_multipart_bulk_get(Results, Options0, OuterBoundary, Resp) ->
+    InnerBoundary = bulk_get_multipart_boundary(),
+    Options = [attachments, follows, att_encoding_info | Options0],
+    lists:foreach(
+        fun({ok, #doc{id=Id, revs=Revs, atts=Atts}=Doc}) ->
+            Refs = monitor_attachments(Doc#doc.atts),
+            try
+            JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)),
+            couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>),
+            case Atts of
+              [] ->
+                couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: application/json\r\n\r\n">>);
+              _ ->
+                lists:foreach(fun(Header) -> couch_httpd:send_chunk(Resp, Header) end,
+                              bulk_get_multipart_headers(Revs, Id, InnerBoundary))
+            end,
+            couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts,
+                    fun(Data) -> couch_httpd:send_chunk(Resp, Data)
+                    end, true)
+            after
+                demonitor_refs(Refs)
+            end;
+          ({{not_found, missing}, RevId}) ->
+              RevStr = couch_doc:rev_to_str(RevId),
+              Json = ?JSON_ENCODE({[{<<"rev">>, RevStr},
+                                    {<<"error">>, <<"not_found">>},
+                                    {<<"reason">>, <<"missing">>}]}),
+              couch_httpd:send_chunk(Resp,
+                  [<<"\r\n--", OuterBoundary/binary>>,
+                  <<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>,
+                  Json])
+          end, Results).
 send_docs_multipart(Req, Results, Options1) ->
     OuterBoundary = couch_uuids:random(),
     InnerBoundary = couch_uuids:random(),
@@ -997,6 +1075,23 @@ send_docs_multipart(Req, Results, Options1) ->
     couch_httpd:send_chunk(Resp, <<"--">>),
+bulk_get_multipart_headers({0, []}, Id, Boundary) ->
+    [
+        <<"\r\nX-Doc-Id: ", Id/binary>>,
+        <<"\r\nContent-Type: multipart/related; boundary=", Boundary/binary, "\r\n\r\n">>
+    ];
+bulk_get_multipart_headers({Start, [FirstRevId|_]}, Id, Boundary) ->
+    RevStr = couch_doc:rev_to_str({Start, FirstRevId}),
+    [
+        <<"\r\nX-Doc-Id: ", Id/binary>>,
+        <<"\r\nX-Rev-Id: ", RevStr/binary>>,
+        <<"\r\nContent-Type: multipart/related; boundary=", Boundary/binary, "\r\n\r\n">>
+    ].
+bulk_get_multipart_boundary() ->
+    Unique = couch_uuids:random(),
+    <<"--", Unique/binary>>.
 receive_request_data(Req) ->
     receive_request_data(Req, chttpd:body_length(Req)).
diff --git a/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl b/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl
new file mode 100644
index 0000000000..601f720a03
--- /dev/null
+++ b/src/chttpd/test/chttpd_db_bulk_get_multipart_test.erl
@@ -0,0 +1,304 @@
+%% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+%% use this file except in compliance with the License. You may obtain a copy of
+%% the License at
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+%% License for the specific language governing permissions and limitations under
+%% the License.
+-define(TIMEOUT, 3000).
+setup() ->
+    mock(config),
+    mock(chttpd),
+    mock(couch_epi),
+    mock(couch_httpd),
+    mock(couch_stats),
+    mock(fabric),
+    mock(mochireq),
+    Pid = spawn_accumulator(),
+    Pid.
+teardown(Pid) ->
+    ok = stop_accumulator(Pid),
+    meck:unload().
+bulk_get_test_() ->
+    {
+        "/db/_bulk_get tests",
+        {
+            foreach, fun setup/0, fun teardown/1,
+            [
+                fun should_require_docs_field/1,
+                fun should_not_accept_specific_query_params/1,
+                fun should_return_empty_results_on_no_docs/1,
+                fun should_get_doc_with_all_revs/1,
+                fun should_validate_doc_with_bad_id/1,
+                fun should_validate_doc_with_bad_rev/1,
+                fun should_validate_missing_doc/1,
+                fun should_validate_bad_atts_since/1,
+                fun should_include_attachments_when_atts_since_specified/1
+            ]
+        }
+    }.
+should_require_docs_field(_) ->
+    Req = fake_request({[{}]}),
+    ?_assertThrow({bad_request, _}, chttpd_db:db_req(Req, nil)).
+should_not_accept_specific_query_params(_) ->
+    Req = fake_request({[{<<"docs">>, []}]}),
+    lists:map(fun (Param) ->
+        {Param, ?_assertThrow({bad_request, _},
+                              begin
+                                  ok = meck:expect(chttpd, qs,
+                                                   fun(_) -> [{Param, ""}] end),
+                                  chttpd_db:db_req(Req, nil)
+                              end)}
+    end, ["rev", "open_revs", "atts_since", "w", "new_edits"]).
+should_return_empty_results_on_no_docs(Pid) ->
+    Req = fake_request({[{<<"docs">>, []}]}),
+    chttpd_db:db_req(Req, nil),
+    Results = get_results_from_response(Pid),
+    ?_assertEqual([], Results).
+should_get_doc_with_all_revs(Pid) ->
+    DocId = <<"docudoc">>,
+    Req = fake_request(DocId),
+    DocRevA = #doc{id = DocId, body = {[{<<"_rev">>, <<"1-ABC">>}]}},
+    DocRevB = #doc{id = DocId, body = {[{<<"_rev">>, <<"1-CDE">>}]}},
+    mock_open_revs(all, {ok, [{ok, DocRevA}, {ok, DocRevB}]}),
+    chttpd_db:db_req(Req, nil),
+    Result = get_results_from_response(Pid),
+    ?_assertEqual(DocId, couch_util:get_value(<<"_id">>, Result)).
+should_validate_doc_with_bad_id(Pid) ->
+    DocId = <<"_docudoc">>,
+    Req = fake_request(DocId),
+    chttpd_db:db_req(Req, nil),
+    Result = get_results_from_response(Pid),
+    ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
+    ?_assertMatch([{<<"id">>, DocId},
+                    {<<"rev">>, null},
+                    {<<"error">>, <<"illegal_docid">>},
+                    {<<"reason">>, _}], Result).
+should_validate_doc_with_bad_rev(Pid) ->
+    DocId = <<"docudoc">>,
+    Rev = <<"revorev">>,
+    Req = fake_request(DocId, Rev),
+    chttpd_db:db_req(Req, nil),
+    Result = get_results_from_response(Pid),
+    ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
+    ?_assertMatch([{<<"id">>, DocId},
+                    {<<"rev">>, Rev},
+                    {<<"error">>, <<"bad_request">>},
+                    {<<"reason">>, _}], Result).
+should_validate_missing_doc(Pid) ->
+    DocId = <<"docudoc">>,
+    Rev = <<"1-revorev">>,
+    Req = fake_request(DocId, Rev),
+    mock_open_revs([{1,<<"revorev">>}], {ok, []}),
+    chttpd_db:db_req(Req, nil),
+    Result = get_results_from_response(Pid),
+    ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
+    ?_assertMatch([{<<"id">>, DocId},
+                    {<<"rev">>, Rev},
+                    {<<"error">>, <<"not_found">>},
+                    {<<"reason">>, _}], Result).
+should_validate_bad_atts_since(Pid) ->
+    DocId = <<"docudoc">>,
+    Rev = <<"1-revorev">>,
+    Req = fake_request(DocId, Rev, <<"badattsince">>),
+    mock_open_revs([{1,<<"revorev">>}], {ok, []}),
+    chttpd_db:db_req(Req, nil),
+    Result = get_results_from_response(Pid),
+    ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
+    ?_assertMatch([{<<"id">>, DocId},
+                    {<<"rev">>, <<"badattsince">>},
+                    {<<"error">>, <<"bad_request">>},
+                    {<<"reason">>, _}], Result).
+should_include_attachments_when_atts_since_specified(_) ->
+    DocId = <<"docudoc">>,
+    Rev = <<"1-revorev">>,
+    Req = fake_request(DocId, Rev, [<<"1-abc">>]),
+    mock_open_revs([{1,<<"revorev">>}], {ok, []}),
+    chttpd_db:db_req(Req, nil),
+    ?_assert(meck:called(fabric, open_revs,
+                         [nil, DocId, [{1, <<"revorev">>}],
+                         [{atts_since, [{1, <<"abc">>}]}, attachments,
+                          {user_ctx, undefined}]])).
+%% helpers
+fake_request(Payload) when is_tuple(Payload) ->
+    #httpd{method='POST', path_parts=[<<"db">>, <<"_bulk_get">>],
+           mochi_req=mochireq, req_body=Payload};
+fake_request(DocId) when is_binary(DocId) ->
+    fake_request({[{<<"docs">>, [{[{<<"id">>, DocId}]}]}]}).
+fake_request(DocId, Rev) ->
+    fake_request({[{<<"docs">>, [{[{<<"id">>, DocId}, {<<"rev">>, Rev}]}]}]}).
+fake_request(DocId, Rev, AttsSince) ->
+    fake_request({[{<<"docs">>, [{[{<<"id">>, DocId},
+                                   {<<"rev">>, Rev},
+                                   {<<"atts_since">>, AttsSince}]}]}]}).
+mock_open_revs(RevsReq0, RevsResp) ->
+    ok = meck:expect(fabric, open_revs,
+                     fun(_, _, RevsReq1, _) ->
+                         ?assertEqual(RevsReq0, RevsReq1),
+                         RevsResp
+                     end).
+mock(mochireq) ->
+    ok = meck:new(mochireq, [non_strict]),
+    ok = meck:expect(mochireq, parse_qs, fun() -> [] end),
+    ok = meck:expect(mochireq, accepts_content_type, fun("multipart/mixed") -> true;
+                                                        ("multipart/related") -> true;
+                                                        (_) -> false end),
+    ok;
+mock(couch_httpd) ->
+    ok = meck:new(couch_httpd, [passthrough]),
+    ok = meck:expect(couch_httpd, validate_ctype, fun(_, _) -> ok end),
+    ok = meck:expect(couch_httpd, last_chunk, fun(_) -> {ok, nil} end),
+    ok = meck:expect(couch_httpd, send_chunk, fun send_chunk/2),
+    ok;
+mock(chttpd) ->
+    ok = meck:new(chttpd, [passthrough]),
+    ok = meck:expect(chttpd, start_json_response, fun(_, _) -> {ok, nil} end),
+    ok = meck:expect(chttpd, start_chunked_response, fun(_, _, _) -> {ok, nil} end),
+    ok = meck:expect(chttpd, end_json_response, fun(_) -> ok end),
+    ok = meck:expect(chttpd, send_chunk, fun send_chunk/2),
+    ok = meck:expect(chttpd, json_body_obj, fun (#httpd{req_body=Body}) -> Body end),
+    ok;
+mock(couch_epi) ->
+    ok = meck:new(couch_epi, [passthrough]),
+    ok = meck:expect(couch_epi, any, fun(_, _, _, _, _) -> false end),
+    ok;
+mock(couch_stats) ->
+    ok = meck:new(couch_stats, [passthrough]),
+    ok = meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
+    ok = meck:expect(couch_stats, increment_counter, fun(_, _) -> ok end),
+    ok = meck:expect(couch_stats, decrement_counter, fun(_) -> ok end),
+    ok = meck:expect(couch_stats, decrement_counter, fun(_, _) -> ok end),
+    ok = meck:expect(couch_stats, update_histogram, fun(_, _) -> ok end),
+    ok = meck:expect(couch_stats, update_gauge, fun(_, _) -> ok end),
+    ok;
+mock(fabric) ->
+    ok = meck:new(fabric, [passthrough]),
+    ok;
+mock(config) ->
+    ok = meck:new(config, [passthrough]),
+    ok = meck:expect(config, get, fun(_, _, Default) -> Default end),
+    ok.
+spawn_accumulator() ->
+    Parent = self(),
+    Pid = spawn(fun() -> accumulator_loop(Parent, []) end),
+    erlang:put(chunks_gather, Pid),
+    Pid.
+accumulator_loop(Parent, Acc) ->
+    receive
+        {stop, Ref} ->
+            Parent ! {ok, Ref};
+        {get, Ref} ->
+            Parent ! {ok, Ref, Acc},
+            accumulator_loop(Parent, Acc);
+        {put, Ref, Chunk} ->
+            Parent ! {ok, Ref},
+            accumulator_loop(Parent, [Chunk|Acc])
+    end.
+stop_accumulator(Pid) ->
+    Ref = make_ref(),
+    Pid ! {stop, Ref},
+    receive
+        {ok, Ref} ->
+            ok
+    after ?TIMEOUT ->
+        throw({timeout, <<"process stop timeout">>})
+    end.
+send_chunk(_, []) ->
+    {ok, nil};
+send_chunk(_Req, [H|T]=Chunk) when is_list(Chunk) ->
+    send_chunk(_Req, H),
+    send_chunk(_Req, T);
+send_chunk(_, Chunk) ->
+    Worker = erlang:get(chunks_gather),
+    Ref = make_ref(),
+    Worker ! {put, Ref, Chunk},
+    receive
+        {ok, Ref} -> {ok, nil}
+    after ?TIMEOUT ->
+        throw({timeout, <<"send chunk timeout">>})
+    end.
+get_response(Pid) ->
+    Ref = make_ref(),
+    Pid ! {get, Ref},
+    receive
+        {ok, Ref, Acc} ->
+            Acc
+    after ?TIMEOUT ->
+        throw({timeout, <<"get response timeout">>})
+    end.
+get_results_from_response(Pid) ->
+    case get_response(Pid) of
+        [] ->
+          [];
+        Result ->
+          {Result1} = ?JSON_DECODE(lists:nth(2, Result)),
+          Result1
+    end.


