You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/04/18 18:39:57 UTC

[couchdb] 06/06: Add `_scheduler/{jobs,docs}` API endpoints

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 7e8708eb24c01a3718331ea4a0ca12c572ab8bd5
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Apr 18 03:54:09 2017 -0400

    Add `_scheduler/{jobs,docs}` API endpoints
    
    The `_scheduler/jobs` endpoint provides a view of all replications managed by
    the scheduler. This endpoint includes more information on the replication than
    the `_scheduler/docs` endpoint, including the history of state transitions of
    the replication. This part was implemented by Benjamin Bastian.
    
    The `_scheduler/docs` endpoint provides a view of all replicator docs which
    have been seen by the scheduler. This endpoint includes useful information such
    as the state of the replication and the coordinator node. The implemention of
    `_scheduler/docs` mimics closely `_all_docs` behavior: similar pagination,
    HTTP request processing and fabric / rexi setup. The algorithm is roughly
    as follows:
    
     * http endpoint:
       - parses query args like it does for any view query
       - parses states to filter by, states are kept in the `extra` query arg
    
     * Call is made to couch_replicator_fabric. This is equivalent to
       fabric:all_docs. Here the typical fabric / rexi setup is happening.
    
     * Fabric worker is in `couch_replicator_fabric_rpc:docs/3`. This worker is
       similar to fabric_rpc's all_docs handler. However it is a bit more intricate
       to handle both replication document in terminal state as well as those which
       are active.
    
       - Before emitting it queries the state of the document to see if it is in a
         terminal state. If it is, it filters it and decides if it should be
         emitted or not.
    
       - If the document state cannot be found from the document. It tries to
         fetch active state from local node's doc processor via key based lookup.
         If it finds, it can also filter it based on state and emit it or skip.
    
       - If the document cannot be found in the node's local doc processor ETS
         table, the row is emitted with a doc value of `undecided`. This will
         let the coordinator fetch the state by possibly querying other nodes's
         doc processors.
    
      * Coordinator then starts handling messages. This also mostly mimics all_docs.
        At this point the most interesting thing is handling `undecided` docs. If
        one is found, then `replicator:active_doc/2` is queried. There, all nodes
        where document shards live are queries. This is better than a previous
        implementation where all nodes were queries all the time.
    
      * The final work happens in `couch_replicator_httpd` where the emitting
        callback is. There we only the doc is emitted (not keys, rows, values).
        Another thing that happens is the `Total` value is decremented to
        account for the always-present _design  doc.
    
    Because of this a bunch of stuff was removed. Including an extra view which
    was build and managed by the previous implementation.
    
    As a bonus, other view-related parameters such as skip and limit seems to
    work out of the box and don't have to be implemented ad-hoc.
    
    Also, most importantly  many thanks to Paul Davis for suggesting this approach.
    
    Jira: COUCHDB-3324
---
 src/chttpd/src/chttpd_httpd_handlers.erl           |   1 +
 .../src/couch_replicator_fabric.erl                | 154 ++++++++++++++++
 .../src/couch_replicator_fabric_rpc.erl            |  97 ++++++++++
 .../src/couch_replicator_httpd.erl                 |  77 ++++++--
 .../src/couch_replicator_httpd_util.erl            | 201 +++++++++++++++++++++
 5 files changed, 516 insertions(+), 14 deletions(-)

diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl
index b91aae9..9c30441 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -19,6 +19,7 @@ url_handler(<<"favicon.ico">>)     -> fun chttpd_misc:handle_favicon_req/1;
 url_handler(<<"_utils">>)          -> fun chttpd_misc:handle_utils_dir_req/1;
 url_handler(<<"_all_dbs">>)        -> fun chttpd_misc:handle_all_dbs_req/1;
 url_handler(<<"_active_tasks">>)   -> fun chttpd_misc:handle_task_status_req/1;
