You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ja...@apache.org on 2020/06/12 17:54:19 UTC
[couchdb] 09/10: fix: all_docs on partitioned dbs,
override partition requirement on faked access all docs query
This is an automated email from the ASF dual-hosted git repository.
jan pushed a commit to branch feat/access-3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 82007260d09f7626f441939edd72d77db9b9091f
Author: Jan Lehnardt <ja...@apache.org>
AuthorDate: Fri Jun 12 18:49:05 2020 +0200
fix: all_docs on partitioned dbs, override partition requirement on faked access all docs query
---
src/couch/src/couch_db.erl | 1 -
src/couch_mrview/include/couch_mrview.hrl.orig | 110 ++++
src/couch_mrview/src/couch_mrview.erl | 8 +-
.../{couch_mrview.erl => couch_mrview.erl.orig} | 121 +---
src/couch_mrview/src/couch_mrview_http.erl.orig | 640 +++++++++++++++++++++
src/couch_mrview/src/couch_mrview_updater.erl.orig | 380 ++++++++++++
src/couch_mrview/src/couch_mrview_updater.erl.rej | 52 ++
src/couch_mrview/src/couch_mrview_util.erl | 4 +-
..._mrview_util.erl => couch_mrview_util.erl.orig} | 5 -
src/couch_mrview/src/couch_mrview_util.erl.rej | 16 +
10 files changed, 1206 insertions(+), 131 deletions(-)
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index ecd456c..b315f07 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -1792,7 +1792,6 @@ open_doc_revs_int(Db, IdRevs, Options) ->
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
case couch_db_engine:open_local_docs(Db, [Id]) of
[#doc{} = Doc] ->
- couch_log:info("~nopen_doc_int: Doc: ~p~n", [Doc]),
case Doc#doc.body of
{ Body } ->
Access = couch_util:get_value(<<"_access">>, Body),
diff --git a/src/couch_mrview/include/couch_mrview.hrl.orig b/src/couch_mrview/include/couch_mrview.hrl.orig
new file mode 100644
index 0000000..bb0ab0b
--- /dev/null
+++ b/src/couch_mrview/include/couch_mrview.hrl.orig
@@ -0,0 +1,110 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-record(mrst, {
+ sig=nil,
+ fd=nil,
+ fd_monitor,
+ db_name,
+ idx_name,
+ language,
+ design_opts=[],
+ partitioned=false,
+ lib,
+ views,
+ id_btree=nil,
+ update_seq=0,
+ purge_seq=0,
+ first_build,
+ partial_resp_pid,
+ doc_acc,
+ doc_queue,
+ write_queue,
+ qserver=nil
+}).
+
+
+-record(mrview, {
+ id_num,
+ update_seq=0,
+ purge_seq=0,
+ map_names=[],
+ reduce_funs=[],
+ def,
+ btree=nil,
+ options=[]
+}).
+
+
+-record(mrheader, {
+ seq=0,
+ purge_seq=0,
+ id_btree_state=nil,
+ view_states=nil
+}).
+
+-define(MAX_VIEW_LIMIT, 16#10000000).
+
+-record(mrargs, {
+ view_type,
+ reduce,
+
+ preflight_fun,
+
+ start_key,
+ start_key_docid,
+ end_key,
+ end_key_docid,
+ keys,
+
+ direction = fwd,
+ limit = ?MAX_VIEW_LIMIT,
+ skip = 0,
+ group_level = 0,
+ group = undefined,
+ stable = false,
+ update = true,
+ multi_get = false,
+ inclusive_end = true,
+ include_docs = false,
+ doc_options = [],
+ update_seq=false,
+ conflicts,
+ callback,
+ sorted = true,
+ extra = []
+}).
+
+-record(vacc, {
+ db,
+ req,
+ resp,
+ prepend,
+ etag,
+ should_close = false,
+ buffer = [],
+ bufsize = 0,
+ threshold = 1490,
+ row_sent = false,
+ meta_sent = false
+}).
+
+-record(lacc, {
+ db,
+ req,
+ resp,
+ qserver,
+ lname,
+ etag,
+ code,
+ headers
+}).
diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl
index 98bceae..0d41f2e 100644
--- a/src/couch_mrview/src/couch_mrview.erl
+++ b/src/couch_mrview/src/couch_mrview.erl
@@ -233,9 +233,9 @@ query_all_docs(Db, Args) ->
query_all_docs(Db, Args, Callback, Acc) when is_list(Args) ->
query_all_docs(Db, to_mrargs(Args), Callback, Acc);
query_all_docs(Db, Args0, Callback, Acc) ->
- case couch_db:is_admin(Db) of
- true -> query_all_docs_admin(Db, Args0, Callback, Acc);
- false -> query_all_docs_access(Db, Args0, Callback, Acc)
+ case couch_db:has_access_enabled(Db) and not couch_db:is_admin(Db) of
+ true -> query_all_docs_access(Db, Args0, Callback, Acc);
+ false -> query_all_docs_admin(Db, Args0, Callback, Acc)
end.
access_ddoc() ->
@@ -305,7 +305,7 @@ query_all_docs_access(Db, Args0, Callback0, Acc) ->
UserCtx = couch_db:get_user_ctx(Db),
UserName = UserCtx#user_ctx.name,
Args1 = prefix_startkey_endkey(UserName, Args0, Args0#mrargs.direction),
- Args = Args1#mrargs{reduce=false},
+ Args = Args1#mrargs{reduce=false, extra=Args1#mrargs.extra ++ [{all_docs_access, true}]},
Callback = fun
({row, Props}, Acc0) ->
diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl.orig
similarity index 83%
copy from src/couch_mrview/src/couch_mrview.erl
copy to src/couch_mrview/src/couch_mrview.erl.orig
index 98bceae..1cdc918 100644
--- a/src/couch_mrview/src/couch_mrview.erl
+++ b/src/couch_mrview/src/couch_mrview.erl.orig
@@ -13,7 +13,7 @@
-module(couch_mrview).
-export([validate/2]).
--export([query_all_docs/2, query_all_docs/4, query_changes_access/5]).
+-export([query_all_docs/2, query_all_docs/4]).
-export([query_view/3, query_view/4, query_view/6, get_view_index_pid/4]).
-export([get_info/2]).
-export([trigger_update/2, trigger_update/3]).
@@ -233,123 +233,6 @@ query_all_docs(Db, Args) ->
query_all_docs(Db, Args, Callback, Acc) when is_list(Args) ->
query_all_docs(Db, to_mrargs(Args), Callback, Acc);
query_all_docs(Db, Args0, Callback, Acc) ->
- case couch_db:is_admin(Db) of
- true -> query_all_docs_admin(Db, Args0, Callback, Acc);
- false -> query_all_docs_access(Db, Args0, Callback, Acc)
- end.
-
-access_ddoc() ->
- #doc{
- id = <<"_design/_access">>,
- body = {[
- {<<"language">>,<<"_access">>},
- {<<"options">>, {[
- {<<"include_design">>, true}
- ]}},
- {<<"views">>, {[
- {<<"_access_by_id">>, {[
- {<<"map">>, <<"_access/by-id-map">>},
- {<<"reduce">>, <<"_count">>}
- ]}},
- {<<"_access_by_seq">>, {[
- {<<"map">>, <<"_access/by-seq-map">>},
- {<<"reduce">>, <<"_count">>}
- ]}}
- ]}}
- ]}
- }.
-
-query_changes_access(Db, StartSeq, Fun, Options, Acc) ->
- DDoc = access_ddoc(),
- UserCtx = couch_db:get_user_ctx(Db),
- UserName = UserCtx#user_ctx.name,
- %% % TODO: add roles
- Args1 = prefix_startkey_endkey(UserName, #mrargs{}, fwd),
- Args2 = Args1#mrargs{deleted=true},
- Args = Args2#mrargs{reduce=false},
- %% % filter out the user-prefix from the key, so _all_docs looks normal
- %% % this isn’t a separate function because I’m binding Callback0 and I don’t
- %% % know the Erlang equivalent of JS’s fun.bind(this, newarg)
- Callback = fun
- ({meta, _}, Acc0) ->
- {ok, Acc0}; % ignore for now
- ({row, Props}, Acc0) ->
- % turn row into FDI
- Value = couch_util:get_value(value, Props),
- [Owner, Seq] = couch_util:get_value(key, Props),
-
- Rev = couch_util:get_value(rev, Value),
- Deleted = couch_util:get_value(deleted, Value, false),
- BodySp = couch_util:get_value(body_sp, Value),
-
- [Pos, RevId] = string:split(?b2l(Rev), "-"),
- FDI = #full_doc_info{
- id = proplists:get_value(id, Props),
- rev_tree = [{list_to_integer(Pos), {?l2b(RevId), #leaf{deleted=Deleted, ptr=BodySp, seq=Seq, sizes=#size_info{}}, []}}],
- deleted = Deleted,
- update_seq = 0,
- sizes = #size_info{},
- access = [Owner]
- },
- Fun(FDI, Acc0);
- (_Else, Acc0) ->
- {ok, Acc0} % ignore for now
- end,
- VName = <<"_access_by_seq">>,
- query_view(Db, DDoc, VName, Args, Callback, Acc).
-
-query_all_docs_access(Db, Args0, Callback0, Acc) ->
- % query our not yest existing, home-grown _access view.
- % use query_view for this.
- DDoc = access_ddoc(),
- UserCtx = couch_db:get_user_ctx(Db),
- UserName = UserCtx#user_ctx.name,
- Args1 = prefix_startkey_endkey(UserName, Args0, Args0#mrargs.direction),
- Args = Args1#mrargs{reduce=false},
-
- Callback = fun
- ({row, Props}, Acc0) ->
-
- % filter out the user-prefix from the key, so _all_docs looks normal
- % this isn’t a separate function because I’m binding Callback0 and I
- % don’t know the Erlang equivalent of JS’s fun.bind(this, newarg)
- [_User, Key] = proplists:get_value(key, Props),
- Row0 = proplists:delete(key, Props),
- Row = [{key, Key} | Row0],
-
- Callback0({row, Row}, Acc0);
- (Row, Acc0) ->
- Callback0(Row, Acc0)
- end,
- VName = <<"_access_by_id">>,
- query_view(Db, DDoc, VName, Args, Callback, Acc).
-
-prefix_startkey_endkey(UserName, Args, fwd) ->
- #mrargs{start_key=StartKey, end_key=EndKey} = Args,
- Args#mrargs {
- start_key = case StartKey of
- undefined -> [UserName];
- StartKey -> [UserName, StartKey]
- end,
- end_key = case EndKey of
- undefined -> [UserName, {}];
- EndKey -> [UserName, EndKey, {}]
- end
- };
-prefix_startkey_endkey(UserName, Args, rev) ->
- #mrargs{start_key=StartKey, end_key=EndKey} = Args,
- Args#mrargs {
- end_key = case StartKey of
- undefined -> [UserName];
- StartKey -> [UserName, StartKey]
- end,
- start_key = case EndKey of
- undefined -> [UserName, {}];
- EndKey -> [UserName, EndKey, {}]
- end
- }.
-
-query_all_docs_admin(Db, Args0, Callback, Acc) ->
Sig = couch_util:with_db(Db, fun(WDb) ->
{ok, Info} = couch_db:get_db_info(WDb),
couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Info)))
@@ -376,7 +259,6 @@ query_view(Db, DDoc, VName, Args) ->
query_view(Db, DDoc, VName, Args, Callback, Acc) when is_list(Args) ->
query_view(Db, DDoc, VName, to_mrargs(Args), Callback, Acc);
query_view(Db, DDoc, VName, Args0, Callback, Acc0) ->
- ok = couch_util:validate_design_access(Db, DDoc),
case couch_mrview_util:get_view(Db, DDoc, VName, Args0) of
{ok, VInfo, Sig, Args} ->
{ok, Acc1} = case Args#mrargs.preflight_fun of
@@ -804,6 +686,7 @@ default_cb(ok, ddoc_updated) ->
default_cb(Row, Acc) ->
{ok, [Row | Acc]}.
+
to_mrargs(KeyList) ->
lists:foldl(fun({Key, Value}, Acc) ->
Index = lookup_index(couch_util:to_existing_atom(Key)),
diff --git a/src/couch_mrview/src/couch_mrview_http.erl.orig b/src/couch_mrview/src/couch_mrview_http.erl.orig
new file mode 100644
index 0000000..3cf8833
--- /dev/null
+++ b/src/couch_mrview/src/couch_mrview_http.erl.orig
@@ -0,0 +1,640 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_http).
+
+-export([
+ handle_all_docs_req/2,
+ handle_local_docs_req/2,
+ handle_design_docs_req/2,
+ handle_reindex_req/3,
+ handle_view_req/3,
+ handle_temp_view_req/2,
+ handle_info_req/3,
+ handle_compact_req/3,
+ handle_cleanup_req/2
+]).
+
+-export([
+ parse_boolean/1,
+ parse_int/1,
+ parse_pos_int/1,
+ prepend_val/1,
+ parse_body_and_query/2,
+ parse_body_and_query/3,
+ parse_params/2,
+ parse_params/3,
+ parse_params/4,
+ view_cb/2,
+ row_to_json/1,
+ row_to_json/2,
+ check_view_etag/3
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+handle_all_docs_req(#httpd{method='GET'}=Req, Db) ->
+ all_docs_req(Req, Db, undefined);
+handle_all_docs_req(#httpd{method='POST'}=Req, Db) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ Keys = couch_mrview_util:get_view_keys(chttpd:json_body_obj(Req)),
+ all_docs_req(Req, Db, Keys);
+handle_all_docs_req(Req, _Db) ->
+ chttpd:send_method_not_allowed(Req, "GET,POST,HEAD").
+
+handle_local_docs_req(#httpd{method='GET'}=Req, Db) ->
+ all_docs_req(Req, Db, undefined, <<"_local">>);
+handle_local_docs_req(#httpd{method='POST'}=Req, Db) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ Keys = couch_mrview_util:get_view_keys(chttpd:json_body_obj(Req)),
+ all_docs_req(Req, Db, Keys, <<"_local">>);
+handle_local_docs_req(Req, _Db) ->
+ chttpd:send_method_not_allowed(Req, "GET,POST,HEAD").
+
+handle_design_docs_req(#httpd{method='GET'}=Req, Db) ->
+ all_docs_req(Req, Db, undefined, <<"_design">>);
+handle_design_docs_req(#httpd{method='POST'}=Req, Db) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ Keys = couch_mrview_util:get_view_keys(chttpd:json_body_obj(Req)),
+ all_docs_req(Req, Db, Keys, <<"_design">>);
+handle_design_docs_req(Req, _Db) ->
+ chttpd:send_method_not_allowed(Req, "GET,POST,HEAD").
+
+handle_reindex_req(#httpd{method='POST',
+ path_parts=[_, _, DName,<<"_reindex">>]}=Req,
+ Db, _DDoc) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ ok = couch_db:check_is_admin(Db),
+ couch_mrview:trigger_update(Db, <<"_design/", DName/binary>>),
+ chttpd:send_json(Req, 201, {[{<<"ok">>, true}]});
+handle_reindex_req(Req, _Db, _DDoc) ->
+ chttpd:send_method_not_allowed(Req, "POST").
+
+
+handle_view_req(#httpd{method='GET',
+ path_parts=[_, _, DDocName, _, VName, <<"_info">>]}=Req,
+ Db, _DDoc) ->
+ DbName = couch_db:name(Db),
+ DDocId = <<"_design/", DDocName/binary >>,
+ {ok, Info} = couch_mrview:get_view_info(DbName, DDocId, VName),
+
+ FinalInfo = [{db_name, DbName},
+ {ddoc, DDocId},
+ {view, VName}] ++ Info,
+ chttpd:send_json(Req, 200, {FinalInfo});
+handle_view_req(#httpd{method='GET'}=Req, Db, DDoc) ->
+ [_, _, _, _, ViewName] = Req#httpd.path_parts,
+ couch_stats:increment_counter([couchdb, httpd, view_reads]),
+ design_doc_view(Req, Db, DDoc, ViewName, undefined);
+handle_view_req(#httpd{method='POST'}=Req, Db, DDoc) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ [_, _, _, _, ViewName] = Req#httpd.path_parts,
+ Props = chttpd:json_body_obj(Req),
+ Keys = couch_mrview_util:get_view_keys(Props),
+ Queries = couch_mrview_util:get_view_queries(Props),
+ case {Queries, Keys} of
+ {Queries, undefined} when is_list(Queries) ->
+ IncrBy = length(Queries),
+ couch_stats:increment_counter([couchdb, httpd, view_reads], IncrBy),
+ multi_query_view(Req, Db, DDoc, ViewName, Queries);
+ {undefined, Keys} when is_list(Keys) ->
+ couch_stats:increment_counter([couchdb, httpd, view_reads]),
+ design_doc_view(Req, Db, DDoc, ViewName, Keys);
+ {undefined, undefined} ->
+ throw({
+ bad_request,
+ "POST body must contain `keys` or `queries` field"
+ });
+ {_, _} ->
+ throw({bad_request, "`keys` and `queries` are mutually exclusive"})
+ end;
+handle_view_req(Req, _Db, _DDoc) ->
+ chttpd:send_method_not_allowed(Req, "GET,POST,HEAD").
+
+
+handle_temp_view_req(#httpd{method='POST'}=Req, Db) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ ok = couch_db:check_is_admin(Db),
+ {Body} = chttpd:json_body_obj(Req),
+ DDoc = couch_mrview_util:temp_view_to_ddoc({Body}),
+ Keys = couch_mrview_util:get_view_keys({Body}),
+ couch_stats:increment_counter([couchdb, httpd, temporary_view_reads]),
+ design_doc_view(Req, Db, DDoc, <<"temp">>, Keys);
+handle_temp_view_req(Req, _Db) ->
+ chttpd:send_method_not_allowed(Req, "POST").
+
+
+handle_info_req(#httpd{method='GET'}=Req, Db, DDoc) ->
+ [_, _, Name, _] = Req#httpd.path_parts,
+ {ok, Info} = couch_mrview:get_info(Db, DDoc),
+ chttpd:send_json(Req, 200, {[
+ {name, Name},
+ {view_index, {Info}}
+ ]});
+handle_info_req(Req, _Db, _DDoc) ->
+ chttpd:send_method_not_allowed(Req, "GET").
+
+
+handle_compact_req(#httpd{method='POST'}=Req, Db, DDoc) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ ok = couch_db:check_is_admin(Db),
+ ok = couch_mrview:compact(Db, DDoc),
+ chttpd:send_json(Req, 202, {[{ok, true}]});
+handle_compact_req(Req, _Db, _DDoc) ->
+ chttpd:send_method_not_allowed(Req, "POST").
+
+
+handle_cleanup_req(#httpd{method='POST'}=Req, Db) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ ok = couch_db:check_is_admin(Db),
+ ok = couch_mrview:cleanup(Db),
+ chttpd:send_json(Req, 202, {[{ok, true}]});
+handle_cleanup_req(Req, _Db) ->
+ chttpd:send_method_not_allowed(Req, "POST").
+
+
+all_docs_req(Req, Db, Keys) ->
+ all_docs_req(Req, Db, Keys, undefined).
+
+all_docs_req(Req, Db, Keys, NS) ->
+ case is_restricted(Db, NS) of
+ true ->
+ case (catch couch_db:check_is_admin(Db)) of
+ ok ->
+ do_all_docs_req(Req, Db, Keys, NS);
+ _ when NS == <<"_local">> ->
+ throw({forbidden, <<"Only admins can access _local_docs">>});
+ _ ->
+ case is_public_fields_configured(Db) of
+ true ->
+ do_all_docs_req(Req, Db, Keys, NS);
+ false ->
+ throw({forbidden, <<"Only admins can access _all_docs",
+ " of system databases.">>})
+ end
+ end;
+ false ->
+ do_all_docs_req(Req, Db, Keys, NS)
+ end.
+
+is_restricted(_Db, <<"_local">>) ->
+ true;
+is_restricted(Db, _) ->
+ couch_db:is_system_db(Db).
+
+is_public_fields_configured(Db) ->
+ DbName = ?b2l(couch_db:name(Db)),
+ case config:get("couch_httpd_auth", "authentication_db", "_users") of
+ DbName ->
+ UsersDbPublic = config:get("couch_httpd_auth", "users_db_public", "false"),
+ PublicFields = config:get("couch_httpd_auth", "public_fields"),
+ case {UsersDbPublic, PublicFields} of
+ {"true", PublicFields} when PublicFields =/= undefined ->
+ true;
+ {_, _} ->
+ false
+ end;
+ _ ->
+ false
+ end.
+
+do_all_docs_req(Req, Db, Keys, NS) ->
+ Args0 = couch_mrview_http:parse_body_and_query(Req, Keys),
+ Args1 = set_namespace(NS, Args0),
+ ETagFun = fun(Sig, Acc0) ->
+ check_view_etag(Sig, Acc0, Req)
+ end,
+ Args = Args1#mrargs{preflight_fun=ETagFun},
+ {ok, Resp} = couch_httpd:etag_maybe(Req, fun() ->
+ Max = chttpd:chunked_response_buffer_size(),
+ VAcc0 = #vacc{db=Db, req=Req, threshold=Max},
+ DbName = ?b2l(couch_db:name(Db)),
+ UsersDbName = config:get("couch_httpd_auth",
+ "authentication_db",
+ "_users"),
+ IsAdmin = is_admin(Db),
+ Callback = get_view_callback(DbName, UsersDbName, IsAdmin),
+ couch_mrview:query_all_docs(Db, Args, Callback, VAcc0)
+ end),
+ case is_record(Resp, vacc) of
+ true -> {ok, Resp#vacc.resp};
+ _ -> {ok, Resp}
+ end.
+
+set_namespace(NS, #mrargs{extra = Extra} = Args) ->
+ Args#mrargs{extra = [{namespace, NS} | Extra]}.
+
+is_admin(Db) ->
+ case catch couch_db:check_is_admin(Db) of
+ {unauthorized, _} ->
+ false;
+ ok ->
+ true
+ end.
+
+
+% admin users always get all fields
+get_view_callback(_, _, true) ->
+ fun view_cb/2;
+% if we are operating on the users db and we aren't
+% admin, filter the view
+get_view_callback(_DbName, _DbName, false) ->
+ fun filtered_view_cb/2;
+% non _users databases get all fields
+get_view_callback(_, _, _) ->
+ fun view_cb/2.
+
+
+design_doc_view(Req, Db, DDoc, ViewName, Keys) ->
+ Args0 = parse_params(Req, Keys),
+ ETagFun = fun(Sig, Acc0) ->
+ check_view_etag(Sig, Acc0, Req)
+ end,
+ Args = Args0#mrargs{preflight_fun=ETagFun},
+ {ok, Resp} = couch_httpd:etag_maybe(Req, fun() ->
+ Max = chttpd:chunked_response_buffer_size(),
+ VAcc0 = #vacc{db=Db, req=Req, threshold=Max},
+ couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0)
+ end),
+ case is_record(Resp, vacc) of
+ true -> {ok, Resp#vacc.resp};
+ _ -> {ok, Resp}
+ end.
+
+
+multi_query_view(Req, Db, DDoc, ViewName, Queries) ->
+ Args0 = parse_params(Req, undefined),
+ {ok, _, _, Args1} = couch_mrview_util:get_view(Db, DDoc, ViewName, Args0),
+ ArgQueries = lists:map(fun({Query}) ->
+ QueryArg = parse_params(Query, undefined, Args1),
+ couch_mrview_util:validate_args(Db, DDoc, QueryArg)
+ end, Queries),
+ {ok, Resp2} = couch_httpd:etag_maybe(Req, fun() ->
+ Max = chttpd:chunked_response_buffer_size(),
+ VAcc0 = #vacc{db=Db, req=Req, prepend="\r\n", threshold=Max},
+ %% TODO: proper calculation of etag
+ Etag = [$", couch_uuids:new(), $"],
+ Headers = [{"ETag", Etag}],
+ FirstChunk = "{\"results\":[",
+ {ok, Resp0} = chttpd:start_delayed_json_response(VAcc0#vacc.req, 200, Headers, FirstChunk),
+ VAcc1 = VAcc0#vacc{resp=Resp0},
+ VAcc2 = lists:foldl(fun(Args, Acc0) ->
+ {ok, Acc1} = couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, Acc0),
+ Acc1
+ end, VAcc1, ArgQueries),
+ {ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"),
+ {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+ {ok, VAcc2#vacc{resp=Resp2}}
+ end),
+ case is_record(Resp2, vacc) of
+ true -> {ok, Resp2#vacc.resp};
+ _ -> {ok, Resp2}
+ end.
+
+filtered_view_cb({row, Row0}, Acc) ->
+ Row1 = lists:map(fun({doc, null}) ->
+ {doc, null};
+ ({doc, Body}) ->
+ Doc = couch_users_db:strip_non_public_fields(#doc{body=Body}),
+ {doc, Doc#doc.body};
+ (KV) ->
+ KV
+ end, Row0),
+ view_cb({row, Row1}, Acc);
+filtered_view_cb(Obj, Acc) ->
+ view_cb(Obj, Acc).
+
+
+%% these clauses start (and possibly end) the response
+view_cb({error, Reason}, #vacc{resp=undefined}=Acc) ->
+ {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason),
+ {ok, Acc#vacc{resp=Resp}};
+
+view_cb(complete, #vacc{resp=undefined}=Acc) ->
+ % Nothing in view
+ {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
+ {ok, Acc#vacc{resp=Resp}};
+
+view_cb(Msg, #vacc{resp=undefined}=Acc) ->
+ %% Start response
+ Headers = [],
+ {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers),
+ view_cb(Msg, Acc#vacc{resp=Resp, should_close=true});
+
+%% ---------------------------------------------------
+
+%% From here on down, the response has been started.
+
+view_cb({error, Reason}, #vacc{resp=Resp}=Acc) ->
+ {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason),
+ {ok, Acc#vacc{resp=Resp1}};
+
+view_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
+ % Finish view output and possibly end the response
+ {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max),
+ case Acc#vacc.should_close of
+ true ->
+ {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+ {ok, Acc#vacc{resp=Resp2}};
+ _ ->
+ {ok, Acc#vacc{resp=Resp1, meta_sent=false, row_sent=false,
+ prepend=",\r\n", buffer=[], bufsize=0}}
+ end;
+
+view_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) ->
+ % Sending metadata as we've not sent it or any row yet
+ Parts = case couch_util:get_value(total, Meta) of
+ undefined -> [];
+ Total -> [io_lib:format("\"total_rows\":~p", [Total])]
+ end ++ case couch_util:get_value(offset, Meta) of
+ undefined -> [];
+ Offset -> [io_lib:format("\"offset\":~p", [Offset])]
+ end ++ case couch_util:get_value(update_seq, Meta) of
+ undefined -> [];
+ null ->
+ ["\"update_seq\":null"];
+ UpdateSeq when is_integer(UpdateSeq) ->
+ [io_lib:format("\"update_seq\":~B", [UpdateSeq])];
+ UpdateSeq when is_binary(UpdateSeq) ->
+ [io_lib:format("\"update_seq\":\"~s\"", [UpdateSeq])]
+ end ++ ["\"rows\":["],
+ Chunk = [prepend_val(Acc), "{", string:join(Parts, ","), "\r\n"],
+ {ok, AccOut} = maybe_flush_response(Acc, Chunk, iolist_size(Chunk)),
+ {ok, AccOut#vacc{prepend="", meta_sent=true}};
+
+view_cb({meta, _Meta}, #vacc{}=Acc) ->
+ %% ignore metadata
+ {ok, Acc};
+
+view_cb({row, Row}, #vacc{meta_sent=false}=Acc) ->
+ %% sorted=false and row arrived before meta
+ % Adding another row
+ Chunk = [prepend_val(Acc), "{\"rows\":[\r\n", row_to_json(Row)],
+ maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk));
+
+view_cb({row, Row}, #vacc{meta_sent=true}=Acc) ->
+ % Adding another row
+ Chunk = [prepend_val(Acc), row_to_json(Row)],
+ maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)).
+
+
+maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
+ when Size > 0 andalso (Size + Len) > Max ->
+ #vacc{buffer = Buffer, resp = Resp} = Acc,
+ {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
+ {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
+maybe_flush_response(Acc0, Data, Len) ->
+ #vacc{buffer = Buf, bufsize = Size} = Acc0,
+ Acc = Acc0#vacc{
+ prepend = ",\r\n",
+ buffer = [Buf | Data],
+ bufsize = Size + Len
+ },
+ {ok, Acc}.
+
+prepend_val(#vacc{prepend=Prepend}) ->
+ case Prepend of
+ undefined ->
+ "";
+ _ ->
+ Prepend
+ end.
+
+
+row_to_json(Row) ->
+ Id = couch_util:get_value(id, Row),
+ row_to_json(Id, Row).
+
+
+row_to_json(error, Row) ->
+ % Special case for _all_docs request with KEYS to
+ % match prior behavior.
+ Key = couch_util:get_value(key, Row),
+ Val = couch_util:get_value(value, Row),
+ Reason = couch_util:get_value(reason, Row),
+ ReasonProp = if Reason == undefined -> []; true ->
+ [{reason, Reason}]
+ end,
+ Obj = {[{key, Key}, {error, Val}] ++ ReasonProp},
+ ?JSON_ENCODE(Obj);
+row_to_json(Id0, Row) ->
+ Id = case Id0 of
+ undefined -> [];
+ Id0 -> [{id, Id0}]
+ end,
+ Key = couch_util:get_value(key, Row, null),
+ Val = couch_util:get_value(value, Row),
+ Doc = case couch_util:get_value(doc, Row) of
+ undefined -> [];
+ Doc0 -> [{doc, Doc0}]
+ end,
+ Obj = {Id ++ [{key, Key}, {value, Val}] ++ Doc},
+ ?JSON_ENCODE(Obj).
+
+
+parse_params(#httpd{}=Req, Keys) ->
+ parse_params(chttpd:qs(Req), Keys);
+parse_params(Props, Keys) ->
+ Args = #mrargs{},
+ parse_params(Props, Keys, Args).
+
+
+parse_params(Props, Keys, Args) ->
+ parse_params(Props, Keys, Args, []).
+
+parse_params(Props, Keys, #mrargs{}=Args0, Options) ->
+ IsDecoded = lists:member(decoded, Options),
+ Args1 = case lists:member(keep_group_level, Options) of
+ true ->
+ Args0;
+ _ ->
+ % group_level set to undefined to detect if explicitly set by user
+ Args0#mrargs{keys=Keys, group=undefined, group_level=undefined}
+ end,
+ lists:foldl(fun({K, V}, Acc) ->
+ parse_param(K, V, Acc, IsDecoded)
+ end, Args1, Props).
+
+
+parse_body_and_query(#httpd{method='POST'} = Req, Keys) ->
+ Props = chttpd:json_body_obj(Req),
+ parse_body_and_query(Req, Props, Keys);
+
+parse_body_and_query(Req, Keys) ->
+ parse_params(chttpd:qs(Req), Keys, #mrargs{keys=Keys, group=undefined,
+ group_level=undefined}, [keep_group_level]).
+
+parse_body_and_query(Req, {Props}, Keys) ->
+ Args = #mrargs{keys=Keys, group=undefined, group_level=undefined},
+ BodyArgs = parse_params(Props, Keys, Args, [decoded]),
+ parse_params(chttpd:qs(Req), Keys, BodyArgs, [keep_group_level]).
+
+parse_param(Key, Val, Args, IsDecoded) when is_binary(Key) ->
+ parse_param(binary_to_list(Key), Val, Args, IsDecoded);
+parse_param(Key, Val, Args, IsDecoded) ->
+ case Key of
+ "" ->
+ Args;
+ "reduce" ->
+ Args#mrargs{reduce=parse_boolean(Val)};
+ "key" when IsDecoded ->
+ Args#mrargs{start_key=Val, end_key=Val};
+ "key" ->
+ JsonKey = ?JSON_DECODE(Val),
+ Args#mrargs{start_key=JsonKey, end_key=JsonKey};
+ "keys" when IsDecoded ->
+ Args#mrargs{keys=Val};
+ "keys" ->
+ Args#mrargs{keys=?JSON_DECODE(Val)};
+ "startkey" when IsDecoded ->
+ Args#mrargs{start_key=Val};
+ "start_key" when IsDecoded ->
+ Args#mrargs{start_key=Val};
+ "startkey" ->
+ Args#mrargs{start_key=?JSON_DECODE(Val)};
+ "start_key" ->
+ Args#mrargs{start_key=?JSON_DECODE(Val)};
+ "startkey_docid" ->
+ Args#mrargs{start_key_docid=couch_util:to_binary(Val)};
+ "start_key_doc_id" ->
+ Args#mrargs{start_key_docid=couch_util:to_binary(Val)};
+ "endkey" when IsDecoded ->
+ Args#mrargs{end_key=Val};
+ "end_key" when IsDecoded ->
+ Args#mrargs{end_key=Val};
+ "endkey" ->
+ Args#mrargs{end_key=?JSON_DECODE(Val)};
+ "end_key" ->
+ Args#mrargs{end_key=?JSON_DECODE(Val)};
+ "endkey_docid" ->
+ Args#mrargs{end_key_docid=couch_util:to_binary(Val)};
+ "end_key_doc_id" ->
+ Args#mrargs{end_key_docid=couch_util:to_binary(Val)};
+ "limit" ->
+ Args#mrargs{limit=parse_pos_int(Val)};
+ "stale" when Val == "ok" orelse Val == <<"ok">> ->
+ Args#mrargs{stable=true, update=false};
+ "stale" when Val == "update_after" orelse Val == <<"update_after">> ->
+ Args#mrargs{stable=true, update=lazy};
+ "stale" ->
+ throw({query_parse_error, <<"Invalid value for `stale`.">>});
+ "stable" when Val == "true" orelse Val == <<"true">> ->
+ Args#mrargs{stable=true};
+ "stable" when Val == "false" orelse Val == <<"false">> ->
+ Args#mrargs{stable=false};
+ "stable" ->
+ throw({query_parse_error, <<"Invalid value for `stable`.">>});
+ "update" when Val == "true" orelse Val == <<"true">> ->
+ Args#mrargs{update=true};
+ "update" when Val == "false" orelse Val == <<"false">> ->
+ Args#mrargs{update=false};
+ "update" when Val == "lazy" orelse Val == <<"lazy">> ->
+ Args#mrargs{update=lazy};
+ "update" ->
+ throw({query_parse_error, <<"Invalid value for `update`.">>});
+ "descending" ->
+ case parse_boolean(Val) of
+ true -> Args#mrargs{direction=rev};
+ _ -> Args#mrargs{direction=fwd}
+ end;
+ "skip" ->
+ Args#mrargs{skip=parse_pos_int(Val)};
+ "group" ->
+ Args#mrargs{group=parse_boolean(Val)};
+ "group_level" ->
+ Args#mrargs{group_level=parse_pos_int(Val)};
+ "inclusive_end" ->
+ Args#mrargs{inclusive_end=parse_boolean(Val)};
+ "include_docs" ->
+ Args#mrargs{include_docs=parse_boolean(Val)};
+ "attachments" ->
+ case parse_boolean(Val) of
+ true ->
+ Opts = Args#mrargs.doc_options,
+ Args#mrargs{doc_options=[attachments|Opts]};
+ false ->
+ Args
+ end;
+ "att_encoding_info" ->
+ case parse_boolean(Val) of
+ true ->
+ Opts = Args#mrargs.doc_options,
+ Args#mrargs{doc_options=[att_encoding_info|Opts]};
+ false ->
+ Args
+ end;
+ "update_seq" ->
+ Args#mrargs{update_seq=parse_boolean(Val)};
+ "conflicts" ->
+ Args#mrargs{conflicts=parse_boolean(Val)};
+ "callback" ->
+ Args#mrargs{callback=couch_util:to_binary(Val)};
+ "sorted" ->
+ Args#mrargs{sorted=parse_boolean(Val)};
+ "partition" ->
+ Partition = couch_util:to_binary(Val),
+ couch_partition:validate_partition(Partition),
+ couch_mrview_util:set_extra(Args, partition, Partition);
+ _ ->
+ BKey = couch_util:to_binary(Key),
+ BVal = couch_util:to_binary(Val),
+ Args#mrargs{extra=[{BKey, BVal} | Args#mrargs.extra]}
+ end.
+
+
+parse_boolean(true) ->
+ true;
+parse_boolean(false) ->
+ false;
+
+parse_boolean(Val) when is_binary(Val) ->
+ parse_boolean(?b2l(Val));
+
+parse_boolean(Val) ->
+ case string:to_lower(Val) of
+ "true" -> true;
+ "false" -> false;
+ _ ->
+ Msg = io_lib:format("Invalid boolean parameter: ~p", [Val]),
+ throw({query_parse_error, ?l2b(Msg)})
+ end.
+
+parse_int(Val) when is_integer(Val) ->
+ Val;
+parse_int(Val) ->
+ case (catch list_to_integer(Val)) of
+ IntVal when is_integer(IntVal) ->
+ IntVal;
+ _ ->
+ Msg = io_lib:format("Invalid value for integer: ~p", [Val]),
+ throw({query_parse_error, ?l2b(Msg)})
+ end.
+
+parse_pos_int(Val) ->
+ case parse_int(Val) of
+ IntVal when IntVal >= 0 ->
+ IntVal;
+ _ ->
+ Fmt = "Invalid value for positive integer: ~p",
+ Msg = io_lib:format(Fmt, [Val]),
+ throw({query_parse_error, ?l2b(Msg)})
+ end.
+
+
+check_view_etag(Sig, Acc0, Req) ->
+ ETag = chttpd:make_etag(Sig),
+ case chttpd:etag_match(Req, ETag) of
+ true -> throw({etag_match, ETag});
+ false -> {ok, Acc0#vacc{etag=ETag}}
+ end.
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl.orig b/src/couch_mrview/src/couch_mrview_updater.erl.orig
new file mode 100644
index 0000000..7d6823e
--- /dev/null
+++ b/src/couch_mrview/src/couch_mrview_updater.erl.orig
@@ -0,0 +1,380 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_updater).
+
+-export([start_update/4, purge/4, process_doc/3, finish_update/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-define(REM_VAL, removed).
+
+start_update(Partial, State, NumChanges, NumChangesDone) ->
+ MaxSize = config:get_integer("view_updater", "queue_memory_cap", 100000),
+ MaxItems = config:get_integer("view_updater", "queue_item_cap", 500),
+ QueueOpts = [{max_size, MaxSize}, {max_items, MaxItems}],
+ {ok, DocQueue} = couch_work_queue:new(QueueOpts),
+ {ok, WriteQueue} = couch_work_queue:new(QueueOpts),
+ InitState = State#mrst{
+ first_build=State#mrst.update_seq==0,
+ partial_resp_pid=Partial,
+ doc_acc=[],
+ doc_queue=DocQueue,
+ write_queue=WriteQueue
+ },
+
+ Self = self(),
+
+ MapFun = fun() ->
+ erlang:put(io_priority,
+ {view_update, State#mrst.db_name, State#mrst.idx_name}),
+ Progress = case NumChanges of
+ 0 -> 0;
+ _ -> (NumChangesDone * 100) div NumChanges
+ end,
+ couch_task_status:add_task([
+ {indexer_pid, ?l2b(pid_to_list(Partial))},
+ {type, indexer},
+ {database, State#mrst.db_name},
+ {design_document, State#mrst.idx_name},
+ {progress, Progress},
+ {changes_done, NumChangesDone},
+ {total_changes, NumChanges}
+ ]),
+ couch_task_status:set_update_frequency(500),
+ map_docs(Self, InitState)
+ end,
+ WriteFun = fun() ->
+ erlang:put(io_priority,
+ {view_update, State#mrst.db_name, State#mrst.idx_name}),
+ write_results(Self, InitState)
+ end,
+ spawn_link(MapFun),
+ spawn_link(WriteFun),
+
+ {ok, InitState}.
+
+
+purge(_Db, PurgeSeq, PurgedIdRevs, State) ->
+ #mrst{
+ id_btree=IdBtree,
+ views=Views,
+ partitioned=Partitioned
+ } = State,
+
+ Ids = [Id || {Id, _Revs} <- PurgedIdRevs],
+ {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
+
+ MakeDictFun = fun
+ ({ok, {DocId, ViewNumRowKeys}}, DictAcc) ->
+ FoldFun = fun
+ ({ViewNum, {Key, Seq, _Op}}, DictAcc2) ->
+ dict:append(ViewNum, {Key, Seq, DocId}, DictAcc2);
+ ({ViewNum, RowKey0}, DictAcc2) ->
+ RowKey = if not Partitioned -> RowKey0; true ->
+ [{RK, _}] = inject_partition([{RowKey0, DocId}]),
+ RK
+ end,
+ dict:append(ViewNum, {RowKey, DocId}, DictAcc2)
+ end,
+ lists:foldl(FoldFun, DictAcc, ViewNumRowKeys);
+ ({not_found, _}, DictAcc) ->
+ DictAcc
+ end,
+ KeysToRemove = lists:foldl(MakeDictFun, dict:new(), Lookups),
+
+ RemKeysFun = fun(#mrview{id_num=ViewId}=View) ->
+ ToRem = couch_util:dict_find(ViewId, KeysToRemove, []),
+ {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, [], ToRem),
+ NewPurgeSeq = case VBtree2 =/= View#mrview.btree of
+ true -> PurgeSeq;
+ _ -> View#mrview.purge_seq
+ end,
+ View#mrview{btree=VBtree2, purge_seq=NewPurgeSeq}
+ end,
+
+ Views2 = lists:map(RemKeysFun, Views),
+ {ok, State#mrst{
+ id_btree=IdBtree2,
+ views=Views2,
+ purge_seq=PurgeSeq
+ }}.
+
+
+process_doc(Doc, Seq, #mrst{doc_acc=Acc}=State) when length(Acc) > 100 ->
+ couch_work_queue:queue(State#mrst.doc_queue, lists:reverse(Acc)),
+ process_doc(Doc, Seq, State#mrst{doc_acc=[]});
+process_doc(nil, Seq, #mrst{doc_acc=Acc}=State) ->
+ {ok, State#mrst{doc_acc=[{nil, Seq, nil} | Acc]}};
+% process_doc(#doc{id=Id, deleted=true}, Seq, #mrst{doc_acc=Acc}=State) ->
+% {ok, State#mrst{doc_acc=[{Id, Seq, deleted} | Acc]}};
+process_doc(#doc{id=Id}=Doc, Seq, #mrst{doc_acc=Acc}=State) ->
+ {ok, State#mrst{doc_acc=[{Id, Seq, Doc} | Acc]}}.
+
+
+finish_update(#mrst{doc_acc=Acc}=State) ->
+ if Acc /= [] ->
+ couch_work_queue:queue(State#mrst.doc_queue, Acc);
+ true -> ok
+ end,
+ couch_work_queue:close(State#mrst.doc_queue),
+ receive
+ {new_state, NewState} ->
+ {ok, NewState#mrst{
+ first_build=undefined,
+ partial_resp_pid=undefined,
+ doc_acc=undefined,
+ doc_queue=undefined,
+ write_queue=undefined,
+ qserver=nil
+ }}
+ end.
+
+make_deleted_body({Props}, Meta, Seq) ->
+ BodySp = couch_util:get_value(body_sp, Meta),
+ Result = [{<<"_seq">>, Seq}, {<<"_body_sp">>, BodySp}],
+ case couch_util:get_value(<<"_access">>, Props) of
+ undefined -> Result;
+ Access -> [{<<"_access">>, Access} | Result]
+ end.
+
+map_docs(Parent, #mrst{db_name = DbName, idx_name = IdxName} = State0) ->
+ erlang:put(io_priority, {view_update, DbName, IdxName}),
+ case couch_work_queue:dequeue(State0#mrst.doc_queue) of
+ closed ->
+ couch_query_servers:stop_doc_map(State0#mrst.qserver),
+ couch_work_queue:close(State0#mrst.write_queue);
+ {ok, Dequeued} ->
+ % Run all the non deleted docs through the view engine and
+ % then pass the results on to the writer process.
+ State1 = case State0#mrst.qserver of
+ nil -> start_query_server(State0);
+ _ -> State0
+ end,
+ QServer = State1#mrst.qserver,
+ DocFun = fun
+ ({nil, Seq, _}, {SeqAcc, Results}) ->
+ {erlang:max(Seq, SeqAcc), Results};
+ ({Id, Seq, deleted}, {SeqAcc, Results}) ->
+ {erlang:max(Seq, SeqAcc), [{Id, []} | Results]};
+ ({Id, Seq, Doc}, {SeqAcc, Results}) ->
+ couch_stats:increment_counter([couchdb, mrview, map_doc]),
+ {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
+ {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
+ end,
+ FoldFun = fun(Docs, Acc) ->
+ update_task(length(Docs)),
+ lists:foldl(DocFun, Acc, Docs)
+ end,
+ Results = lists:foldl(FoldFun, {0, []}, Dequeued),
+ couch_work_queue:queue(State1#mrst.write_queue, Results),
+ map_docs(Parent, State1)
+ end.
+
+
+write_results(Parent, #mrst{} = State) ->
+ case accumulate_writes(State, State#mrst.write_queue, nil) of
+ stop ->
+ Parent ! {new_state, State};
+ {Go, {Seq, ViewKVs, DocIdKeys}} ->
+ NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys),
+ if Go == stop ->
+ Parent ! {new_state, NewState};
+ true ->
+ send_partial(NewState#mrst.partial_resp_pid, NewState),
+ write_results(Parent, NewState)
+ end
+ end.
+
+
+start_query_server(State) ->
+ #mrst{
+ language=Language,
+ lib=Lib,
+ views=Views
+ } = State,
+ Defs = [View#mrview.def || View <- Views],
+ {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib),
+ State#mrst{qserver=QServer}.
+
+
+accumulate_writes(State, W, Acc0) ->
+ {Seq, ViewKVs, DocIdKVs} = case Acc0 of
+ nil -> {0, [{V#mrview.id_num, []} || V <- State#mrst.views], []};
+ _ -> Acc0
+ end,
+ case couch_work_queue:dequeue(W) of
+ closed when Seq == 0 ->
+ stop;
+ closed ->
+ {stop, {Seq, ViewKVs, DocIdKVs}};
+ {ok, Info} ->
+ {_, _, NewIds} = Acc = merge_results(Info, Seq, ViewKVs, DocIdKVs),
+ case accumulate_more(length(NewIds), Acc) of
+ true -> accumulate_writes(State, W, Acc);
+ false -> {ok, Acc}
+ end
+ end.
+
+
+accumulate_more(NumDocIds, Acc) ->
+ % check if we have enough items now
+ MinItems = config:get("view_updater", "min_writer_items", "100"),
+ MinSize = config:get("view_updater", "min_writer_size", "16777216"),
+ CurrMem = ?term_size(Acc),
+ NumDocIds < list_to_integer(MinItems)
+ andalso CurrMem < list_to_integer(MinSize).
+
+
+merge_results([], SeqAcc, ViewKVs, DocIdKeys) ->
+ {SeqAcc, ViewKVs, DocIdKeys};
+merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys) ->
+ Fun = fun(RawResults, {VKV, DIK}) ->
+ merge_results(RawResults, VKV, DIK)
+ end,
+ {ViewKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs, DocIdKeys}, Results),
+ merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1).
+
+
+merge_results({DocId, []}, ViewKVs, DocIdKeys) ->
+ {ViewKVs, [{DocId, []} | DocIdKeys]};
+merge_results({DocId, RawResults}, ViewKVs, DocIdKeys) ->
+ JsonResults = couch_query_servers:raw_to_ejson(RawResults),
+ Results = [[list_to_tuple(Res) || Res <- FunRs] || FunRs <- JsonResults],
+ case lists:flatten(Results) of
+ [] ->
+ {ViewKVs, [{DocId, []} | DocIdKeys]};
+ _ ->
+ {ViewKVs1, ViewIdKeys} = insert_results(DocId, Results, ViewKVs, [], []),
+ {ViewKVs1, [ViewIdKeys | DocIdKeys]}
+ end.
+
+
+insert_results(DocId, [], [], ViewKVs, ViewIdKeys) ->
+ {lists:reverse(ViewKVs), {DocId, ViewIdKeys}};
+insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) ->
+ CombineDupesFun = fun
+ ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys}) ->
+ {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys};
+ ({Key, Val1}, {[{Key, Val2} | Rest], IdKeys}) ->
+ {[{Key, {dups, [Val1, Val2]}} | Rest], IdKeys};
+ ({Key, Value}, {Rest, IdKeys}) ->
+ {[{Key, Value} | Rest], [{Id, Key} | IdKeys]}
+ end,
+ InitAcc = {[], VIdKeys},
+ couch_stats:increment_counter([couchdb, mrview, emits], length(KVs)),
+ {Duped, VIdKeys0} = lists:foldl(CombineDupesFun, InitAcc,
+ lists:sort(KVs)),
+ FinalKVs = [{{Key, DocId}, Val} || {Key, Val} <- Duped] ++ VKVs,
+ insert_results(DocId, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], VIdKeys0).
+
+
+write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) ->
+ #mrst{
+ id_btree=IdBtree,
+ first_build=FirstBuild,
+ partitioned=Partitioned
+ } = State,
+
+ {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild),
+ ToRemByView = collapse_rem_keys(ToRemove, dict:new()),
+
+ UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, KVs0}) ->
+ ToRem0 = couch_util:dict_find(ViewId, ToRemByView, []),
+ {KVs, ToRem} = case Partitioned of
+ true ->
+ KVs1 = inject_partition(KVs0),
+ ToRem1 = inject_partition(ToRem0),
+ {KVs1, ToRem1};
+ false ->
+ {KVs0, ToRem0}
+ end,
+ {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
+ NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
+ true -> UpdateSeq;
+ _ -> View#mrview.update_seq
+ end,
+
+ View2 = View#mrview{btree=VBtree2, update_seq=NewUpdateSeq},
+ maybe_notify(State, View2, KVs, ToRem),
+ View2
+ end,
+
+ State#mrst{
+ views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs),
+ update_seq=UpdateSeq,
+ id_btree=IdBtree2
+ }.
+
+
+inject_partition(Rows) ->
+ lists:map(fun
+ ({{Key, DocId}, Value}) ->
+ % Adding a row to the view
+ {Partition, _} = couch_partition:extract(DocId),
+ {{{p, Partition, Key}, DocId}, Value};
+ ({Key, DocId}) ->
+ % Removing a row based on values in id_tree
+ {Partition, _} = couch_partition:extract(DocId),
+ {{p, Partition, Key}, DocId}
+ end, Rows).
+
+
+update_id_btree(Btree, DocIdKeys, true) ->
+ ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
+ couch_btree:query_modify(Btree, [], ToAdd, []);
+update_id_btree(Btree, DocIdKeys, _) ->
+ ToFind = [Id || {Id, _} <- DocIdKeys],
+ ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
+ ToRem = [Id || {Id, DIKeys} <- DocIdKeys, DIKeys == []],
+ couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem).
+
+
+collapse_rem_keys([], Acc) ->
+ Acc;
+collapse_rem_keys([{ok, {DocId, ViewIdKeys}} | Rest], Acc) ->
+ NewAcc = lists:foldl(fun({ViewId, Key}, Acc2) ->
+ dict:append(ViewId, {Key, DocId}, Acc2)
+ end, Acc, ViewIdKeys),
+ collapse_rem_keys(Rest, NewAcc);
+collapse_rem_keys([{not_found, _} | Rest], Acc) ->
+ collapse_rem_keys(Rest, Acc).
+
+
+send_partial(Pid, State) when is_pid(Pid) ->
+ gen_server:cast(Pid, {new_state, State});
+send_partial(_, _) ->
+ ok.
+
+
+update_task(NumChanges) ->
+ [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+ Changes2 = Changes + NumChanges,
+ Progress = case Total of
+ 0 ->
+ % updater restart after compaction finishes
+ 0;
+ _ ->
+ (Changes2 * 100) div Total
+ end,
+ couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).
+
+
+maybe_notify(State, View, KVs, ToRem) ->
+ Updated = fun() ->
+ [Key || {{Key, _}, _} <- KVs]
+ end,
+ Removed = fun() ->
+ [Key || {Key, _DocId} <- ToRem]
+ end,
+ couch_index_plugin:index_update(State, View, Updated, Removed).
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl.rej b/src/couch_mrview/src/couch_mrview_updater.erl.rej
new file mode 100644
index 0000000..81a2ce1
--- /dev/null
+++ b/src/couch_mrview/src/couch_mrview_updater.erl.rej
@@ -0,0 +1,52 @@
+***************
+*** 192,202 ****
+ DocFun = fun
+ ({nil, Seq, _, _}, {SeqAcc, Results}) ->
+ {erlang:max(Seq, SeqAcc), Results};
+- ({Id, Seq, Rev, deleted}, {SeqAcc, Results}) ->
+- {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, []} | Results]};
+ ({Id, Seq, Rev, Doc}, {SeqAcc, Results}) ->
+ couch_stats:increment_counter([couchdb, mrview, map_doc]),
+- {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
+ {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, Res} | Results]}
+ end,
+
+--- 199,236 ----
+ DocFun = fun
+ ({nil, Seq, _, _}, {SeqAcc, Results}) ->
+ {erlang:max(Seq, SeqAcc), Results};
++ ({Id, Seq, Rev, #doc{deleted=true, body=Body, meta=Meta}}, {SeqAcc, Results}) ->
++ % _access needs deleted docs
++ case IdxName of
++ <<"_design/_access">> ->
++ % splice in seq
++ {Start, Rev1} = Rev,
++ Doc = #doc{
++ id = Id,
++ revs = {Start, [Rev1]},
++ body = {make_deleted_body(Body, Meta, Seq)}, %% todo: only keep _access and add _seq
++ deleted = true
++ },
++ {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
++ {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, Res} | Results]};
++ _Else ->
++ {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, []} | Results]}
++ end;
+ ({Id, Seq, Rev, Doc}, {SeqAcc, Results}) ->
+ couch_stats:increment_counter([couchdb, mrview, map_doc]),
++ % couch_log:info("~nIdxName: ~p, Doc: ~p~n~n", [IdxName, Doc]),
++ Doc0 = case IdxName of
++ <<"_design/_access">> ->
++ % splice in seq
++ {Props} = Doc#doc.body,
++ BodySp = couch_util:get_value(body_sp, Doc#doc.meta),
++ Doc#doc{
++ body = {Props++[{<<"_seq">>, Seq}, {<<"_body_sp">>, BodySp}]}
++ };
++ _Else ->
++ Doc
++ end,
++ {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc0),
+ {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, Res} | Results]}
+ end,
+
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index be75dd5..2bf1680 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -409,11 +409,11 @@ validate_args(Db, DDoc, Args0) ->
validate_args(#mrst{} = State, Args0) ->
Args = validate_args(Args0),
-
ViewPartitioned = State#mrst.partitioned,
Partition = get_extra(Args, partition),
+ AllDocsAccess = get_extra(Args, all_docs_access, false),
- case {ViewPartitioned, Partition} of
+ case {ViewPartitioned and not AllDocsAccess, Partition} of
{true, undefined} ->
Msg1 = <<"`partition` parameter is mandatory "
"for queries to this view.">>,
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl.orig
similarity index 99%
copy from src/couch_mrview/src/couch_mrview_util.erl
copy to src/couch_mrview/src/couch_mrview_util.erl.orig
index be75dd5..e971720 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl.orig
@@ -20,7 +20,6 @@
-export([index_file/2, compaction_file/2, open_file/1]).
-export([delete_files/2, delete_index_file/2, delete_compaction_file/2]).
-export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]).
--export([get_access_row_count/2]).
-export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]).
-export([fold/4, fold_reduce/4]).
-export([temp_view_to_ddoc/1]).
@@ -341,10 +340,6 @@ temp_view_to_ddoc({Props}) ->
]},
couch_doc:from_json_obj(DDoc).
-get_access_row_count(#mrview{btree=Bt}, UserName) ->
- couch_btree:full_reduce_with_options(Bt, [
- {start_key, UserName}
- ]).
get_row_count(#mrview{btree=Bt}) ->
Count = case couch_btree:full_reduce(Bt) of
diff --git a/src/couch_mrview/src/couch_mrview_util.erl.rej b/src/couch_mrview/src/couch_mrview_util.erl.rej
new file mode 100644
index 0000000..2bcf126
--- /dev/null
+++ b/src/couch_mrview/src/couch_mrview_util.erl.rej
@@ -0,0 +1,16 @@
+***************
+*** 20,25 ****
+ -export([index_file/2, compaction_file/2, open_file/1]).
+ -export([delete_files/2, delete_index_file/2, delete_compaction_file/2]).
+ -export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]).
+ -export([get_view_changes_count/1]).
+ -export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]).
+ -export([fold/4, fold_reduce/4]).
+--- 20,26 ----
+ -export([index_file/2, compaction_file/2, open_file/1]).
+ -export([delete_files/2, delete_index_file/2, delete_compaction_file/2]).
+ -export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]).
++ -export([get_access_row_count/2]).
+ -export([get_view_changes_count/1]).
+ -export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]).
+ -export([fold/4, fold_reduce/4]).