You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2017/04/14 21:37:13 UTC
[couchdb] branch 63012-scheduler updated: [fixup] Move replicator
HTTP endpoint to replicator app
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch 63012-scheduler
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/63012-scheduler by this push:
new 33328fc [fixup] Move replicator HTTP endpoint to replicator app
33328fc is described below
commit 33328fc58ab69a75676e310a2f69ac40c23196e8
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Fri Apr 14 17:37:08 2017 -0400
[fixup] Move replicator HTTP endpoint to replicator app
---
src/chttpd/src/chttpd_httpd_handlers.erl | 2 +-
src/chttpd/src/chttpd_misc.erl | 103 +------------
.../src/couch_replicator_httpd.erl | 171 +++++++--------------
...r_httpd.erl => couch_replicator_httpd_util.erl} | 170 +++++++++++---------
4 files changed, 154 insertions(+), 292 deletions(-)
diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl
index 8d2c280..9c30441 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -19,7 +19,7 @@ url_handler(<<"favicon.ico">>) -> fun chttpd_misc:handle_favicon_req/1;
url_handler(<<"_utils">>) -> fun chttpd_misc:handle_utils_dir_req/1;
url_handler(<<"_all_dbs">>) -> fun chttpd_misc:handle_all_dbs_req/1;
url_handler(<<"_active_tasks">>) -> fun chttpd_misc:handle_task_status_req/1;
-url_handler(<<"_scheduler">>) -> fun chttpd_misc:handle_scheduler_req/1;
+url_handler(<<"_scheduler">>) -> fun couch_replicator_httpd:handle_scheduler_req/1;
url_handler(<<"_node">>) -> fun chttpd_misc:handle_node_req/1;
url_handler(<<"_reload_query_servers">>) -> fun chttpd_misc:handle_reload_query_servers_req/1;
url_handler(<<"_replicate">>) -> fun chttpd_misc:handle_replicate_req/1;
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index c9e7a24..cfeeb3f 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -21,16 +21,13 @@
handle_reload_query_servers_req/1,
handle_system_req/1,
handle_task_status_req/1,
- handle_scheduler_req/1,
handle_up_req/1,
handle_utils_dir_req/1,
handle_utils_dir_req/2,
handle_uuids_req/1,
handle_welcome_req/1,
handle_welcome_req/2,
- get_stats/0,
- parse_int_param/5,
- parse_replication_state_filter/1
+ get_stats/0
]).
-include_lib("couch/include/couch_db.hrl").
@@ -40,11 +37,6 @@
[send_json/2,send_json/3,send_method_not_allowed/2,
send_chunk/2,start_chunked_response/3]).
-
--define(DEFAULT_TASK_LIMIT, 100).
--define(DEFAULT_DOCS_LIMIT, 100).
--define(REPDB, <<"_replicator">>).
-
% httpd global handlers
handle_welcome_req(Req) ->
@@ -158,52 +150,6 @@ handle_task_status_req(#httpd{method='GET'}=Req) ->
handle_task_status_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").
-handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) ->
- Limit = parse_int_param(Req, "limit", ?DEFAULT_TASK_LIMIT, 0, infinity),
- Skip = parse_int_param(Req, "skip", 0, 0, infinity),
- {Replies, _BadNodes} = rpc:multicall(couch_replicator_scheduler, jobs, []),
- Flatlist = lists:concat(Replies),
- Sorted = lists:sort(fun({[{id,A}|_]},{[{id,B}|_]}) -> A =< B end, Flatlist),
- Total = length(Sorted),
- Offset = min(Skip, Total),
- Sublist = lists:sublist(Sorted, Offset+1, Limit),
- Sublist1 = [update_db_name(Task) || Task <- Sublist],
- send_json(Req, {[{total_rows, Total}, {offset, Offset}, {jobs, Sublist1}]});
-handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req) ->
- case couch_replicator:job(JobId) of
- {ok, JobInfo} ->
- send_json(Req, update_db_name(JobInfo));
- {error, not_found} ->
- throw(not_found)
- end;
-handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>]}=Req) ->
- VArgs0 = couch_mrview_http:parse_params(Req, undefined),
- States = parse_replication_state_filter(chttpd:qs_value(Req, "states")),
- VArgs1 = VArgs0#mrargs{
- view_type = map,
- include_docs = true,
- reduce = false,
- extra = [{filter_states, States}]
- },
- VArgs2 = couch_mrview_util:validate_args(VArgs1),
- Opts = [{user_ctx, Req#httpd.user_ctx}],
- Max = chttpd:chunked_response_buffer_size(),
- Db = ?REPDB,
- Acc = #vacc{db=Db, req=Req, threshold=Max},
- Cb = fun couch_replicator_httpd:docs_cb/2,
- {ok, Res} = couch_replicator_fabric:docs(Db, Opts, VArgs2, Cb, Acc),
- {ok, Res#vacc.resp};
-handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,DocId]}=Req) ->
- UserCtx = Req#httpd.user_ctx,
- case couch_replicator:doc(?REPDB, DocId, UserCtx#user_ctx.roles) of
- {ok, DocInfo} ->
- send_json(Req, update_db_name(DocInfo));
- {error, not_found} ->
- throw(not_found)
- end;
-handle_scheduler_req(Req) ->
- send_method_not_allowed(Req, "GET,HEAD").
-
handle_replicate_req(#httpd{method='POST', user_ctx=Ctx} = Req) ->
chttpd:validate_ctype(Req, "application/json"),
%% see HACK in chttpd.erl about replication
@@ -493,50 +439,3 @@ message_queues(Registered) ->
{Type, Length} = process_info(whereis(Name), Type),
{Name, Length}
end, Registered).
-
-update_db_name({Props}) ->
- {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props),
- {[{database, normalize_db_name(DbName)} | Props1]}.
-
-normalize_db_name(<<"shards/", _/binary>> = DbName) ->
- mem3:dbname(DbName);
-normalize_db_name(DbName) ->
- DbName.
-
-parse_replication_state_filter(undefined) ->
- []; % This is the default (wildcard) filter
-parse_replication_state_filter(States) when is_list(States) ->
- AllStates = couch_replicator:replication_states(),
- StrStates = [string:to_lower(S) || S <- string:tokens(States, ",")],
- AtomStates = try
- [list_to_existing_atom(S) || S <- StrStates]
- catch error:badarg ->
- Msg1 = io_lib:format("States must be one or more of ~w", [AllStates]),
- throw({query_parse_error, ?l2b(Msg1)})
- end,
- AllSet = sets:from_list(AllStates),
- StatesSet = sets:from_list(AtomStates),
- Diff = sets:to_list(sets:subtract(StatesSet, AllSet)),
- case Diff of
- [] ->
- AtomStates;
- _ ->
- Args = [Diff, AllStates],
- Msg2 = io_lib:format("Unknown states ~w. Choose from: ~w", Args),
- throw({query_parse_error, ?l2b(Msg2)})
- end.
-
-parse_int_param(Req, Param, Default, Min, Max) ->
- IntVal = try
- list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default)))
- catch error:badarg ->
- Msg1 = io_lib:format("~s must be an integer", [Param]),
- throw({query_parse_error, ?l2b(Msg1)})
- end,
- case IntVal >= Min andalso IntVal =< Max of
- true ->
- IntVal;
- false ->
- Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]),
- throw({query_parse_error, ?l2b(Msg2)})
- end.
diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd.erl
index 96a88cb..173033f 100644
--- a/src/couch_replicator/src/couch_replicator_httpd.erl
+++ b/src/couch_replicator/src/couch_replicator_httpd.erl
@@ -17,7 +17,7 @@
-export([
handle_req/1,
- docs_cb/2
+ handle_scheduler_req/1
]).
-import(couch_httpd, [
@@ -31,80 +31,65 @@
]).
-%% Handle replicator docs streaming
-
-docs_cb({error, Reason}, #vacc{resp=undefined}=Acc) ->
- {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason),
- {ok, Acc#vacc{resp=Resp}};
-
-
-docs_cb(complete, #vacc{resp=undefined}=Acc) ->
- % Nothing in view
- {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
- {ok, Acc#vacc{resp=Resp}};
-
-
-docs_cb(Msg, #vacc{resp=undefined}=Acc) ->
- %% Start response
- Headers = [],
- {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers),
- docs_cb(Msg, Acc#vacc{resp=Resp, should_close=true});
-
-
-docs_cb({error, Reason}, #vacc{resp=Resp}=Acc) ->
- {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason),
- {ok, Acc#vacc{resp=Resp1}};
-
-
-docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
- % Finish view output and possibly end the response
- {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max),
- case Acc#vacc.should_close of
- true ->
- {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
- {ok, Acc#vacc{resp=Resp2}};
- _ ->
- {ok, Acc#vacc{resp=Resp1, meta_sent=false, row_sent=false,
- prepend=",\r\n", buffer=[], bufsize=0}}
+-define(DEFAULT_TASK_LIMIT, 100).
+-define(REPDB, <<"_replicator">>).
+
+
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) ->
+ Limit = couch_replicator_httpd_util:parse_int_param(Req, "limit",
+ ?DEFAULT_TASK_LIMIT, 0, infinity),
+ Skip = couch_replicator_httpd_util:parse_int_param(Req, "skip", 0, 0,
+ infinity),
+ {Replies, _BadNodes} = rpc:multicall(couch_replicator_scheduler, jobs, []),
+ Flatlist = lists:concat(Replies),
+ Sorted = lists:sort(fun({[{id,A}|_]},{[{id,B}|_]}) -> A =< B end, Flatlist),
+ Total = length(Sorted),
+ Offset = min(Skip, Total),
+ Sublist = lists:sublist(Sorted, Offset+1, Limit),
+ Sublist1 = [couch_replicator_httpd_util:update_db_name(Task)
+ || Task <- Sublist],
+ send_json(Req, {[{total_rows, Total}, {offset, Offset}, {jobs, Sublist1}]});
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req) ->
+ case couch_replicator:job(JobId) of
+ {ok, JobInfo} ->
+ send_json(Req, couch_replicator_httpd_util:update_db_name(JobInfo));
+ {error, not_found} ->
+ throw(not_found)
end;
-
-
-docs_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) ->
- % Sending metadata as we've not sent it or any row yet
- Parts = case couch_util:get_value(total, Meta) of
- undefined -> [];
- Total -> [io_lib:format("\"total_rows\":~p", [adjust_total(Total)])]
- end ++ case couch_util:get_value(offset, Meta) of
- undefined -> [];
- Offset -> [io_lib:format("\"offset\":~p", [Offset])]
- end ++ ["\"docs\":["],
- Chunk = [prepend_val(Acc), "{", string:join(Parts, ","), "\r\n"],
- {ok, AccOut} = maybe_flush_response(Acc, Chunk, iolist_size(Chunk)),
- {ok, AccOut#vacc{prepend="", meta_sent=true}};
-
-
-docs_cb({meta, _Meta}, #vacc{}=Acc) ->
- %% ignore metadata
- {ok, Acc};
-
-
-docs_cb({row, Row}, #vacc{meta_sent=false}=Acc) ->
- %% sorted=false and row arrived before meta
- % Adding another row
- Chunk = [prepend_val(Acc), "{\"docs\":[\r\n", row_to_json(Row)],
- maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk));
-
-
-docs_cb({row, Row}, #vacc{meta_sent=true}=Acc) ->
- % Adding another row
- Chunk = [prepend_val(Acc), row_to_json(Row)],
- maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)).
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>]}=Req) ->
+ VArgs0 = couch_mrview_http:parse_params(Req, undefined),
+ StatesQs = chttpd:qs_value(Req, "states"),
+ States = couch_replicator_httpd_util:parse_replication_state_filter(StatesQs),
+ VArgs1 = VArgs0#mrargs{
+ view_type = map,
+ include_docs = true,
+ reduce = false,
+ extra = [{filter_states, States}]
+ },
+ VArgs2 = couch_mrview_util:validate_args(VArgs1),
+ Opts = [{user_ctx, Req#httpd.user_ctx}],
+ Db = ?REPDB,
+ Max = chttpd:chunked_response_buffer_size(),
+ Acc = couch_replicator_httpd_util:docs_acc_new(Req, Db, Max),
+ Cb = fun couch_replicator_httpd_util:docs_cb/2,
+ {ok, RAcc} = couch_replicator_fabric:docs(Db, Opts, VArgs2, Cb, Acc),
+ {ok, couch_replicator_httpd_util:docs_acc_response(RAcc)};
+handle_scheduler_req(#httpd{method='GET', path_parts=[_,<<"docs">>,DocId]}=Req) ->
+ UserCtx = Req#httpd.user_ctx,
+ case couch_replicator:doc(?REPDB, DocId, UserCtx#user_ctx.roles) of
+ {ok, DocInfo} ->
+ send_json(Req, couch_replicator_httpd_util:update_db_name(DocInfo));
+ {error, not_found} ->
+ throw(not_found)
+ end;
+handle_scheduler_req(Req) ->
+ send_method_not_allowed(Req, "GET,HEAD").
handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
couch_httpd:validate_ctype(Req, "application/json"),
RepDoc = {Props} = couch_httpd:json_body_obj(Req),
- validate_rep_props(Props),
+ couch_replicator_httpd_utils:validate_rep_props(Props),
case couch_replicator:replicate(RepDoc, UserCtx) of
{error, {Error, Reason}} ->
send_json(
@@ -125,51 +110,3 @@ handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
handle_req(Req) ->
send_method_not_allowed(Req, "POST").
-
-validate_rep_props([]) ->
- ok;
-validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
- lists:foreach(fun
- ({_,V}) when is_binary(V) -> ok;
- ({K,_}) -> throw({bad_request,
- <<K/binary," value must be a string.">>})
- end, Params),
- validate_rep_props(Rest);
-validate_rep_props([_|Rest]) ->
- validate_rep_props(Rest).
-
-
-prepend_val(#vacc{prepend=Prepend}) ->
- case Prepend of
- undefined ->
- "";
- _ ->
- Prepend
- end.
-
-
-maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
- when Size > 0 andalso (Size + Len) > Max ->
- #vacc{buffer = Buffer, resp = Resp} = Acc,
- {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
- {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
-maybe_flush_response(Acc0, Data, Len) ->
- #vacc{buffer = Buf, bufsize = Size} = Acc0,
- Acc = Acc0#vacc{
- prepend = ",\r\n",
- buffer = [Buf | Data],
- bufsize = Size + Len
- },
- {ok, Acc}.
-
-
-row_to_json(Row) ->
- Doc = couch_util:get_value(doc, Row),
- ?JSON_ENCODE(Doc).
-
-
-%% Adjust Total as there is an automatically created validation design doc
-adjust_total(Total) when is_integer(Total), Total > 0 ->
- Total - 1;
-adjust_total(Total) when is_integer(Total) ->
- 0.
diff --git a/src/couch_replicator/src/couch_replicator_httpd.erl b/src/couch_replicator/src/couch_replicator_httpd_util.erl
similarity index 68%
copy from src/couch_replicator/src/couch_replicator_httpd.erl
copy to src/couch_replicator/src/couch_replicator_httpd_util.erl
index 96a88cb..624eddd 100644
--- a/src/couch_replicator/src/couch_replicator_httpd.erl
+++ b/src/couch_replicator/src/couch_replicator_httpd_util.erl
@@ -10,13 +10,18 @@
% License for the specific language governing permissions and limitations under
% the License.
--module(couch_replicator_httpd).
+-module(couch_replicator_httpd_util).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-export([
- handle_req/1,
+ validate_rep_props/1,
+ parse_int_param/5,
+ parse_replication_state_filter/1,
+ update_db_name/1,
+ docs_acc_new/3,
+ docs_acc_response/1,
docs_cb/2
]).
@@ -31,31 +36,107 @@
]).
-%% Handle replicator docs streaming
+parse_replication_state_filter(undefined) ->
+ []; % This is the default (wildcard) filter
+parse_replication_state_filter(States) when is_list(States) ->
+ AllStates = couch_replicator:replication_states(),
+ StrStates = [string:to_lower(S) || S <- string:tokens(States, ",")],
+ AtomStates = try
+ [list_to_existing_atom(S) || S <- StrStates]
+ catch error:badarg ->
+ Msg1 = io_lib:format("States must be one or more of ~w", [AllStates]),
+ throw({query_parse_error, ?l2b(Msg1)})
+ end,
+ AllSet = sets:from_list(AllStates),
+ StatesSet = sets:from_list(AtomStates),
+ Diff = sets:to_list(sets:subtract(StatesSet, AllSet)),
+ case Diff of
+ [] ->
+ AtomStates;
+ _ ->
+ Args = [Diff, AllStates],
+ Msg2 = io_lib:format("Unknown states ~w. Choose from: ~w", Args),
+ throw({query_parse_error, ?l2b(Msg2)})
+ end.
+
+
+parse_int_param(Req, Param, Default, Min, Max) ->
+ IntVal = try
+ list_to_integer(chttpd:qs_value(Req, Param, integer_to_list(Default)))
+ catch error:badarg ->
+ Msg1 = io_lib:format("~s must be an integer", [Param]),
+ throw({query_parse_error, ?l2b(Msg1)})
+ end,
+ case IntVal >= Min andalso IntVal =< Max of
+ true ->
+ IntVal;
+ false ->
+ Msg2 = io_lib:format("~s not in range of [~w,~w]", [Param, Min, Max]),
+ throw({query_parse_error, ?l2b(Msg2)})
+ end.
+
+
+validate_rep_props([]) ->
+ ok;
+validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
+ lists:foreach(fun
+ ({_,V}) when is_binary(V) -> ok;
+ ({K,_}) -> throw({bad_request,
+ <<K/binary," value must be a string.">>})
+ end, Params),
+ validate_rep_props(Rest);
+validate_rep_props([_|Rest]) ->
+ validate_rep_props(Rest).
+
+
+prepend_val(#vacc{prepend=Prepend}) ->
+ case Prepend of
+ undefined ->
+ "";
+ _ ->
+ Prepend
+ end.
+
+
+maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
+ when Size > 0 andalso (Size + Len) > Max ->
+ #vacc{buffer = Buffer, resp = Resp} = Acc,
+ {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
+ {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
+maybe_flush_response(Acc0, Data, Len) ->
+ #vacc{buffer = Buf, bufsize = Size} = Acc0,
+ Acc = Acc0#vacc{
+ prepend = ",\r\n",
+ buffer = [Buf | Data],
+ bufsize = Size + Len
+ },
+ {ok, Acc}.
+
+docs_acc_new(Req, Db, Threshold) ->
+ #vacc{db=Db, req=Req, threshold=Threshold}.
+
+docs_acc_response(#vacc{resp = Resp}) ->
+ Resp.
docs_cb({error, Reason}, #vacc{resp=undefined}=Acc) ->
{ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason),
{ok, Acc#vacc{resp=Resp}};
-
docs_cb(complete, #vacc{resp=undefined}=Acc) ->
% Nothing in view
{ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
{ok, Acc#vacc{resp=Resp}};
-
docs_cb(Msg, #vacc{resp=undefined}=Acc) ->
%% Start response
Headers = [],
{ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers),
docs_cb(Msg, Acc#vacc{resp=Resp, should_close=true});
-
docs_cb({error, Reason}, #vacc{resp=Resp}=Acc) ->
{ok, Resp1} = chttpd:send_delayed_error(Resp, Reason),
{ok, Acc#vacc{resp=Resp1}};
-
docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
% Finish view output and possibly end the response
{ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max),
@@ -68,7 +149,6 @@ docs_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
prepend=",\r\n", buffer=[], bufsize=0}}
end;
-
docs_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) ->
% Sending metadata as we've not sent it or any row yet
Parts = case couch_util:get_value(total, Meta) of
@@ -87,85 +167,31 @@ docs_cb({meta, _Meta}, #vacc{}=Acc) ->
%% ignore metadata
{ok, Acc};
-
docs_cb({row, Row}, #vacc{meta_sent=false}=Acc) ->
%% sorted=false and row arrived before meta
% Adding another row
Chunk = [prepend_val(Acc), "{\"docs\":[\r\n", row_to_json(Row)],
maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk));
-
docs_cb({row, Row}, #vacc{meta_sent=true}=Acc) ->
% Adding another row
Chunk = [prepend_val(Acc), row_to_json(Row)],
maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)).
-handle_req(#httpd{method = 'POST', user_ctx = UserCtx} = Req) ->
- couch_httpd:validate_ctype(Req, "application/json"),
- RepDoc = {Props} = couch_httpd:json_body_obj(Req),
- validate_rep_props(Props),
- case couch_replicator:replicate(RepDoc, UserCtx) of
- {error, {Error, Reason}} ->
- send_json(
- Req, 500,
- {[{error, to_binary(Error)}, {reason, to_binary(Reason)}]});
- {error, not_found} ->
- % Tried to cancel a replication that didn't exist.
- send_json(Req, 404, {[{error, <<"not found">>}]});
- {error, Reason} ->
- send_json(Req, 500, {[{error, to_binary(Reason)}]});
- {ok, {cancelled, RepId}} ->
- send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
- {ok, {continuous, RepId}} ->
- send_json(Req, 202, {[{ok, true}, {<<"_local_id">>, RepId}]});
- {ok, {HistoryResults}} ->
- send_json(Req, {[{ok, true} | HistoryResults]})
- end;
-
-handle_req(Req) ->
- send_method_not_allowed(Req, "POST").
-
-validate_rep_props([]) ->
- ok;
-validate_rep_props([{<<"query_params">>, {Params}}|Rest]) ->
- lists:foreach(fun
- ({_,V}) when is_binary(V) -> ok;
- ({K,_}) -> throw({bad_request,
- <<K/binary," value must be a string.">>})
- end, Params),
- validate_rep_props(Rest);
-validate_rep_props([_|Rest]) ->
- validate_rep_props(Rest).
-
-
-prepend_val(#vacc{prepend=Prepend}) ->
- case Prepend of
- undefined ->
- "";
- _ ->
- Prepend
- end.
-
-
-maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
- when Size > 0 andalso (Size + Len) > Max ->
- #vacc{buffer = Buffer, resp = Resp} = Acc,
- {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
- {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
-maybe_flush_response(Acc0, Data, Len) ->
- #vacc{buffer = Buf, bufsize = Size} = Acc0,
- Acc = Acc0#vacc{
- prepend = ",\r\n",
- buffer = [Buf | Data],
- bufsize = Size + Len
- },
- {ok, Acc}.
+update_db_name({Props}) ->
+ {value, {database, DbName}, Props1} = lists:keytake(database, 1, Props),
+ {[{database, normalize_db_name(DbName)} | Props1]}.
+normalize_db_name(<<"shards/", _/binary>> = DbName) ->
+ mem3:dbname(DbName);
+normalize_db_name(DbName) ->
+ DbName.
row_to_json(Row) ->
- Doc = couch_util:get_value(doc, Row),
- ?JSON_ENCODE(Doc).
+ Doc0 = couch_util:get_value(doc, Row),
+ Doc1 = update_db_name(Doc0),
+ ?JSON_ENCODE(Doc1).
%% Adjust Total as there is an automatically created validation design doc
--
To stop receiving notification emails like this one, please contact
['"commits@couchdb.apache.org" <co...@couchdb.apache.org>'].