+url_handler(<<"_scheduler">>)      -> fun couch_replicator_httpd:handle_scheduler_req/1;
 url_handler(<<"_node">>)           -> fun chttpd_misc:handle_node_req/1;
 url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1;
 url_handler(<<"_replicate">>)      -> fun chttpd_misc:handle_replicate_req/1;
diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl
new file mode 100644
index 0000000..8634e76
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_fabric.erl
@@ -0,0 +1,154 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_fabric).
+
+-export([
+   docs/5
+]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+docs(DbName, Options, QueryArgs, Callback, Acc) ->
+    Shards = mem3:shards(DbName),
+    Workers0 = fabric_util:submit_jobs(
+           Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    docs_int(DbName, Workers, QueryArgs, Callback, Acc)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            {timeout, NewState} ->
+                DefunctWorkers = fabric_util:remove_done_workers(
+                    NewState#stream_acc.workers, waiting
+                ),
+                fabric_util:log_timeout(
+                    DefunctWorkers,
+                    "replicator docs"
+                ),
+                Callback({error, timeout}, Acc);
+            {error, Error} ->
+                Callback({error, Error}, Acc)
+        end
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+
+docs_int(DbName, Workers, QueryArgs, Callback, Acc0) ->
+    #mrargs{limit = Limit, skip = Skip} = QueryArgs,
+    State = #collector{
+        db_name = DbName,
+        query_args = QueryArgs,
+        callback = Callback,
+        counters = fabric_dict:init(Workers, 0),
+        skip = Skip,
+        limit = Limit,
+        user_acc = Acc0,
+        update_seq = nil
+    },
+    case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+        State, infinity, 5000) of
+    {ok, NewState} ->
+        {ok, NewState#collector.user_acc};
+    {timeout, NewState} ->
+        Callback({error, timeout}, NewState#collector.user_acc);
+    {error, Resp} ->
+        {ok, Resp}
+    end.
+
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+    fabric_view:check_down_shards(State, NodeRef);
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+    fabric_view:handle_worker_exit(State, Worker, Reason);
+
+handle_message({meta, Meta0}, {Worker, From}, State) ->
+    Tot = couch_util:get_value(total, Meta0, 0),
+    Off = couch_util:get_value(offset, Meta0, 0),
+    #collector{
+        callback = Callback,
+        counters = Counters0,
+        total_rows = Total0,
+        offset = Offset0,
+        user_acc = AccIn
+    } = State,
+    % Assert that we don't have other messages from this
+    % worker when the total_and_offset message arrives.
+    0 = fabric_dict:lookup_element(Worker, Counters0),
+    rexi:stream_ack(From),
+    Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    Total = Total0 + Tot,
+    Offset = Offset0 + Off,
+    case fabric_dict:any(0, Counters1) of
+    true ->
+        {ok, State#collector{
+            counters = Counters1,
+            total_rows = Total,
+            offset = Offset
+        }};
+    false ->
+        FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+        Meta = [{total, Total}, {offset, FinalOffset}],
+        {Go, Acc} = Callback({meta, Meta}, AccIn),
+        {Go, State#collector{
+            counters = fabric_dict:decrement_all(Counters1),
+            total_rows = Total,
+            offset = FinalOffset,
+            user_acc = Acc
+        }}
+    end;
+
+handle_message(#view_row{id = Id, doc = Doc} = Row0, {Worker, From}, State) ->
+    #collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
+    case maybe_fetch_and_filter_doc(Id, Doc, State) of
+        {[_ | _]} = NewDoc ->
+            Row = Row0#view_row{doc = NewDoc},
+            Dir = Args#mrargs.direction,
+            Rows = merge_row(Dir, Row#view_row{worker={Worker, From}}, Rows0),
+            Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+            State1 = State#collector{rows=Rows, counters=Counters1},
+            fabric_view:maybe_send_row(State1);
+        skip ->
+            {ok, State}
+    end;
+
+handle_message(complete, Worker, State) ->
+    Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
+    fabric_view:maybe_send_row(State#collector{counters = Counters}).
+
+
+merge_row(fwd, Row, Rows) ->
+    lists:keymerge(#view_row.id, [Row], Rows);
+merge_row(rev, Row, Rows) ->
+    lists:rkeymerge(#view_row.id, [Row], Rows).
+
+
+maybe_fetch_and_filter_doc(Id, undecided, State) ->
+    #collector{db_name = DbName, query_args = #mrargs{extra = Extra}} = State,
+    FilterStates = proplists:get_value(filter_states, Extra),
+    case couch_replicator:active_doc(DbName, Id) of
+        {ok, {Props} = DocInfo} ->
+            DocState = couch_util:get_value(state, Props),
+            couch_replicator_utils:filter_state(DocState, FilterStates, DocInfo);
+        {error, not_found} ->
+            skip  % could have been deleted
+    end;
+maybe_fetch_and_filter_doc(_Id, Doc, _State) ->
+    Doc.
diff --git a/src/couch_replicator/src/couch_replicator_fabric_rpc.erl b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl
new file mode 100644
index 0000000..d67f875
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_fabric_rpc.erl
@@ -0,0 +1,97 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_fabric_rpc).
+
+-export([
+   docs/3
+]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+docs(DbName, Options, Args0) ->
+    set_io_priority(DbName, Options),
+    #mrargs{skip = Skip, limit = Limit, extra = Extra} = Args0,
+    FilterStates = proplists:get_value(filter_states, Extra),
+    Args = Args0#mrargs{skip = 0, limit = Skip + Limit},
+    HealthThreshold = couch_replicator_scheduler:health_threshold(),
+    {ok, Db} = couch_db:open_int(DbName, Options),
+    Acc = {DbName, FilterStates, HealthThreshold},
+    couch_mrview:query_all_docs(Db, Args, fun docs_cb/2, Acc).
+
+
+docs_cb({meta, Meta}, Acc) ->
+    ok = rexi:stream2({meta, Meta}),
+    {ok, Acc};
+docs_cb({row, Row}, {DbName, States, HealthThreshold} = Acc) ->
+    Id = couch_util:get_value(id, Row),
+    Doc = couch_util:get_value(doc, Row),
+    ViewRow = #view_row{
+        id = Id,
+        key = couch_util:get_value(key, Row),
+        value = couch_util:get_value(value, Row)
+    },
+    case rep_doc_state(DbName, Id, Doc, States, HealthThreshold) of
+        skip ->
+            ok;
+        Other ->
+            ok = rexi:stream2(ViewRow#view_row{doc = Other})
+    end,
+    {ok, Acc};
+docs_cb(complete, Acc) ->
+    ok = rexi:stream_last(complete),
+    {ok, Acc}.
+
+
+set_io_priority(DbName, Options) ->
+    case lists:keyfind(io_priority, 1, Options) of
+    {io_priority, Pri} ->
+        erlang:put(io_priority, Pri);
+    false ->
+        erlang:put(io_priority, {interactive, DbName})
+    end.
+
+
+%% Get the state of the replication document. If it is found and has a terminal
+%% state then it can be filtered and either included in the results or skipped.
+%% If it is not in a terminal state, look it up in the local doc processor ETS
+%% table. If it is there then filter by state. If it is not found there either
+%% then mark it as `undecided` and let the coordinator try to fetch it. The
+%% The idea is to do as much work as possible locally and leave the minimum
+%% amount of work for the coordinator.
+rep_doc_state(_Shard, <<"_design/", _/binary>>, _, _, _) ->
+    skip;
+rep_doc_state(Shard, Id, {[_ | _]} = Doc, States, HealthThreshold) ->
+    DbName = mem3:dbname(Shard),
+    DocInfo = couch_replicator:info_from_doc(DbName, Doc),
+    case get_doc_state(DocInfo) of
+        null ->
+            % Fetch from local doc processor. If there, filter by state.
+            % If not there, mark as undecided. Let coordinator figure it out.
+            case couch_replicator_doc_processor:doc_lookup(Shard, Id,
+                    HealthThreshold) of
+                {ok, EtsInfo} ->
+                    State = get_doc_state(EtsInfo),
+                    couch_replicator_utils:filter_state(State, States, EtsInfo);
+                {error, not_found} ->
+                    undecided
+            end;
+        OtherState when is_atom(OtherState) ->
+            couch_replicator_utils:filter_state(OtherState, States, DocInfo)
+    end.
+
+
+get_doc_state({Props})->
+    couch_util:get_value(state, Props).
diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd.erl
index 6b43472..364d098 100644
--- a/src/couch_replicator/src/couch_replicator_httpd.erl
+++ b/src/couch_replicator/src/couch_replicator_httpd.erl
@@ -13,6 +13,12 @@
 -module(couch_replicator_httpd).
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-export([
+    handle_req/1,
+    handle_scheduler_req/1
+]).
 
 -import(couch_httpd, [
     send_json/2,
@@ -24,13 +30,68 @@
     to_binary/1
 ]).
 
--export([handle_req/1]).
+
+-define(DEFAULT_TASK_LIMIT, 100).
+-define(REPDB, <<"_replicator">>).
+
+
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) ->
+    Limit = couch_replicator_httpd_util:parse_int_param(Req, "limit",
+        ?DEFAULT_TASK_LIMIT, 0, infinity),
+    Skip = couch_replicator_httpd_util:parse_int_param(Req, "skip", 0, 0,
+        infinity),
+    {Replies, _BadNodes} = rpc:multicall(couch_replicator_scheduler, jobs, []),
+    Flatlist = lists:concat(Replies),
+    % couch_replicator_scheduler:job_ejson/1 guarantees {id, Id} to be the
+    % the first item in the list
+    Sorted = lists:sort(fun({[{id,A}|_]},{[{id,B}|_]}) -> A =< B end, Flatlist),
+    Total = length(Sorted),
+    Offset = min(Skip, Total),
+    Sublist = lists:sublist(Sorted, Offset+1, Limit),
+    Sublist1 = [couch_replicator_httpd_util:update_db_name(Task)
+        || Task <- Sublist],
+    send_json(Req, {[{total_rows, Total}, {offset, Offset}, {jobs, Sublist1}]});
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req) ->
+    case couch_replicator:job(JobId) of
+        {ok, JobInfo} ->
+            send_json(Req, couch_replicator_httpd_util:update_db_name(JobInfo));
+        {error, not_found} ->
+            throw(not_found)
+    end;
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>]}=Req) ->
+    VArgs0 = couch_mrview_http:parse_params(Req, undefined),
+    StatesQs = chttpd:qs_value(Req, "states"),
+    States = couch_replicator_httpd_util:parse_replication_state_filter(StatesQs),
+    VArgs1 = VArgs0#mrargs{
+        view_type = map,
+        include_docs = true,
+        reduce = false,
+        extra = [{filter_states, States}]
+    },
+    VArgs2 = couch_mrview_util:validate_args(VArgs1),
+    Opts = [{user_ctx, Req#httpd.user_ctx}],
+    Db = ?REPDB,
+    Max = chttpd:chunked_response_buffer_size(),
+    Acc = couch_replicator_httpd_util:docs_acc_new(Req, Db, Max),
+    Cb = fun couch_replicator_httpd_util:docs_cb/2,
+    {ok, RAcc} = couch_replicator_fabric:docs(Db, Opts, VArgs2, Cb, Acc),
+    {ok,  couch_replicator_httpd_util:docs_acc_response(RAcc)};
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,DocId]}=Req) ->
+    UserCtx = Req#httpd.user_ctx,
+    case couch_replicator:doc(?REPDB, DocId, UserCtx#user_ctx.roles) of
+        {ok, DocInfo} ->
+            send_json(Req, couch_replicator_httpd_util:update_db_name(DocInfo));
+        {error, not_found} ->
+            throw(not_found)
+    end;
+handle_scheduler_req(Req) ->
+    send_method_not_allowed(Req, "GET,HEAD").
 
 
 handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
     couch_httpd:validate_ctype(Req, "application/json"),
     RepDoc = {Props} = couch_httpd:json_body_obj(Req),
-    validate_rep_props(Props),
+    couch_replicator_httpd_utils:validate_rep_props(Props),
     case couch_replicator:replicate(RepDoc, UserCtx) of
     {error, {Error, Reason}} ->
         send_json(
@@ -51,15 +112,3 @@ handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
 
 handle_req(Req) ->
     send_method_not_allowed(Req, "POST").
-
-validate_rep_props([]) ->
-    ok;
-validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
-    lists:foreach(fun
-        ({_,V}) when is_binary(V) -> ok;
-        ({K,_}) -> throw({bad_request,
-            <<K/binary," value must be a string.">>})
-        end, Params),
-    validate_rep_props(Rest);
-validate_rep_props([_|Rest]) ->
-    validate_rep_props(Rest).
diff --git a/src/couch_replicator/src/couch_replicator_httpd_util.erl b/src/couch_replicator/src/couch_replicator_httpd_util.erl
new file mode 100644
index 0000000..624eddd
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_httpd_util.erl
@@ -0,0 +1,201 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_httpd_util).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-export([
+    validate_rep_props/1,
+    parse_int_param/5,
+    parse_replication_state_filter/1,
+    update_db_name/1,
+    docs_acc_new/3,
+    docs_acc_response/1,
+    docs_cb/2
+]).
+
+-import(couch_httpd, [
+    send_json/2,
+    send_json/3,
+    send_method_not_allowed/2
+]).
+
+-import(couch_util, [
+    to_binary/1
+]).
+
+
+parse_replication_state_filter(undefined) ->
+    [];  % This is the default (wildcard) filter
+parse_replication_state_filter(States) when is_list(States) ->
+    AllStates = couch_replicator:replication_states(),
+    StrStates = [string:to_lower(S) || S <- string:tokens(States, ",")],
+    AtomStates = try
+        [list_to_existing_atom(S) || S <- StrStates]
+    catch error:badarg ->
+        Msg1 = io_lib:format("States must be one or more of ~w", [AllStates]),
+        throw({query_parse_error, ?l2b(Msg1)})
+    end,
+    AllSet = sets:from_list(AllStates),
+    StatesSet = sets:from_list(AtomStates),
+    Diff = sets:to_list(sets:subtract(StatesSet, AllSet)),
+    case Diff of
+    [] ->
+        AtomStates;
+    _ ->
+        Args = [Diff, AllStates],
+        Msg2 = io_lib:format("Unknown states ~w. Choose from: ~w", Args),
+        throw({query_parse_error, ?l2b(Msg2)})
+    end.
+
+
+parse_int_param(Req, Param, Default, Min, Max) ->
+    IntVal = try
+        list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default)))
+    catch error:badarg ->
+        Msg1 = io_lib:format("~s must be an integer", [Param]),
+        throw({query_parse_error, ?l2b(Msg1)})
+    end,
+    case IntVal >= Min andalso IntVal =< Max of
+    true ->
+        IntVal;
+    false ->
+        Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]),
+        throw({query_parse_error, ?l2b(Msg2)})
+    end.
+
+
+validate_rep_props([]) ->
+    ok;
+validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
+    lists:foreach(fun
+        ({_,V}) when is_binary(V) -> ok;
+        ({K,_}) -> throw({bad_request,
+            <<K/binary," value must be a string.">>})
+        end, Params),
+    validate_rep_props(Rest);
+validate_rep_props([_|Rest]) ->
+    validate_rep_props(Rest).
+
+
+prepend_val(#vacc{prepend=Prepend}) ->
+    case Prepend of
+        undefined ->
+            "";
+        _ ->
+            Prepend
+    end.
+
+
+maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
+        when Size > 0 andalso (Size + Len) > Max ->
+    #vacc{buffer = Buffer, resp = Resp} = Acc,
+    {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
+    {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
+maybe_flush_response(Acc0, Data, Len) ->
+    #vacc{buffer = Buf, bufsize = Size} = Acc0,
+    Acc = Acc0#vacc{
+        prepend = ",\r\n",
+        buffer = [Buf | Data],
+        bufsize = Size + Len
+    },
+    {ok, Acc}.
+
+docs_acc_new(Req, Db, Threshold) ->
+     #vacc{db=Db, req=Req, threshold=Threshold}.
+
+docs_acc_response(#vacc{resp = Resp}) ->
+    Resp.
+
+docs_cb({error, Reason}, #vacc{resp=undefined}=Acc) ->
+    {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason),
+    {ok, Acc#vacc{resp=Resp}};
+
+docs_cb(complete, #vacc{resp=undefined}=Acc) ->
+    % Nothing in view
+    {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
+    {ok, Acc#vacc{resp=Resp}};
+
+docs_cb(Msg, #vacc{resp=undefined}=Acc) ->
+    %% Start response
+    Headers = [],
+    {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers),
+    docs_cb(Msg, Acc#vacc{resp=Resp, should_close=true});
+
+docs_cb({error, Reason}, #vacc{resp=Resp}=Acc) ->
+    {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason),
+    {ok, Acc#vacc{resp=Resp1}};
+
+docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
+    % Finish view output and possibly end the response
+    {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max),
+    case Acc#vacc.should_close of
+        true ->
+            {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+            {ok, Acc#vacc{resp=Resp2}};
+        _ ->
+            {ok, Acc#vacc{resp=Resp1, meta_sent=false, row_sent=false,
+                prepend=",\r\n", buffer=[], bufsize=0}}
+    end;
+
+docs_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) ->
+    % Sending metadata as we've not sent it or any row yet
+    Parts = case couch_util:get_value(total, Meta) of
+        undefined -> [];
+        Total -> [io_lib:format("\"total_rows\":~p", [adjust_total(Total)])]
+    end ++ case couch_util:get_value(offset, Meta) of
+        undefined -> [];
+        Offset -> [io_lib:format("\"offset\":~p", [Offset])]
+    end ++ ["\"docs\":["],
+    Chunk = [prepend_val(Acc), "{", string:join(Parts, ","), "\r\n"],
+    {ok, AccOut} = maybe_flush_response(Acc, Chunk, iolist_size(Chunk)),
+    {ok, AccOut#vacc{prepend="", meta_sent=true}};
+
+
+docs_cb({meta, _Meta}, #vacc{}=Acc) ->
+    %% ignore metadata
+    {ok, Acc};
+
+docs_cb({row, Row}, #vacc{meta_sent=false}=Acc) ->
+    %% sorted=false and row arrived before meta
+    % Adding another row
+    Chunk = [prepend_val(Acc), "{\"docs\":[\r\n", row_to_json(Row)],
+    maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk));
+
+docs_cb({row, Row}, #vacc{meta_sent=true}=Acc) ->
+    % Adding another row
+    Chunk = [prepend_val(Acc), row_to_json(Row)],
+    maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)).
+
+
+update_db_name({Props}) ->
+    {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props),
+    {[{database, normalize_db_name(DbName)} | Props1]}.
+
+normalize_db_name(<<"shards/", _/binary>> = DbName) ->
+    mem3:dbname(DbName);
+normalize_db_name(DbName) ->
+    DbName.
+
+row_to_json(Row) ->
+    Doc0 = couch_util:get_value(doc, Row),
+    Doc1 = update_db_name(Doc0),
+    ?JSON_ENCODE(Doc1).
+
+
+%% Adjust Total as there is an automatically created validation design doc
+adjust_total(Total) when is_integer(Total), Total > 0 ->
+    Total - 1;
+adjust_total(Total) when is_integer(Total) ->
+    0.

-- 
To stop receiving notification emails like this one, please contact
"commits@couchdb.apache.org" <co...@couchdb.apache.org>.