You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/11 09:07:11 UTC
[09/41] inital move to rebar compilation
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/json_stream_parse.erl
----------------------------------------------------------------------
diff --git a/json_stream_parse.erl b/json_stream_parse.erl
deleted file mode 100644
index b63e011..0000000
--- a/json_stream_parse.erl
+++ /dev/null
@@ -1,432 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(json_stream_parse).
-
-
--export([events/2, to_ejson/1, collect_object/2]).
-
--define(IS_WS(X), (X == $\ orelse X == $\t orelse X == $\n orelse X == $\r)).
--define(IS_DELIM(X), (X == $} orelse X == $] orelse X == $,)).
--define(IS_DIGIT(X), (X >= $0 andalso X =< $9)).
-
-
-
-% Parses the json into events.
-%
-% The DataFun param is a function that produces the data for parsing. When
-% called it must yield a tuple, or the atom done. The first element in the
-% tuple is the data itself, and the second element is a function to be called
-% next to get the next chunk of data in the stream.
-%
-% The EventFun is called everytime a json element is parsed. It must produce
-% a new function to be called for the next event.
-%
-% Events happen each time a new element in the json string is parsed.
-% For simple value types, the data itself is returned:
-% Strings
-% Integers
-% Floats
-% true
-% false
-% null
-%
-% For arrays, the start of the array is signaled by the event array_start
-% atom. The end is signaled by array_end. The events before the end are the
-% values, or nested values.
-%
-% For objects, the start of the object is signaled by the event object_start
-% atom. The end is signaled by object_end. Each key is signaled by
-% {key, KeyString}, and the following event is the value, or start of the
-% value (array_start, object_start).
-%
-events(Data,EventFun) when is_list(Data)->
- events(list_to_binary(Data),EventFun);
-events(Data,EventFun) when is_binary(Data)->
- events(fun() -> {Data, fun() -> done end} end,EventFun);
-events(DataFun,EventFun) ->
- parse_one(DataFun, EventFun, <<>>).
-
-% converts the JSON directly to the erlang represention of Json
-to_ejson(DF) ->
- {_DF2, EF, _Rest} = events(DF, fun(Ev) -> collect_events(Ev, []) end),
- [[EJson]] = make_ejson(EF(get_results), [[]]),
- EJson.
-
-
-% This function is used to return complete objects while parsing streams.
-%
-% Return this function from inside an event function right after getting an
-% object_start event. It then collects the remaining events for that object
-% and converts it to the erlang represention of Json.
-%
-% It then calls your ReturnControl function with the erlang object. Your
-% return control function then should yield another event function.
-%
-% This example stream parses an array of objects, calling
-% fun do_something_with_the_object/1 for each object.
-%
-% ev_array(array_start) ->
-% fun(Ev) -> ev_object_loop(Ev) end.
-%
-% ev_object_loop(object_start) ->
-% fun(Ev) ->
-% json_stream_parse:collect_object(Ev,
-% fun(Obj) ->
-% do_something_with_the_object(Obj),
-% fun(Ev2) -> ev_object_loop(Ev2) end
-% end)
-% end;
-% ev_object_loop(array_end) ->
-% ok
-% end.
-%
-% % invoke the parse
-% main() ->
-% ...
-% events(Data, fun(Ev) -> ev_array(Ev) end).
-
-collect_object(Ev, ReturnControl) ->
- collect_object(Ev, 0, ReturnControl, [object_start]).
-
-
-
-% internal methods
-
-parse_one(DF,EF,Acc) ->
- case toke(DF, Acc) of
- none ->
- none;
- {Token, DF2, Rest} ->
- case Token of
- "{" ->
- EF2 = EF(object_start),
- {DF3, EF3, Rest2} = parse_object(DF2, EF2, Rest),
- {DF3, EF3(object_end), Rest2};
- "[" ->
- EF2 = EF(array_start),
- {DF3, EF3, Rest2} = parse_array(DF2, EF2, Rest),
- {DF3, EF3(array_end), Rest2};
- Int when is_integer(Int)->
- {DF2, EF(Int), Rest};
- Float when is_float(Float)->
- {DF2, EF(Float), Rest};
- Atom when is_atom(Atom)->
- {DF2, EF(Atom), Rest};
- String when is_binary(String)->
- {DF2, EF(String), Rest};
- _OtherToken ->
- err(unexpected_token)
- end
- end.
-
-must_parse_one(DF,EF,Acc,Error)->
- case parse_one(DF, EF, Acc) of
- none ->
- err(Error);
- Else ->
- Else
- end.
-
-must_toke(DF, Data, Error) ->
- case toke(DF, Data) of
- none ->
- err(Error);
- Result ->
- Result
- end.
-
-toke(DF, <<>>) ->
- case DF() of
- done ->
- none;
- {Data, DF2} ->
- toke(DF2, Data)
- end;
-toke(DF, <<C,Rest/binary>>) when ?IS_WS(C)->
- toke(DF, Rest);
-toke(DF, <<${,Rest/binary>>) ->
- {"{", DF, Rest};
-toke(DF, <<$},Rest/binary>>) ->
- {"}", DF, Rest};
-toke(DF, <<$[,Rest/binary>>) ->
- {"[", DF, Rest};
-toke(DF, <<$],Rest/binary>>) ->
- {"]", DF, Rest};
-toke(DF, <<$",Rest/binary>>) ->
- toke_string(DF,Rest,[]);
-toke(DF, <<$,,Rest/binary>>) ->
- {",", DF, Rest};
-toke(DF, <<$:,Rest/binary>>) ->
- {":", DF, Rest};
-toke(DF, <<$-,Rest/binary>>) ->
- {<<C,_/binary>> = Data, DF2} = must_df(DF,1,Rest,expected_number),
- case ?IS_DIGIT(C) of
- true ->
- toke_number_leading(DF2, Data, "-");
- false ->
- err(expected_number)
- end;
-toke(DF, <<C,_/binary>> = Data) when ?IS_DIGIT(C) ->
- toke_number_leading(DF, Data, []);
-toke(DF, <<$t,Rest/binary>>) ->
- {Data, DF2} = must_match(<<"rue">>, DF, Rest),
- {true, DF2, Data};
-toke(DF, <<$f,Rest/binary>>) ->
- {Data, DF2} = must_match(<<"alse">>, DF, Rest),
- {false, DF2, Data};
-toke(DF, <<$n,Rest/binary>>) ->
- {Data, DF2} = must_match(<<"ull">>, DF, Rest),
- {null, DF2, Data};
-toke(_, _) ->
- err(bad_token).
-
-
-must_match(Pattern, DF, Data) ->
- Size = size(Pattern),
- case must_df(DF, Size, Data, bad_token) of
- {<<Pattern:Size/binary,Data2/binary>>, DF2} ->
- {Data2, DF2};
- {_, _} ->
- err(bad_token)
- end.
-
-must_df(DF,Error)->
- case DF() of
- done ->
- err(Error);
- {Data, DF2} ->
- {Data, DF2}
- end.
-
-
-must_df(DF,NeedLen,Acc,Error)->
- if size(Acc) >= NeedLen ->
- {Acc, DF};
- true ->
- case DF() of
- done ->
- err(Error);
- {Data, DF2} ->
- must_df(DF2, NeedLen, <<Acc/binary, Data/binary>>, Error)
- end
- end.
-
-
-parse_object(DF,EF,Acc) ->
- case must_toke(DF, Acc, unterminated_object) of
- {String, DF2, Rest} when is_binary(String)->
- EF2 = EF({key,String}),
- case must_toke(DF2,Rest,unterminated_object) of
- {":", DF3, Rest2} ->
- {DF4, EF3, Rest3} = must_parse_one(DF3, EF2, Rest2, expected_value),
- case must_toke(DF4,Rest3, unterminated_object) of
- {",", DF5, Rest4} ->
- parse_object(DF5, EF3, Rest4);
- {"}", DF5, Rest4} ->
- {DF5, EF3, Rest4};
- {_, _, _} ->
- err(unexpected_token)
- end;
- _Else ->
- err(expected_colon)
- end;
- {"}", DF2, Rest} ->
- {DF2, EF, Rest};
- {_, _, _} ->
- err(unexpected_token)
- end.
-
-parse_array0(DF,EF,Acc) ->
- case toke(DF, Acc) of
- none ->
- err(unterminated_array);
- {",", DF2, Rest} ->
- parse_array(DF2,EF,Rest);
- {"]", DF2, Rest} ->
- {DF2,EF,Rest};
- _ ->
- err(unexpected_token)
- end.
-
-parse_array(DF,EF,Acc) ->
- case toke(DF, Acc) of
- none ->
- err(unterminated_array);
- {Token, DF2, Rest} ->
- case Token of
- "{" ->
- EF2 = EF(object_start),
- {DF3, EF3, Rest2} = parse_object(DF2, EF2, Rest),
- parse_array0(DF3, EF3(object_end), Rest2);
- "[" ->
- EF2 = EF(array_start),
- {DF3, EF3, Rest2} = parse_array(DF2, EF2, Rest),
- parse_array0(DF3, EF3(array_end), Rest2);
- Int when is_integer(Int)->
- parse_array0(DF2, EF(Int), Rest);
- Float when is_float(Float)->
- parse_array0(DF2, EF(Float), Rest);
- Atom when is_atom(Atom)->
- parse_array0(DF2, EF(Atom), Rest);
- String when is_binary(String)->
- parse_array0(DF2, EF(String), Rest);
- "]" ->
- {DF2, EF, Rest};
- _ ->
- err(unexpected_token)
- end
- end.
-
-
-toke_string(DF, <<>>, Acc) ->
- {Data, DF2} = must_df(DF, unterminated_string),
- toke_string(DF2, Data, Acc);
-toke_string(DF, <<$\\,$",Rest/binary>>, Acc) ->
- toke_string(DF, Rest, [$" | Acc]);
-toke_string(DF, <<$\\,$\\,Rest/binary>>, Acc) ->
- toke_string(DF, Rest, [$\\ | Acc]);
-toke_string(DF, <<$\\,$/,Rest/binary>>, Acc) ->
- toke_string(DF, Rest, [$/ | Acc]);
-toke_string(DF, <<$\\,$b,Rest/binary>>, Acc) ->
- toke_string(DF, Rest, [$\b | Acc]);
-toke_string(DF, <<$\\,$f,Rest/binary>>, Acc) ->
- toke_string(DF, Rest, [$\f | Acc]);
-toke_string(DF, <<$\\,$n,Rest/binary>>, Acc) ->
- toke_string(DF, Rest, [$\n | Acc]);
-toke_string(DF, <<$\\,$r,Rest/binary>>, Acc) ->
- toke_string(DF, Rest, [$\r | Acc]);
-toke_string(DF, <<$\\,$t,Rest/binary>>, Acc) ->
- toke_string(DF, Rest, [$\t | Acc]);
-toke_string(DF, <<$\\,$u,Rest/binary>>, Acc) ->
- {<<A,B,C,D,Data/binary>>, DF2} = must_df(DF,4,Rest,missing_hex),
- UTFChar = erlang:list_to_integer([A, B, C, D], 16),
- if UTFChar == 16#FFFF orelse UTFChar == 16#FFFE ->
- err(invalid_utf_char);
- true ->
- ok
- end,
- Chars = xmerl_ucs:to_utf8(UTFChar),
- toke_string(DF2, Data, lists:reverse(Chars) ++ Acc);
-toke_string(DF, <<$\\>>, Acc) ->
- {Data, DF2} = must_df(DF, unterminated_string),
- toke_string(DF2, <<$\\,Data/binary>>, Acc);
-toke_string(_DF, <<$\\, _/binary>>, _Acc) ->
- err(bad_escape);
-toke_string(DF, <<$", Rest/binary>>, Acc) ->
- {list_to_binary(lists:reverse(Acc)), DF, Rest};
-toke_string(DF, <<C, Rest/binary>>, Acc) ->
- toke_string(DF, Rest, [C | Acc]).
-
-
-toke_number_leading(DF, <<Digit,Rest/binary>>, Acc)
- when ?IS_DIGIT(Digit) ->
- toke_number_leading(DF, Rest, [Digit | Acc]);
-toke_number_leading(DF, <<C,_/binary>>=Rest, Acc)
- when ?IS_WS(C) orelse ?IS_DELIM(C) ->
- {list_to_integer(lists:reverse(Acc)), DF, Rest};
-toke_number_leading(DF, <<>>, Acc) ->
- case DF() of
- done ->
- {list_to_integer(lists:reverse(Acc)), fun() -> done end, <<>>};
- {Data, DF2} ->
- toke_number_leading(DF2, Data, Acc)
- end;
-toke_number_leading(DF, <<$., Rest/binary>>, Acc) ->
- toke_number_trailing(DF, Rest, [$.|Acc]);
-toke_number_leading(DF, <<$e, Rest/binary>>, Acc) ->
- toke_number_exponent(DF, Rest, [$e, $0, $.|Acc]);
-toke_number_leading(DF, <<$E, Rest/binary>>, Acc) ->
- toke_number_exponent(DF, Rest, [$e, $0, $.|Acc]);
-toke_number_leading(_, _, _) ->
- err(unexpected_character_in_number).
-
-toke_number_trailing(DF, <<Digit,Rest/binary>>, Acc)
- when ?IS_DIGIT(Digit) ->
- toke_number_trailing(DF, Rest, [Digit | Acc]);
-toke_number_trailing(DF, <<C,_/binary>>=Rest, Acc)
- when ?IS_WS(C) orelse ?IS_DELIM(C) ->
- {list_to_float(lists:reverse(Acc)), DF, Rest};
-toke_number_trailing(DF, <<>>, Acc) ->
- case DF() of
- done ->
- {list_to_float(lists:reverse(Acc)), fun() -> done end, <<>>};
- {Data, DF2} ->
- toke_number_trailing(DF2, Data, Acc)
- end;
-toke_number_trailing(DF, <<"e", Rest/binary>>, [C|_]=Acc) when C /= $. ->
- toke_number_exponent(DF, Rest, [$e|Acc]);
-toke_number_trailing(DF, <<"E", Rest/binary>>, [C|_]=Acc) when C /= $. ->
- toke_number_exponent(DF, Rest, [$e|Acc]);
-toke_number_trailing(_, _, _) ->
- err(unexpected_character_in_number).
-
-
-toke_number_exponent(DF, <<Digit,Rest/binary>>, Acc) when ?IS_DIGIT(Digit) ->
- toke_number_exponent(DF, Rest, [Digit | Acc]);
-toke_number_exponent(DF, <<Sign,Rest/binary>>, [$e|_]=Acc)
- when Sign == $+ orelse Sign == $- ->
- toke_number_exponent(DF, Rest, [Sign | Acc]);
-toke_number_exponent(DF, <<C,_/binary>>=Rest, Acc)
- when ?IS_WS(C) orelse ?IS_DELIM(C) ->
- {list_to_float(lists:reverse(Acc)), DF, Rest};
-toke_number_exponent(DF, <<>>, Acc) ->
- case DF() of
- done ->
- {list_to_float(lists:reverse(Acc)), fun() -> done end, <<>>};
- {Data, DF2} ->
- toke_number_exponent(DF2, Data, Acc)
- end;
-toke_number_exponent(_, _, _) ->
- err(unexpected_character_in_number).
-
-
-err(Error)->
- throw({parse_error,Error}).
-
-
-make_ejson([], Stack) ->
- Stack;
-make_ejson([array_start | RevEvs], [ArrayValues, PrevValues | RestStack]) ->
- make_ejson(RevEvs, [[ArrayValues | PrevValues] | RestStack]);
-make_ejson([array_end | RevEvs], Stack) ->
- make_ejson(RevEvs, [[] | Stack]);
-make_ejson([object_start | RevEvs], [ObjValues, PrevValues | RestStack]) ->
- make_ejson(RevEvs, [[{ObjValues} | PrevValues] | RestStack]);
-make_ejson([object_end | RevEvs], Stack) ->
- make_ejson(RevEvs, [[] | Stack]);
-make_ejson([{key, String} | RevEvs], [[PrevValue|RestObject] | RestStack] = _Stack) ->
- make_ejson(RevEvs, [[{String, PrevValue}|RestObject] | RestStack]);
-make_ejson([Value | RevEvs], [Vals | RestStack] = _Stack) ->
- make_ejson(RevEvs, [[Value | Vals] | RestStack]).
-
-collect_events(get_results, Acc) ->
- Acc;
-collect_events(Ev, Acc) ->
- fun(NextEv) -> collect_events(NextEv, [Ev | Acc]) end.
-
-
-collect_object(object_end, 0, ReturnControl, Acc) ->
- [[Obj]] = make_ejson([object_end | Acc], [[]]),
- ReturnControl(Obj);
-collect_object(object_end, NestCount, ReturnControl, Acc) ->
- fun(Ev) ->
- collect_object(Ev, NestCount - 1, ReturnControl, [object_end | Acc])
- end;
-collect_object(object_start, NestCount, ReturnControl, Acc) ->
- fun(Ev) ->
- collect_object(Ev, NestCount + 1, ReturnControl, [object_start | Acc])
- end;
-collect_object(Ev, NestCount, ReturnControl, Acc) ->
- fun(Ev2) ->
- collect_object(Ev2, NestCount, ReturnControl, [Ev | Acc])
- end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
new file mode 100644
index 0000000..9fe19bc
--- /dev/null
+++ b/src/Makefile.am
@@ -0,0 +1,198 @@
+## Licensed under the Apache License, Version 2.0 (the "License"); you may not
+## use this file except in compliance with the License. You may obtain a copy of
+## the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+## License for the specific language governing permissions and limitations under
+## the License.
+
+SUBDIRS = priv
+
+# devdocdir = $(localdocdir)/developer/couchdb
+couchlibdir = $(localerlanglibdir)/couch-$(version)
+couchincludedir = $(couchlibdir)/include
+couchebindir = $(couchlibdir)/ebin
+
+couchinclude_DATA = couch_db.hrl couch_js_functions.hrl
+couchebin_DATA = $(compiled_files)
+
+# dist_devdoc_DATA = $(doc_base) $(doc_modules)
+
+CLEANFILES = $(compiled_files) $(doc_base)
+
+# CLEANFILES = $(doc_modules) edoc-info
+
+source_files = \
+ couch.erl \
+ couch_app.erl \
+ couch_auth_cache.erl \
+ couch_btree.erl \
+ couch_changes.erl \
+ couch_compaction_daemon.erl \
+ couch_compress.erl \
+ couch_config.erl \
+ couch_config_writer.erl \
+ couch_db.erl \
+ couch_db_update_notifier.erl \
+ couch_db_update_notifier_sup.erl \
+ couch_doc.erl \
+ couch_drv.erl \
+ couch_ejson_compare.erl \
+ couch_event_sup.erl \
+ couch_external_manager.erl \
+ couch_external_server.erl \
+ couch_file.erl \
+ couch_httpd.erl \
+ couch_httpd_db.erl \
+ couch_httpd_auth.erl \
+ couch_httpd_cors.erl \
+ couch_httpd_oauth.erl \
+ couch_httpd_external.erl \
+ couch_httpd_misc_handlers.erl \
+ couch_httpd_proxy.erl \
+ couch_httpd_rewrite.erl \
+ couch_httpd_stats_handlers.erl \
+ couch_httpd_vhost.erl \
+ couch_key_tree.erl \
+ couch_log.erl \
+ couch_native_process.erl \
+ couch_os_daemons.erl \
+ couch_os_process.erl \
+ couch_passwords.erl \
+ couch_primary_sup.erl \
+ couch_query_servers.erl \
+ couch_ref_counter.erl \
+ couch_secondary_sup.erl \
+ couch_server.erl \
+ couch_server_sup.erl \
+ couch_stats_aggregator.erl \
+ couch_stats_collector.erl \
+ couch_stream.erl \
+ couch_task_status.erl \
+ couch_users_db.erl \
+ couch_util.erl \
+ couch_uuids.erl \
+ couch_db_updater.erl \
+ couch_work_queue.erl \
+ json_stream_parse.erl
+
+EXTRA_DIST = $(source_files) couch_db.hrl couch_js_functions.hrl
+
+compiled_files = \
+ couch.app \
+ couch.beam \
+ couch_app.beam \
+ couch_auth_cache.beam \
+ couch_btree.beam \
+ couch_changes.beam \
+ couch_compaction_daemon.beam \
+ couch_compress.beam \
+ couch_config.beam \
+ couch_config_writer.beam \
+ couch_db.beam \
+ couch_db_update_notifier.beam \
+ couch_db_update_notifier_sup.beam \
+ couch_doc.beam \
+ couch_drv.beam \
+ couch_ejson_compare.beam \
+ couch_event_sup.beam \
+ couch_external_manager.beam \
+ couch_external_server.beam \
+ couch_file.beam \
+ couch_httpd.beam \
+ couch_httpd_db.beam \
+ couch_httpd_auth.beam \
+ couch_httpd_oauth.beam \
+ couch_httpd_cors.beam \
+ couch_httpd_proxy.beam \
+ couch_httpd_external.beam \
+ couch_httpd_misc_handlers.beam \
+ couch_httpd_rewrite.beam \
+ couch_httpd_stats_handlers.beam \
+ couch_httpd_vhost.beam \
+ couch_key_tree.beam \
+ couch_log.beam \
+ couch_native_process.beam \
+ couch_os_daemons.beam \
+ couch_os_process.beam \
+ couch_passwords.beam \
+ couch_primary_sup.beam \
+ couch_query_servers.beam \
+ couch_ref_counter.beam \
+ couch_secondary_sup.beam \
+ couch_server.beam \
+ couch_server_sup.beam \
+ couch_stats_aggregator.beam \
+ couch_stats_collector.beam \
+ couch_stream.beam \
+ couch_task_status.beam \
+ couch_users_db.beam \
+ couch_util.beam \
+ couch_uuids.beam \
+ couch_db_updater.beam \
+ couch_work_queue.beam \
+ json_stream_parse.beam
+
+# doc_base = \
+# erlang.png \
+# index.html \
+# modules-frame.html \
+# overview-summary.html \
+# packages-frame.html \
+# stylesheet.css
+
+# doc_modules = \
+# couch_btree.html \
+# couch_config.html \
+# couch_config_writer.html \
+# couch_db.html \
+# couch_db_update_notifier.html \
+# couch_db_update_notifier_sup.html \
+# couch_doc.html \
+# couch_event_sup.html \
+# couch_file.html \
+# couch_httpd.html \
+# couch_key_tree.html \
+# couch_log.html \
+# couch_query_servers.html \
+# couch_rep.html \
+# couch_rep_sup.html \
+# couch_server.html \
+# couch_server_sup.html \
+# couch_stream.html \
+# couch_util.html
+
+if WINDOWS
+couch.app: couch.app.tpl
+ modules=`find . -name "*.erl" \! -name ".*" -exec basename {} .erl \; | tr '\n' ',' | sed "s/,$$//"`; \
+ sed -e "s|%package_name%|@package_name@|g" \
+ -e "s|%version%|@version@|g" \
+ -e "s|@modules@|$$modules|g" \
+ -e "s|%localconfdir%|../etc/couchdb|g" \
+ -e "s|@defaultini@|default.ini|g" \
+ -e "s|@localini@|local.ini|g" > \
+ $@ < $<
+else
+couch.app: couch.app.tpl
+ modules=`{ find . -name "*.erl" \! -name ".*" -exec basename {} .erl \; | tr '\n' ','; echo ''; } | sed "s/,$$//"`; \
+ sed -e "s|%package_name%|@package_name@|g" \
+ -e "s|%version%|@version@|g" \
+ -e "s|@modules@|$$modules|g" \
+ -e "s|%localconfdir%|@localconfdir@|g" \
+ -e "s|@defaultini@|default.ini|g" \
+ -e "s|@localini@|local.ini|g" > \
+ $@ < $<
+ chmod +x $@
+endif
+
+# $(dist_devdoc_DATA): edoc-info
+
+# $(ERL) -noshell -run edoc_run files [\"$<\"]
+
+%.beam: %.erl couch_db.hrl couch_js_functions.hrl
+ $(ERLC) $(ERLC_FLAGS) ${TEST} $<;
+
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch.app.src
----------------------------------------------------------------------
diff --git a/src/couch.app.src b/src/couch.app.src
new file mode 100644
index 0000000..2d14148
--- /dev/null
+++ b/src/couch.app.src
@@ -0,0 +1,23 @@
+{application, couch, [
+ {description, "@package_name@"},
+ {vsn, "@version@"},
+ {registered, [
+ couch_config,
+ couch_db_update,
+ couch_db_update_notifier_sup,
+ couch_external_manager,
+ couch_httpd,
+ couch_log,
+ couch_primary_services,
+ couch_query_servers,
+ couch_secondary_services,
+ couch_server,
+ couch_server_sup,
+ couch_stats_aggregator,
+ couch_stats_collector,
+ couch_task_status
+ ]},
+ {mod, {couch_app, []}},
+ {applications, [kernel, stdlib, crypto, sasl, public_key, ssl,
+ inets, oauth, ibrowse, mochiweb, os_mon]}
+]}.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch.erl
----------------------------------------------------------------------
diff --git a/src/couch.erl b/src/couch.erl
new file mode 100644
index 0000000..80e3261
--- /dev/null
+++ b/src/couch.erl
@@ -0,0 +1,58 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch).
+
+-export([get_app_env/2,
+ version/0,
+ start/0,
+ stop/0,
+ restart/0,
+ reload/0]).
+
+get_app_env(Env, Default) ->
+ case application:get_env(couch, Env) of
+ {ok, Val} -> Val;
+ undefined -> Default
+ end.
+
+version() ->
+ case application:get_key(couch, vsn) of
+ {ok, FullVersion} ->
+ hd(string:tokens(FullVersion, "-"));
+ _ ->
+ "0.0.0"
+ end.
+
+start() ->
+ application:start(couch).
+
+stop() ->
+ application:stop(couch).
+
+restart() ->
+ case stop() of
+ ok ->
+ start();
+ {error, {not_started,couch}} ->
+ start();
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+reload() ->
+ case supervisor:terminate_child(couch_server_sup, couch_config) of
+ ok ->
+ supervisor:restart_child(couch_server_sup, couch_config);
+ {error, Reason} ->
+ {error, Reason}
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_app.erl
----------------------------------------------------------------------
diff --git a/src/couch_app.erl b/src/couch_app.erl
new file mode 100644
index 0000000..a8d215e
--- /dev/null
+++ b/src/couch_app.erl
@@ -0,0 +1,36 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_app).
+
+-behaviour(application).
+
+-include("couch_db.hrl").
+
+-export([start/2, stop/1]).
+
+-define(CONF_FILES, ["couch.ini", "couch_httpd.ini", "local.ini"]).
+
+start(_Type, _Args) ->
+ couch_util:start_app_deps(couch),
+ IniFiles = get_ini_files(),
+ couch_server_sup:start_link(IniFiles).
+
+stop(_) ->
+ ok.
+
+get_ini_files() ->
+ DefaultConfDir = filename:join([code:root_dir(), "./etc"]),
+ Defaults = lists:map(fun(FName) ->
+ filename:join(DefaultConfDir, FName)
+ end, ?CONF_FILES),
+ couch:get_app_env(config_files, Defaults).
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_auth_cache.erl
----------------------------------------------------------------------
diff --git a/src/couch_auth_cache.erl b/src/couch_auth_cache.erl
new file mode 100644
index 0000000..42ccd44
--- /dev/null
+++ b/src/couch_auth_cache.erl
@@ -0,0 +1,425 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_auth_cache).
+-behaviour(gen_server).
+
+% public API
+-export([get_user_creds/1]).
+
+% gen_server API
+-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+-include("couch_js_functions.hrl").
+
+-define(STATE, auth_state_ets).
+-define(BY_USER, auth_by_user_ets).
+-define(BY_ATIME, auth_by_atime_ets).
+
+-record(state, {
+ max_cache_size = 0,
+ cache_size = 0,
+ db_notifier = nil,
+ db_mon_ref = nil
+}).
+
+
+-spec get_user_creds(UserName::string() | binary()) ->
+ Credentials::list() | nil.
+
+get_user_creds(UserName) when is_list(UserName) ->
+ get_user_creds(?l2b(UserName));
+
+get_user_creds(UserName) ->
+ UserCreds = case couch_config:get("admins", ?b2l(UserName)) of
+ "-hashed-" ++ HashedPwdAndSalt ->
+ % the name is an admin, now check to see if there is a user doc
+ % which has a matching name, salt, and password_sha
+ [HashedPwd, Salt] = string:tokens(HashedPwdAndSalt, ","),
+ case get_from_cache(UserName) of
+ nil ->
+ make_admin_doc(HashedPwd, Salt, []);
+ UserProps when is_list(UserProps) ->
+ make_admin_doc(HashedPwd, Salt, couch_util:get_value(<<"roles">>, UserProps))
+ end;
+ "-pbkdf2-" ++ HashedPwdSaltAndIterations ->
+ [HashedPwd, Salt, Iterations] = string:tokens(HashedPwdSaltAndIterations, ","),
+ case get_from_cache(UserName) of
+ nil ->
+ make_admin_doc(HashedPwd, Salt, Iterations, []);
+ UserProps when is_list(UserProps) ->
+ make_admin_doc(HashedPwd, Salt, Iterations, couch_util:get_value(<<"roles">>, UserProps))
+ end;
+ _Else ->
+ get_from_cache(UserName)
+ end,
+ validate_user_creds(UserCreds).
+
+make_admin_doc(HashedPwd, Salt, ExtraRoles) ->
+ [{<<"roles">>, [<<"_admin">>|ExtraRoles]},
+ {<<"salt">>, ?l2b(Salt)},
+ {<<"password_scheme">>, <<"simple">>},
+ {<<"password_sha">>, ?l2b(HashedPwd)}].
+
+make_admin_doc(DerivedKey, Salt, Iterations, ExtraRoles) ->
+ [{<<"roles">>, [<<"_admin">>|ExtraRoles]},
+ {<<"salt">>, ?l2b(Salt)},
+ {<<"iterations">>, list_to_integer(Iterations)},
+ {<<"password_scheme">>, <<"pbkdf2">>},
+ {<<"derived_key">>, ?l2b(DerivedKey)}].
+
+get_from_cache(UserName) ->
+ exec_if_auth_db(
+ fun(_AuthDb) ->
+ maybe_refresh_cache(),
+ case ets:lookup(?BY_USER, UserName) of
+ [] ->
+ gen_server:call(?MODULE, {fetch, UserName}, infinity);
+ [{UserName, {Credentials, _ATime}}] ->
+ couch_stats_collector:increment({couchdb, auth_cache_hits}),
+ gen_server:cast(?MODULE, {cache_hit, UserName}),
+ Credentials
+ end
+ end,
+ nil
+ ).
+
+
+validate_user_creds(nil) ->
+ nil;
+validate_user_creds(UserCreds) ->
+ case couch_util:get_value(<<"_conflicts">>, UserCreds) of
+ undefined ->
+ ok;
+ _ConflictList ->
+ throw({unauthorized,
+ <<"User document conflicts must be resolved before the document",
+ " is used for authentication purposes.">>
+ })
+ end,
+ UserCreds.
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+ ?STATE = ets:new(?STATE, [set, protected, named_table]),
+ ?BY_USER = ets:new(?BY_USER, [set, protected, named_table]),
+ ?BY_ATIME = ets:new(?BY_ATIME, [ordered_set, private, named_table]),
+ process_flag(trap_exit, true),
+ ok = couch_config:register(
+ fun("couch_httpd_auth", "auth_cache_size", SizeList) ->
+ Size = list_to_integer(SizeList),
+ ok = gen_server:call(?MODULE, {new_max_cache_size, Size}, infinity);
+ ("couch_httpd_auth", "authentication_db", _DbName) ->
+ ok = gen_server:call(?MODULE, reinit_cache, infinity)
+ end
+ ),
+ {ok, Notifier} = couch_db_update_notifier:start_link(fun handle_db_event/1),
+ State = #state{
+ db_notifier = Notifier,
+ max_cache_size = list_to_integer(
+ couch_config:get("couch_httpd_auth", "auth_cache_size", "50")
+ )
+ },
+ {ok, reinit_cache(State)}.
+
+
+handle_db_event({Event, DbName}) ->
+ [{auth_db_name, AuthDbName}] = ets:lookup(?STATE, auth_db_name),
+ case DbName =:= AuthDbName of
+ true ->
+ case Event of
+ created -> gen_server:call(?MODULE, reinit_cache, infinity);
+ compacted -> gen_server:call(?MODULE, auth_db_compacted, infinity);
+ _Else -> ok
+ end;
+ false ->
+ ok
+ end.
+
+
+handle_call(reinit_cache, _From, State) ->
+ catch erlang:demonitor(State#state.db_mon_ref, [flush]),
+ exec_if_auth_db(fun(AuthDb) -> catch couch_db:close(AuthDb) end),
+ {reply, ok, reinit_cache(State)};
+
+handle_call(auth_db_compacted, _From, State) ->
+ exec_if_auth_db(
+ fun(AuthDb) ->
+ true = ets:insert(?STATE, {auth_db, reopen_auth_db(AuthDb)})
+ end
+ ),
+ {reply, ok, State};
+
+handle_call({new_max_cache_size, NewSize},
+ _From, #state{cache_size = Size} = State) when NewSize >= Size ->
+ {reply, ok, State#state{max_cache_size = NewSize}};
+
+handle_call({new_max_cache_size, NewSize}, _From, State) ->
+ free_mru_cache_entries(State#state.cache_size - NewSize),
+ {reply, ok, State#state{max_cache_size = NewSize, cache_size = NewSize}};
+
+handle_call({fetch, UserName}, _From, State) ->
+ {Credentials, NewState} = case ets:lookup(?BY_USER, UserName) of
+ [{UserName, {Creds, ATime}}] ->
+ couch_stats_collector:increment({couchdb, auth_cache_hits}),
+ cache_hit(UserName, Creds, ATime),
+ {Creds, State};
+ [] ->
+ couch_stats_collector:increment({couchdb, auth_cache_misses}),
+ Creds = get_user_props_from_db(UserName),
+ State1 = add_cache_entry(UserName, Creds, erlang:now(), State),
+ {Creds, State1}
+ end,
+ {reply, Credentials, NewState};
+
+handle_call(refresh, _From, State) ->
+ exec_if_auth_db(fun refresh_entries/1),
+ {reply, ok, State}.
+
+
+handle_cast({cache_hit, UserName}, State) ->
+ case ets:lookup(?BY_USER, UserName) of
+ [{UserName, {Credentials, ATime}}] ->
+ cache_hit(UserName, Credentials, ATime);
+ _ ->
+ ok
+ end,
+ {noreply, State}.
+
+
+handle_info({'DOWN', Ref, _, _, _Reason}, #state{db_mon_ref = Ref} = State) ->
+ {noreply, reinit_cache(State)}.
+
+
+terminate(_Reason, #state{db_notifier = Notifier}) ->
+ couch_db_update_notifier:stop(Notifier),
+ exec_if_auth_db(fun(AuthDb) -> catch couch_db:close(AuthDb) end),
+ true = ets:delete(?BY_USER),
+ true = ets:delete(?BY_ATIME),
+ true = ets:delete(?STATE).
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+clear_cache(State) ->
+ exec_if_auth_db(fun(AuthDb) -> catch couch_db:close(AuthDb) end),
+ true = ets:delete_all_objects(?BY_USER),
+ true = ets:delete_all_objects(?BY_ATIME),
+ State#state{cache_size = 0}.
+
+
+reinit_cache(State) ->
+ NewState = clear_cache(State),
+ AuthDbName = ?l2b(couch_config:get("couch_httpd_auth", "authentication_db")),
+ true = ets:insert(?STATE, {auth_db_name, AuthDbName}),
+ AuthDb = open_auth_db(),
+ true = ets:insert(?STATE, {auth_db, AuthDb}),
+ NewState#state{db_mon_ref = couch_db:monitor(AuthDb)}.
+
+
+add_cache_entry(_, _, _, #state{max_cache_size = 0} = State) ->
+ State;
+add_cache_entry(UserName, Credentials, ATime, State) ->
+ case State#state.cache_size >= State#state.max_cache_size of
+ true ->
+ free_mru_cache_entry();
+ false ->
+ ok
+ end,
+ true = ets:insert(?BY_ATIME, {ATime, UserName}),
+ true = ets:insert(?BY_USER, {UserName, {Credentials, ATime}}),
+ State#state{cache_size = couch_util:get_value(size, ets:info(?BY_USER))}.
+
+free_mru_cache_entries(0) ->
+ ok;
+free_mru_cache_entries(N) when N > 0 ->
+ free_mru_cache_entry(),
+ free_mru_cache_entries(N - 1).
+
+free_mru_cache_entry() ->
+ MruTime = ets:last(?BY_ATIME),
+ [{MruTime, UserName}] = ets:lookup(?BY_ATIME, MruTime),
+ true = ets:delete(?BY_ATIME, MruTime),
+ true = ets:delete(?BY_USER, UserName).
+
+
+cache_hit(UserName, Credentials, ATime) ->
+ NewATime = erlang:now(),
+ true = ets:delete(?BY_ATIME, ATime),
+ true = ets:insert(?BY_ATIME, {NewATime, UserName}),
+ true = ets:insert(?BY_USER, {UserName, {Credentials, NewATime}}).
+
+
+refresh_entries(AuthDb) ->
+ case reopen_auth_db(AuthDb) of
+ nil ->
+ ok;
+ AuthDb2 ->
+ case AuthDb2#db.update_seq > AuthDb#db.update_seq of
+ true ->
+ {ok, _, _} = couch_db:enum_docs_since(
+ AuthDb2,
+ AuthDb#db.update_seq,
+ fun(DocInfo, _, _) -> refresh_entry(AuthDb2, DocInfo) end,
+ AuthDb#db.update_seq,
+ []
+ ),
+ true = ets:insert(?STATE, {auth_db, AuthDb2});
+ false ->
+ ok
+ end
+ end.
+
+
+refresh_entry(Db, #doc_info{high_seq = DocSeq} = DocInfo) ->
+ case is_user_doc(DocInfo) of
+ {true, UserName} ->
+ case ets:lookup(?BY_USER, UserName) of
+ [] ->
+ ok;
+ [{UserName, {_OldCreds, ATime}}] ->
+ {ok, Doc} = couch_db:open_doc(Db, DocInfo, [conflicts, deleted]),
+ NewCreds = user_creds(Doc),
+ true = ets:insert(?BY_USER, {UserName, {NewCreds, ATime}})
+ end;
+ false ->
+ ok
+ end,
+ {ok, DocSeq}.
+
+
+user_creds(#doc{deleted = true}) ->
+ nil;
+user_creds(#doc{} = Doc) ->
+ {Creds} = couch_doc:to_json_obj(Doc, []),
+ Creds.
+
+
+is_user_doc(#doc_info{id = <<"org.couchdb.user:", UserName/binary>>}) ->
+ {true, UserName};
+is_user_doc(_) ->
+ false.
+
+
+maybe_refresh_cache() ->
+ case cache_needs_refresh() of
+ true ->
+ ok = gen_server:call(?MODULE, refresh, infinity);
+ false ->
+ ok
+ end.
+
+
+cache_needs_refresh() ->
+ exec_if_auth_db(
+ fun(AuthDb) ->
+ case reopen_auth_db(AuthDb) of
+ nil ->
+ false;
+ AuthDb2 ->
+ AuthDb2#db.update_seq > AuthDb#db.update_seq
+ end
+ end,
+ false
+ ).
+
+
+reopen_auth_db(AuthDb) ->
+ case (catch couch_db:reopen(AuthDb)) of
+ {ok, AuthDb2} ->
+ AuthDb2;
+ _ ->
+ nil
+ end.
+
+
+exec_if_auth_db(Fun) ->
+ exec_if_auth_db(Fun, ok).
+
+exec_if_auth_db(Fun, DefRes) ->
+ case ets:lookup(?STATE, auth_db) of
+ [{auth_db, #db{} = AuthDb}] ->
+ Fun(AuthDb);
+ _ ->
+ DefRes
+ end.
+
+
+open_auth_db() ->
+ [{auth_db_name, DbName}] = ets:lookup(?STATE, auth_db_name),
+ {ok, AuthDb} = ensure_users_db_exists(DbName, [sys_db]),
+ AuthDb.
+
+
+get_user_props_from_db(UserName) ->
+ exec_if_auth_db(
+ fun(AuthDb) ->
+ Db = reopen_auth_db(AuthDb),
+ DocId = <<"org.couchdb.user:", UserName/binary>>,
+ try
+ {ok, Doc} = couch_db:open_doc(Db, DocId, [conflicts]),
+ {DocProps} = couch_doc:to_json_obj(Doc, []),
+ DocProps
+ catch
+ _:_Error ->
+ nil
+ end
+ end,
+ nil
+ ).
+
+ensure_users_db_exists(DbName, Options) ->
+ Options1 = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}, nologifmissing | Options],
+ case couch_db:open(DbName, Options1) of
+ {ok, Db} ->
+ ensure_auth_ddoc_exists(Db, <<"_design/_auth">>),
+ {ok, Db};
+ _Error ->
+ {ok, Db} = couch_db:create(DbName, Options1),
+ ok = ensure_auth_ddoc_exists(Db, <<"_design/_auth">>),
+ {ok, Db}
+ end.
+
+ensure_auth_ddoc_exists(Db, DDocId) ->
+ case couch_db:open_doc(Db, DDocId) of
+ {not_found, _Reason} ->
+ {ok, AuthDesign} = auth_design_doc(DDocId),
+ {ok, _Rev} = couch_db:update_doc(Db, AuthDesign, []);
+ {ok, Doc} ->
+ {Props} = couch_doc:to_json_obj(Doc, []),
+ case couch_util:get_value(<<"validate_doc_update">>, Props, []) of
+ ?AUTH_DB_DOC_VALIDATE_FUNCTION ->
+ ok;
+ _ ->
+ Props1 = lists:keyreplace(<<"validate_doc_update">>, 1, Props,
+ {<<"validate_doc_update">>,
+ ?AUTH_DB_DOC_VALIDATE_FUNCTION}),
+ couch_db:update_doc(Db, couch_doc:from_json_obj({Props1}), [])
+ end
+ end,
+ ok.
+
+auth_design_doc(DocId) ->
+ DocProps = [
+ {<<"_id">>, DocId},
+ {<<"language">>,<<"javascript">>},
+ {<<"validate_doc_update">>, ?AUTH_DB_DOC_VALIDATE_FUNCTION}
+ ],
+ {ok, couch_doc:from_json_obj({DocProps})}.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_btree.erl
----------------------------------------------------------------------
diff --git a/src/couch_btree.erl b/src/couch_btree.erl
new file mode 100644
index 0000000..789819e
--- /dev/null
+++ b/src/couch_btree.erl
@@ -0,0 +1,714 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_btree).
+
+-export([open/2, open/3, query_modify/4, add/2, add_remove/3]).
+-export([fold/4, full_reduce/1, final_reduce/2, size/1, foldl/3, foldl/4]).
+-export([fold_reduce/4, lookup/2, get_state/1, set_options/2]).
+-export([less/3]).
+
+-include("couch_db.hrl").
+-define(CHUNK_THRESHOLD, 16#4ff).
+
+extract(#btree{extract_kv=Extract}, Value) ->
+ Extract(Value).
+
+assemble(#btree{assemble_kv=Assemble}, Key, Value) ->
+ Assemble(Key, Value).
+
+less(#btree{less=Less}, A, B) ->
+ Less(A, B).
+
+% pass in 'nil' for State if a new Btree.
+open(State, Fd) ->
+ {ok, #btree{root=State, fd=Fd}}.
+
+set_options(Bt, []) ->
+ Bt;
+set_options(Bt, [{split, Extract}|Rest]) ->
+ set_options(Bt#btree{extract_kv=Extract}, Rest);
+set_options(Bt, [{join, Assemble}|Rest]) ->
+ set_options(Bt#btree{assemble_kv=Assemble}, Rest);
+set_options(Bt, [{less, Less}|Rest]) ->
+ set_options(Bt#btree{less=Less}, Rest);
+set_options(Bt, [{reduce, Reduce}|Rest]) ->
+ set_options(Bt#btree{reduce=Reduce}, Rest);
+set_options(Bt, [{compression, Comp}|Rest]) ->
+ set_options(Bt#btree{compression=Comp}, Rest).
+
+open(State, Fd, Options) ->
+ {ok, set_options(#btree{root=State, fd=Fd}, Options)}.
+
+get_state(#btree{root=Root}) ->
+ Root.
+
+final_reduce(#btree{reduce=Reduce}, Val) ->
+ final_reduce(Reduce, Val);
+final_reduce(Reduce, {[], []}) ->
+ Reduce(reduce, []);
+final_reduce(_Bt, {[], [Red]}) ->
+ Red;
+final_reduce(Reduce, {[], Reductions}) ->
+ Reduce(rereduce, Reductions);
+final_reduce(Reduce, {KVs, Reductions}) ->
+ Red = Reduce(reduce, KVs),
+ final_reduce(Reduce, {[], [Red | Reductions]}).
+
+fold_reduce(#btree{root=Root}=Bt, Fun, Acc, Options) ->
+ Dir = couch_util:get_value(dir, Options, fwd),
+ StartKey = couch_util:get_value(start_key, Options),
+ InEndRangeFun = make_key_in_end_range_function(Bt, Dir, Options),
+ KeyGroupFun = couch_util:get_value(key_group_fun, Options, fun(_,_) -> true end),
+ try
+ {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} =
+ reduce_stream_node(Bt, Dir, Root, StartKey, InEndRangeFun, undefined, [], [],
+ KeyGroupFun, Fun, Acc),
+ if GroupedKey2 == undefined ->
+ {ok, Acc2};
+ true ->
+ case Fun(GroupedKey2, {GroupedKVsAcc2, GroupedRedsAcc2}, Acc2) of
+ {ok, Acc3} -> {ok, Acc3};
+ {stop, Acc3} -> {ok, Acc3}
+ end
+ end
+ catch
+ throw:{stop, AccDone} -> {ok, AccDone}
+ end.
+
+full_reduce(#btree{root=nil,reduce=Reduce}) ->
+ {ok, Reduce(reduce, [])};
+full_reduce(#btree{root=Root}) ->
+ {ok, element(2, Root)}.
+
+size(#btree{root = nil}) ->
+ 0;
+size(#btree{root = {_P, _Red}}) ->
+ % pre 1.2 format
+ nil;
+size(#btree{root = {_P, _Red, Size}}) ->
+ Size.
+
+% wraps a 2 arity function with the proper 3 arity function
+convert_fun_arity(Fun) when is_function(Fun, 2) ->
+ fun
+ (visit, KV, _Reds, AccIn) -> Fun(KV, AccIn);
+ (traverse, _K, _Red, AccIn) -> {ok, AccIn}
+ end;
+convert_fun_arity(Fun) when is_function(Fun, 3) ->
+ fun
+ (visit, KV, Reds, AccIn) -> Fun(KV, Reds, AccIn);
+ (traverse, _K, _Red, AccIn) -> {ok, AccIn}
+ end;
+convert_fun_arity(Fun) when is_function(Fun, 4) ->
+ Fun. % Already arity 4
+
+make_key_in_end_range_function(#btree{less=Less}, fwd, Options) ->
+ case couch_util:get_value(end_key_gt, Options) of
+ undefined ->
+ case couch_util:get_value(end_key, Options) of
+ undefined ->
+ fun(_Key) -> true end;
+ LastKey ->
+ fun(Key) -> not Less(LastKey, Key) end
+ end;
+ EndKey ->
+ fun(Key) -> Less(Key, EndKey) end
+ end;
+make_key_in_end_range_function(#btree{less=Less}, rev, Options) ->
+ case couch_util:get_value(end_key_gt, Options) of
+ undefined ->
+ case couch_util:get_value(end_key, Options) of
+ undefined ->
+ fun(_Key) -> true end;
+ LastKey ->
+ fun(Key) -> not Less(Key, LastKey) end
+ end;
+ EndKey ->
+ fun(Key) -> Less(EndKey, Key) end
+ end.
+
+
+foldl(Bt, Fun, Acc) ->
+ fold(Bt, Fun, Acc, []).
+
+foldl(Bt, Fun, Acc, Options) ->
+ fold(Bt, Fun, Acc, Options).
+
+
+fold(#btree{root=nil}, _Fun, Acc, _Options) ->
+ {ok, {[], []}, Acc};
+fold(#btree{root=Root}=Bt, Fun, Acc, Options) ->
+ Dir = couch_util:get_value(dir, Options, fwd),
+ InRange = make_key_in_end_range_function(Bt, Dir, Options),
+ Result =
+ case couch_util:get_value(start_key, Options) of
+ undefined ->
+ stream_node(Bt, [], Bt#btree.root, InRange, Dir,
+ convert_fun_arity(Fun), Acc);
+ StartKey ->
+ stream_node(Bt, [], Bt#btree.root, StartKey, InRange, Dir,
+ convert_fun_arity(Fun), Acc)
+ end,
+ case Result of
+ {ok, Acc2}->
+ FullReduction = element(2, Root),
+ {ok, {[], [FullReduction]}, Acc2};
+ {stop, LastReduction, Acc2} ->
+ {ok, LastReduction, Acc2}
+ end.
+
+add(Bt, InsertKeyValues) ->
+ add_remove(Bt, InsertKeyValues, []).
+
+add_remove(Bt, InsertKeyValues, RemoveKeys) ->
+ {ok, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys),
+ {ok, Bt2}.
+
+query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) ->
+ #btree{root=Root} = Bt,
+ InsertActions = lists:map(
+ fun(KeyValue) ->
+ {Key, Value} = extract(Bt, KeyValue),
+ {insert, Key, Value}
+ end, InsertValues),
+ RemoveActions = [{remove, Key, nil} || Key <- RemoveKeys],
+ FetchActions = [{fetch, Key, nil} || Key <- LookupKeys],
+ SortFun =
+ fun({OpA, A, _}, {OpB, B, _}) ->
+ case A == B of
+ % A and B are equal, sort by op.
+ true -> op_order(OpA) < op_order(OpB);
+ false ->
+ less(Bt, A, B)
+ end
+ end,
+ Actions = lists:sort(SortFun, lists:append([InsertActions, RemoveActions, FetchActions])),
+ {ok, KeyPointers, QueryResults} = modify_node(Bt, Root, Actions, []),
+ {ok, NewRoot} = complete_root(Bt, KeyPointers),
+ {ok, QueryResults, Bt#btree{root=NewRoot}}.
+
+% for ordering different operations with the same key.
+% fetch < remove < insert
+op_order(fetch) -> 1;
+op_order(remove) -> 2;
+op_order(insert) -> 3.
+
+lookup(#btree{root=Root, less=Less}=Bt, Keys) ->
+ SortedKeys = lists:sort(Less, Keys),
+ {ok, SortedResults} = lookup(Bt, Root, SortedKeys),
+ % We want to return the results in the same order as the keys were input
+ % but we may have changed the order when we sorted. So we need to put the
+ % order back into the results.
+ couch_util:reorder_results(Keys, SortedResults).
+
+lookup(_Bt, nil, Keys) ->
+ {ok, [{Key, not_found} || Key <- Keys]};
+lookup(Bt, Node, Keys) ->
+ Pointer = element(1, Node),
+ {NodeType, NodeList} = get_node(Bt, Pointer),
+ case NodeType of
+ kp_node ->
+ lookup_kpnode(Bt, list_to_tuple(NodeList), 1, Keys, []);
+ kv_node ->
+ lookup_kvnode(Bt, list_to_tuple(NodeList), 1, Keys, [])
+ end.
+
+lookup_kpnode(_Bt, _NodeTuple, _LowerBound, [], Output) ->
+ {ok, lists:reverse(Output)};
+lookup_kpnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound ->
+ {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])};
+lookup_kpnode(Bt, NodeTuple, LowerBound, [FirstLookupKey | _] = LookupKeys, Output) ->
+ N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), FirstLookupKey),
+ {Key, PointerInfo} = element(N, NodeTuple),
+ SplitFun = fun(LookupKey) -> not less(Bt, Key, LookupKey) end,
+ case lists:splitwith(SplitFun, LookupKeys) of
+ {[], GreaterQueries} ->
+ lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, Output);
+ {LessEqQueries, GreaterQueries} ->
+ {ok, Results} = lookup(Bt, PointerInfo, LessEqQueries),
+ lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, lists:reverse(Results, Output))
+ end.
+
+
+lookup_kvnode(_Bt, _NodeTuple, _LowerBound, [], Output) ->
+ {ok, lists:reverse(Output)};
+lookup_kvnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound ->
+ % keys not found
+ {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])};
+lookup_kvnode(Bt, NodeTuple, LowerBound, [LookupKey | RestLookupKeys], Output) ->
+ N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), LookupKey),
+ {Key, Value} = element(N, NodeTuple),
+ case less(Bt, LookupKey, Key) of
+ true ->
+ % LookupKey is less than Key
+ lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, not_found} | Output]);
+ false ->
+ case less(Bt, Key, LookupKey) of
+ true ->
+ % LookupKey is greater than Key
+ lookup_kvnode(Bt, NodeTuple, N+1, RestLookupKeys, [{LookupKey, not_found} | Output]);
+ false ->
+ % LookupKey is equal to Key
+ lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, {ok, assemble(Bt, LookupKey, Value)}} | Output])
+ end
+ end.
+
+
+complete_root(_Bt, []) ->
+ {ok, nil};
+complete_root(_Bt, [{_Key, PointerInfo}])->
+ {ok, PointerInfo};
+complete_root(Bt, KPs) ->
+ {ok, ResultKeyPointers} = write_node(Bt, kp_node, KPs),
+ complete_root(Bt, ResultKeyPointers).
+
+%%%%%%%%%%%%% The chunkify function sucks! %%%%%%%%%%%%%
+% It is inaccurate as it does not account for compression when blocks are
+% written. Plus with the "case byte_size(term_to_binary(InList)) of" code
+% it's probably really inefficient.
+
+chunkify(InList) ->
+ case ?term_size(InList) of
+ Size when Size > ?CHUNK_THRESHOLD ->
+ NumberOfChunksLikely = ((Size div ?CHUNK_THRESHOLD) + 1),
+ ChunkThreshold = Size div NumberOfChunksLikely,
+ chunkify(InList, ChunkThreshold, [], 0, []);
+ _Else ->
+ [InList]
+ end.
+
+chunkify([], _ChunkThreshold, [], 0, OutputChunks) ->
+ lists:reverse(OutputChunks);
+chunkify([], _ChunkThreshold, OutList, _OutListSize, OutputChunks) ->
+ lists:reverse([lists:reverse(OutList) | OutputChunks]);
+chunkify([InElement | RestInList], ChunkThreshold, OutList, OutListSize, OutputChunks) ->
+ case ?term_size(InElement) of
+ Size when (Size + OutListSize) > ChunkThreshold andalso OutList /= [] ->
+ chunkify(RestInList, ChunkThreshold, [], 0, [lists:reverse([InElement | OutList]) | OutputChunks]);
+ Size ->
+ chunkify(RestInList, ChunkThreshold, [InElement | OutList], OutListSize + Size, OutputChunks)
+ end.
+
+modify_node(Bt, RootPointerInfo, Actions, QueryOutput) ->
+ case RootPointerInfo of
+ nil ->
+ NodeType = kv_node,
+ NodeList = [];
+ _Tuple ->
+ Pointer = element(1, RootPointerInfo),
+ {NodeType, NodeList} = get_node(Bt, Pointer)
+ end,
+ NodeTuple = list_to_tuple(NodeList),
+
+ {ok, NewNodeList, QueryOutput2} =
+ case NodeType of
+ kp_node -> modify_kpnode(Bt, NodeTuple, 1, Actions, [], QueryOutput);
+ kv_node -> modify_kvnode(Bt, NodeTuple, 1, Actions, [], QueryOutput)
+ end,
+ case NewNodeList of
+ [] -> % no nodes remain
+ {ok, [], QueryOutput2};
+ NodeList -> % nothing changed
+ {LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple),
+ {ok, [{LastKey, RootPointerInfo}], QueryOutput2};
+ _Else2 ->
+ {ok, ResultList} = write_node(Bt, NodeType, NewNodeList),
+ {ok, ResultList, QueryOutput2}
+ end.
+
+reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) ->
+ [];
+reduce_node(#btree{reduce=R}, kp_node, NodeList) ->
+ R(rereduce, [element(2, Node) || {_K, Node} <- NodeList]);
+reduce_node(#btree{reduce=R}=Bt, kv_node, NodeList) ->
+ R(reduce, [assemble(Bt, K, V) || {K, V} <- NodeList]).
+
+reduce_tree_size(kv_node, NodeSize, _KvList) ->
+ NodeSize;
+reduce_tree_size(kp_node, NodeSize, []) ->
+ NodeSize;
+reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red}} | _]) ->
+ % pre 1.2 format
+ nil;
+reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red, nil}} | _]) ->
+ nil;
+reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) ->
+ reduce_tree_size(kp_node, NodeSize + Sz, NodeList).
+
+get_node(#btree{fd = Fd}, NodePos) ->
+ {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos),
+ {NodeType, NodeList}.
+
+write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) ->
+ % split up nodes into smaller sizes
+ NodeListList = chunkify(NodeList),
+ % now write out each chunk and return the KeyPointer pairs for those nodes
+ ResultList = [
+ begin
+ {ok, Pointer, Size} = couch_file:append_term(
+ Fd, {NodeType, ANodeList}, [{compression, Comp}]),
+ {LastKey, _} = lists:last(ANodeList),
+ SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList),
+ {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}}
+ end
+ ||
+ ANodeList <- NodeListList
+ ],
+ {ok, ResultList}.
+
+modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput) ->
+ modify_node(Bt, nil, Actions, QueryOutput);
+modify_kpnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) ->
+ {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound,
+ tuple_size(NodeTuple), [])), QueryOutput};
+modify_kpnode(Bt, NodeTuple, LowerBound,
+ [{_, FirstActionKey, _}|_]=Actions, ResultNode, QueryOutput) ->
+ Sz = tuple_size(NodeTuple),
+ N = find_first_gteq(Bt, NodeTuple, LowerBound, Sz, FirstActionKey),
+ case N =:= Sz of
+ true ->
+ % perform remaining actions on last node
+ {_, PointerInfo} = element(Sz, NodeTuple),
+ {ok, ChildKPs, QueryOutput2} =
+ modify_node(Bt, PointerInfo, Actions, QueryOutput),
+ NodeList = lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound,
+ Sz - 1, ChildKPs)),
+ {ok, NodeList, QueryOutput2};
+ false ->
+ {NodeKey, PointerInfo} = element(N, NodeTuple),
+ SplitFun = fun({_ActionType, ActionKey, _ActionValue}) ->
+ not less(Bt, NodeKey, ActionKey)
+ end,
+ {LessEqQueries, GreaterQueries} = lists:splitwith(SplitFun, Actions),
+ {ok, ChildKPs, QueryOutput2} =
+ modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput),
+ ResultNode2 = lists:reverse(ChildKPs, bounded_tuple_to_revlist(NodeTuple,
+ LowerBound, N - 1, ResultNode)),
+ modify_kpnode(Bt, NodeTuple, N+1, GreaterQueries, ResultNode2, QueryOutput2)
+ end.
+
+bounded_tuple_to_revlist(_Tuple, Start, End, Tail) when Start > End ->
+ Tail;
+bounded_tuple_to_revlist(Tuple, Start, End, Tail) ->
+ bounded_tuple_to_revlist(Tuple, Start+1, End, [element(Start, Tuple)|Tail]).
+
+bounded_tuple_to_list(Tuple, Start, End, Tail) ->
+ bounded_tuple_to_list2(Tuple, Start, End, [], Tail).
+
+bounded_tuple_to_list2(_Tuple, Start, End, Acc, Tail) when Start > End ->
+ lists:reverse(Acc, Tail);
+bounded_tuple_to_list2(Tuple, Start, End, Acc, Tail) ->
+ bounded_tuple_to_list2(Tuple, Start + 1, End, [element(Start, Tuple) | Acc], Tail).
+
+find_first_gteq(_Bt, _Tuple, Start, End, _Key) when Start == End ->
+ End;
+find_first_gteq(Bt, Tuple, Start, End, Key) ->
+ Mid = Start + ((End - Start) div 2),
+ {TupleKey, _} = element(Mid, Tuple),
+ case less(Bt, TupleKey, Key) of
+ true ->
+ find_first_gteq(Bt, Tuple, Mid+1, End, Key);
+ false ->
+ find_first_gteq(Bt, Tuple, Start, Mid, Key)
+ end.
+
+modify_kvnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) ->
+ {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, tuple_size(NodeTuple), [])), QueryOutput};
+modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, QueryOutput) when LowerBound > tuple_size(NodeTuple) ->
+ case ActionType of
+ insert ->
+ modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
+ remove ->
+ % just drop the action
+ modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, QueryOutput);
+ fetch ->
+ % the key/value must not exist in the tree
+ modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput])
+ end;
+modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], AccNode, QueryOutput) ->
+ N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), ActionKey),
+ {Key, Value} = element(N, NodeTuple),
+ ResultNode = bounded_tuple_to_revlist(NodeTuple, LowerBound, N - 1, AccNode),
+ case less(Bt, ActionKey, Key) of
+ true ->
+ case ActionType of
+ insert ->
+ % ActionKey is less than the Key, so insert
+ modify_kvnode(Bt, NodeTuple, N, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
+ remove ->
+ % ActionKey is less than the Key, just drop the action
+ modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, QueryOutput);
+ fetch ->
+ % ActionKey is less than the Key, the key/value must not exist in the tree
+ modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput])
+ end;
+ false ->
+ % ActionKey and Key are maybe equal.
+ case less(Bt, Key, ActionKey) of
+ false ->
+ case ActionType of
+ insert ->
+ modify_kvnode(Bt, NodeTuple, N+1, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput);
+ remove ->
+ modify_kvnode(Bt, NodeTuple, N+1, RestActions, ResultNode, QueryOutput);
+ fetch ->
+ % ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node
+ % since an identical action key can follow it.
+ modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{ok, assemble(Bt, Key, Value)} | QueryOutput])
+ end;
+ true ->
+ modify_kvnode(Bt, NodeTuple, N + 1, [{ActionType, ActionKey, ActionValue} | RestActions], [{Key, Value} | ResultNode], QueryOutput)
+ end
+ end.
+
+
+reduce_stream_node(_Bt, _Dir, nil, _KeyStart, _InEndRangeFun, GroupedKey, GroupedKVsAcc,
+ GroupedRedsAcc, _KeyGroupFun, _Fun, Acc) ->
+ {ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey};
+reduce_stream_node(Bt, Dir, Node, KeyStart, InEndRangeFun, GroupedKey, GroupedKVsAcc,
+ GroupedRedsAcc, KeyGroupFun, Fun, Acc) ->
+ P = element(1, Node),
+ case get_node(Bt, P) of
+ {kp_node, NodeList} ->
+ NodeList2 = adjust_dir(Dir, NodeList),
+ reduce_stream_kp_node(Bt, Dir, NodeList2, KeyStart, InEndRangeFun, GroupedKey,
+ GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc);
+ {kv_node, KVs} ->
+ KVs2 = adjust_dir(Dir, KVs),
+ reduce_stream_kv_node(Bt, Dir, KVs2, KeyStart, InEndRangeFun, GroupedKey,
+ GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc)
+ end.
+
+reduce_stream_kv_node(Bt, Dir, KVs, KeyStart, InEndRangeFun,
+ GroupedKey, GroupedKVsAcc, GroupedRedsAcc,
+ KeyGroupFun, Fun, Acc) ->
+
+ GTEKeyStartKVs =
+ case KeyStart of
+ undefined ->
+ KVs;
+ _ ->
+ DropFun = case Dir of
+ fwd ->
+ fun({Key, _}) -> less(Bt, Key, KeyStart) end;
+ rev ->
+ fun({Key, _}) -> less(Bt, KeyStart, Key) end
+ end,
+ lists:dropwhile(DropFun, KVs)
+ end,
+ KVs2 = lists:takewhile(
+ fun({Key, _}) -> InEndRangeFun(Key) end, GTEKeyStartKVs),
+ reduce_stream_kv_node2(Bt, KVs2, GroupedKey, GroupedKVsAcc, GroupedRedsAcc,
+ KeyGroupFun, Fun, Acc).
+
+
+reduce_stream_kv_node2(_Bt, [], GroupedKey, GroupedKVsAcc, GroupedRedsAcc,
+ _KeyGroupFun, _Fun, Acc) ->
+ {ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey};
+reduce_stream_kv_node2(Bt, [{Key, Value}| RestKVs], GroupedKey, GroupedKVsAcc,
+ GroupedRedsAcc, KeyGroupFun, Fun, Acc) ->
+ case GroupedKey of
+ undefined ->
+ reduce_stream_kv_node2(Bt, RestKVs, Key,
+ [assemble(Bt,Key,Value)], [], KeyGroupFun, Fun, Acc);
+ _ ->
+
+ case KeyGroupFun(GroupedKey, Key) of
+ true ->
+ reduce_stream_kv_node2(Bt, RestKVs, GroupedKey,
+ [assemble(Bt,Key,Value)|GroupedKVsAcc], GroupedRedsAcc, KeyGroupFun,
+ Fun, Acc);
+ false ->
+ case Fun(GroupedKey, {GroupedKVsAcc, GroupedRedsAcc}, Acc) of
+ {ok, Acc2} ->
+ reduce_stream_kv_node2(Bt, RestKVs, Key, [assemble(Bt,Key,Value)],
+ [], KeyGroupFun, Fun, Acc2);
+ {stop, Acc2} ->
+ throw({stop, Acc2})
+ end
+ end
+ end.
+
+reduce_stream_kp_node(Bt, Dir, NodeList, KeyStart, InEndRangeFun,
+ GroupedKey, GroupedKVsAcc, GroupedRedsAcc,
+ KeyGroupFun, Fun, Acc) ->
+ Nodes =
+ case KeyStart of
+ undefined ->
+ NodeList;
+ _ ->
+ case Dir of
+ fwd ->
+ lists:dropwhile(fun({Key, _}) -> less(Bt, Key, KeyStart) end, NodeList);
+ rev ->
+ RevKPs = lists:reverse(NodeList),
+ case lists:splitwith(fun({Key, _}) -> less(Bt, Key, KeyStart) end, RevKPs) of
+ {_Before, []} ->
+ NodeList;
+ {Before, [FirstAfter | _]} ->
+ [FirstAfter | lists:reverse(Before)]
+ end
+ end
+ end,
+ {InRange, MaybeInRange} = lists:splitwith(
+ fun({Key, _}) -> InEndRangeFun(Key) end, Nodes),
+ NodesInRange = case MaybeInRange of
+ [FirstMaybeInRange | _] when Dir =:= fwd ->
+ InRange ++ [FirstMaybeInRange];
+ _ ->
+ InRange
+ end,
+ reduce_stream_kp_node2(Bt, Dir, NodesInRange, KeyStart, InEndRangeFun,
+ GroupedKey, GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc).
+
+
+reduce_stream_kp_node2(Bt, Dir, [{_Key, NodeInfo} | RestNodeList], KeyStart, InEndRangeFun,
+ undefined, [], [], KeyGroupFun, Fun, Acc) ->
+ {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} =
+ reduce_stream_node(Bt, Dir, NodeInfo, KeyStart, InEndRangeFun, undefined,
+ [], [], KeyGroupFun, Fun, Acc),
+ reduce_stream_kp_node2(Bt, Dir, RestNodeList, KeyStart, InEndRangeFun, GroupedKey2,
+ GroupedKVsAcc2, GroupedRedsAcc2, KeyGroupFun, Fun, Acc2);
+reduce_stream_kp_node2(Bt, Dir, NodeList, KeyStart, InEndRangeFun,
+ GroupedKey, GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc) ->
+ {Grouped0, Ungrouped0} = lists:splitwith(fun({Key,_}) ->
+ KeyGroupFun(GroupedKey, Key) end, NodeList),
+ {GroupedNodes, UngroupedNodes} =
+ case Grouped0 of
+ [] ->
+ {Grouped0, Ungrouped0};
+ _ ->
+ [FirstGrouped | RestGrouped] = lists:reverse(Grouped0),
+ {RestGrouped, [FirstGrouped | Ungrouped0]}
+ end,
+ GroupedReds = [element(2, Node) || {_, Node} <- GroupedNodes],
+ case UngroupedNodes of
+ [{_Key, NodeInfo}|RestNodes] ->
+ {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} =
+ reduce_stream_node(Bt, Dir, NodeInfo, KeyStart, InEndRangeFun, GroupedKey,
+ GroupedKVsAcc, GroupedReds ++ GroupedRedsAcc, KeyGroupFun, Fun, Acc),
+ reduce_stream_kp_node2(Bt, Dir, RestNodes, KeyStart, InEndRangeFun, GroupedKey2,
+ GroupedKVsAcc2, GroupedRedsAcc2, KeyGroupFun, Fun, Acc2);
+ [] ->
+ {ok, Acc, GroupedReds ++ GroupedRedsAcc, GroupedKVsAcc, GroupedKey}
+ end.
+
+adjust_dir(fwd, List) ->
+ List;
+adjust_dir(rev, List) ->
+ lists:reverse(List).
+
+stream_node(Bt, Reds, Node, StartKey, InRange, Dir, Fun, Acc) ->
+ Pointer = element(1, Node),
+ {NodeType, NodeList} = get_node(Bt, Pointer),
+ case NodeType of
+ kp_node ->
+ stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc);
+ kv_node ->
+ stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc)
+ end.
+
+stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc) ->
+ Pointer = element(1, Node),
+ {NodeType, NodeList} = get_node(Bt, Pointer),
+ case NodeType of
+ kp_node ->
+ stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc);
+ kv_node ->
+ stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc)
+ end.
+
+stream_kp_node(_Bt, _Reds, [], _InRange, _Dir, _Fun, Acc) ->
+ {ok, Acc};
+stream_kp_node(Bt, Reds, [{Key, Node} | Rest], InRange, Dir, Fun, Acc) ->
+ Red = element(2, Node),
+ case Fun(traverse, Key, Red, Acc) of
+ {ok, Acc2} ->
+ case stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc2) of
+ {ok, Acc3} ->
+ stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc3);
+ {stop, LastReds, Acc3} ->
+ {stop, LastReds, Acc3}
+ end;
+ {skip, Acc2} ->
+ stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2)
+ end.
+
+drop_nodes(_Bt, Reds, _StartKey, []) ->
+ {Reds, []};
+drop_nodes(Bt, Reds, StartKey, [{NodeKey, Node} | RestKPs]) ->
+ case less(Bt, NodeKey, StartKey) of
+ true ->
+ drop_nodes(Bt, [element(2, Node) | Reds], StartKey, RestKPs);
+ false ->
+ {Reds, [{NodeKey, Node} | RestKPs]}
+ end.
+
+stream_kp_node(Bt, Reds, KPs, StartKey, InRange, Dir, Fun, Acc) ->
+ {NewReds, NodesToStream} =
+ case Dir of
+ fwd ->
+ % drop all nodes sorting before the key
+ drop_nodes(Bt, Reds, StartKey, KPs);
+ rev ->
+ % keep all nodes sorting before the key, AND the first node to sort after
+ RevKPs = lists:reverse(KPs),
+ case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key, StartKey) end, RevKPs) of
+ {_RevsBefore, []} ->
+ % everything sorts before it
+ {Reds, KPs};
+ {RevBefore, [FirstAfter | Drop]} ->
+ {[element(2, Node) || {_K, Node} <- Drop] ++ Reds,
+ [FirstAfter | lists:reverse(RevBefore)]}
+ end
+ end,
+ case NodesToStream of
+ [] ->
+ {ok, Acc};
+ [{_Key, Node} | Rest] ->
+ case stream_node(Bt, NewReds, Node, StartKey, InRange, Dir, Fun, Acc) of
+ {ok, Acc2} ->
+ Red = element(2, Node),
+ stream_kp_node(Bt, [Red | NewReds], Rest, InRange, Dir, Fun, Acc2);
+ {stop, LastReds, Acc2} ->
+ {stop, LastReds, Acc2}
+ end
+ end.
+
+stream_kv_node(Bt, Reds, KVs, StartKey, InRange, Dir, Fun, Acc) ->
+ DropFun =
+ case Dir of
+ fwd ->
+ fun({Key, _}) -> less(Bt, Key, StartKey) end;
+ rev ->
+ fun({Key, _}) -> less(Bt, StartKey, Key) end
+ end,
+ {LTKVs, GTEKVs} = lists:splitwith(DropFun, KVs),
+ AssembleLTKVs = [assemble(Bt,K,V) || {K,V} <- LTKVs],
+ stream_kv_node2(Bt, Reds, AssembleLTKVs, GTEKVs, InRange, Dir, Fun, Acc).
+
+stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _InRange, _Dir, _Fun, Acc) ->
+ {ok, Acc};
+stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], InRange, Dir, Fun, Acc) ->
+ case InRange(K) of
+ false ->
+ {stop, {PrevKVs, Reds}, Acc};
+ true ->
+ AssembledKV = assemble(Bt, K, V),
+ case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of
+ {ok, Acc2} ->
+ stream_kv_node2(Bt, Reds, [AssembledKV | PrevKVs], RestKVs, InRange, Dir, Fun, Acc2);
+ {stop, Acc2} ->
+ {stop, {PrevKVs, Reds}, Acc2}
+ end
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_changes.erl b/src/couch_changes.erl
new file mode 100644
index 0000000..6edde32
--- /dev/null
+++ b/src/couch_changes.erl
@@ -0,0 +1,577 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_changes).
+-include("couch_db.hrl").
+
+-export([handle_changes/3]).
+
+% For the builtin filter _docs_ids, this is the maximum number
+% of documents for which we trigger the optimized code path.
+-define(MAX_DOC_IDS, 100).
+
+-record(changes_acc, {
+ db,
+ seq,
+ prepend,
+ filter,
+ callback,
+ user_acc,
+ resp_type,
+ limit,
+ include_docs,
+ doc_options,
+ conflicts,
+ timeout,
+ timeout_fun
+}).
+
+%% @type Req -> #httpd{} | {json_req, JsonObj()}
+handle_changes(Args1, Req, Db0) ->
+ #changes_args{
+ style = Style,
+ filter = FilterName,
+ feed = Feed,
+ dir = Dir,
+ since = Since
+ } = Args1,
+ {FilterFun, FilterArgs} = make_filter_fun(FilterName, Style, Req, Db0),
+ Args = Args1#changes_args{filter_fun = FilterFun, filter_args = FilterArgs},
+ Start = fun() ->
+ {ok, Db} = couch_db:reopen(Db0),
+ StartSeq = case Dir of
+ rev ->
+ couch_db:get_update_seq(Db);
+ fwd ->
+ Since
+ end,
+ {Db, StartSeq}
+ end,
+ % begin timer to deal with heartbeat when filter function fails
+ case Args#changes_args.heartbeat of
+ undefined ->
+ erlang:erase(last_changes_heartbeat);
+ Val when is_integer(Val); Val =:= true ->
+ put(last_changes_heartbeat, now())
+ end,
+
+ case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of
+ true ->
+ fun(CallbackAcc) ->
+ {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+ Self = self(),
+ {ok, Notify} = couch_db_update_notifier:start_link(
+ fun({_, DbName}) when Db0#db.name == DbName ->
+ Self ! db_updated;
+ (_) ->
+ ok
+ end
+ ),
+ {Db, StartSeq} = Start(),
+ UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+ {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+ Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
+ <<"">>, Timeout, TimeoutFun),
+ try
+ keep_sending_changes(
+ Args#changes_args{dir=fwd},
+ Acc0,
+ true)
+ after
+ couch_db_update_notifier:stop(Notify),
+ get_rest_db_updated(ok) % clean out any remaining update messages
+ end
+ end;
+ false ->
+ fun(CallbackAcc) ->
+ {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+ UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+ {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+ {Db, StartSeq} = Start(),
+ Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
+ UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun),
+ {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} =
+ send_changes(
+ Args#changes_args{feed="normal"},
+ Acc0,
+ true),
+ end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
+ end
+ end.
+
+get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
+ Pair;
+get_callback_acc(Callback) when is_function(Callback, 2) ->
+ {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
+
+%% @type Req -> #httpd{} | {json_req, JsonObj()}
+make_filter_fun([$_ | _] = FilterName, Style, Req, Db) ->
+ builtin_filter_fun(FilterName, Style, Req, Db);
+make_filter_fun(FilterName, Style, Req, Db) ->
+ {os_filter_fun(FilterName, Style, Req, Db), []}.
+
+os_filter_fun(FilterName, Style, Req, Db) ->
+ case [list_to_binary(couch_httpd:unquote(Part))
+ || Part <- string:tokens(FilterName, "/")] of
+ [] ->
+ fun(_Db2, #doc_info{revs=Revs}) ->
+ builtin_results(Style, Revs)
+ end;
+ [DName, FName] ->
+ DesignId = <<"_design/", DName/binary>>,
+ DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]),
+ % validate that the ddoc has the filter fun
+ #doc{body={Props}} = DDoc,
+ couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
+ fun(Db2, DocInfo) ->
+ DocInfos =
+ case Style of
+ main_only ->
+ [DocInfo];
+ all_docs ->
+ [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
+ end,
+ Docs = [Doc || {ok, Doc} <- [
+ couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts])
+ || DocInfo2 <- DocInfos]],
+ {ok, Passes} = couch_query_servers:filter_docs(
+ Req, Db2, DDoc, FName, Docs
+ ),
+ [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
+ || {Pass, #doc{revs={RevPos,[RevId|_]}}}
+ <- lists:zip(Passes, Docs), Pass == true]
+ end;
+ _Else ->
+ throw({bad_request,
+ "filter parameter must be of the form `designname/filtername`"})
+ end.
+
+builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) ->
+ DocIds = couch_util:get_value(<<"doc_ids">>, Props),
+ {filter_docids(DocIds, Style), DocIds};
+builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) ->
+ {Props} = couch_httpd:json_body_obj(Req),
+ DocIds = couch_util:get_value(<<"doc_ids">>, Props, nil),
+ {filter_docids(DocIds, Style), DocIds};
+builtin_filter_fun("_doc_ids", Style, #httpd{method='GET'}=Req, _Db) ->
+ DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")),
+ {filter_docids(DocIds, Style), DocIds};
+builtin_filter_fun("_design", Style, _Req, _Db) ->
+ {filter_designdoc(Style), []};
+builtin_filter_fun("_view", Style, Req, Db) ->
+ ViewName = couch_httpd:qs_value(Req, "view", ""),
+ {filter_view(ViewName, Style, Db), []};
+builtin_filter_fun(_FilterName, _Style, _Req, _Db) ->
+ throw({bad_request, "unknown builtin filter name"}).
+
+filter_docids(DocIds, Style) when is_list(DocIds)->
+ fun(_Db, #doc_info{id=DocId, revs=Revs}) ->
+ case lists:member(DocId, DocIds) of
+ true ->
+ builtin_results(Style, Revs);
+ _ -> []
+ end
+ end;
+filter_docids(_, _) ->
+ throw({bad_request, "`doc_ids` filter parameter is not a list."}).
+
+filter_designdoc(Style) ->
+ fun(_Db, #doc_info{id=DocId, revs=Revs}) ->
+ case DocId of
+ <<"_design", _/binary>> ->
+ builtin_results(Style, Revs);
+ _ -> []
+ end
+ end.
+
+filter_view("", _Style, _Db) ->
+ throw({bad_request, "`view` filter parameter is not provided."});
+filter_view(ViewName, Style, Db) ->
+ case [list_to_binary(couch_httpd:unquote(Part))
+ || Part <- string:tokens(ViewName, "/")] of
+ [] ->
+ throw({bad_request, "Invalid `view` parameter."});
+ [DName, VName] ->
+ DesignId = <<"_design/", DName/binary>>,
+ DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]),
+ % validate that the ddoc has the filter fun
+ #doc{body={Props}} = DDoc,
+ couch_util:get_nested_json_value({Props}, [<<"views">>, VName]),
+ fun(Db2, DocInfo) ->
+ DocInfos =
+ case Style of
+ main_only ->
+ [DocInfo];
+ all_docs ->
+ [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
+ end,
+ Docs = [Doc || {ok, Doc} <- [
+ couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts])
+ || DocInfo2 <- DocInfos]],
+ {ok, Passes} = couch_query_servers:filter_view(
+ DDoc, VName, Docs
+ ),
+ [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
+ || {Pass, #doc{revs={RevPos,[RevId|_]}}}
+ <- lists:zip(Passes, Docs), Pass == true]
+ end
+ end.
+
+builtin_results(Style, [#rev_info{rev=Rev}|_]=Revs) ->
+ case Style of
+ main_only ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+ all_docs ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
+ || #rev_info{rev=R} <- Revs]
+ end.
+
+get_changes_timeout(Args, Callback) ->
+ #changes_args{
+ heartbeat = Heartbeat,
+ timeout = Timeout,
+ feed = ResponseType
+ } = Args,
+ DefaultTimeout = list_to_integer(
+ couch_config:get("httpd", "changes_timeout", "60000")
+ ),
+ case Heartbeat of
+ undefined ->
+ case Timeout of
+ undefined ->
+ {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end};
+ infinity ->
+ {infinity, fun(UserAcc) -> {stop, UserAcc} end};
+ _ ->
+ {lists:min([DefaultTimeout, Timeout]),
+ fun(UserAcc) -> {stop, UserAcc} end}
+ end;
+ true ->
+ {DefaultTimeout,
+ fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end};
+ _ ->
+ {lists:min([DefaultTimeout, Heartbeat]),
+ fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
+ end.
+
+start_sending_changes(_Callback, UserAcc, ResponseType)
+ when ResponseType =:= "continuous"
+ orelse ResponseType =:= "eventsource" ->
+ UserAcc;
+start_sending_changes(Callback, UserAcc, ResponseType) ->
+ Callback(start, ResponseType, UserAcc).
+
+build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) ->
+ #changes_args{
+ include_docs = IncludeDocs,
+ doc_options = DocOpts,
+ conflicts = Conflicts,
+ limit = Limit,
+ feed = ResponseType,
+ filter_fun = FilterFun
+ } = Args,
+ #changes_acc{
+ db = Db,
+ seq = StartSeq,
+ prepend = Prepend,
+ filter = FilterFun,
+ callback = Callback,
+ user_acc = UserAcc,
+ resp_type = ResponseType,
+ limit = Limit,
+ include_docs = IncludeDocs,
+ doc_options = DocOpts,
+ conflicts = Conflicts,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun
+ }.
+
+send_changes(Args, Acc0, FirstRound) ->
+ #changes_args{
+ dir = Dir,
+ filter = FilterName,
+ filter_args = FilterArgs
+ } = Args,
+ #changes_acc{
+ db = Db,
+ seq = StartSeq
+ } = Acc0,
+ case FirstRound of
+ true ->
+ case FilterName of
+ "_doc_ids" when length(FilterArgs) =< ?MAX_DOC_IDS ->
+ send_changes_doc_ids(
+ FilterArgs, Db, StartSeq, Dir, fun changes_enumerator/2, Acc0);
+ "_design" ->
+ send_changes_design_docs(
+ Db, StartSeq, Dir, fun changes_enumerator/2, Acc0);
+ _ ->
+ couch_db:changes_since(
+ Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
+ end;
+ false ->
+ couch_db:changes_since(
+ Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
+ end.
+
+
+send_changes_doc_ids(DocIds, Db, StartSeq, Dir, Fun, Acc0) ->
+ Lookups = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, DocIds),
+ FullDocInfos = lists:foldl(
+ fun({ok, FDI}, Acc) ->
+ [FDI | Acc];
+ (not_found, Acc) ->
+ Acc
+ end,
+ [], Lookups),
+ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0).
+
+
+send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0) ->
+ FoldFun = fun(FullDocInfo, _, Acc) ->
+ {ok, [FullDocInfo | Acc]}
+ end,
+ KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
+ {ok, _, FullDocInfos} = couch_btree:fold(
+ Db#db.fulldocinfo_by_id_btree, FoldFun, [], KeyOpts),
+ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0).
+
+
+send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
+ FoldFun = case Dir of
+ fwd ->
+ fun lists:foldl/3;
+ rev ->
+ fun lists:foldr/3
+ end,
+ GreaterFun = case Dir of
+ fwd ->
+ fun(A, B) -> A > B end;
+ rev ->
+ fun(A, B) -> A =< B end
+ end,
+ DocInfos = lists:foldl(
+ fun(FDI, Acc) ->
+ DI = couch_doc:to_doc_info(FDI),
+ case GreaterFun(DI#doc_info.high_seq, StartSeq) of
+ true ->
+ [DI | Acc];
+ false ->
+ Acc
+ end
+ end,
+ [], FullDocInfos),
+ SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
+ FinalAcc = try
+ FoldFun(
+ fun(DocInfo, Acc) ->
+ case Fun(DocInfo, Acc) of
+ {ok, NewAcc} ->
+ NewAcc;
+ {stop, NewAcc} ->
+ throw({stop, NewAcc})
+ end
+ end,
+ Acc0, SortedDocInfos)
+ catch
+ throw:{stop, Acc} ->
+ Acc
+ end,
+ case Dir of
+ fwd ->
+ {ok, FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)}};
+ rev ->
+ {ok, FinalAcc}
+ end.
+
+
+keep_sending_changes(Args, Acc0, FirstRound) ->
+ #changes_args{
+ feed = ResponseType,
+ limit = Limit,
+ db_open_options = DbOptions
+ } = Args,
+
+ {ok, ChangesAcc} = send_changes(
+ Args#changes_args{dir=fwd},
+ Acc0,
+ FirstRound),
+ #changes_acc{
+ db = Db, callback = Callback, timeout = Timeout, timeout_fun = TimeoutFun,
+ seq = EndSeq, prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit
+ } = ChangesAcc,
+
+ couch_db:close(Db),
+ if Limit > NewLimit, ResponseType == "longpoll" ->
+ end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
+ true ->
+ case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of
+ {updated, UserAcc4} ->
+ DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
+ case couch_db:open(Db#db.name, DbOptions1) of
+ {ok, Db2} ->
+ keep_sending_changes(
+ Args#changes_args{limit=NewLimit},
+ ChangesAcc#changes_acc{
+ db = Db2,
+ user_acc = UserAcc4,
+ seq = EndSeq,
+ prepend = Prepend2,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun},
+ false);
+ _Else ->
+ end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
+ end;
+ {stop, UserAcc4} ->
+ end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
+ end
+ end.
+
+end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
+ Callback({stop, EndSeq}, ResponseType, UserAcc).
+
+changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc)
+ when ResponseType =:= "continuous"
+ orelse ResponseType =:= "eventsource" ->
+ #changes_acc{
+ filter = FilterFun, callback = Callback,
+ user_acc = UserAcc, limit = Limit, db = Db,
+ timeout = Timeout, timeout_fun = TimeoutFun
+ } = Acc,
+ #doc_info{high_seq = Seq} = DocInfo,
+ Results0 = FilterFun(Db, DocInfo),
+ Results = [Result || Result <- Results0, Result /= null],
+ %% TODO: I'm thinking this should be < 1 and not =< 1
+ Go = if Limit =< 1 -> stop; true -> ok end,
+ case Results of
+ [] ->
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ case Done of
+ stop ->
+ {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
+ ok ->
+ {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
+ end;
+ _ ->
+ ChangesRow = changes_row(Results, DocInfo, Acc),
+ UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}
+ end;
+changes_enumerator(DocInfo, Acc) ->
+ #changes_acc{
+ filter = FilterFun, callback = Callback, prepend = Prepend,
+ user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
+ timeout = Timeout, timeout_fun = TimeoutFun
+ } = Acc,
+ #doc_info{high_seq = Seq} = DocInfo,
+ Results0 = FilterFun(Db, DocInfo),
+ Results = [Result || Result <- Results0, Result /= null],
+ Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+ case Results of
+ [] ->
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ case Done of
+ stop ->
+ {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
+ ok ->
+ {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
+ end;
+ _ ->
+ ChangesRow = changes_row(Results, DocInfo, Acc),
+ UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{
+ seq = Seq, prepend = <<",\n">>,
+ user_acc = UserAcc2, limit = Limit - 1}}
+ end.
+
+
+changes_row(Results, DocInfo, Acc) ->
+ #doc_info{
+ id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
+ } = DocInfo,
+ #changes_acc{
+ db = Db,
+ include_docs = IncDoc,
+ doc_options = DocOpts,
+ conflicts = Conflicts
+ } = Acc,
+ {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
+ deleted_item(Del) ++ case IncDoc of
+ true ->
+ Opts = case Conflicts of
+ true -> [deleted, conflicts];
+ false -> [deleted]
+ end,
+ Doc = couch_index_util:load_doc(Db, DocInfo, Opts),
+ case Doc of
+ null ->
+ [{doc, null}];
+ _ ->
+ [{doc, couch_doc:to_json_obj(Doc, DocOpts)}]
+ end;
+ false ->
+ []
+ end}.
+
+deleted_item(true) -> [{<<"deleted">>, true}];
+deleted_item(_) -> [].
+
+% waits for a db_updated msg, if there are multiple msgs, collects them.
+wait_db_updated(Timeout, TimeoutFun, UserAcc) ->
+ receive
+ db_updated ->
+ get_rest_db_updated(UserAcc)
+ after Timeout ->
+ {Go, UserAcc2} = TimeoutFun(UserAcc),
+ case Go of
+ ok ->
+ wait_db_updated(Timeout, TimeoutFun, UserAcc2);
+ stop ->
+ {stop, UserAcc2}
+ end
+ end.
+
+get_rest_db_updated(UserAcc) ->
+ receive
+ db_updated ->
+ get_rest_db_updated(UserAcc)
+ after 0 ->
+ {updated, UserAcc}
+ end.
+
+reset_heartbeat() ->
+ case get(last_changes_heartbeat) of
+ undefined ->
+ ok;
+ _ ->
+ put(last_changes_heartbeat, now())
+ end.
+
+maybe_heartbeat(Timeout, TimeoutFun, Acc) ->
+ Before = get(last_changes_heartbeat),
+ case Before of
+ undefined ->
+ {ok, Acc};
+ _ ->
+ Now = now(),
+ case timer:now_diff(Now, Before) div 1000 >= Timeout of
+ true ->
+ Acc2 = TimeoutFun(Acc),
+ put(last_changes_heartbeat, Now),
+ Acc2;
+ false ->
+ {ok, Acc}
+ end
+ end.