You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/04/14 21:37:13 UTC

[couchdb] branch 63012-scheduler updated: [fixup] Move replicator HTTP endpoint to replicator app

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git

The following commit(s) were added to refs/heads/63012-scheduler by this push:
       new  33328fc   [fixup] Move replicator HTTP endpoint to replicator app
33328fc is described below

commit 33328fc58ab69a75676e310a2f69ac40c23196e8
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Apr 14 17:37:08 2017 -0400

    [fixup] Move replicator HTTP endpoint to replicator app
---
 src/chttpd/src/chttpd_httpd_handlers.erl           |   2 +-
 src/chttpd/src/chttpd_misc.erl                     | 103 +------------
 .../src/couch_replicator_httpd.erl                 | 171 +++++++--------------
 ...r_httpd.erl => couch_replicator_httpd_util.erl} | 170 +++++++++++---------
 4 files changed, 154 insertions(+), 292 deletions(-)

diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl
index 8d2c280..9c30441 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -19,7 +19,7 @@ url_handler(<<"favicon.ico">>)     -> fun chttpd_misc:handle_favicon_req/1;
 url_handler(<<"_utils">>)          -> fun chttpd_misc:handle_utils_dir_req/1;
 url_handler(<<"_all_dbs">>)        -> fun chttpd_misc:handle_all_dbs_req/1;
 url_handler(<<"_active_tasks">>)   -> fun chttpd_misc:handle_task_status_req/1;
-url_handler(<<"_scheduler">>)      -> fun chttpd_misc:handle_scheduler_req/1;
+url_handler(<<"_scheduler">>)      -> fun couch_replicator_httpd:handle_scheduler_req/1;
 url_handler(<<"_node">>)           -> fun chttpd_misc:handle_node_req/1;
 url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1;
 url_handler(<<"_replicate">>)      -> fun chttpd_misc:handle_replicate_req/1;
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index c9e7a24..cfeeb3f 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -21,16 +21,13 @@
     handle_reload_query_servers_req/1,
     handle_system_req/1,
     handle_task_status_req/1,
-    handle_scheduler_req/1,
     handle_up_req/1,
     handle_utils_dir_req/1,
     handle_utils_dir_req/2,
     handle_uuids_req/1,
     handle_welcome_req/1,
     handle_welcome_req/2,
-    get_stats/0,
-    parse_int_param/5,
-    parse_replication_state_filter/1
+    get_stats/0
 ]).
 
 -include_lib("couch/include/couch_db.hrl").
@@ -40,11 +37,6 @@
     [send_json/2,send_json/3,send_method_not_allowed/2,
     send_chunk/2,start_chunked_response/3]).
 
-
--define(DEFAULT_TASK_LIMIT, 100).
--define(DEFAULT_DOCS_LIMIT, 100).
--define(REPDB, <<"_replicator">>).
-
 % httpd global handlers
 
 handle_welcome_req(Req) ->
