You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/11 09:07:10 UTC
[08/41] inital move to rebar compilation
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_compaction_daemon.erl
----------------------------------------------------------------------
diff --git a/src/couch_compaction_daemon.erl b/src/couch_compaction_daemon.erl
new file mode 100644
index 0000000..18a51a4
--- /dev/null
+++ b/src/couch_compaction_daemon.erl
@@ -0,0 +1,504 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_compaction_daemon).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/0, config_change/3]).
+
+% gen_server callbacks
+-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+
+-define(CONFIG_ETS, couch_compaction_daemon_config).
+
+-record(state, {
+ loop_pid
+}).
+
+-record(config, {
+ db_frag = nil,
+ view_frag = nil,
+ period = nil,
+ cancel = false,
+ parallel_view_compact = false
+}).
+
+-record(period, {
+ from = nil,
+ to = nil
+}).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+ process_flag(trap_exit, true),
+ ?CONFIG_ETS = ets:new(?CONFIG_ETS, [named_table, set, protected]),
+ ok = couch_config:register(fun ?MODULE:config_change/3),
+ load_config(),
+ Server = self(),
+ Loop = spawn_link(fun() -> compact_loop(Server) end),
+ {ok, #state{loop_pid = Loop}}.
+
+
+config_change("compactions", DbName, NewValue) ->
+ ok = gen_server:cast(?MODULE, {config_update, DbName, NewValue}).
+
+
+handle_cast({config_update, DbName, deleted}, State) ->
+ true = ets:delete(?CONFIG_ETS, ?l2b(DbName)),
+ {noreply, State};
+
+handle_cast({config_update, DbName, Config}, #state{loop_pid = Loop} = State) ->
+ case parse_config(DbName, Config) of
+ {ok, NewConfig} ->
+ WasEmpty = (ets:info(?CONFIG_ETS, size) =:= 0),
+ true = ets:insert(?CONFIG_ETS, {?l2b(DbName), NewConfig}),
+ case WasEmpty of
+ true ->
+ Loop ! {self(), have_config};
+ false ->
+ ok
+ end;
+ error ->
+ ok
+ end,
+ {noreply, State}.
+
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+
+handle_info({'EXIT', Pid, Reason}, #state{loop_pid = Pid} = State) ->
+ {stop, {compaction_loop_died, Reason}, State}.
+
+
+terminate(_Reason, _State) ->
+ true = ets:delete(?CONFIG_ETS).
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+compact_loop(Parent) ->
+ {ok, _} = couch_server:all_databases(
+ fun(DbName, Acc) ->
+ case ets:info(?CONFIG_ETS, size) =:= 0 of
+ true ->
+ {stop, Acc};
+ false ->
+ case get_db_config(DbName) of
+ nil ->
+ ok;
+ {ok, Config} ->
+ case check_period(Config) of
+ true ->
+ maybe_compact_db(DbName, Config);
+ false ->
+ ok
+ end
+ end,
+ {ok, Acc}
+ end
+ end, ok),
+ case ets:info(?CONFIG_ETS, size) =:= 0 of
+ true ->
+ receive {Parent, have_config} -> ok end;
+ false ->
+ PausePeriod = list_to_integer(
+ couch_config:get("compaction_daemon", "check_interval", "300")),
+ ok = timer:sleep(PausePeriod * 1000)
+ end,
+ compact_loop(Parent).
+
+
+maybe_compact_db(DbName, Config) ->
+ case (catch couch_db:open_int(DbName, [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}])) of
+ {ok, Db} ->
+ DDocNames = db_ddoc_names(Db),
+ case can_db_compact(Config, Db) of
+ true ->
+ {ok, DbCompactPid} = couch_db:start_compact(Db),
+ TimeLeft = compact_time_left(Config),
+ case Config#config.parallel_view_compact of
+ true ->
+ ViewsCompactPid = spawn_link(fun() ->
+ maybe_compact_views(DbName, DDocNames, Config)
+ end),
+ ViewsMonRef = erlang:monitor(process, ViewsCompactPid);
+ false ->
+ ViewsCompactPid = nil,
+ ViewsMonRef = nil
+ end,
+ DbMonRef = erlang:monitor(process, DbCompactPid),
+ receive
+ {'DOWN', DbMonRef, process, _, normal} ->
+ couch_db:close(Db),
+ case Config#config.parallel_view_compact of
+ true ->
+ ok;
+ false ->
+ maybe_compact_views(DbName, DDocNames, Config)
+ end;
+ {'DOWN', DbMonRef, process, _, Reason} ->
+ couch_db:close(Db),
+ ?LOG_ERROR("Compaction daemon - an error ocurred while"
+ " compacting the database `~s`: ~p", [DbName, Reason])
+ after TimeLeft ->
+ ?LOG_INFO("Compaction daemon - canceling compaction for database"
+ " `~s` because it's exceeding the allowed period.",
+ [DbName]),
+ erlang:demonitor(DbMonRef, [flush]),
+ ok = couch_db:cancel_compact(Db),
+ couch_db:close(Db)
+ end,
+ case ViewsMonRef of
+ nil ->
+ ok;
+ _ ->
+ receive
+ {'DOWN', ViewsMonRef, process, _, _Reason} ->
+ ok
+ after TimeLeft + 1000 ->
+ % Under normal circunstances, the view compaction process
+ % should have finished already.
+ erlang:demonitor(ViewsMonRef, [flush]),
+ unlink(ViewsCompactPid),
+ exit(ViewsCompactPid, kill)
+ end
+ end;
+ false ->
+ couch_db:close(Db),
+ maybe_compact_views(DbName, DDocNames, Config)
+ end;
+ _ ->
+ ok
+ end.
+
+
+maybe_compact_views(_DbName, [], _Config) ->
+ ok;
+maybe_compact_views(DbName, [DDocName | Rest], Config) ->
+ case check_period(Config) of
+ true ->
+ case maybe_compact_view(DbName, DDocName, Config) of
+ ok ->
+ maybe_compact_views(DbName, Rest, Config);
+ timeout ->
+ ok
+ end;
+ false ->
+ ok
+ end.
+
+
+db_ddoc_names(Db) ->
+ {ok, _, DDocNames} = couch_db:enum_docs(
+ Db,
+ fun(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, _, Acc) ->
+ {ok, Acc};
+ (#full_doc_info{id = <<"_design/", Id/binary>>}, _, Acc) ->
+ {ok, [Id | Acc]};
+ (_, _, Acc) ->
+ {stop, Acc}
+ end, [], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]),
+ DDocNames.
+
+
+maybe_compact_view(DbName, GroupId, Config) ->
+ DDocId = <<"_design/", GroupId/binary>>,
+ case (catch couch_mrview:get_info(DbName, DDocId)) of
+ {ok, GroupInfo} ->
+ case can_view_compact(Config, DbName, GroupId, GroupInfo) of
+ true ->
+ {ok, MonRef} = couch_mrview:compact(DbName, DDocId, [monitor]),
+ TimeLeft = compact_time_left(Config),
+ receive
+ {'DOWN', MonRef, process, _, normal} ->
+ ok;
+ {'DOWN', MonRef, process, _, Reason} ->
+ ?LOG_ERROR("Compaction daemon - an error ocurred while compacting"
+ " the view group `~s` from database `~s`: ~p",
+ [GroupId, DbName, Reason]),
+ ok
+ after TimeLeft ->
+ ?LOG_INFO("Compaction daemon - canceling the compaction for the "
+ "view group `~s` of the database `~s` because it's exceeding"
+ " the allowed period.", [GroupId, DbName]),
+ erlang:demonitor(MonRef, [flush]),
+ ok = couch_mrview:cancel_compaction(DbName, DDocId),
+ timeout
+ end;
+ false ->
+ ok
+ end;
+ Error ->
+ ?LOG_ERROR("Error opening view group `~s` from database `~s`: ~p",
+ [GroupId, DbName, Error]),
+ ok
+ end.
+
+
+compact_time_left(#config{cancel = false}) ->
+ infinity;
+compact_time_left(#config{period = nil}) ->
+ infinity;
+compact_time_left(#config{period = #period{to = {ToH, ToM} = To}}) ->
+ {H, M, _} = time(),
+ case To > {H, M} of
+ true ->
+ ((ToH - H) * 60 * 60 * 1000) + (abs(ToM - M) * 60 * 1000);
+ false ->
+ ((24 - H + ToH) * 60 * 60 * 1000) + (abs(ToM - M) * 60 * 1000)
+ end.
+
+
+get_db_config(DbName) ->
+ case ets:lookup(?CONFIG_ETS, DbName) of
+ [] ->
+ case ets:lookup(?CONFIG_ETS, <<"_default">>) of
+ [] ->
+ nil;
+ [{<<"_default">>, Config}] ->
+ {ok, Config}
+ end;
+ [{DbName, Config}] ->
+ {ok, Config}
+ end.
+
+
+can_db_compact(#config{db_frag = Threshold} = Config, Db) ->
+ case check_period(Config) of
+ false ->
+ false;
+ true ->
+ {ok, DbInfo} = couch_db:get_db_info(Db),
+ {Frag, SpaceRequired} = frag(DbInfo),
+ ?LOG_DEBUG("Fragmentation for database `~s` is ~p%, estimated space for"
+ " compaction is ~p bytes.", [Db#db.name, Frag, SpaceRequired]),
+ case check_frag(Threshold, Frag) of
+ false ->
+ false;
+ true ->
+ Free = free_space(couch_config:get("couchdb", "database_dir")),
+ case Free >= SpaceRequired of
+ true ->
+ true;
+ false ->
+ ?LOG_WARN("Compaction daemon - skipping database `~s` "
+ "compaction: the estimated necessary disk space is about ~p"
+ " bytes but the currently available disk space is ~p bytes.",
+ [Db#db.name, SpaceRequired, Free]),
+ false
+ end
+ end
+ end.
+
+can_view_compact(Config, DbName, GroupId, GroupInfo) ->
+ case check_period(Config) of
+ false ->
+ false;
+ true ->
+ case couch_util:get_value(updater_running, GroupInfo) of
+ true ->
+ false;
+ false ->
+ {Frag, SpaceRequired} = frag(GroupInfo),
+ ?LOG_DEBUG("Fragmentation for view group `~s` (database `~s`) is "
+ "~p%, estimated space for compaction is ~p bytes.",
+ [GroupId, DbName, Frag, SpaceRequired]),
+ case check_frag(Config#config.view_frag, Frag) of
+ false ->
+ false;
+ true ->
+ Free = free_space(couch_index_util:root_dir()),
+ case Free >= SpaceRequired of
+ true ->
+ true;
+ false ->
+ ?LOG_WARN("Compaction daemon - skipping view group `~s` "
+ "compaction (database `~s`): the estimated necessary "
+ "disk space is about ~p bytes but the currently available"
+ " disk space is ~p bytes.",
+ [GroupId, DbName, SpaceRequired, Free]),
+ false
+ end
+ end
+ end
+ end.
+
+
+check_period(#config{period = nil}) ->
+ true;
+check_period(#config{period = #period{from = From, to = To}}) ->
+ {HH, MM, _} = erlang:time(),
+ case From < To of
+ true ->
+ ({HH, MM} >= From) andalso ({HH, MM} < To);
+ false ->
+ ({HH, MM} >= From) orelse ({HH, MM} < To)
+ end.
+
+
+check_frag(nil, _) ->
+ true;
+check_frag(Threshold, Frag) ->
+ Frag >= Threshold.
+
+
+frag(Props) ->
+ FileSize = couch_util:get_value(disk_size, Props),
+ MinFileSize = list_to_integer(
+ couch_config:get("compaction_daemon", "min_file_size", "131072")),
+ case FileSize < MinFileSize of
+ true ->
+ {0, FileSize};
+ false ->
+ case couch_util:get_value(data_size, Props) of
+ null ->
+ {100, FileSize};
+ 0 ->
+ {0, FileSize};
+ DataSize ->
+ Frag = round(((FileSize - DataSize) / FileSize * 100)),
+ {Frag, space_required(DataSize)}
+ end
+ end.
+
+% Rough, and pessimistic, estimation of necessary disk space to compact a
+% database or view index.
+space_required(DataSize) ->
+ round(DataSize * 2.0).
+
+
+load_config() ->
+ lists:foreach(
+ fun({DbName, ConfigString}) ->
+ case parse_config(DbName, ConfigString) of
+ {ok, Config} ->
+ true = ets:insert(?CONFIG_ETS, {?l2b(DbName), Config});
+ error ->
+ ok
+ end
+ end,
+ couch_config:get("compactions")).
+
+parse_config(DbName, ConfigString) ->
+ case (catch do_parse_config(ConfigString)) of
+ {ok, Conf} ->
+ {ok, Conf};
+ incomplete_period ->
+ ?LOG_ERROR("Incomplete period ('to' or 'from' missing) in the compaction"
+ " configuration for database `~s`", [DbName]),
+ error;
+ _ ->
+ ?LOG_ERROR("Invalid compaction configuration for database "
+ "`~s`: `~s`", [DbName, ConfigString]),
+ error
+ end.
+
+do_parse_config(ConfigString) ->
+ {ok, ConfProps} = couch_util:parse_term(ConfigString),
+ {ok, #config{period = Period} = Conf} = config_record(ConfProps, #config{}),
+ case Period of
+ nil ->
+ {ok, Conf};
+ #period{from = From, to = To} when From =/= nil, To =/= nil ->
+ {ok, Conf};
+ #period{} ->
+ incomplete_period
+ end.
+
+config_record([], Config) ->
+ {ok, Config};
+
+config_record([{db_fragmentation, V} | Rest], Config) ->
+ [Frag] = string:tokens(V, "%"),
+ config_record(Rest, Config#config{db_frag = list_to_integer(Frag)});
+
+config_record([{view_fragmentation, V} | Rest], Config) ->
+ [Frag] = string:tokens(V, "%"),
+ config_record(Rest, Config#config{view_frag = list_to_integer(Frag)});
+
+config_record([{from, V} | Rest], #config{period = Period0} = Config) ->
+ Time = parse_time(V),
+ Period = case Period0 of
+ nil ->
+ #period{from = Time};
+ #period{} ->
+ Period0#period{from = Time}
+ end,
+ config_record(Rest, Config#config{period = Period});
+
+config_record([{to, V} | Rest], #config{period = Period0} = Config) ->
+ Time = parse_time(V),
+ Period = case Period0 of
+ nil ->
+ #period{to = Time};
+ #period{} ->
+ Period0#period{to = Time}
+ end,
+ config_record(Rest, Config#config{period = Period});
+
+config_record([{strict_window, true} | Rest], Config) ->
+ config_record(Rest, Config#config{cancel = true});
+
+config_record([{strict_window, false} | Rest], Config) ->
+ config_record(Rest, Config#config{cancel = false});
+
+config_record([{parallel_view_compaction, true} | Rest], Config) ->
+ config_record(Rest, Config#config{parallel_view_compact = true});
+
+config_record([{parallel_view_compaction, false} | Rest], Config) ->
+ config_record(Rest, Config#config{parallel_view_compact = false}).
+
+
+parse_time(String) ->
+ [HH, MM] = string:tokens(String, ":"),
+ {list_to_integer(HH), list_to_integer(MM)}.
+
+
+free_space(Path) ->
+ DiskData = lists:sort(
+ fun({PathA, _, _}, {PathB, _, _}) ->
+ length(filename:split(PathA)) > length(filename:split(PathB))
+ end,
+ disksup:get_disk_data()),
+ free_space_rec(abs_path(Path), DiskData).
+
+free_space_rec(_Path, []) ->
+ undefined;
+free_space_rec(Path, [{MountPoint0, Total, Usage} | Rest]) ->
+ MountPoint = abs_path(MountPoint0),
+ case MountPoint =:= string:substr(Path, 1, length(MountPoint)) of
+ false ->
+ free_space_rec(Path, Rest);
+ true ->
+ trunc(Total - (Total * (Usage / 100))) * 1024
+ end.
+
+abs_path(Path0) ->
+ Path = filename:absname(Path0),
+ case lists:last(Path) of
+ $/ ->
+ Path;
+ _ ->
+ Path ++ "/"
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_compress.erl
----------------------------------------------------------------------
diff --git a/src/couch_compress.erl b/src/couch_compress.erl
new file mode 100644
index 0000000..ac386fd
--- /dev/null
+++ b/src/couch_compress.erl
@@ -0,0 +1,84 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_compress).
+
+-export([compress/2, decompress/1, is_compressed/2]).
+-export([get_compression_method/0]).
+
+-include("couch_db.hrl").
+
+% binaries compressed with snappy have their first byte set to this value
+-define(SNAPPY_PREFIX, 1).
+% Term prefixes documented at:
+% http://www.erlang.org/doc/apps/erts/erl_ext_dist.html
+-define(TERM_PREFIX, 131).
+-define(COMPRESSED_TERM_PREFIX, 131, 80).
+
+
+get_compression_method() ->
+ case couch_config:get("couchdb", "file_compression") of
+ undefined ->
+ ?DEFAULT_COMPRESSION;
+ Method1 ->
+ case string:tokens(Method1, "_") of
+ [Method] ->
+ list_to_existing_atom(Method);
+ [Method, Level] ->
+ {list_to_existing_atom(Method), list_to_integer(Level)}
+ end
+ end.
+
+
+compress(<<?SNAPPY_PREFIX, _/binary>> = Bin, snappy) ->
+ Bin;
+compress(<<?SNAPPY_PREFIX, _/binary>> = Bin, Method) ->
+ compress(decompress(Bin), Method);
+compress(<<?TERM_PREFIX, _/binary>> = Bin, Method) ->
+ compress(decompress(Bin), Method);
+compress(Term, none) ->
+ ?term_to_bin(Term);
+compress(Term, {deflate, Level}) ->
+ term_to_binary(Term, [{minor_version, 1}, {compressed, Level}]);
+compress(Term, snappy) ->
+ Bin = ?term_to_bin(Term),
+ try
+ {ok, CompressedBin} = snappy:compress(Bin),
+ case byte_size(CompressedBin) < byte_size(Bin) of
+ true ->
+ <<?SNAPPY_PREFIX, CompressedBin/binary>>;
+ false ->
+ Bin
+ end
+ catch exit:snappy_nif_not_loaded ->
+ Bin
+ end.
+
+
+decompress(<<?SNAPPY_PREFIX, Rest/binary>>) ->
+ {ok, TermBin} = snappy:decompress(Rest),
+ binary_to_term(TermBin);
+decompress(<<?TERM_PREFIX, _/binary>> = Bin) ->
+ binary_to_term(Bin).
+
+
+is_compressed(<<?SNAPPY_PREFIX, _/binary>>, Method) ->
+ Method =:= snappy;
+is_compressed(<<?COMPRESSED_TERM_PREFIX, _/binary>>, {deflate, _Level}) ->
+ true;
+is_compressed(<<?COMPRESSED_TERM_PREFIX, _/binary>>, _Method) ->
+ false;
+is_compressed(<<?TERM_PREFIX, _/binary>>, Method) ->
+ Method =:= none;
+is_compressed(Term, _Method) when not is_binary(Term) ->
+ false.
+
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_config.erl
----------------------------------------------------------------------
diff --git a/src/couch_config.erl b/src/couch_config.erl
new file mode 100644
index 0000000..22d7cdc
--- /dev/null
+++ b/src/couch_config.erl
@@ -0,0 +1,251 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Reads CouchDB's ini file and gets queried for configuration parameters.
+% This module is initialized with a list of ini files that it consecutively
+% reads Key/Value pairs from and saves them in an ets table. If more than one
+% ini file is specified, the last one is used to write changes that are made
+% with store/2 back to that ini file.
+
+-module(couch_config).
+-behaviour(gen_server).
+
+-include("couch_db.hrl").
+
+
+-export([start_link/1, stop/0]).
+-export([all/0, get/1, get/2, get/3, set/3, set/4, delete/2, delete/3]).
+-export([register/1, register/2]).
+-export([parse_ini_file/1]).
+
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-record(config, {
+ notify_funs=[],
+ write_filename=undefined
+}).
+
+
+start_link(IniFiles) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, IniFiles, []).
+
+stop() ->
+ gen_server:cast(?MODULE, stop).
+
+
+all() ->
+ lists:sort(gen_server:call(?MODULE, all, infinity)).
+
+
+get(Section) when is_binary(Section) ->
+ ?MODULE:get(?b2l(Section));
+get(Section) ->
+ Matches = ets:match(?MODULE, {{Section, '$1'}, '$2'}),
+ [{Key, Value} || [Key, Value] <- Matches].
+
+get(Section, Key) ->
+ ?MODULE:get(Section, Key, undefined).
+
+get(Section, Key, Default) when is_binary(Section) and is_binary(Key) ->
+ ?MODULE:get(?b2l(Section), ?b2l(Key), Default);
+get(Section, Key, Default) ->
+ case ets:lookup(?MODULE, {Section, Key}) of
+ [] -> Default;
+ [{_, Match}] -> Match
+ end.
+
+set(Section, Key, Value) ->
+ ?MODULE:set(Section, Key, Value, true).
+
+set(Section, Key, Value, Persist) when is_binary(Section) and is_binary(Key) ->
+ ?MODULE:set(?b2l(Section), ?b2l(Key), Value, Persist);
+set(Section, Key, Value, Persist) ->
+ gen_server:call(?MODULE, {set, Section, Key, Value, Persist}).
+
+
+delete(Section, Key) when is_binary(Section) and is_binary(Key) ->
+ delete(?b2l(Section), ?b2l(Key));
+delete(Section, Key) ->
+ delete(Section, Key, true).
+
+delete(Section, Key, Persist) when is_binary(Section) and is_binary(Key) ->
+ delete(?b2l(Section), ?b2l(Key), Persist);
+delete(Section, Key, Persist) ->
+ gen_server:call(?MODULE, {delete, Section, Key, Persist}).
+
+
+register(Fun) ->
+ ?MODULE:register(Fun, self()).
+
+register(Fun, Pid) ->
+ gen_server:call(?MODULE, {register, Fun, Pid}).
+
+
+init(IniFiles) ->
+ ets:new(?MODULE, [named_table, set, protected]),
+ try
+ lists:foreach(fun(IniFile) ->
+ {ok, ParsedIniValues} = parse_ini_file(IniFile),
+ ets:insert(?MODULE, ParsedIniValues)
+ end, IniFiles),
+ WriteFile = case IniFiles of
+ [_|_] -> lists:last(IniFiles);
+ _ -> undefined
+ end,
+ {ok, #config{write_filename = WriteFile}}
+ catch _Tag:Error ->
+ {stop, Error}
+ end.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+handle_call(all, _From, Config) ->
+ Resp = lists:sort((ets:tab2list(?MODULE))),
+ {reply, Resp, Config};
+handle_call({set, Sec, Key, Val, Persist}, From, Config) ->
+ Result = case {Persist, Config#config.write_filename} of
+ {true, undefined} ->
+ ok;
+ {true, FileName} ->
+ couch_config_writer:save_to_file({{Sec, Key}, Val}, FileName);
+ _ ->
+ ok
+ end,
+ case Result of
+ ok ->
+ true = ets:insert(?MODULE, {{Sec, Key}, Val}),
+ spawn_link(fun() ->
+ [catch F(Sec, Key, Val, Persist) || {_Pid, F} <- Config#config.notify_funs],
+ gen_server:reply(From, ok)
+ end),
+ {noreply, Config};
+ _Error ->
+ {reply, Result, Config}
+ end;
+handle_call({delete, Sec, Key, Persist}, From, Config) ->
+ true = ets:delete(?MODULE, {Sec,Key}),
+ case {Persist, Config#config.write_filename} of
+ {true, undefined} ->
+ ok;
+ {true, FileName} ->
+ couch_config_writer:save_to_file({{Sec, Key}, ""}, FileName);
+ _ ->
+ ok
+ end,
+ spawn_link(fun() ->
+ [catch F(Sec, Key, deleted, Persist) || {_Pid, F} <- Config#config.notify_funs],
+ gen_server:reply(From, ok)
+ end),
+ {noreply, Config};
+handle_call({register, Fun, Pid}, _From, #config{notify_funs=PidFuns}=Config) ->
+ erlang:monitor(process, Pid),
+ % convert 1 and 2 arity to 3 arity
+ Fun2 =
+ case Fun of
+ _ when is_function(Fun, 1) ->
+ fun(Section, _Key, _Value, _Persist) -> Fun(Section) end;
+ _ when is_function(Fun, 2) ->
+ fun(Section, Key, _Value, _Persist) -> Fun(Section, Key) end;
+ _ when is_function(Fun, 3) ->
+ fun(Section, Key, Value, _Persist) -> Fun(Section, Key, Value) end;
+ _ when is_function(Fun, 4) ->
+ Fun
+ end,
+ {reply, ok, Config#config{notify_funs=[{Pid, Fun2} | PidFuns]}}.
+
+
+handle_cast(stop, State) ->
+ {stop, normal, State};
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _, _, DownPid, _}, #config{notify_funs=PidFuns}=Config) ->
+ % remove any funs registered by the downed process
+ FilteredPidFuns = [{Pid,Fun} || {Pid,Fun} <- PidFuns, Pid /= DownPid],
+ {noreply, Config#config{notify_funs=FilteredPidFuns}}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+parse_ini_file(IniFile) ->
+ IniFilename = couch_util:abs_pathname(IniFile),
+ IniBin =
+ case file:read_file(IniFilename) of
+ {ok, IniBin0} ->
+ IniBin0;
+ {error, Reason} = Error ->
+ ?LOG_ERROR("Could not read server configuration file ~s: ~s",
+ [IniFilename, file:format_error(Reason)]),
+ throw(Error)
+ end,
+
+ Lines = re:split(IniBin, "\r\n|\n|\r|\032", [{return, list}]),
+ {_, ParsedIniValues} =
+ lists:foldl(fun(Line, {AccSectionName, AccValues}) ->
+ case string:strip(Line) of
+ "[" ++ Rest ->
+ case re:split(Rest, "\\]", [{return, list}]) of
+ [NewSectionName, ""] ->
+ {NewSectionName, AccValues};
+ _Else -> % end bracket not at end, ignore this line
+ {AccSectionName, AccValues}
+ end;
+ ";" ++ _Comment ->
+ {AccSectionName, AccValues};
+ Line2 ->
+ case re:split(Line2, "\s*=\s*", [{return, list}]) of
+ [Value] ->
+ MultiLineValuePart = case re:run(Line, "^ \\S", []) of
+ {match, _} ->
+ true;
+ _ ->
+ false
+ end,
+ case {MultiLineValuePart, AccValues} of
+ {true, [{{_, ValueName}, PrevValue} | AccValuesRest]} ->
+ % remove comment
+ case re:split(Value, " ;|\t;", [{return, list}]) of
+ [[]] ->
+ % empty line
+ {AccSectionName, AccValues};
+ [LineValue | _Rest] ->
+ E = {{AccSectionName, ValueName},
+ PrevValue ++ " " ++ LineValue},
+ {AccSectionName, [E | AccValuesRest]}
+ end;
+ _ ->
+ {AccSectionName, AccValues}
+ end;
+ [""|_LineValues] -> % line begins with "=", ignore
+ {AccSectionName, AccValues};
+ [ValueName|LineValues] -> % yeehaw, got a line!
+ RemainingLine = couch_util:implode(LineValues, "="),
+ % removes comments
+ case re:split(RemainingLine, " ;|\t;", [{return, list}]) of
+ [[]] ->
+ % empty line means delete this key
+ ets:delete(?MODULE, {AccSectionName, ValueName}),
+ {AccSectionName, AccValues};
+ [LineValue | _Rest] ->
+ {AccSectionName,
+ [{{AccSectionName, ValueName}, LineValue} | AccValues]}
+ end
+ end
+ end
+ end, {"", []}, Lines),
+ {ok, ParsedIniValues}.
+
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_config_writer.erl
----------------------------------------------------------------------
diff --git a/src/couch_config_writer.erl b/src/couch_config_writer.erl
new file mode 100644
index 0000000..21f1c3f
--- /dev/null
+++ b/src/couch_config_writer.erl
@@ -0,0 +1,88 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% @doc Saves a Key/Value pair to a ini file. The Key consists of a Section
+%% and Option combination. If that combination is found in the ini file
+%% the new value replaces the old value. If only the Section is found the
+%% Option and value combination is appended to the Section. If the Section
+%% does not yet exist in the ini file, it is added and the Option/Value
+%% pair is appended.
+%% @see couch_config
+
+-module(couch_config_writer).
+
+-export([save_to_file/2]).
+
+-include("couch_db.hrl").
+
+%% @spec save_to_file(
+%% Config::{{Section::string(), Option::string()}, Value::string()},
+%% File::filename()) -> ok
+%% @doc Saves a Section/Key/Value triple to the ini file File::filename()
+save_to_file({{Section, Key}, Value}, File) ->
+ {ok, OldFileContents} = file:read_file(File),
+ Lines = re:split(OldFileContents, "\r\n|\n|\r|\032", [{return, list}]),
+
+ SectionLine = "[" ++ Section ++ "]",
+ {ok, Pattern} = re:compile(["^(", Key, "\\s*=)|\\[[a-zA-Z0-9\_-]*\\]"]),
+
+ NewLines = process_file_lines(Lines, [], SectionLine, Pattern, Key, Value),
+ NewFileContents = reverse_and_add_newline(strip_empty_lines(NewLines), []),
+ case file:write_file(File, NewFileContents) of
+ ok ->
+ ok;
+ {error, Reason} = Error ->
+ ?LOG_ERROR("Could not write config file ~s: ~s",
+ [File, file:format_error(Reason)]),
+ Error
+ end.
+
+
+process_file_lines([Section|Rest], SeenLines, Section, Pattern, Key, Value) ->
+ process_section_lines(Rest, [Section|SeenLines], Pattern, Key, Value);
+
+process_file_lines([Line|Rest], SeenLines, Section, Pattern, Key, Value) ->
+ process_file_lines(Rest, [Line|SeenLines], Section, Pattern, Key, Value);
+
+process_file_lines([], SeenLines, Section, _Pattern, Key, Value) ->
+ % Section wasn't found. Append it with the option here.
+ [Key ++ " = " ++ Value, Section, "" | strip_empty_lines(SeenLines)].
+
+
+process_section_lines([Line|Rest], SeenLines, Pattern, Key, Value) ->
+ case re:run(Line, Pattern, [{capture, all_but_first}]) of
+ nomatch -> % Found nothing interesting. Move on.
+ process_section_lines(Rest, [Line|SeenLines], Pattern, Key, Value);
+ {match, []} -> % Found another section. Append the option here.
+ lists:reverse(Rest) ++
+ [Line, "", Key ++ " = " ++ Value | strip_empty_lines(SeenLines)];
+ {match, _} -> % Found the option itself. Replace it.
+ lists:reverse(Rest) ++ [Key ++ " = " ++ Value | SeenLines]
+ end;
+
+process_section_lines([], SeenLines, _Pattern, Key, Value) ->
+ % Found end of file within the section. Append the option here.
+ [Key ++ " = " ++ Value | strip_empty_lines(SeenLines)].
+
+
+reverse_and_add_newline([Line|Rest], Content) ->
+ reverse_and_add_newline(Rest, [Line, "\n", Content]);
+
+reverse_and_add_newline([], Content) ->
+ Content.
+
+
+strip_empty_lines(["" | Rest]) ->
+ strip_empty_lines(Rest);
+
+strip_empty_lines(All) ->
+ All.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_db.erl b/src/couch_db.erl
new file mode 100644
index 0000000..11ea0fd
--- /dev/null
+++ b/src/couch_db.erl
@@ -0,0 +1,1358 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db).
+-behaviour(gen_server).
+
+-export([open/2,open_int/2,close/1,create/2,get_db_info/1,get_design_docs/1]).
+-export([start_compact/1, cancel_compact/1]).
+-export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]).
+-export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
+-export([get_doc_info/2,get_full_doc_info/2,get_full_doc_infos/2]).
+-export([open_doc/2,open_doc/3,open_doc_revs/4]).
+-export([set_revs_limit/2,get_revs_limit/1]).
+-export([get_missing_revs/2,name/1,get_update_seq/1,get_committed_update_seq/1]).
+-export([enum_docs/4,enum_docs_since/5]).
+-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
+-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
+-export([start_link/3,open_doc_int/3,ensure_full_commit/1]).
+-export([set_security/2,get_security/1]).
+-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
+-export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]).
+-export([check_is_admin/1, check_is_member/1]).
+-export([reopen/1, is_system_db/1, compression/1]).
+
+-include("couch_db.hrl").
+
+
+start_link(DbName, Filepath, Options) ->
+ case open_db_file(Filepath, Options) of
+ {ok, Fd} ->
+ StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []),
+ unlink(Fd),
+ StartResult;
+ Else ->
+ Else
+ end.
+
+open_db_file(Filepath, Options) ->
+ case couch_file:open(Filepath, Options) of
+ {ok, Fd} ->
+ {ok, Fd};
+ {error, enoent} ->
+ % couldn't find file. is there a compact version? This can happen if
+ % crashed during the file switch.
+ case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of
+ {ok, Fd} ->
+ ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
+ ok = file:rename(Filepath ++ ".compact", Filepath),
+ ok = couch_file:sync(Fd),
+ {ok, Fd};
+ {error, enoent} ->
+ {not_found, no_db_file}
+ end;
+ Error ->
+ Error
+ end.
+
+
+create(DbName, Options) ->
+ couch_server:create(DbName, Options).
+
+% this is for opening a database for internal purposes like the replicator
+% or the view indexer. it never throws a reader error.
+open_int(DbName, Options) ->
+ couch_server:open(DbName, Options).
+
+% this should be called anytime an http request opens the database.
+% it ensures that the http userCtx is a valid reader
+open(DbName, Options) ->
+ case couch_server:open(DbName, Options) of
+ {ok, Db} ->
+ try
+ check_is_member(Db),
+ {ok, Db}
+ catch
+ throw:Error ->
+ close(Db),
+ throw(Error)
+ end;
+ Else -> Else
+ end.
+
+reopen(#db{main_pid = Pid, fd_ref_counter = OldRefCntr, user_ctx = UserCtx}) ->
+ {ok, #db{fd_ref_counter = NewRefCntr} = NewDb} =
+ gen_server:call(Pid, get_db, infinity),
+ case NewRefCntr =:= OldRefCntr of
+ true ->
+ ok;
+ false ->
+ couch_ref_counter:add(NewRefCntr),
+ catch couch_ref_counter:drop(OldRefCntr)
+ end,
+ {ok, NewDb#db{user_ctx = UserCtx}}.
+
+is_system_db(#db{options = Options}) ->
+ lists:member(sys_db, Options).
+
+ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) ->
+ ok = gen_server:call(UpdatePid, full_commit, infinity),
+ {ok, StartTime}.
+
+close(#db{fd_ref_counter=RefCntr}) ->
+ couch_ref_counter:drop(RefCntr).
+
+open_ref_counted(MainPid, OpenedPid) ->
+ gen_server:call(MainPid, {open_ref_count, OpenedPid}).
+
+is_idle(#db{main_pid = MainPid}) ->
+ is_idle(MainPid);
+is_idle(MainPid) ->
+ gen_server:call(MainPid, is_idle).
+
+monitor(#db{main_pid=MainPid}) ->
+ erlang:monitor(process, MainPid).
+
+start_compact(#db{update_pid=Pid}) ->
+ gen_server:call(Pid, start_compact).
+
+cancel_compact(#db{update_pid=Pid}) ->
+ gen_server:call(Pid, cancel_compact).
+
+delete_doc(Db, Id, Revisions) ->
+ DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
+ {ok, [Result]} = update_docs(Db, DeletedDocs, []),
+ {ok, Result}.
+
+open_doc(Db, IdOrDocInfo) ->
+ open_doc(Db, IdOrDocInfo, []).
+
+open_doc(Db, Id, Options) ->
+ increment_stat(Db, {couchdb, database_reads}),
+ case open_doc_int(Db, Id, Options) of
+ {ok, #doc{deleted=true}=Doc} ->
+ case lists:member(deleted, Options) of
+ true ->
+ apply_open_options({ok, Doc},Options);
+ false ->
+ {not_found, deleted}
+ end;
+ Else ->
+ apply_open_options(Else,Options)
+ end.
+
+apply_open_options({ok, Doc},Options) ->
+ apply_open_options2(Doc,Options);
+apply_open_options(Else,_Options) ->
+ Else.
+
+apply_open_options2(Doc,[]) ->
+ {ok, Doc};
+apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc,
+ [{atts_since, PossibleAncestors}|Rest]) ->
+ RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors),
+ apply_open_options2(Doc#doc{atts=[A#att{data=
+ if AttPos>RevPos -> Data; true -> stub end}
+ || #att{revpos=AttPos,data=Data}=A <- Atts]}, Rest);
+apply_open_options2(Doc, [ejson_body | Rest]) ->
+ apply_open_options2(couch_doc:with_ejson_body(Doc), Rest);
+apply_open_options2(Doc,[_|Rest]) ->
+ apply_open_options2(Doc,Rest).
+
+
+find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
+ 0;
+find_ancestor_rev_pos(_DocRevs, []) ->
+ 0;
+find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
+ case lists:member({RevPos, RevId}, AttsSinceRevs) of
+ true ->
+ RevPos;
+ false ->
+ find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
+ end.
+
+open_doc_revs(Db, Id, Revs, Options) ->
+ increment_stat(Db, {couchdb, database_reads}),
+ [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
+ {ok, [apply_open_options(Result, Options) || Result <- Results]}.
+
+% Each returned result is a list of tuples:
+% {Id, MissingRevs, PossibleAncestors}
+% if no revs are missing, it's omitted from the results.
+get_missing_revs(Db, IdRevsList) ->
+ Results = get_full_doc_infos(Db, [Id1 || {Id1, _Revs} <- IdRevsList]),
+ {ok, find_missing(IdRevsList, Results)}.
+
+find_missing([], []) ->
+ [];
+find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
+ case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of
+ [] ->
+ find_missing(RestIdRevs, RestLookupInfo);
+ MissingRevs ->
+ #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
+ LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
+ % Find the revs that are possible parents of this rev
+ PossibleAncestors =
+ lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
+ % this leaf is a "possible ancenstor" of the missing
+ % revs if this LeafPos lessthan any of the missing revs
+ case lists:any(fun({MissingPos, _}) ->
+ LeafPos < MissingPos end, MissingRevs) of
+ true ->
+ [{LeafPos, LeafRevId} | Acc];
+ false ->
+ Acc
+ end
+ end, [], LeafRevs),
+ [{Id, MissingRevs, PossibleAncestors} |
+ find_missing(RestIdRevs, RestLookupInfo)]
+ end;
+find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
+ [{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)].
+
+get_doc_info(Db, Id) ->
+ case get_full_doc_info(Db, Id) of
+ {ok, DocInfo} ->
+ {ok, couch_doc:to_doc_info(DocInfo)};
+ Else ->
+ Else
+ end.
+
+% returns {ok, DocInfo} or not_found
+get_full_doc_info(Db, Id) ->
+ [Result] = get_full_doc_infos(Db, [Id]),
+ Result.
+
+get_full_doc_infos(Db, Ids) ->
+ couch_btree:lookup(by_id_btree(Db), Ids).
+
+increment_update_seq(#db{update_pid=UpdatePid}) ->
+ gen_server:call(UpdatePid, increment_update_seq).
+
+purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
+ gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
+
+get_committed_update_seq(#db{committed_update_seq=Seq}) ->
+ Seq.
+
+get_update_seq(#db{update_seq=Seq})->
+ Seq.
+
+get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})->
+ PurgeSeq.
+
+get_last_purged(#db{header=#db_header{purged_docs=nil}}) ->
+ {ok, []};
+get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) ->
+ couch_file:pread_term(Fd, PurgedPointer).
+
+get_db_info(Db) ->
+ #db{fd=Fd,
+ header=#db_header{disk_version=DiskVersion},
+ compactor_pid=Compactor,
+ update_seq=SeqNum,
+ name=Name,
+ instance_start_time=StartTime,
+ committed_update_seq=CommittedUpdateSeq,
+ fulldocinfo_by_id_btree = IdBtree,
+ docinfo_by_seq_btree = SeqBtree,
+ local_docs_btree = LocalBtree
+ } = Db,
+ {ok, Size} = couch_file:bytes(Fd),
+ {ok, DbReduction} = couch_btree:full_reduce(by_id_btree(Db)),
+ InfoList = [
+ {db_name, Name},
+ {doc_count, element(1, DbReduction)},
+ {doc_del_count, element(2, DbReduction)},
+ {update_seq, SeqNum},
+ {purge_seq, couch_db:get_purge_seq(Db)},
+ {compact_running, Compactor/=nil},
+ {disk_size, Size},
+ {data_size, db_data_size(DbReduction, [SeqBtree, IdBtree, LocalBtree])},
+ {instance_start_time, StartTime},
+ {disk_format_version, DiskVersion},
+ {committed_update_seq, CommittedUpdateSeq}
+ ],
+ {ok, InfoList}.
+
+db_data_size({_Count, _DelCount}, _Trees) ->
+ % pre 1.2 format, upgraded on compaction
+ null;
+db_data_size({_Count, _DelCount, nil}, _Trees) ->
+ null;
+db_data_size({_Count, _DelCount, DocAndAttsSize}, Trees) ->
+ sum_tree_sizes(DocAndAttsSize, Trees).
+
+sum_tree_sizes(Acc, []) ->
+ Acc;
+sum_tree_sizes(Acc, [T | Rest]) ->
+ case couch_btree:size(T) of
+ nil ->
+ null;
+ Sz ->
+ sum_tree_sizes(Acc + Sz, Rest)
+ end.
+
+get_design_docs(Db) ->
+ FoldFun = skip_deleted(fun
+ (#full_doc_info{deleted = true}, _Reds, Acc) ->
+ {ok, Acc};
+ (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, Acc) ->
+ {ok, [FullDocInfo | Acc]};
+ (_, _Reds, Acc) ->
+ {stop, Acc}
+ end),
+ KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
+ {ok, _, Docs} = couch_btree:fold(by_id_btree(Db), FoldFun, [], KeyOpts),
+ Docs.
+
+check_is_admin(#db{user_ctx=#user_ctx{name=Name,roles=Roles}}=Db) ->
+ {Admins} = get_admins(Db),
+ AdminRoles = [<<"_admin">> | couch_util:get_value(<<"roles">>, Admins, [])],
+ AdminNames = couch_util:get_value(<<"names">>, Admins,[]),
+ case AdminRoles -- Roles of
+ AdminRoles -> % same list, not an admin role
+ case AdminNames -- [Name] of
+ AdminNames -> % same names, not an admin
+ throw({unauthorized, <<"You are not a db or server admin.">>});
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
+ end.
+
+check_is_member(#db{user_ctx=#user_ctx{name=Name,roles=Roles}=UserCtx}=Db) ->
+ case (catch check_is_admin(Db)) of
+ ok -> ok;
+ _ ->
+ {Members} = get_members(Db),
+ ReaderRoles = couch_util:get_value(<<"roles">>, Members,[]),
+ WithAdminRoles = [<<"_admin">> | ReaderRoles],
+ ReaderNames = couch_util:get_value(<<"names">>, Members,[]),
+ case ReaderRoles ++ ReaderNames of
+ [] -> ok; % no readers == public access
+ _Else ->
+ case WithAdminRoles -- Roles of
+ WithAdminRoles -> % same list, not an reader role
+ case ReaderNames -- [Name] of
+ ReaderNames -> % same names, not a reader
+ ?LOG_DEBUG("Not a reader: UserCtx ~p vs Names ~p Roles ~p",[UserCtx, ReaderNames, WithAdminRoles]),
+ throw({unauthorized, <<"You are not authorized to access this db.">>});
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
+ end
+ end
+ end.
+
+get_admins(#db{security=SecProps}) ->
+ couch_util:get_value(<<"admins">>, SecProps, {[]}).
+
+get_members(#db{security=SecProps}) ->
+ % we fallback to readers here for backwards compatibility
+ couch_util:get_value(<<"members">>, SecProps,
+ couch_util:get_value(<<"readers">>, SecProps, {[]})).
+
+get_security(#db{security=SecProps}) ->
+ {SecProps}.
+
+set_security(#db{update_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
+ check_is_admin(Db),
+ ok = validate_security_object(NewSecProps),
+ ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity),
+ {ok, _} = ensure_full_commit(Db),
+ ok;
+set_security(_, _) ->
+ throw(bad_request).
+
+validate_security_object(SecProps) ->
+ Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}),
+ % we fallback to readers here for backwards compatibility
+ Members = couch_util:get_value(<<"members">>, SecProps,
+ couch_util:get_value(<<"readers">>, SecProps, {[]})),
+ ok = validate_names_and_roles(Admins),
+ ok = validate_names_and_roles(Members),
+ ok.
+
+% validate user input
+validate_names_and_roles({Props}) when is_list(Props) ->
+ case couch_util:get_value(<<"names">>,Props,[]) of
+ Ns when is_list(Ns) ->
+ [throw("names must be a JSON list of strings") ||N <- Ns, not is_binary(N)],
+ Ns;
+ _ -> throw("names must be a JSON list of strings")
+ end,
+ case couch_util:get_value(<<"roles">>,Props,[]) of
+ Rs when is_list(Rs) ->
+ [throw("roles must be a JSON list of strings") ||R <- Rs, not is_binary(R)],
+ Rs;
+ _ -> throw("roles must be a JSON list of strings")
+ end,
+ ok.
+
+get_revs_limit(#db{revs_limit=Limit}) ->
+ Limit.
+
+set_revs_limit(#db{update_pid=Pid}=Db, Limit) when Limit > 0 ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {set_revs_limit, Limit}, infinity);
+set_revs_limit(_Db, _Limit) ->
+ throw(invalid_revs_limit).
+
+name(#db{name=Name}) ->
+ Name.
+
+compression(#db{compression=Compression}) ->
+ Compression.
+
+update_doc(Db, Doc, Options) ->
+ update_doc(Db, Doc, Options, interactive_edit).
+
+update_doc(Db, Doc, Options, UpdateType) ->
+ case update_docs(Db, [Doc], Options, UpdateType) of
+ {ok, [{ok, NewRev}]} ->
+ {ok, NewRev};
+ {ok, [{{_Id, _Rev}, Error}]} ->
+ throw(Error);
+ {ok, [Error]} ->
+ throw(Error);
+ {ok, []} ->
+ % replication success
+ {Pos, [RevId | _]} = Doc#doc.revs,
+ {ok, {Pos, RevId}}
+ end.
+
+update_docs(Db, Docs) ->
+ update_docs(Db, Docs, []).
+
+% group_alike_docs groups the sorted documents into sublist buckets, by id.
+% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
+group_alike_docs(Docs) ->
+ Sorted = lists:sort(fun({#doc{id=A},_},{#doc{id=B},_})-> A < B end, Docs),
+ group_alike_docs(Sorted, []).
+
+group_alike_docs([], Buckets) ->
+ lists:reverse(lists:map(fun lists:reverse/1, Buckets));
+group_alike_docs([Doc|Rest], []) ->
+ group_alike_docs(Rest, [[Doc]]);
+group_alike_docs([{Doc,Ref}|Rest], [Bucket|RestBuckets]) ->
+ [{#doc{id=BucketId},_Ref}|_] = Bucket,
+ case Doc#doc.id == BucketId of
+ true ->
+ % add to existing bucket
+ group_alike_docs(Rest, [[{Doc,Ref}|Bucket]|RestBuckets]);
+ false ->
+ % add to new bucket
+ group_alike_docs(Rest, [[{Doc,Ref}]|[Bucket|RestBuckets]])
+ end.
+
+validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
+ catch check_is_admin(Db);
+validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) ->
+ ok;
+validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
+ ok;
+validate_doc_update(Db, Doc, GetDiskDocFun) ->
+ DiskDoc = GetDiskDocFun(),
+ JsonCtx = couch_util:json_user_ctx(Db),
+ SecObj = get_security(Db),
+ try [case Fun(Doc, DiskDoc, JsonCtx, SecObj) of
+ ok -> ok;
+ Error -> throw(Error)
+ end || Fun <- Db#db.validate_doc_funs],
+ ok
+ catch
+ throw:Error ->
+ Error
+ end.
+
+
+prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
+ OldFullDocInfo, LeafRevsDict, AllowConflict) ->
+ case Revs of
+ [PrevRev|_] ->
+ case dict:find({RevStart, PrevRev}, LeafRevsDict) of
+ {ok, {Deleted, DiskSp, DiskRevs}} ->
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs),
+ Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
+ {validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2};
+ false ->
+ LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end,
+ {validate_doc_update(Db, Doc, LoadDiskDoc), Doc}
+ end;
+ error when AllowConflict ->
+ couch_doc:merge_stubs(Doc, #doc{}), % will generate error if
+ % there are stubs
+ {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
+ error ->
+ {conflict, Doc}
+ end;
+ [] ->
+ % new doc, and we have existing revs.
+ % reuse existing deleted doc
+ if OldFullDocInfo#full_doc_info.deleted orelse AllowConflict ->
+ {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
+ true ->
+ {conflict, Doc}
+ end
+ end.
+
+
+
+prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
+ AccFatalErrors) ->
+ {AccPrepped, AccFatalErrors};
+prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
+ AllowConflict, AccPrepped, AccErrors) ->
+ {PreppedBucket, AccErrors3} = lists:foldl(
+ fun({#doc{revs=Revs}=Doc,Ref}, {AccBucket, AccErrors2}) ->
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
+ false -> ok
+ end,
+ case Revs of
+ {0, []} ->
+ case validate_doc_update(Db, Doc, fun() -> nil end) of
+ ok ->
+ {[{Doc, Ref} | AccBucket], AccErrors2};
+ Error ->
+ {AccBucket, [{Ref, Error} | AccErrors2]}
+ end;
+ _ ->
+ % old revs specified but none exist, a conflict
+ {AccBucket, [{Ref, conflict} | AccErrors2]}
+ end
+ end,
+ {[], AccErrors}, DocBucket),
+
+ prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
+ [lists:reverse(PreppedBucket) | AccPrepped], AccErrors3);
+prep_and_validate_updates(Db, [DocBucket|RestBuckets],
+ [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
+ AllowConflict, AccPrepped, AccErrors) ->
+ Leafs = couch_key_tree:get_all_leafs(OldRevTree),
+ LeafRevsDict = dict:from_list([
+ begin
+ Deleted = element(1, LeafVal),
+ Sp = element(2, LeafVal),
+ {{Start, RevId}, {Deleted, Sp, Revs}}
+ end ||
+ {LeafVal, {Start, [RevId | _]} = Revs} <- Leafs
+ ]),
+ {PreppedBucket, AccErrors3} = lists:foldl(
+ fun({Doc, Ref}, {Docs2Acc, AccErrors2}) ->
+ case prep_and_validate_update(Db, Doc, OldFullDocInfo,
+ LeafRevsDict, AllowConflict) of
+ {ok, Doc2} ->
+ {[{Doc2, Ref} | Docs2Acc], AccErrors2};
+ {Error, #doc{}} ->
+ % Record the error
+ {Docs2Acc, [{Ref, Error} |AccErrors2]}
+ end
+ end,
+ {[], AccErrors}, DocBucket),
+ prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
+ [PreppedBucket | AccPrepped], AccErrors3).
+
+
+update_docs(Db, Docs, Options) ->
+ update_docs(Db, Docs, Options, interactive_edit).
+
+
+prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
+ Errors2 = [{{Id, {Pos, Rev}}, Error} ||
+ {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors],
+ {lists:reverse(AccPrepped), lists:reverse(Errors2)};
+prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) ->
+ case OldInfo of
+ not_found ->
+ {ValidatedBucket, AccErrors3} = lists:foldl(
+ fun({Doc, Ref}, {AccPrepped2, AccErrors2}) ->
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
+ false -> ok
+ end,
+ case validate_doc_update(Db, Doc, fun() -> nil end) of
+ ok ->
+ {[{Doc, Ref} | AccPrepped2], AccErrors2};
+ Error ->
+ {AccPrepped2, [{Doc, Error} | AccErrors2]}
+ end
+ end,
+ {[], AccErrors}, Bucket),
+ prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
+ {ok, #full_doc_info{rev_tree=OldTree}} ->
+ NewRevTree = lists:foldl(
+ fun({NewDoc, _Ref}, AccTree) ->
+ {NewTree, _} = couch_key_tree:merge(AccTree,
+ couch_doc:to_path(NewDoc), Db#db.revs_limit),
+ NewTree
+ end,
+ OldTree, Bucket),
+ Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
+ LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
+ {ValidatedBucket, AccErrors3} =
+ lists:foldl(
+ fun({#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, Ref}, {AccValidated, AccErrors2}) ->
+ case dict:find({Pos, RevId}, LeafRevsFullDict) of
+ {ok, {Start, Path}} ->
+ % our unflushed doc is a leaf node. Go back on the path
+ % to find the previous rev that's on disk.
+
+ LoadPrevRevFun = fun() ->
+ make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
+ end,
+
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ DiskDoc = LoadPrevRevFun(),
+ Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
+ GetDiskDocFun = fun() -> DiskDoc end;
+ false ->
+ Doc2 = Doc,
+ GetDiskDocFun = LoadPrevRevFun
+ end,
+
+ case validate_doc_update(Db, Doc2, GetDiskDocFun) of
+ ok ->
+ {[{Doc2, Ref} | AccValidated], AccErrors2};
+ Error ->
+ {AccValidated, [{Doc, Error} | AccErrors2]}
+ end;
+ _ ->
+ % this doc isn't a leaf or already exists in the tree.
+ % ignore but consider it a success.
+ {AccValidated, AccErrors2}
+ end
+ end,
+ {[], AccErrors}, Bucket),
+ prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo,
+ [ValidatedBucket | AccPrepped], AccErrors3)
+ end.
+
+
+
+new_revid(#doc{body=Body,revs={OldStart,OldRevs},
+ atts=Atts,deleted=Deleted}) ->
+ case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M =/= <<>>] of
+ Atts2 when length(Atts) =/= length(Atts2) ->
+ % We must have old style non-md5 attachments
+ ?l2b(integer_to_list(couch_util:rand32()));
+ Atts2 ->
+ OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end,
+ couch_util:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2]))
+ end.
+
+new_revs([], OutBuckets, IdRevsAcc) ->
+ {lists:reverse(OutBuckets), IdRevsAcc};
+new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
+ {NewBucket, IdRevsAcc3} = lists:mapfoldl(
+ fun({#doc{revs={Start, RevIds}}=Doc, Ref}, IdRevsAcc2)->
+ NewRevId = new_revid(Doc),
+ {{Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, Ref},
+ [{Ref, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
+ end, IdRevsAcc, Bucket),
+ new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
+
+check_dup_atts(#doc{atts=Atts}=Doc) ->
+ Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
+ check_dup_atts2(Atts2),
+ Doc.
+
+check_dup_atts2([#att{name=N}, #att{name=N} | _]) ->
+ throw({bad_request, <<"Duplicate attachments">>});
+check_dup_atts2([_ | Rest]) ->
+ check_dup_atts2(Rest);
+check_dup_atts2(_) ->
+ ok.
+
+
+update_docs(Db, Docs, Options, replicated_changes) ->
+ increment_stat(Db, {couchdb, database_writes}),
+ % associate reference with each doc in order to track duplicates
+ Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end, Docs),
+ DocBuckets = before_docs_update(Db, group_alike_docs(Docs2)),
+ case (Db#db.validate_doc_funs /= []) orelse
+ lists:any(
+ fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}, _Ref}) -> true;
+ ({#doc{atts=Atts}, _Ref}) ->
+ Atts /= []
+ end, Docs2) of
+ true ->
+ Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
+ ExistingDocs = get_full_doc_infos(Db, Ids),
+
+ {DocBuckets2, DocErrors} =
+ prep_and_validate_replicated_updates(Db, DocBuckets, ExistingDocs, [], []),
+ DocBuckets3 = [Bucket || [_|_]=Bucket <- DocBuckets2]; % remove empty buckets
+ false ->
+ DocErrors = [],
+ DocBuckets3 = DocBuckets
+ end,
+ DocBuckets4 = [[{doc_flush_atts(check_dup_atts(Doc), Db#db.updater_fd), Ref}
+ || {Doc, Ref} <- Bucket] || Bucket <- DocBuckets3],
+ {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
+ {ok, DocErrors};
+
+update_docs(Db, Docs, Options, interactive_edit) ->
+ increment_stat(Db, {couchdb, database_writes}),
+ AllOrNothing = lists:member(all_or_nothing, Options),
+ % go ahead and generate the new revision ids for the documents.
+ % separate out the NonRep documents from the rest of the documents
+
+ % associate reference with each doc in order to track duplicates
+ Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end,Docs),
+ {Docs3, NonRepDocs} = lists:foldl(
+ fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) ->
+ case Id of
+ <<?LOCAL_DOC_PREFIX, _/binary>> ->
+ {DocsAcc, [Doc | NonRepDocsAcc]};
+ Id->
+ {[Doc | DocsAcc], NonRepDocsAcc}
+ end
+ end, {[], []}, Docs2),
+
+ DocBuckets = before_docs_update(Db, group_alike_docs(Docs3)),
+
+ case (Db#db.validate_doc_funs /= []) orelse
+ lists:any(
+ fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}, _Ref}) ->
+ true;
+ ({#doc{atts=Atts}, _Ref}) ->
+ Atts /= []
+ end, Docs3) of
+ true ->
+ % lookup the doc by id and get the most recent
+ Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
+ ExistingDocInfos = get_full_doc_infos(Db, Ids),
+
+ {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
+ DocBuckets, ExistingDocInfos, AllOrNothing, [], []),
+
+ % strip out any empty buckets
+ DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped];
+ false ->
+ PreCommitFailures = [],
+ DocBuckets2 = DocBuckets
+ end,
+
+ if (AllOrNothing) and (PreCommitFailures /= []) ->
+ {aborted,
+ lists:foldl(fun({#doc{id=Id,revs=Revs}, Ref},Acc) ->
+ case lists:keyfind(Ref,1,PreCommitFailures) of
+ {Ref, Error} ->
+ case Revs of
+ {Pos, [RevId|_]} ->
+ [{{Id,{Pos, RevId}}, Error} | Acc];
+ {0, []} ->
+ [{{Id,{0, <<>>}}, Error} | Acc]
+ end;
+ false ->
+ Acc
+ end
+ end,[],Docs3)};
+
+ true ->
+ Options2 = if AllOrNothing -> [merge_conflicts];
+ true -> [] end ++ Options,
+ DocBuckets3 = [[
+ {doc_flush_atts(set_new_att_revpos(
+ check_dup_atts(Doc)), Db#db.updater_fd), Ref}
+ || {Doc, Ref} <- B] || B <- DocBuckets2],
+ {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
+
+ {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
+
+ ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
+ {ok, lists:map(
+ fun({#doc{}, Ref}) ->
+ {ok, Result} = dict:find(Ref, ResultsDict),
+ Result
+ end, Docs2)}
+ end.
+
+% Returns the first available document on disk. Input list is a full rev path
+% for the doc.
+make_first_doc_on_disk(_Db, _Id, _Pos, []) ->
+ nil;
+make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #doc{}} | RestPath]) ->
+ make_first_doc_on_disk(Db, Id, Pos-1, RestPath);
+make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) ->
+ make_first_doc_on_disk(Db, Id, Pos - 1, RestPath);
+make_first_doc_on_disk(Db, Id, Pos, [{_Rev, RevValue} |_]=DocPath) ->
+ IsDel = element(1, RevValue),
+ Sp = element(2, RevValue),
+ Revs = [Rev || {Rev, _} <- DocPath],
+ make_doc(Db, Id, IsDel, Sp, {Pos, Revs}).
+
+set_commit_option(Options) ->
+ CommitSettings = {
+ [true || O <- Options, O==full_commit orelse O==delay_commit],
+ couch_config:get("couchdb", "delayed_commits", "false")
+ },
+ case CommitSettings of
+ {[true], _} ->
+ Options; % user requested explicit commit setting, do not change it
+ {_, "true"} ->
+ Options; % delayed commits are enabled, do nothing
+ {_, "false"} ->
+ [full_commit|Options];
+ {_, Else} ->
+ ?LOG_ERROR("[couchdb] delayed_commits setting must be true/false, not ~p",
+ [Else]),
+ [full_commit|Options]
+ end.
+
+collect_results(UpdatePid, MRef, ResultsAcc) ->
+ receive
+ {result, UpdatePid, Result} ->
+ collect_results(UpdatePid, MRef, [Result | ResultsAcc]);
+ {done, UpdatePid} ->
+ {ok, ResultsAcc};
+ {retry, UpdatePid} ->
+ retry;
+ {'DOWN', MRef, _, _, Reason} ->
+ exit(Reason)
+ end.
+
+write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1,
+ NonRepDocs, Options0) ->
+ DocBuckets = prepare_doc_summaries(Db, DocBuckets1),
+ Options = set_commit_option(Options0),
+ MergeConflicts = lists:member(merge_conflicts, Options),
+ FullCommit = lists:member(full_commit, Options),
+ MRef = erlang:monitor(process, UpdatePid),
+ try
+ UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
+ case collect_results(UpdatePid, MRef, []) of
+ {ok, Results} -> {ok, Results};
+ retry ->
+ % This can happen if the db file we wrote to was swapped out by
+ % compaction. Retry by reopening the db and writing to the current file
+ {ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
+ DocBuckets2 = [
+ [{doc_flush_atts(Doc, Db2#db.updater_fd), Ref} || {Doc, Ref} <- Bucket] ||
+ Bucket <- DocBuckets1
+ ],
+ % We only retry once
+ DocBuckets3 = prepare_doc_summaries(Db2, DocBuckets2),
+ close(Db2),
+ UpdatePid ! {update_docs, self(), DocBuckets3, NonRepDocs, MergeConflicts, FullCommit},
+ case collect_results(UpdatePid, MRef, []) of
+ {ok, Results} -> {ok, Results};
+ retry -> throw({update_error, compaction_retry})
+ end
+ end
+ after
+ erlang:demonitor(MRef, [flush])
+ end.
+
+
+prepare_doc_summaries(Db, BucketList) ->
+ [lists:map(
+ fun({#doc{body = Body, atts = Atts} = Doc, Ref}) ->
+ DiskAtts = [{N, T, P, AL, DL, R, M, E} ||
+ #att{name = N, type = T, data = {_, P}, md5 = M, revpos = R,
+ att_len = AL, disk_len = DL, encoding = E} <- Atts],
+ AttsFd = case Atts of
+ [#att{data = {Fd, _}} | _] ->
+ Fd;
+ [] ->
+ nil
+ end,
+ SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}),
+ {Doc#doc{body = {summary, SummaryChunk, AttsFd}}, Ref}
+ end,
+ Bucket) || Bucket <- BucketList].
+
+
+before_docs_update(#db{before_doc_update = nil}, BucketList) ->
+ BucketList;
+before_docs_update(#db{before_doc_update = Fun} = Db, BucketList) ->
+ [lists:map(
+ fun({Doc, Ref}) ->
+ NewDoc = Fun(couch_doc:with_ejson_body(Doc), Db),
+ {NewDoc, Ref}
+ end,
+ Bucket) || Bucket <- BucketList].
+
+
+set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
+ Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) ->
+ % already commited to disk, do not set new rev
+ Att;
+ (Att) ->
+ Att#att{revpos=RevPos+1}
+ end, Atts)}.
+
+
+doc_flush_atts(Doc, Fd) ->
+ Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
+
+check_md5(_NewSig, <<>>) -> ok;
+check_md5(Sig, Sig) -> ok;
+check_md5(_, _) -> throw(md5_mismatch).
+
+flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd ->
+ % already written to our file, nothing to write
+ Att;
+
+flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5,
+ disk_len=InDiskLen} = Att) ->
+ {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} =
+ couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
+ check_md5(IdentityMd5, InMd5),
+ Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=InDiskLen};
+
+flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
+ with_stream(Fd, Att, fun(OutputStream) ->
+ couch_stream:write(OutputStream, Data)
+ end);
+
+flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) ->
+ MaxChunkSize = list_to_integer(
+ couch_config:get("couchdb", "attachment_stream_buffer_size", "4096")),
+ with_stream(Fd, Att, fun(OutputStream) ->
+ % Fun(MaxChunkSize, WriterFun) must call WriterFun
+ % once for each chunk of the attachment,
+ Fun(MaxChunkSize,
+ % WriterFun({Length, Binary}, State)
+ % WriterFun({0, _Footers}, State)
+ % Called with Length == 0 on the last time.
+ % WriterFun returns NewState.
+ fun({0, Footers}, _) ->
+ F = mochiweb_headers:from_binary(Footers),
+ case mochiweb_headers:get_value("Content-MD5", F) of
+ undefined ->
+ ok;
+ Md5 ->
+ {md5, base64:decode(Md5)}
+ end;
+ ({_Length, Chunk}, _) ->
+ couch_stream:write(OutputStream, Chunk)
+ end, ok)
+ end);
+
+flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) ->
+ with_stream(Fd, Att, fun(OutputStream) ->
+ write_streamed_attachment(OutputStream, Fun, AttLen)
+ end).
+
+
+compressible_att_type(MimeType) when is_binary(MimeType) ->
+ compressible_att_type(?b2l(MimeType));
+compressible_att_type(MimeType) ->
+ TypeExpList = re:split(
+ couch_config:get("attachments", "compressible_types", ""),
+ "\\s*,\\s*",
+ [{return, list}]
+ ),
+ lists:any(
+ fun(TypeExp) ->
+ Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"),
+ "(?:\\s*;.*?)?\\s*", $$],
+ re:run(MimeType, Regexp, [caseless]) =/= nomatch
+ end,
+ [T || T <- TypeExpList, T /= []]
+ ).
+
+% From RFC 2616 3.6.1 - Chunked Transfer Coding
+%
+% In other words, the origin server is willing to accept
+% the possibility that the trailer fields might be silently
+% discarded along the path to the client.
+%
+% I take this to mean that if "Trailers: Content-MD5\r\n"
+% is present in the request, but there is no Content-MD5
+% trailer, we're free to ignore this inconsistency and
+% pretend that no Content-MD5 exists.
+with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) ->
+ BufferSize = list_to_integer(
+ couch_config:get("couchdb", "attachment_stream_buffer_size", "4096")),
+ {ok, OutputStream} = case (Enc =:= identity) andalso
+ compressible_att_type(Type) of
+ true ->
+ CompLevel = list_to_integer(
+ couch_config:get("attachments", "compression_level", "0")
+ ),
+ couch_stream:open(Fd, [{buffer_size, BufferSize},
+ {encoding, gzip}, {compression_level, CompLevel}]);
+ _ ->
+ couch_stream:open(Fd, [{buffer_size, BufferSize}])
+ end,
+ ReqMd5 = case Fun(OutputStream) of
+ {md5, FooterMd5} ->
+ case InMd5 of
+ md5_in_footer -> FooterMd5;
+ _ -> InMd5
+ end;
+ _ ->
+ InMd5
+ end,
+ {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
+ couch_stream:close(OutputStream),
+ check_md5(IdentityMd5, ReqMd5),
+ {AttLen, DiskLen, NewEnc} = case Enc of
+ identity ->
+ case {Md5, IdentityMd5} of
+ {Same, Same} ->
+ {Len, IdentityLen, identity};
+ _ ->
+ {Len, IdentityLen, gzip}
+ end;
+ gzip ->
+ case {Att#att.att_len, Att#att.disk_len} of
+ {AL, DL} when AL =:= undefined orelse DL =:= undefined ->
+ % Compressed attachment uploaded through the standalone API.
+ {Len, Len, gzip};
+ {AL, DL} ->
+ % This case is used for efficient push-replication, where a
+ % compressed attachment is located in the body of multipart
+ % content-type request.
+ {AL, DL, gzip}
+ end
+ end,
+ Att#att{
+ data={Fd,StreamInfo},
+ att_len=AttLen,
+ disk_len=DiskLen,
+ md5=Md5,
+ encoding=NewEnc
+ }.
+
+
+write_streamed_attachment(_Stream, _F, 0) ->
+ ok;
+write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 ->
+ Bin = read_next_chunk(F, LenLeft),
+ ok = couch_stream:write(Stream, Bin),
+ write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
+
+read_next_chunk(F, _) when is_function(F, 0) ->
+ F();
+read_next_chunk(F, LenLeft) when is_function(F, 1) ->
+ F(lists:min([LenLeft, 16#2000])).
+
+enum_docs_since_reduce_to_count(Reds) ->
+ couch_btree:final_reduce(
+ fun couch_db_updater:btree_by_seq_reduce/2, Reds).
+
+enum_docs_reduce_to_count(Reds) ->
+ FinalRed = couch_btree:final_reduce(
+ fun couch_db_updater:btree_by_id_reduce/2, Reds),
+ element(1, FinalRed).
+
+changes_since(Db, StartSeq, Fun, Acc) ->
+ changes_since(Db, StartSeq, Fun, [], Acc).
+
+changes_since(Db, StartSeq, Fun, Options, Acc) ->
+ Wrapper = fun(DocInfo, _Offset, Acc2) -> Fun(DocInfo, Acc2) end,
+ {ok, _LastReduction, AccOut} = couch_btree:fold(by_seq_btree(Db),
+ Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
+ {ok, AccOut}.
+
+count_changes_since(Db, SinceSeq) ->
+ BTree = by_seq_btree(Db),
+ {ok, Changes} =
+ couch_btree:fold_reduce(BTree,
+ fun(_SeqStart, PartialReds, 0) ->
+ {ok, couch_btree:final_reduce(BTree, PartialReds)}
+ end,
+ 0, [{start_key, SinceSeq + 1}]),
+ Changes.
+
+enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
+ {ok, LastReduction, AccOut} = couch_btree:fold(
+ by_seq_btree(Db), InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
+ {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
+
+enum_docs(Db, InFun, InAcc, Options) ->
+ FoldFun = skip_deleted(InFun),
+ {ok, LastReduce, OutAcc} = couch_btree:fold(
+ by_id_btree(Db), FoldFun, InAcc, Options),
+ {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
+
+% server functions
+
+init({DbName, Filepath, Fd, Options}) ->
+ {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []),
+ {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db),
+ couch_ref_counter:add(RefCntr),
+ case lists:member(sys_db, Options) of
+ true ->
+ ok;
+ false ->
+ couch_stats_collector:track_process_count({couchdb, open_databases})
+ end,
+ process_flag(trap_exit, true),
+ {ok, Db}.
+
+terminate(_Reason, Db) ->
+ couch_util:shutdown_sync(Db#db.update_pid),
+ ok.
+
+handle_call({open_ref_count, OpenerPid}, _, #db{fd_ref_counter=RefCntr}=Db) ->
+ ok = couch_ref_counter:add(RefCntr, OpenerPid),
+ {reply, {ok, Db}, Db};
+handle_call(is_idle, _From, #db{fd_ref_counter=RefCntr, compactor_pid=Compact,
+ waiting_delayed_commit=Delay}=Db) ->
+ % Idle means no referrers. Unless in the middle of a compaction file switch,
+ % there are always at least 2 referrers, couch_db_updater and us.
+ {reply, (Delay == nil) andalso (Compact == nil) andalso (couch_ref_counter:count(RefCntr) == 2), Db};
+handle_call({db_updated, NewDb}, _From, #db{fd_ref_counter=OldRefCntr}) ->
+ #db{fd_ref_counter=NewRefCntr}=NewDb,
+ case NewRefCntr =:= OldRefCntr of
+ true -> ok;
+ false ->
+ couch_ref_counter:add(NewRefCntr),
+ couch_ref_counter:drop(OldRefCntr)
+ end,
+ {reply, ok, NewDb};
+handle_call(get_db, _From, Db) ->
+ {reply, {ok, Db}, Db}.
+
+
+handle_cast(Msg, Db) ->
+ ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]),
+ exit({error, Msg}).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info({'EXIT', _Pid, normal}, Db) ->
+ {noreply, Db};
+handle_info({'EXIT', _Pid, Reason}, Server) ->
+ {stop, Reason, Server};
+handle_info(Msg, Db) ->
+ ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]),
+ exit({error, Msg}).
+
+
+%%% Internal function %%%
+open_doc_revs_int(Db, IdRevs, Options) ->
+ Ids = [Id || {Id, _Revs} <- IdRevs],
+ LookupResults = get_full_doc_infos(Db, Ids),
+ lists:zipwith(
+ fun({Id, Revs}, Lookup) ->
+ case Lookup of
+ {ok, #full_doc_info{rev_tree=RevTree}} ->
+ {FoundRevs, MissingRevs} =
+ case Revs of
+ all ->
+ {couch_key_tree:get_all_leafs(RevTree), []};
+ _ ->
+ case lists:member(latest, Options) of
+ true ->
+ couch_key_tree:get_key_leafs(RevTree, Revs);
+ false ->
+ couch_key_tree:get(RevTree, Revs)
+ end
+ end,
+ FoundResults =
+ lists:map(fun({Value, {Pos, [Rev|_]}=FoundRevPath}) ->
+ case Value of
+ ?REV_MISSING ->
+ % we have the rev in our list but know nothing about it
+ {{not_found, missing}, {Pos, Rev}};
+ RevValue ->
+ IsDeleted = element(1, RevValue),
+ SummaryPtr = element(2, RevValue),
+ {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
+ end
+ end, FoundRevs),
+ Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
+ {ok, Results};
+ not_found when Revs == all ->
+ {ok, []};
+ not_found ->
+ {ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
+ end
+ end,
+ IdRevs, LookupResults).
+
+open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
+ case couch_btree:lookup(local_btree(Db), [Id]) of
+ [{ok, {_, {Rev, BodyData}}}] ->
+ Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData},
+ apply_open_options({ok, Doc}, Options);
+ [not_found] ->
+ {not_found, missing}
+ end;
+open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) ->
+ #rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo,
+ Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}),
+ apply_open_options(
+ {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}, Options);
+open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
+ #doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} =
+ DocInfo = couch_doc:to_doc_info(FullDocInfo),
+ {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
+ Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath),
+ apply_open_options(
+ {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options);
+open_doc_int(Db, Id, Options) ->
+ case get_full_doc_info(Db, Id) of
+ {ok, FullDocInfo} ->
+ open_doc_int(Db, FullDocInfo, Options);
+ not_found ->
+ {not_found, missing}
+ end.
+
+doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTree, Options) ->
+ case lists:member(revs_info, Options) of
+ false -> [];
+ true ->
+ {[{Pos, RevPath}],[]} =
+ couch_key_tree:get_full_key_paths(RevTree, [Rev]),
+
+ [{revs_info, Pos, lists:map(
+ fun({Rev1, ?REV_MISSING}) ->
+ {Rev1, missing};
+ ({Rev1, RevValue}) ->
+ case element(1, RevValue) of
+ true ->
+ {Rev1, deleted};
+ false ->
+ {Rev1, available}
+ end
+ end, RevPath)}]
+ end ++
+ case lists:member(conflicts, Options) of
+ false -> [];
+ true ->
+ case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of
+ [] -> [];
+ ConflictRevs -> [{conflicts, ConflictRevs}]
+ end
+ end ++
+ case lists:member(deleted_conflicts, Options) of
+ false -> [];
+ true ->
+ case [Rev1 || #rev_info{rev=Rev1,deleted=true} <- RestInfo] of
+ [] -> [];
+ DelConflictRevs -> [{deleted_conflicts, DelConflictRevs}]
+ end
+ end ++
+ case lists:member(local_seq, Options) of
+ false -> [];
+ true -> [{local_seq, Seq}]
+ end.
+
+read_doc(#db{fd=Fd}, Pos) ->
+ couch_file:pread_term(Fd, Pos).
+
+
+make_doc(#db{updater_fd = Fd} = Db, Id, Deleted, Bp, RevisionPath) ->
+ {BodyData, Atts} =
+ case Bp of
+ nil ->
+ {[], []};
+ _ ->
+ {ok, {BodyData0, Atts00}} = read_doc(Db, Bp),
+ Atts0 = case Atts00 of
+ _ when is_binary(Atts00) ->
+ couch_compress:decompress(Atts00);
+ _ when is_list(Atts00) ->
+ % pre 1.2 format
+ Atts00
+ end,
+ {BodyData0,
+ lists:map(
+ fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
+ #att{name=Name,
+ type=Type,
+ att_len=AttLen,
+ disk_len=DiskLen,
+ md5=Md5,
+ revpos=RevPos,
+ data={Fd,Sp},
+ encoding=
+ case Enc of
+ true ->
+ % 0110 UPGRADE CODE
+ gzip;
+ false ->
+ % 0110 UPGRADE CODE
+ identity;
+ _ ->
+ Enc
+ end
+ };
+ ({Name,Type,Sp,AttLen,RevPos,Md5}) ->
+ #att{name=Name,
+ type=Type,
+ att_len=AttLen,
+ disk_len=AttLen,
+ md5=Md5,
+ revpos=RevPos,
+ data={Fd,Sp}};
+ ({Name,{Type,Sp,AttLen}}) ->
+ #att{name=Name,
+ type=Type,
+ att_len=AttLen,
+ disk_len=AttLen,
+ md5= <<>>,
+ revpos=0,
+ data={Fd,Sp}}
+ end, Atts0)}
+ end,
+ Doc = #doc{
+ id = Id,
+ revs = RevisionPath,
+ body = BodyData,
+ atts = Atts,
+ deleted = Deleted
+ },
+ after_doc_read(Db, Doc).
+
+
+after_doc_read(#db{after_doc_read = nil}, Doc) ->
+ Doc;
+after_doc_read(#db{after_doc_read = Fun} = Db, Doc) ->
+ Fun(couch_doc:with_ejson_body(Doc), Db).
+
+
+increment_stat(#db{options = Options}, Stat) ->
+ case lists:member(sys_db, Options) of
+ true ->
+ ok;
+ false ->
+ couch_stats_collector:increment(Stat)
+ end.
+
+local_btree(#db{local_docs_btree = BTree, fd = ReaderFd}) ->
+ BTree#btree{fd = ReaderFd}.
+
+by_seq_btree(#db{docinfo_by_seq_btree = BTree, fd = ReaderFd}) ->
+ BTree#btree{fd = ReaderFd}.
+
+by_id_btree(#db{fulldocinfo_by_id_btree = BTree, fd = ReaderFd}) ->
+ BTree#btree{fd = ReaderFd}.
+
+skip_deleted(FoldFun) ->
+ fun
+ (visit, KV, Reds, Acc) ->
+ FoldFun(KV, Reds, Acc);
+ (traverse, _LK, {Undeleted, _Del, _Size}, Acc) when Undeleted == 0 ->
+ {skip, Acc};
+ (traverse, _, _, Acc) ->
+ {ok, Acc}
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_db_update_notifier.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_update_notifier.erl b/src/couch_db_update_notifier.erl
new file mode 100644
index 0000000..bfa770a
--- /dev/null
+++ b/src/couch_db_update_notifier.erl
@@ -0,0 +1,82 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%
+% This causes an OS process to spawned and it is notified every time a database
+% is updated.
+%
+% The notifications are in the form of a the database name sent as a line of
+% text to the OS processes stdout.
+%
+
+-module(couch_db_update_notifier).
+
+-behaviour(gen_event).
+
+-export([start_link/1, notify/1]).
+-export([init/1, terminate/2, handle_event/2, handle_call/2, handle_info/2, code_change/3,stop/1]).
+
+-include("couch_db.hrl").
+
+start_link(Exec) ->
+ couch_event_sup:start_link(couch_db_update, {couch_db_update_notifier, make_ref()}, Exec).
+
+notify(Event) ->
+ gen_event:notify(couch_db_update, Event).
+
+stop(Pid) ->
+ couch_event_sup:stop(Pid).
+
+init(Exec) when is_list(Exec) -> % an exe
+ couch_os_process:start_link(Exec, []);
+init(Else) ->
+ {ok, Else}.
+
+terminate(_Reason, Pid) when is_pid(Pid) ->
+ couch_os_process:stop(Pid),
+ ok;
+terminate(_Reason, _State) ->
+ ok.
+
+handle_event(Event, Fun) when is_function(Fun, 1) ->
+ Fun(Event),
+ {ok, Fun};
+handle_event(Event, {Fun, FunAcc}) ->
+ FunAcc2 = Fun(Event, FunAcc),
+ {ok, {Fun, FunAcc2}};
+handle_event({EventType, EventDesc}, Pid) ->
+ Obj = encode_event(EventType, EventDesc),
+ ok = couch_os_process:send(Pid, Obj),
+ {ok, Pid}.
+
+handle_call(_Request, State) ->
+ {reply, ok, State}.
+
+handle_info({'EXIT', Pid, Reason}, Pid) ->
+ ?LOG_ERROR("Update notification process ~p died: ~p", [Pid, Reason]),
+ remove_handler;
+handle_info({'EXIT', _, _}, Pid) ->
+ %% the db_update event manager traps exits and forwards this message to all
+ %% its handlers. Just ignore as it wasn't our os_process that exited.
+ {ok, Pid}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+encode_event(EventType, EventDesc) when is_atom(EventType) ->
+ encode_event(atom_to_list(EventType), EventDesc);
+encode_event(EventType, EventDesc) when is_list(EventType) ->
+ encode_event(?l2b(EventType), EventDesc);
+encode_event(EventType, {DbName, DocId}) ->
+ {[{type, EventType}, {db, DbName}, {id, DocId}]};
+encode_event(EventType, DbName) ->
+ {[{type, EventType}, {db, DbName}]}.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/75f30dbe/src/couch_db_update_notifier_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_db_update_notifier_sup.erl b/src/couch_db_update_notifier_sup.erl
new file mode 100644
index 0000000..e7cc16c
--- /dev/null
+++ b/src/couch_db_update_notifier_sup.erl
@@ -0,0 +1,61 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%
+% This causes an OS process to spawned and it is notified every time a database
+% is updated.
+%
+% The notifications are in the form of a the database name sent as a line of
+% text to the OS processes stdout.
+%
+
+-module(couch_db_update_notifier_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0, init/1, config_change/3]).
+
+start_link() ->
+ supervisor:start_link({local, couch_db_update_notifier_sup},
+ couch_db_update_notifier_sup, []).
+
+init([]) ->
+ ok = couch_config:register(fun ?MODULE:config_change/3),
+
+ UpdateNotifierExes = couch_config:get("update_notification"),
+
+ {ok,
+ {{one_for_one, 10, 3600},
+ lists:map(fun({Name, UpdateNotifierExe}) ->
+ {Name,
+ {couch_db_update_notifier, start_link, [UpdateNotifierExe]},
+ permanent,
+ 1000,
+ supervisor,
+ [couch_db_update_notifier]}
+ end, UpdateNotifierExes)}}.
+
+%% @doc when update_notification configuration changes, terminate the process
+%% for that notifier and start a new one with the updated config
+config_change("update_notification", Id, Exe) ->
+ ChildSpec = {
+ Id,
+ {couch_db_update_notifier, start_link, [Exe]},
+ permanent,
+ 1000,
+ supervisor,
+ [couch_db_update_notifier]
+ },
+ supervisor:terminate_child(couch_db_update_notifier_sup, Id),
+ supervisor:delete_child(couch_db_update_notifier_sup, Id),
+ supervisor:start_child(couch_db_update_notifier_sup, ChildSpec).
+