@@ -158,52 +150,6 @@ handle_task_status_req(#httpd{method='GET'}=Req) ->
 handle_task_status_req(Req) ->
     send_method_not_allowed(Req, "GET,HEAD").
 
-handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) ->
-    Limit = parse_int_param(Req, "limit", ?DEFAULT_TASK_LIMIT, 0, infinity),
-    Skip = parse_int_param(Req, "skip", 0, 0, infinity),
-    {Replies, _BadNodes} = rpc:multicall(couch_replicator_scheduler, jobs, []),
-    Flatlist = lists:concat(Replies),
-    Sorted = lists:sort(fun({[{id,A}|_]},{[{id,B}|_]}) -> A =< B end, Flatlist),
-    Total = length(Sorted),
-    Offset = min(Skip, Total),
-    Sublist = lists:sublist(Sorted, Offset+1, Limit),
-    Sublist1 = [update_db_name(Task) || Task <- Sublist],
-    send_json(Req, {[{total_rows, Total}, {offset, Offset}, {jobs, Sublist1}]});
-handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req) ->
-    case couch_replicator:job(JobId) of
-        {ok, JobInfo} ->
-            send_json(Req, update_db_name(JobInfo));
-        {error, not_found} ->
-            throw(not_found)
-    end;
-handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>]}=Req) ->
-    VArgs0 = couch_mrview_http:parse_params(Req, undefined),
-    States = parse_replication_state_filter(chttpd:qs_value(Req, "states")),
-    VArgs1 = VArgs0#mrargs{
-        view_type = map,
-        include_docs = true,
-        reduce = false,
-        extra = [{filter_states, States}]
-    },
-    VArgs2 = couch_mrview_util:validate_args(VArgs1),
-    Opts = [{user_ctx, Req#httpd.user_ctx}],
-    Max = chttpd:chunked_response_buffer_size(),
-    Db = ?REPDB,
-    Acc = #vacc{db=Db, req=Req, threshold=Max},
-    Cb = fun couch_replicator_httpd:docs_cb/2,
-    {ok, Res} = couch_replicator_fabric:docs(Db, Opts, VArgs2, Cb, Acc),
-    {ok, Res#vacc.resp};
-handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,DocId]}=Req) ->
-    UserCtx = Req#httpd.user_ctx,
-    case couch_replicator:doc(?REPDB, DocId, UserCtx#user_ctx.roles) of
-        {ok, DocInfo} ->
-            send_json(Req, update_db_name(DocInfo));
-        {error, not_found} ->
-            throw(not_found)
-    end;
-handle_scheduler_req(Req) ->
-    send_method_not_allowed(Req, "GET,HEAD").
-
 handle_replicate_req(#httpd{method='POST', user_ctx=Ctx} = Req) ->
     chttpd:validate_ctype(Req, "application/json"),
     %% see HACK in chttpd.erl about replication
@@ -493,50 +439,3 @@ message_queues(Registered) ->
         {Type, Length} = process_info(whereis(Name), Type),
         {Name, Length}
     end, Registered).
-
-update_db_name({Props}) ->
-    {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props),
-    {[{database, normalize_db_name(DbName)} | Props1]}.
-
-normalize_db_name(<<"shards/", _/binary>> = DbName) ->
-    mem3:dbname(DbName);
-normalize_db_name(DbName) ->
-    DbName.
-
-parse_replication_state_filter(undefined) ->
-    [];  % This is the default (wildcard) filter
-parse_replication_state_filter(States) when is_list(States) ->
-    AllStates = couch_replicator:replication_states(),
-    StrStates = [string:to_lower(S) || S <- string:tokens(States, ",")],
-    AtomStates = try
-        [list_to_existing_atom(S) || S <- StrStates]
-    catch error:badarg ->
-        Msg1 = io_lib:format("States must be one or more of ~w", [AllStates]),
-        throw({query_parse_error, ?l2b(Msg1)})
-    end,
-    AllSet = sets:from_list(AllStates),
-    StatesSet = sets:from_list(AtomStates),
-    Diff = sets:to_list(sets:subtract(StatesSet, AllSet)),
-    case Diff of
-    [] ->
-        AtomStates;
-    _ ->
-        Args = [Diff, AllStates],
-        Msg2 = io_lib:format("Unknown states ~w. Choose from: ~w", Args),
-        throw({query_parse_error, ?l2b(Msg2)})
-    end.
-
-parse_int_param(Req, Param, Default, Min, Max) ->
-    IntVal = try
-        list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default)))
-    catch error:badarg ->
-        Msg1 = io_lib:format("~s must be an integer", [Param]),
-        throw({query_parse_error, ?l2b(Msg1)})
-    end,
-    case IntVal >= Min andalso IntVal =< Max of
-    true ->
-        IntVal;
-    false ->
-        Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]),
-        throw({query_parse_error, ?l2b(Msg2)})
-    end.
diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd.erl
index 96a88cb..173033f 100644
--- a/src/couch_replicator/src/couch_replicator_httpd.erl
+++ b/src/couch_replicator/src/couch_replicator_httpd.erl
@@ -17,7 +17,7 @@
 
 -export([
     handle_req/1,
-    docs_cb/2
+    handle_scheduler_req/1
 ]).
 
 -import(couch_httpd, [
@@ -31,80 +31,65 @@
 ]).
 
 
-%% Handle replicator docs streaming
-
-docs_cb({error, Reason}, #vacc{resp=undefined}=Acc) ->
-    {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason),
-    {ok, Acc#vacc{resp=Resp}};
-
-
-docs_cb(complete, #vacc{resp=undefined}=Acc) ->
-    % Nothing in view
-    {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
-    {ok, Acc#vacc{resp=Resp}};
-
-
-docs_cb(Msg, #vacc{resp=undefined}=Acc) ->
-    %% Start response
-    Headers = [],
-    {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers),
-    docs_cb(Msg, Acc#vacc{resp=Resp, should_close=true});
-
-
-docs_cb({error, Reason}, #vacc{resp=Resp}=Acc) ->
-    {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason),
-    {ok, Acc#vacc{resp=Resp1}};
-
-
-docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
-    % Finish view output and possibly end the response
-    {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max),
-    case Acc#vacc.should_close of
-        true ->
-            {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
-            {ok, Acc#vacc{resp=Resp2}};
-        _ ->
-            {ok, Acc#vacc{resp=Resp1, meta_sent=false, row_sent=false,
-                prepend=",\r\n", buffer=[], bufsize=0}}
+-define(DEFAULT_TASK_LIMIT, 100).
+-define(REPDB, <<"_replicator">>).
+
+
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) ->
+    Limit = couch_replicator_httpd_util:parse_int_param(Req, "limit",
+        ?DEFAULT_TASK_LIMIT, 0, infinity),
+    Skip = couch_replicator_httpd_util:parse_int_param(Req, "skip", 0, 0,
+        infinity),
+    {Replies, _BadNodes} = rpc:multicall(couch_replicator_scheduler, jobs, []),
+    Flatlist = lists:concat(Replies),
+    Sorted = lists:sort(fun({[{id,A}|_]},{[{id,B}|_]}) -> A =< B end, Flatlist),
+    Total = length(Sorted),
+    Offset = min(Skip, Total),
+    Sublist = lists:sublist(Sorted, Offset+1, Limit),
+    Sublist1 = [couch_replicator_httpd_util:update_db_name(Task)
+        || Task <- Sublist],
+    send_json(Req, {[{total_rows, Total}, {offset, Offset}, {jobs, Sublist1}]});
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req) ->
+    case couch_replicator:job(JobId) of
+        {ok, JobInfo} ->
+            send_json(Req, couch_replicator_httpd_util:update_db_name(JobInfo));
+        {error, not_found} ->
+            throw(not_found)
     end;
-
-
-docs_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) ->
-    % Sending metadata as we've not sent it or any row yet
-    Parts = case couch_util:get_value(total, Meta) of
-        undefined -> [];
-        Total -> [io_lib:format("\"total_rows\":~p", [adjust_total(Total)])]
-    end ++ case couch_util:get_value(offset, Meta) of
-        undefined -> [];
-        Offset -> [io_lib:format("\"offset\":~p", [Offset])]
-    end ++ ["\"docs\":["],
-    Chunk = [prepend_val(Acc), "{", string:join(Parts, ","), "\r\n"],
-    {ok, AccOut} = maybe_flush_response(Acc, Chunk, iolist_size(Chunk)),
-    {ok, AccOut#vacc{prepend="", meta_sent=true}};
-
-
-docs_cb({meta, _Meta}, #vacc{}=Acc) ->
-    %% ignore metadata
-    {ok, Acc};
-
-
-docs_cb({row, Row}, #vacc{meta_sent=false}=Acc) ->
-    %% sorted=false and row arrived before meta
-    % Adding another row
-    Chunk = [prepend_val(Acc), "{\"docs\":[\r\n", row_to_json(Row)],
-    maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk));
-
-
-docs_cb({row, Row}, #vacc{meta_sent=true}=Acc) ->
-    % Adding another row
-    Chunk = [prepend_val(Acc), row_to_json(Row)],
-    maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)).
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>]}=Req) ->
+    VArgs0 = couch_mrview_http:parse_params(Req, undefined),
+    StatesQs = chttpd:qs_value(Req, "states"),
+    States = couch_replicator_httpd_util:parse_replication_state_filter(StatesQs),
+    VArgs1 = VArgs0#mrargs{
+        view_type = map,
+        include_docs = true,
+        reduce = false,
+        extra = [{filter_states, States}]
+    },
+    VArgs2 = couch_mrview_util:validate_args(VArgs1),
+    Opts = [{user_ctx, Req#httpd.user_ctx}],
+    Db = ?REPDB,
+    Max = chttpd:chunked_response_buffer_size(),
+    Acc = couch_replicator_httpd_util:docs_acc_new(Req, Db, Max),
+    Cb = fun couch_replicator_httpd_util:docs_cb/2,
+    {ok, RAcc} = couch_replicator_fabric:docs(Db, Opts, VArgs2, Cb, Acc),
+    {ok,  couch_replicator_httpd_util:docs_acc_response(RAcc)};
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,DocId]}=Req) ->
+    UserCtx = Req#httpd.user_ctx,
+    case couch_replicator:doc(?REPDB, DocId, UserCtx#user_ctx.roles) of
+        {ok, DocInfo} ->
+            send_json(Req, couch_replicator_httpd_util:update_db_name(DocInfo));
+        {error, not_found} ->
+            throw(not_found)
+    end;
+handle_scheduler_req(Req) ->
+    send_method_not_allowed(Req, "GET,HEAD").
 
 
 handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
     couch_httpd:validate_ctype(Req, "application/json"),
     RepDoc = {Props} = couch_httpd:json_body_obj(Req),
-    validate_rep_props(Props),
+    couch_replicator_httpd_utils:validate_rep_props(Props),
     case couch_replicator:replicate(RepDoc, UserCtx) of
     {error, {Error, Reason}} ->
         send_json(
@@ -125,51 +110,3 @@ handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
 
 handle_req(Req) ->
     send_method_not_allowed(Req, "POST").
-
-validate_rep_props([]) ->
-    ok;
-validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
-    lists:foreach(fun
-        ({_,V}) when is_binary(V) -> ok;
-        ({K,_}) -> throw({bad_request,
-            <<K/binary," value must be a string.">>})
-        end, Params),
-    validate_rep_props(Rest);
-validate_rep_props([_|Rest]) ->
-    validate_rep_props(Rest).
-
-
-prepend_val(#vacc{prepend=Prepend}) ->
-    case Prepend of
-        undefined ->
-            "";
-        _ ->
-            Prepend
-    end.
-
-
-maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
-        when Size > 0 andalso (Size + Len) > Max ->
-    #vacc{buffer = Buffer, resp = Resp} = Acc,
-    {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
-    {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
-maybe_flush_response(Acc0, Data, Len) ->
-    #vacc{buffer = Buf, bufsize = Size} = Acc0,
-    Acc = Acc0#vacc{
-        prepend = ",\r\n",
-        buffer = [Buf | Data],
-        bufsize = Size + Len
-    },
-    {ok, Acc}.
-
-
-row_to_json(Row) ->
-    Doc = couch_util:get_value(doc, Row),
-    ?JSON_ENCODE(Doc).
-
-
-%% Adjust Total as there is an automatically created validation design doc
-adjust_total(Total) when is_integer(Total), Total > 0 ->
-    Total - 1;
-adjust_total(Total) when is_integer(Total) ->
-    0.
diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd_util.erl
similarity index 68%
copy from src/couch_replicator/src/couch_replicator_httpd.erl
copy to src/couch_replicator/src/couch_replicator_httpd_util.erl
index 96a88cb..624eddd 100644
--- a/src/couch_replicator/src/couch_replicator_httpd.erl
+++ b/src/couch_replicator/src/couch_replicator_httpd_util.erl
@@ -10,13 +10,18 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
--module(couch_replicator_httpd).
+-module(couch_replicator_httpd_util).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
 -export([
-    handle_req/1,
+    validate_rep_props/1,
+    parse_int_param/5,
+    parse_replication_state_filter/1,
+    update_db_name/1,
+    docs_acc_new/3,
+    docs_acc_response/1,
     docs_cb/2
 ]).
 
@@ -31,31 +36,107 @@
 ]).
 
 
-%% Handle replicator docs streaming
+parse_replication_state_filter(undefined) ->
+    [];  % This is the default (wildcard) filter
+parse_replication_state_filter(States) when is_list(States) ->
+    AllStates = couch_replicator:replication_states(),
+    StrStates = [string:to_lower(S) || S <- string:tokens(States, ",")],
+    AtomStates = try
+        [list_to_existing_atom(S) || S <- StrStates]
+    catch error:badarg ->
+        Msg1 = io_lib:format("States must be one or more of ~w", [AllStates]),
+        throw({query_parse_error, ?l2b(Msg1)})
+    end,
+    AllSet = sets:from_list(AllStates),
+    StatesSet = sets:from_list(AtomStates),
+    Diff = sets:to_list(sets:subtract(StatesSet, AllSet)),
+    case Diff of
+    [] ->
+        AtomStates;
+    _ ->
+        Args = [Diff, AllStates],
+        Msg2 = io_lib:format("Unknown states ~w. Choose from: ~w", Args),
+        throw({query_parse_error, ?l2b(Msg2)})
+    end.
+
+
+parse_int_param(Req, Param, Default, Min, Max) ->
+    IntVal = try
+        list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default)))
+    catch error:badarg ->
+        Msg1 = io_lib:format("~s must be an integer", [Param]),
+        throw({query_parse_error, ?l2b(Msg1)})
+    end,
+    case IntVal >= Min andalso IntVal =< Max of
+    true ->
+        IntVal;
+    false ->
+        Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]),
+        throw({query_parse_error, ?l2b(Msg2)})
+    end.
+
+
+validate_rep_props([]) ->
+    ok;
+validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
+    lists:foreach(fun
+        ({_,V}) when is_binary(V) -> ok;
+        ({K,_}) -> throw({bad_request,
+            <<K/binary," value must be a string.">>})
+        end, Params),
+    validate_rep_props(Rest);
+validate_rep_props([_|Rest]) ->
+    validate_rep_props(Rest).
+
+
+prepend_val(#vacc{prepend=Prepend}) ->
+    case Prepend of
+        undefined ->
+            "";
+        _ ->
+            Prepend
+    end.
+
+
+maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
+        when Size > 0 andalso (Size + Len) > Max ->
+    #vacc{buffer = Buffer, resp = Resp} = Acc,
+    {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
+    {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
+maybe_flush_response(Acc0, Data, Len) ->
+    #vacc{buffer = Buf, bufsize = Size} = Acc0,
+    Acc = Acc0#vacc{
+        prepend = ",\r\n",
+        buffer = [Buf | Data],
+        bufsize = Size + Len
+    },
+    {ok, Acc}.
+
+docs_acc_new(Req, Db, Threshold) ->
+     #vacc{db=Db, req=Req, threshold=Threshold}.
+
+docs_acc_response(#vacc{resp = Resp}) ->
+    Resp.
 
 docs_cb({error, Reason}, #vacc{resp=undefined}=Acc) ->
     {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason),
     {ok, Acc#vacc{resp=Resp}};
 
-
 docs_cb(complete, #vacc{resp=undefined}=Acc) ->
     % Nothing in view
     {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
     {ok, Acc#vacc{resp=Resp}};
 
-
 docs_cb(Msg, #vacc{resp=undefined}=Acc) ->
     %% Start response
     Headers = [],
     {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers),
     docs_cb(Msg, Acc#vacc{resp=Resp, should_close=true});
 
-
 docs_cb({error, Reason}, #vacc{resp=Resp}=Acc) ->
     {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason),
     {ok, Acc#vacc{resp=Resp1}};
 
-
 docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
     % Finish view output and possibly end the response
     {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max),
@@ -68,7 +149,6 @@ docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
                 prepend=",\r\n", buffer=[], bufsize=0}}
     end;
 
-
 docs_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) ->
     % Sending metadata as we've not sent it or any row yet
     Parts = case couch_util:get_value(total, Meta) of
@@ -87,85 +167,31 @@ docs_cb({meta, _Meta}, #vacc{}=Acc) ->
     %% ignore metadata
     {ok, Acc};
 
-
 docs_cb({row, Row}, #vacc{meta_sent=false}=Acc) ->
     %% sorted=false and row arrived before meta
     % Adding another row
     Chunk = [prepend_val(Acc), "{\"docs\":[\r\n", row_to_json(Row)],
     maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk));
 
-
 docs_cb({row, Row}, #vacc{meta_sent=true}=Acc) ->
     % Adding another row
     Chunk = [prepend_val(Acc), row_to_json(Row)],
     maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)).
 
 
-handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
-    couch_httpd:validate_ctype(Req, "application/json"),
-    RepDoc = {Props} = couch_httpd:json_body_obj(Req),
-    validate_rep_props(Props),
-    case couch_replicator:replicate(RepDoc, UserCtx) of
-    {error, {Error, Reason}} ->
-        send_json(
-            Req, 500,
-            {[{error, to_binary(Error)}, {reason, to_binary(Reason)}]});
-    {error, not_found} ->
-        % Tried to cancel a replication that didn't exist.
-        send_json(Req, 404, {[{error, <<"not found">>}]});
-    {error, Reason} ->
-        send_json(Req, 500, {[{error, to_binary(Reason)}]});
-    {ok, {cancelled, RepId}} ->
-        send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
-    {ok, {continuous, RepId}} ->
-        send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
-    {ok, {HistoryResults}} ->
-        send_json(Req, {[{ok, true} | HistoryResults]})
-    end;
-
-handle_req(Req) ->
-    send_method_not_allowed(Req, "POST").
-
-validate_rep_props([]) ->
-    ok;
-validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
-    lists:foreach(fun
-        ({_,V}) when is_binary(V) -> ok;
-        ({K,_}) -> throw({bad_request,
-            <<K/binary," value must be a string.">>})
-        end, Params),
-    validate_rep_props(Rest);
-validate_rep_props([_|Rest]) ->
-    validate_rep_props(Rest).
-
-
-prepend_val(#vacc{prepend=Prepend}) ->
-    case Prepend of
-        undefined ->
-            "";
-        _ ->
-            Prepend
-    end.
-
-
-maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
-        when Size > 0 andalso (Size + Len) > Max ->
-    #vacc{buffer = Buffer, resp = Resp} = Acc,
-    {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
-    {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
-maybe_flush_response(Acc0, Data, Len) ->
-    #vacc{buffer = Buf, bufsize = Size} = Acc0,
-    Acc = Acc0#vacc{
-        prepend = ",\r\n",
-        buffer = [Buf | Data],
-        bufsize = Size + Len
-    },
-    {ok, Acc}.
+update_db_name({Props}) ->
+    {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props),
+    {[{database, normalize_db_name(DbName)} | Props1]}.
 
+normalize_db_name(<<"shards/", _/binary>> = DbName) ->
+    mem3:dbname(DbName);
+normalize_db_name(DbName) ->
+    DbName.
 
 row_to_json(Row) ->
-    Doc = couch_util:get_value(doc, Row),
-    ?JSON_ENCODE(Doc).
+    Doc0 = couch_util:get_value(doc, Row),
+    Doc1 = update_db_name(Doc0),
+    ?JSON_ENCODE(Doc1).
 
 
 %% Adjust Total as there is an automatically created validation design doc

-- 
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].