You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/05 15:50:34 UTC
[12/49] Remove src/couch
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_compaction_daemon.erl
----------------------------------------------------------------------
diff --git a/src/couch/src/couch_compaction_daemon.erl b/src/couch/src/couch_compaction_daemon.erl
deleted file mode 100644
index 3251d5f..0000000
--- a/src/couch/src/couch_compaction_daemon.erl
+++ /dev/null
@@ -1,514 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_compaction_daemon).
--behaviour(gen_server).
--behaviour(config_listener).
-
-% public API
--export([start_link/0]).
-
-% gen_server callbacks
--export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
--export([code_change/3, terminate/2]).
-
-% config_listener api
--export([handle_config_change/5]).
-
--include_lib("couch/include/couch_db.hrl").
-
--define(CONFIG_ETS, couch_compaction_daemon_config).
-
--record(state, {
- loop_pid
-}).
-
--record(config, {
- db_frag = nil,
- view_frag = nil,
- period = nil,
- cancel = false,
- parallel_view_compact = false
-}).
-
--record(period, {
- from = nil,
- to = nil
-}).
-
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
-init(_) ->
- process_flag(trap_exit, true),
- ?CONFIG_ETS = ets:new(?CONFIG_ETS, [named_table, set, protected]),
- ok = config:listen_for_changes(?MODULE, nil),
- load_config(),
- Server = self(),
- Loop = spawn_link(fun() -> compact_loop(Server) end),
- {ok, #state{loop_pid = Loop}}.
-
-
-handle_cast({config_update, DbName, deleted}, State) ->
- true = ets:delete(?CONFIG_ETS, ?l2b(DbName)),
- {noreply, State};
-
-handle_cast({config_update, DbName, Config}, #state{loop_pid = Loop} = State) ->
- case parse_config(DbName, Config) of
- {ok, NewConfig} ->
- WasEmpty = (ets:info(?CONFIG_ETS, size) =:= 0),
- true = ets:insert(?CONFIG_ETS, {?l2b(DbName), NewConfig}),
- case WasEmpty of
- true ->
- Loop ! {self(), have_config};
- false ->
- ok
- end;
- error ->
- ok
- end,
- {noreply, State}.
-
-
-handle_call(Msg, _From, State) ->
- {stop, {unexpected_call, Msg}, State}.
-
-
-handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
- erlang:send_after(5000, self(), restart_config_listener),
- {noreply, State};
-handle_info(restart_config_listener, State) ->
- ok = config:listen_for_changes(?MODULE, nil),
- {noreply, State};
-handle_info({'EXIT', Pid, Reason}, #state{loop_pid = Pid} = State) ->
- {stop, {compaction_loop_died, Reason}, State}.
-
-
-terminate(_Reason, _State) ->
- true = ets:delete(?CONFIG_ETS).
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-handle_config_change("compactions", DbName, Value, _, _) ->
- {ok, gen_server:cast(?MODULE, {config_update, DbName, Value})};
-handle_config_change(_, _, _, _, _) ->
- {ok, nil}.
-
-
-compact_loop(Parent) ->
- {ok, _} = couch_server:all_databases(
- fun(DbName, Acc) ->
- case ets:info(?CONFIG_ETS, size) =:= 0 of
- true ->
- {stop, Acc};
- false ->
- case get_db_config(DbName) of
- nil ->
- ok;
- {ok, Config} ->
- case check_period(Config) of
- true ->
- maybe_compact_db(DbName, Config);
- false ->
- ok
- end
- end,
- {ok, Acc}
- end
- end, ok),
- case ets:info(?CONFIG_ETS, size) =:= 0 of
- true ->
- receive {Parent, have_config} -> ok end;
- false ->
- PausePeriod = list_to_integer(
- config:get("compaction_daemon", "check_interval", "300")),
- ok = timer:sleep(PausePeriod * 1000)
- end,
- compact_loop(Parent).
-
-
-maybe_compact_db(DbName, Config) ->
- etap:diag("~n~n~n~n################~nCOMPACTING: ~p~n#############~n~n",
- [DbName]),
- case (catch couch_db:open_int(DbName, [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}])) of
- {ok, Db} ->
- DDocNames = db_ddoc_names(Db),
- case can_db_compact(Config, Db) of
- true ->
- {ok, _} = couch_db:start_compact(Db),
- TimeLeft = compact_time_left(Config),
- case Config#config.parallel_view_compact of
- true ->
- ViewsCompactPid = spawn_link(fun() ->
- maybe_compact_views(DbName, DDocNames, Config)
- end),
- ViewsMonRef = erlang:monitor(process, ViewsCompactPid);
- false ->
- ViewsCompactPid = nil,
- ViewsMonRef = nil
- end,
- case couch_db:wait_for_compaction(Db, TimeLeft) of
- ok ->
- couch_db:close(Db),
- case Config#config.parallel_view_compact of
- true -> ok;
- false -> maybe_compact_views(DbName, DDocNames, Config)
- end;
- {error, timeout} ->
- ?LOG_INFO("Compaction daemon - canceling compaction "
- "for databaes `~s` because exceeded the allowed time.",
- [DbName]),
- ok = couch_db:cancel_compact(Db),
- couch_db:close(Db);
- {error, Reason} ->
- couch_db:close(Db),
- ?LOG_ERROR("Compaction daemon - an error ocurred while"
- " compacting the database `~s`: ~p", [DbName, Reason])
- end,
- case ViewsMonRef of
- nil ->
- ok;
- _ ->
- receive
- {'DOWN', ViewsMonRef, process, _, _Reason} ->
- ok
- after TimeLeft + 1000 ->
- % Under normal circunstances, the view compaction process
- % should have finished already.
- erlang:demonitor(ViewsMonRef, [flush]),
- unlink(ViewsCompactPid),
- exit(ViewsCompactPid, kill)
- end
- end;
- false ->
- couch_db:close(Db),
- maybe_compact_views(DbName, DDocNames, Config)
- end;
- _ ->
- ok
- end.
-
-
-maybe_compact_views(_DbName, [], _Config) ->
- ok;
-maybe_compact_views(DbName, [DDocName | Rest], Config) ->
- case check_period(Config) of
- true ->
- case maybe_compact_view(DbName, DDocName, Config) of
- ok ->
- maybe_compact_views(DbName, Rest, Config);
- timeout ->
- ok
- end;
- false ->
- ok
- end.
-
-
-db_ddoc_names(Db) ->
- {ok, _, DDocNames} = couch_db:enum_docs(
- Db,
- fun(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, _, Acc) ->
- {ok, Acc};
- (#full_doc_info{id = <<"_design/", Id/binary>>}, _, Acc) ->
- {ok, [Id | Acc]};
- (_, _, Acc) ->
- {stop, Acc}
- end, [], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]),
- DDocNames.
-
-
-maybe_compact_view(DbName, GroupId, Config) ->
- DDocId = <<"_design/", GroupId/binary>>,
- case (catch couch_mrview:get_info(DbName, DDocId)) of
- {ok, GroupInfo} ->
- case can_view_compact(Config, DbName, GroupId, GroupInfo) of
- true ->
- {ok, MonRef} = couch_mrview:compact(DbName, DDocId, [monitor]),
- TimeLeft = compact_time_left(Config),
- receive
- {'DOWN', MonRef, process, _, normal} ->
- ok;
- {'DOWN', MonRef, process, _, Reason} ->
- ?LOG_ERROR("Compaction daemon - an error ocurred while compacting"
- " the view group `~s` from database `~s`: ~p",
- [GroupId, DbName, Reason]),
- ok
- after TimeLeft ->
- ?LOG_INFO("Compaction daemon - canceling the compaction for the "
- "view group `~s` of the database `~s` because it's exceeding"
- " the allowed period.", [GroupId, DbName]),
- erlang:demonitor(MonRef, [flush]),
- ok = couch_mrview:cancel_compaction(DbName, DDocId),
- timeout
- end;
- false ->
- ok
- end;
- Error ->
- ?LOG_ERROR("Error opening view group `~s` from database `~s`: ~p",
- [GroupId, DbName, Error]),
- ok
- end.
-
-
-compact_time_left(#config{cancel = false}) ->
- infinity;
-compact_time_left(#config{period = nil}) ->
- infinity;
-compact_time_left(#config{period = #period{to = {ToH, ToM} = To}}) ->
- {H, M, _} = time(),
- case To > {H, M} of
- true ->
- ((ToH - H) * 60 * 60 * 1000) + (abs(ToM - M) * 60 * 1000);
- false ->
- ((24 - H + ToH) * 60 * 60 * 1000) + (abs(ToM - M) * 60 * 1000)
- end.
-
-
-get_db_config(DbName) ->
- case ets:lookup(?CONFIG_ETS, DbName) of
- [] ->
- case ets:lookup(?CONFIG_ETS, <<"_default">>) of
- [] ->
- nil;
- [{<<"_default">>, Config}] ->
- {ok, Config}
- end;
- [{DbName, Config}] ->
- {ok, Config}
- end.
-
-
-can_db_compact(#config{db_frag = Threshold} = Config, Db) ->
- case check_period(Config) of
- false ->
- false;
- true ->
- {ok, DbInfo} = couch_db:get_db_info(Db),
- {Frag, SpaceRequired} = frag(DbInfo),
- ?LOG_DEBUG("Fragmentation for database `~s` is ~p%, estimated space for"
- " compaction is ~p bytes.", [Db#db.name, Frag, SpaceRequired]),
- case check_frag(Threshold, Frag) of
- false ->
- false;
- true ->
- Free = free_space(config:get("couchdb", "database_dir")),
- case Free >= SpaceRequired of
- true ->
- true;
- false ->
- ?LOG_WARN("Compaction daemon - skipping database `~s` "
- "compaction: the estimated necessary disk space is about ~p"
- " bytes but the currently available disk space is ~p bytes.",
- [Db#db.name, SpaceRequired, Free]),
- false
- end
- end
- end.
-
-can_view_compact(Config, DbName, GroupId, GroupInfo) ->
- case check_period(Config) of
- false ->
- false;
- true ->
- case couch_util:get_value(updater_running, GroupInfo) of
- true ->
- false;
- false ->
- {Frag, SpaceRequired} = frag(GroupInfo),
- ?LOG_DEBUG("Fragmentation for view group `~s` (database `~s`) is "
- "~p%, estimated space for compaction is ~p bytes.",
- [GroupId, DbName, Frag, SpaceRequired]),
- case check_frag(Config#config.view_frag, Frag) of
- false ->
- false;
- true ->
- Free = free_space(couch_index_util:root_dir()),
- case Free >= SpaceRequired of
- true ->
- true;
- false ->
- ?LOG_WARN("Compaction daemon - skipping view group `~s` "
- "compaction (database `~s`): the estimated necessary "
- "disk space is about ~p bytes but the currently available"
- " disk space is ~p bytes.",
- [GroupId, DbName, SpaceRequired, Free]),
- false
- end
- end
- end
- end.
-
-
-check_period(#config{period = nil}) ->
- true;
-check_period(#config{period = #period{from = From, to = To}}) ->
- {HH, MM, _} = erlang:time(),
- case From < To of
- true ->
- ({HH, MM} >= From) andalso ({HH, MM} < To);
- false ->
- ({HH, MM} >= From) orelse ({HH, MM} < To)
- end.
-
-
-check_frag(nil, _) ->
- true;
-check_frag(Threshold, Frag) ->
- Frag >= Threshold.
-
-
-frag(Props) ->
- FileSize = couch_util:get_value(disk_size, Props),
- MinFileSize = list_to_integer(
- config:get("compaction_daemon", "min_file_size", "131072")),
- case FileSize < MinFileSize of
- true ->
- {0, FileSize};
- false ->
- case couch_util:get_value(data_size, Props) of
- null ->
- {100, FileSize};
- 0 ->
- {0, FileSize};
- DataSize ->
- Frag = round(((FileSize - DataSize) / FileSize * 100)),
- {Frag, space_required(DataSize)}
- end
- end.
-
-% Rough, and pessimistic, estimation of necessary disk space to compact a
-% database or view index.
-space_required(DataSize) ->
- round(DataSize * 2.0).
-
-
-load_config() ->
- lists:foreach(
- fun({DbName, ConfigString}) ->
- case parse_config(DbName, ConfigString) of
- {ok, Config} ->
- true = ets:insert(?CONFIG_ETS, {?l2b(DbName), Config});
- error ->
- ok
- end
- end,
- config:get("compactions")).
-
-parse_config(DbName, ConfigString) ->
- case (catch do_parse_config(ConfigString)) of
- {ok, Conf} ->
- {ok, Conf};
- incomplete_period ->
- ?LOG_ERROR("Incomplete period ('to' or 'from' missing) in the compaction"
- " configuration for database `~s`", [DbName]),
- error;
- _ ->
- ?LOG_ERROR("Invalid compaction configuration for database "
- "`~s`: `~s`", [DbName, ConfigString]),
- error
- end.
-
-do_parse_config(ConfigString) ->
- {ok, ConfProps} = couch_util:parse_term(ConfigString),
- {ok, #config{period = Period} = Conf} = config_record(ConfProps, #config{}),
- case Period of
- nil ->
- {ok, Conf};
- #period{from = From, to = To} when From =/= nil, To =/= nil ->
- {ok, Conf};
- #period{} ->
- incomplete_period
- end.
-
-config_record([], Config) ->
- {ok, Config};
-
-config_record([{db_fragmentation, V} | Rest], Config) ->
- [Frag] = string:tokens(V, "%"),
- config_record(Rest, Config#config{db_frag = list_to_integer(Frag)});
-
-config_record([{view_fragmentation, V} | Rest], Config) ->
- [Frag] = string:tokens(V, "%"),
- config_record(Rest, Config#config{view_frag = list_to_integer(Frag)});
-
-config_record([{from, V} | Rest], #config{period = Period0} = Config) ->
- Time = parse_time(V),
- Period = case Period0 of
- nil ->
- #period{from = Time};
- #period{} ->
- Period0#period{from = Time}
- end,
- config_record(Rest, Config#config{period = Period});
-
-config_record([{to, V} | Rest], #config{period = Period0} = Config) ->
- Time = parse_time(V),
- Period = case Period0 of
- nil ->
- #period{to = Time};
- #period{} ->
- Period0#period{to = Time}
- end,
- config_record(Rest, Config#config{period = Period});
-
-config_record([{strict_window, true} | Rest], Config) ->
- config_record(Rest, Config#config{cancel = true});
-
-config_record([{strict_window, false} | Rest], Config) ->
- config_record(Rest, Config#config{cancel = false});
-
-config_record([{parallel_view_compaction, true} | Rest], Config) ->
- config_record(Rest, Config#config{parallel_view_compact = true});
-
-config_record([{parallel_view_compaction, false} | Rest], Config) ->
- config_record(Rest, Config#config{parallel_view_compact = false}).
-
-
-parse_time(String) ->
- [HH, MM] = string:tokens(String, ":"),
- {list_to_integer(HH), list_to_integer(MM)}.
-
-
-free_space(Path) ->
- DiskData = lists:sort(
- fun({PathA, _, _}, {PathB, _, _}) ->
- length(filename:split(PathA)) > length(filename:split(PathB))
- end,
- disksup:get_disk_data()),
- free_space_rec(abs_path(Path), DiskData).
-
-free_space_rec(_Path, []) ->
- undefined;
-free_space_rec(Path, [{MountPoint0, Total, Usage} | Rest]) ->
- MountPoint = abs_path(MountPoint0),
- case MountPoint =:= string:substr(Path, 1, length(MountPoint)) of
- false ->
- free_space_rec(Path, Rest);
- true ->
- trunc(Total - (Total * (Usage / 100))) * 1024
- end.
-
-abs_path(Path0) ->
- Path = filename:absname(Path0),
- case lists:last(Path) of
- $/ ->
- Path;
- _ ->
- Path ++ "/"
- end.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_compress.erl
----------------------------------------------------------------------
diff --git a/src/couch/src/couch_compress.erl b/src/couch/src/couch_compress.erl
deleted file mode 100644
index 6b47a7a..0000000
--- a/src/couch/src/couch_compress.erl
+++ /dev/null
@@ -1,84 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_compress).
-
--export([compress/2, decompress/1, is_compressed/2]).
--export([get_compression_method/0]).
-
--include_lib("couch/include/couch_db.hrl").
-
-% binaries compressed with snappy have their first byte set to this value
--define(SNAPPY_PREFIX, 1).
-% Term prefixes documented at:
-% http://www.erlang.org/doc/apps/erts/erl_ext_dist.html
--define(TERM_PREFIX, 131).
--define(COMPRESSED_TERM_PREFIX, 131, 80).
-
-
-get_compression_method() ->
- case config:get("couchdb", "file_compression") of
- undefined ->
- ?DEFAULT_COMPRESSION;
- Method1 ->
- case string:tokens(Method1, "_") of
- [Method] ->
- list_to_existing_atom(Method);
- [Method, Level] ->
- {list_to_existing_atom(Method), list_to_integer(Level)}
- end
- end.
-
-
-compress(<<?SNAPPY_PREFIX, _/binary>> = Bin, snappy) ->
- Bin;
-compress(<<?SNAPPY_PREFIX, _/binary>> = Bin, Method) ->
- compress(decompress(Bin), Method);
-compress(<<?TERM_PREFIX, _/binary>> = Bin, Method) ->
- compress(decompress(Bin), Method);
-compress(Term, none) ->
- ?term_to_bin(Term);
-compress(Term, {deflate, Level}) ->
- term_to_binary(Term, [{minor_version, 1}, {compressed, Level}]);
-compress(Term, snappy) ->
- Bin = ?term_to_bin(Term),
- try
- {ok, CompressedBin} = snappy:compress(Bin),
- case byte_size(CompressedBin) < byte_size(Bin) of
- true ->
- <<?SNAPPY_PREFIX, CompressedBin/binary>>;
- false ->
- Bin
- end
- catch exit:snappy_nif_not_loaded ->
- Bin
- end.
-
-
-decompress(<<?SNAPPY_PREFIX, Rest/binary>>) ->
- {ok, TermBin} = snappy:decompress(Rest),
- binary_to_term(TermBin);
-decompress(<<?TERM_PREFIX, _/binary>> = Bin) ->
- binary_to_term(Bin).
-
-
-is_compressed(<<?SNAPPY_PREFIX, _/binary>>, Method) ->
- Method =:= snappy;
-is_compressed(<<?COMPRESSED_TERM_PREFIX, _/binary>>, {deflate, _Level}) ->
- true;
-is_compressed(<<?COMPRESSED_TERM_PREFIX, _/binary>>, _Method) ->
- false;
-is_compressed(<<?TERM_PREFIX, _/binary>>, Method) ->
- Method =:= none;
-is_compressed(Term, _Method) when not is_binary(Term) ->
- false.
-
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_config.erl
----------------------------------------------------------------------
diff --git a/src/couch/src/couch_config.erl b/src/couch/src/couch_config.erl
deleted file mode 100644
index 5d13dff..0000000
--- a/src/couch/src/couch_config.erl
+++ /dev/null
@@ -1,251 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-% Reads CouchDB's ini file and gets queried for configuration parameters.
-% This module is initialized with a list of ini files that it consecutively
-% reads Key/Value pairs from and saves them in an ets table. If more than one
-% ini file is specified, the last one is used to write changes that are made
-% with store/2 back to that ini file.
-
--module(couch_config).
--behaviour(gen_server).
-
--include_lib("couch/include/couch_db.hrl").
-
-
--export([start_link/1, stop/0]).
--export([all/0, get/1, get/2, get/3, set/3, set/4, delete/2, delete/3]).
--export([register/1, register/2]).
--export([parse_ini_file/1]).
-
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
-
--record(config, {
- notify_funs=[],
- write_filename=undefined
-}).
-
-
-start_link(IniFiles) ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, IniFiles, []).
-
-stop() ->
- gen_server:cast(?MODULE, stop).
-
-
-all() ->
- lists:sort(gen_server:call(?MODULE, all, infinity)).
-
-
-get(Section) when is_binary(Section) ->
- ?MODULE:get(?b2l(Section));
-get(Section) ->
- Matches = ets:match(?MODULE, {{Section, '$1'}, '$2'}),
- [{Key, Value} || [Key, Value] <- Matches].
-
-get(Section, Key) ->
- ?MODULE:get(Section, Key, undefined).
-
-get(Section, Key, Default) when is_binary(Section) and is_binary(Key) ->
- ?MODULE:get(?b2l(Section), ?b2l(Key), Default);
-get(Section, Key, Default) ->
- case ets:lookup(?MODULE, {Section, Key}) of
- [] -> Default;
- [{_, Match}] -> Match
- end.
-
-set(Section, Key, Value) ->
- ?MODULE:set(Section, Key, Value, true).
-
-set(Section, Key, Value, Persist) when is_binary(Section) and is_binary(Key) ->
- ?MODULE:set(?b2l(Section), ?b2l(Key), Value, Persist);
-set(Section, Key, Value, Persist) ->
- gen_server:call(?MODULE, {set, Section, Key, Value, Persist}).
-
-
-delete(Section, Key) when is_binary(Section) and is_binary(Key) ->
- delete(?b2l(Section), ?b2l(Key));
-delete(Section, Key) ->
- delete(Section, Key, true).
-
-delete(Section, Key, Persist) when is_binary(Section) and is_binary(Key) ->
- delete(?b2l(Section), ?b2l(Key), Persist);
-delete(Section, Key, Persist) ->
- gen_server:call(?MODULE, {delete, Section, Key, Persist}).
-
-
-register(Fun) ->
- ?MODULE:register(Fun, self()).
-
-register(Fun, Pid) ->
- gen_server:call(?MODULE, {register, Fun, Pid}).
-
-
-init(IniFiles) ->
- ets:new(?MODULE, [named_table, set, protected]),
- try
- lists:map(fun(IniFile) ->
- {ok, ParsedIniValues} = parse_ini_file(IniFile),
- ets:insert(?MODULE, ParsedIniValues)
- end, IniFiles),
- WriteFile = case IniFiles of
- [_|_] -> lists:last(IniFiles);
- _ -> undefined
- end,
- {ok, #config{write_filename = WriteFile}}
- catch _Tag:Error ->
- {stop, Error}
- end.
-
-
-terminate(_Reason, _State) ->
- ok.
-
-
-handle_call(all, _From, Config) ->
- Resp = lists:sort((ets:tab2list(?MODULE))),
- {reply, Resp, Config};
-handle_call({set, Sec, Key, Val, Persist}, From, Config) ->
- Result = case {Persist, Config#config.write_filename} of
- {true, undefined} ->
- ok;
- {true, FileName} ->
- couch_config_writer:save_to_file({{Sec, Key}, Val}, FileName);
- _ ->
- ok
- end,
- case Result of
- ok ->
- true = ets:insert(?MODULE, {{Sec, Key}, Val}),
- spawn_link(fun() ->
- [catch F(Sec, Key, Val, Persist) || {_Pid, F} <- Config#config.notify_funs],
- gen_server:reply(From, ok)
- end),
- {noreply, Config};
- _Error ->
- {reply, Result, Config}
- end;
-handle_call({delete, Sec, Key, Persist}, From, Config) ->
- true = ets:delete(?MODULE, {Sec,Key}),
- case {Persist, Config#config.write_filename} of
- {true, undefined} ->
- ok;
- {true, FileName} ->
- couch_config_writer:save_to_file({{Sec, Key}, ""}, FileName);
- _ ->
- ok
- end,
- spawn_link(fun() ->
- [catch F(Sec, Key, deleted, Persist) || {_Pid, F} <- Config#config.notify_funs],
- gen_server:reply(From, ok)
- end),
- {noreply, Config};
-handle_call({register, Fun, Pid}, _From, #config{notify_funs=PidFuns}=Config) ->
- erlang:monitor(process, Pid),
- % convert 1 and 2 arity to 3 arity
- Fun2 =
- case Fun of
- _ when is_function(Fun, 1) ->
- fun(Section, _Key, _Value, _Persist) -> Fun(Section) end;
- _ when is_function(Fun, 2) ->
- fun(Section, Key, _Value, _Persist) -> Fun(Section, Key) end;
- _ when is_function(Fun, 3) ->
- fun(Section, Key, Value, _Persist) -> Fun(Section, Key, Value) end;
- _ when is_function(Fun, 4) ->
- Fun
- end,
- {reply, ok, Config#config{notify_funs=[{Pid, Fun2} | PidFuns]}}.
-
-
-handle_cast(stop, State) ->
- {stop, normal, State};
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info({'DOWN', _, _, DownPid, _}, #config{notify_funs=PidFuns}=Config) ->
- % remove any funs registered by the downed process
- FilteredPidFuns = [{Pid,Fun} || {Pid,Fun} <- PidFuns, Pid /= DownPid],
- {noreply, Config#config{notify_funs=FilteredPidFuns}}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-parse_ini_file(IniFile) ->
- IniFilename = couch_util:abs_pathname(IniFile),
- IniBin =
- case file:read_file(IniFilename) of
- {ok, IniBin0} ->
- IniBin0;
- {error, Reason} = Error ->
- ?LOG_ERROR("Could not read server configuration file ~s: ~s",
- [IniFilename, file:format_error(Reason)]),
- throw(Error)
- end,
-
- Lines = re:split(IniBin, "\r\n|\n|\r|\032", [{return, list}]),
- {_, ParsedIniValues} =
- lists:foldl(fun(Line, {AccSectionName, AccValues}) ->
- case string:strip(Line) of
- "[" ++ Rest ->
- case re:split(Rest, "\\]", [{return, list}]) of
- [NewSectionName, ""] ->
- {NewSectionName, AccValues};
- _Else -> % end bracket not at end, ignore this line
- {AccSectionName, AccValues}
- end;
- ";" ++ _Comment ->
- {AccSectionName, AccValues};
- Line2 ->
- case re:split(Line2, "\s*=\s*", [{return, list}]) of
- [Value] ->
- MultiLineValuePart = case re:run(Line, "^ \\S", []) of
- {match, _} ->
- true;
- _ ->
- false
- end,
- case {MultiLineValuePart, AccValues} of
- {true, [{{_, ValueName}, PrevValue} | AccValuesRest]} ->
- % remove comment
- case re:split(Value, " ;|\t;", [{return, list}]) of
- [[]] ->
- % empty line
- {AccSectionName, AccValues};
- [LineValue | _Rest] ->
- E = {{AccSectionName, ValueName},
- PrevValue ++ " " ++ LineValue},
- {AccSectionName, [E | AccValuesRest]}
- end;
- _ ->
- {AccSectionName, AccValues}
- end;
- [""|_LineValues] -> % line begins with "=", ignore
- {AccSectionName, AccValues};
- [ValueName|LineValues] -> % yeehaw, got a line!
- RemainingLine = couch_util:implode(LineValues, "="),
- % removes comments
- case re:split(RemainingLine, " ;|\t;", [{return, list}]) of
- [[]] ->
- % empty line means delete this key
- ets:delete(?MODULE, {AccSectionName, ValueName}),
- {AccSectionName, AccValues};
- [LineValue | _Rest] ->
- {AccSectionName,
- [{{AccSectionName, ValueName}, LineValue} | AccValues]}
- end
- end
- end
- end, {"", []}, Lines),
- {ok, ParsedIniValues}.
-
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_config_writer.erl
----------------------------------------------------------------------
diff --git a/src/couch/src/couch_config_writer.erl b/src/couch/src/couch_config_writer.erl
deleted file mode 100644
index f3c9cca..0000000
--- a/src/couch/src/couch_config_writer.erl
+++ /dev/null
@@ -1,88 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-%% @doc Saves a Key/Value pair to a ini file. The Key consists of a Section
-%% and Option combination. If that combination is found in the ini file
-%% the new value replaces the old value. If only the Section is found the
-%% Option and value combination is appended to the Section. If the Section
-%% does not yet exist in the ini file, it is added and the Option/Value
-%% pair is appended.
-%% @see couch_config
-
--module(couch_config_writer).
-
--export([save_to_file/2]).
-
--include_lib("couch/include/couch_db.hrl").
-
-%% @spec save_to_file(
-%% Config::{{Section::string(), Option::string()}, Value::string()},
-%% File::filename()) -> ok
-%% @doc Saves a Section/Key/Value triple to the ini file File::filename()
-save_to_file({{Section, Key}, Value}, File) ->
- {ok, OldFileContents} = file:read_file(File),
- Lines = re:split(OldFileContents, "\r\n|\n|\r|\032", [{return, list}]),
-
- SectionLine = "[" ++ Section ++ "]",
- {ok, Pattern} = re:compile(["^(", Key, "\\s*=)|\\[[a-zA-Z0-9\_-]*\\]"]),
-
- NewLines = process_file_lines(Lines, [], SectionLine, Pattern, Key, Value),
- NewFileContents = reverse_and_add_newline(strip_empty_lines(NewLines), []),
- case file:write_file(File, NewFileContents) of
- ok ->
- ok;
- {error, Reason} = Error ->
- ?LOG_ERROR("Could not write config file ~s: ~s",
- [File, file:format_error(Reason)]),
- Error
- end.
-
-
-process_file_lines([Section|Rest], SeenLines, Section, Pattern, Key, Value) ->
- process_section_lines(Rest, [Section|SeenLines], Pattern, Key, Value);
-
-process_file_lines([Line|Rest], SeenLines, Section, Pattern, Key, Value) ->
- process_file_lines(Rest, [Line|SeenLines], Section, Pattern, Key, Value);
-
-process_file_lines([], SeenLines, Section, _Pattern, Key, Value) ->
- % Section wasn't found. Append it with the option here.
- [Key ++ " = " ++ Value, Section, "" | strip_empty_lines(SeenLines)].
-
-
-process_section_lines([Line|Rest], SeenLines, Pattern, Key, Value) ->
- case re:run(Line, Pattern, [{capture, all_but_first}]) of
- nomatch -> % Found nothing interesting. Move on.
- process_section_lines(Rest, [Line|SeenLines], Pattern, Key, Value);
- {match, []} -> % Found another section. Append the option here.
- lists:reverse(Rest) ++
- [Line, "", Key ++ " = " ++ Value | strip_empty_lines(SeenLines)];
- {match, _} -> % Found the option itself. Replace it.
- lists:reverse(Rest) ++ [Key ++ " = " ++ Value | SeenLines]
- end;
-
-process_section_lines([], SeenLines, _Pattern, Key, Value) ->
- % Found end of file within the section. Append the option here.
- [Key ++ " = " ++ Value | strip_empty_lines(SeenLines)].
-
-
-reverse_and_add_newline([Line|Rest], Content) ->
- reverse_and_add_newline(Rest, [Line, "\n", Content]);
-
-reverse_and_add_newline([], Content) ->
- Content.
-
-
-strip_empty_lines(["" | Rest]) ->
- strip_empty_lines(Rest);
-
-strip_empty_lines(All) ->
- All.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
deleted file mode 100644
index 32a0049..0000000
--- a/src/couch/src/couch_db.erl
+++ /dev/null
@@ -1,1412 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_db).
-
--export([open/2,open_int/2,close/1,create/2,get_db_info/1,get_design_docs/1]).
--export([start_compact/1, cancel_compact/1]).
--export([wait_for_compaction/1, wait_for_compaction/2]).
--export([is_idle/1,monitor/1,count_changes_since/2]).
--export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
--export([get_doc_info/2,get_full_doc_info/2,get_full_doc_infos/2]).
--export([open_doc/2,open_doc/3,open_doc_revs/4]).
--export([set_revs_limit/2,get_revs_limit/1]).
--export([get_missing_revs/2,name/1,get_update_seq/1,get_committed_update_seq/1]).
--export([enum_docs/4,enum_docs_since/5]).
--export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
--export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
--export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]).
--export([set_security/2,get_security/1]).
--export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]).
--export([check_is_admin/1, check_is_member/1, get_doc_count/1]).
--export([reopen/1, is_system_db/1, compression/1, make_doc/5]).
--export([load_validation_funs/1]).
-
--include_lib("couch/include/couch_db.hrl").
-
--define(VALID_DB_NAME, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$").
-
-start_link(DbName, Filepath, Options) ->
- case open_db_file(Filepath, Options) of
- {ok, Fd} ->
- {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName,
- Filepath, Fd, Options}, []),
- unlink(Fd),
- gen_server:call(UpdaterPid, get_db);
- Else ->
- Else
- end.
-
-open_db_file(Filepath, Options) ->
- case couch_file:open(Filepath, Options) of
- {ok, Fd} ->
- {ok, Fd};
- {error, enoent} ->
- % couldn't find file. is there a compact version? This can happen if
- % crashed during the file switch.
- case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of
- {ok, Fd} ->
- ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
- ok = file:rename(Filepath ++ ".compact", Filepath),
- ok = couch_file:sync(Fd),
- {ok, Fd};
- {error, enoent} ->
- {not_found, no_db_file}
- end;
- Error ->
- Error
- end.
-
-
-create(DbName, Options) ->
- couch_server:create(DbName, Options).
-
-% this is for opening a database for internal purposes like the replicator
-% or the view indexer. it never throws a reader error.
-open_int(DbName, Options) ->
- couch_server:open(DbName, Options).
-
-% this should be called anytime an http request opens the database.
-% it ensures that the http userCtx is a valid reader
-open(DbName, Options) ->
- case couch_server:open(DbName, Options) of
- {ok, Db} ->
- try
- check_is_member(Db),
- {ok, Db}
- catch
- throw:Error ->
- close(Db),
- throw(Error)
- end;
- Else -> Else
- end.
-
-reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
- {ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity),
- case NewFd =:= Fd of
- true ->
- {ok, NewDb#db{user_ctx = UserCtx}};
- false ->
- erlang:demonitor(OldRef, [flush]),
- NewRef = erlang:monitor(process, NewFd),
- {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}}
- end.
-
-is_system_db(#db{options = Options}) ->
- lists:member(sys_db, Options).
-
-ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
- ok = gen_server:call(Pid, full_commit, infinity),
- {ok, StartTime}.
-
-ensure_full_commit(Db, RequiredSeq) ->
- #db{main_pid=Pid, instance_start_time=StartTime} = Db,
- ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity),
- {ok, StartTime}.
-
-close(#db{fd_monitor=RefCntr}) ->
- erlang:demonitor(RefCntr, [flush]),
- ok.
-
-is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) ->
- case erlang:process_info(Db#db.fd, monitored_by) of
- undefined ->
- true;
- {monitored_by, Pids} ->
- (Pids -- [Db#db.main_pid, whereis(couch_stats_collector)]) =:= []
- end;
-is_idle(_Db) ->
- false.
-
-monitor(#db{main_pid=MainPid}) ->
- erlang:monitor(process, MainPid).
-
-start_compact(#db{main_pid=Pid}) ->
- gen_server:call(Pid, start_compact).
-
-cancel_compact(#db{main_pid=Pid}) ->
- gen_server:call(Pid, cancel_compact).
-
-wait_for_compaction(Db) ->
- wait_for_compaction(Db, infinity).
-
-wait_for_compaction(#db{main_pid=Pid}=Db, Timeout) ->
- Start = erlang:now(),
- case gen_server:call(Pid, compactor_pid) of
- CPid when is_pid(CPid) ->
- Ref = erlang:monitor(process, CPid),
- receive
- {'DOWN', Ref, _, _, normal} when Timeout == infinity ->
- wait_for_compaction(Db, Timeout);
- {'DOWN', Ref, _, _, normal} ->
- Elapsed = timer:now_diff(now(), Start) div 1000,
- wait_for_compaction(Db, Timeout - Elapsed);
- {'DOWN', Ref, _, _, Reason} ->
- {error, Reason}
- after Timeout ->
- erlang:demonitor(Ref, [flush]),
- {error, Timeout}
- end;
- _ ->
- ok
- end.
-
-delete_doc(Db, Id, Revisions) ->
- DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
- {ok, [Result]} = update_docs(Db, DeletedDocs, []),
- {ok, Result}.
-
-open_doc(Db, IdOrDocInfo) ->
- open_doc(Db, IdOrDocInfo, []).
-
-open_doc(Db, Id, Options) ->
- increment_stat(Db, {couchdb, database_reads}),
- case open_doc_int(Db, Id, Options) of
- {ok, #doc{deleted=true}=Doc} ->
- case lists:member(deleted, Options) of
- true ->
- apply_open_options({ok, Doc},Options);
- false ->
- {not_found, deleted}
- end;
- Else ->
- apply_open_options(Else,Options)
- end.
-
-apply_open_options({ok, Doc},Options) ->
- apply_open_options2(Doc,Options);
-apply_open_options(Else,_Options) ->
- Else.
-
-apply_open_options2(Doc,[]) ->
- {ok, Doc};
-apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc,
- [{atts_since, PossibleAncestors}|Rest]) ->
- RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors),
- apply_open_options2(Doc#doc{atts=[A#att{data=
- if AttPos>RevPos -> Data; true -> stub end}
- || #att{revpos=AttPos,data=Data}=A <- Atts]}, Rest);
-apply_open_options2(Doc, [ejson_body | Rest]) ->
- apply_open_options2(couch_doc:with_ejson_body(Doc), Rest);
-apply_open_options2(Doc,[_|Rest]) ->
- apply_open_options2(Doc,Rest).
-
-
-find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
- 0;
-find_ancestor_rev_pos(_DocRevs, []) ->
- 0;
-find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
- case lists:member({RevPos, RevId}, AttsSinceRevs) of
- true ->
- RevPos;
- false ->
- find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
- end.
-
-open_doc_revs(Db, Id, Revs, Options) ->
- increment_stat(Db, {couchdb, database_reads}),
- [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
- {ok, [apply_open_options(Result, Options) || Result <- Results]}.
-
-% Each returned result is a list of tuples:
-% {Id, MissingRevs, PossibleAncestors}
-% if no revs are missing, it's omitted from the results.
-get_missing_revs(Db, IdRevsList) ->
- Results = get_full_doc_infos(Db, [Id1 || {Id1, _Revs} <- IdRevsList]),
- {ok, find_missing(IdRevsList, Results)}.
-
-find_missing([], []) ->
- [];
-find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
- case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of
- [] ->
- find_missing(RestIdRevs, RestLookupInfo);
- MissingRevs ->
- #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
- LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
- % Find the revs that are possible parents of this rev
- PossibleAncestors =
- lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
- % this leaf is a "possible ancenstor" of the missing
- % revs if this LeafPos lessthan any of the missing revs
- case lists:any(fun({MissingPos, _}) ->
- LeafPos < MissingPos end, MissingRevs) of
- true ->
- [{LeafPos, LeafRevId} | Acc];
- false ->
- Acc
- end
- end, [], LeafRevs),
- [{Id, MissingRevs, PossibleAncestors} |
- find_missing(RestIdRevs, RestLookupInfo)]
- end;
-find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
- [{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)].
-
-get_doc_info(Db, Id) ->
- case get_full_doc_info(Db, Id) of
- {ok, DocInfo} ->
- {ok, couch_doc:to_doc_info(DocInfo)};
- Else ->
- Else
- end.
-
-% returns {ok, DocInfo} or not_found
-get_full_doc_info(Db, Id) ->
- [Result] = get_full_doc_infos(Db, [Id]),
- Result.
-
-get_full_doc_infos(Db, Ids) ->
- couch_btree:lookup(Db#db.id_tree, Ids).
-
-increment_update_seq(#db{main_pid=Pid}) ->
- gen_server:call(Pid, increment_update_seq).
-
-purge_docs(#db{main_pid=Pid}, IdsRevs) ->
- gen_server:call(Pid, {purge_docs, IdsRevs}).
-
-get_committed_update_seq(#db{committed_update_seq=Seq}) ->
- Seq.
-
-get_update_seq(#db{update_seq=Seq})->
- Seq.
-
-get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})->
- PurgeSeq.
-
-get_last_purged(#db{header=#db_header{purged_docs=nil}}) ->
- {ok, []};
-get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) ->
- couch_file:pread_term(Fd, PurgedPointer).
-
-get_doc_count(Db) ->
- {ok, {Count, _, _}} = couch_btree:full_reduce(Db#db.id_tree),
- {ok, Count}.
-
-get_db_info(Db) ->
- #db{fd=Fd,
- header=#db_header{disk_version=DiskVersion},
- compactor_pid=Compactor,
- update_seq=SeqNum,
- name=Name,
- instance_start_time=StartTime,
- committed_update_seq=CommittedUpdateSeq,
- id_tree = IdBtree,
- seq_tree = SeqBtree,
- local_tree = LocalBtree
- } = Db,
- {ok, Size} = couch_file:bytes(Fd),
- {ok, DbReduction} = couch_btree:full_reduce(IdBtree),
- InfoList = [
- {db_name, Name},
- {doc_count, element(1, DbReduction)},
- {doc_del_count, element(2, DbReduction)},
- {update_seq, SeqNum},
- {purge_seq, couch_db:get_purge_seq(Db)},
- {compact_running, Compactor/=nil},
- {disk_size, Size},
- {data_size, db_data_size(DbReduction, [SeqBtree, IdBtree, LocalBtree])},
- {instance_start_time, StartTime},
- {disk_format_version, DiskVersion},
- {committed_update_seq, CommittedUpdateSeq}
- ],
- {ok, InfoList}.
-
-db_data_size({_Count, _DelCount}, _Trees) ->
- % pre 1.2 format, upgraded on compaction
- null;
-db_data_size({_Count, _DelCount, nil}, _Trees) ->
- null;
-db_data_size({_Count, _DelCount, DocAndAttsSize}, Trees) ->
- sum_tree_sizes(DocAndAttsSize, Trees).
-
-sum_tree_sizes(Acc, []) ->
- Acc;
-sum_tree_sizes(Acc, [T | Rest]) ->
- case couch_btree:size(T) of
- nil ->
- null;
- Sz ->
- sum_tree_sizes(Acc + Sz, Rest)
- end.
-
-get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
- {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
- receive {'DOWN', Ref, _, _, Response} ->
- Response
- end;
-get_design_docs(#db{id_tree = IdBtree}) ->
- FoldFun = skip_deleted(fun
- (#full_doc_info{deleted = true}, _Reds, Acc) ->
- {ok, Acc};
- (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, Acc) ->
- {ok, [FullDocInfo | Acc]};
- (_, _Reds, Acc) ->
- {stop, Acc}
- end),
- KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
- {ok, _, Docs} = couch_btree:fold(IdBtree, FoldFun, [], KeyOpts),
- {ok, Docs}.
-
-check_is_admin(#db{user_ctx=#user_ctx{name=Name,roles=Roles}}=Db) ->
- {Admins} = get_admins(Db),
- AdminRoles = [<<"_admin">> | couch_util:get_value(<<"roles">>, Admins, [])],
- AdminNames = couch_util:get_value(<<"names">>, Admins,[]),
- case AdminRoles -- Roles of
- AdminRoles -> % same list, not an admin role
- case AdminNames -- [Name] of
- AdminNames -> % same names, not an admin
- throw({unauthorized, <<"You are not a db or server admin.">>});
- _ ->
- ok
- end;
- _ ->
- ok
- end.
-
-check_is_member(#db{user_ctx=#user_ctx{name=Name,roles=Roles}=UserCtx}=Db) ->
- case (catch check_is_admin(Db)) of
- ok -> ok;
- _ ->
- {Members} = get_members(Db),
- ReaderRoles = couch_util:get_value(<<"roles">>, Members,[]),
- WithAdminRoles = [<<"_admin">> | ReaderRoles],
- ReaderNames = couch_util:get_value(<<"names">>, Members,[]),
- case ReaderRoles ++ ReaderNames of
- [] -> ok; % no readers == public access
- _Else ->
- case WithAdminRoles -- Roles of
- WithAdminRoles -> % same list, not an reader role
- case ReaderNames -- [Name] of
- ReaderNames -> % same names, not a reader
- ?LOG_DEBUG("Not a reader: UserCtx ~p vs Names ~p Roles ~p",[UserCtx, ReaderNames, WithAdminRoles]),
- throw({unauthorized, <<"You are not authorized to access this db.">>});
- _ ->
- ok
- end;
- _ ->
- ok
- end
- end
- end.
-
-get_admins(#db{security=SecProps}) ->
- couch_util:get_value(<<"admins">>, SecProps, {[]}).
-
-get_members(#db{security=SecProps}) ->
- % we fallback to readers here for backwards compatibility
- couch_util:get_value(<<"members">>, SecProps,
- couch_util:get_value(<<"readers">>, SecProps, {[]})).
-
-get_security(#db{security=SecProps}) ->
- {SecProps}.
-
-set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
- check_is_admin(Db),
- ok = validate_security_object(NewSecProps),
- ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity),
- {ok, _} = ensure_full_commit(Db),
- ok;
-set_security(_, _) ->
- throw(bad_request).
-
-validate_security_object(SecProps) ->
- Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}),
- % we fallback to readers here for backwards compatibility
- Members = couch_util:get_value(<<"members">>, SecProps,
- couch_util:get_value(<<"readers">>, SecProps, {[]})),
- ok = validate_names_and_roles(Admins),
- ok = validate_names_and_roles(Members),
- ok.
-
-% validate user input
-validate_names_and_roles({Props}) when is_list(Props) ->
- case couch_util:get_value(<<"names">>,Props,[]) of
- Ns when is_list(Ns) ->
- [throw("names must be a JSON list of strings") ||N <- Ns, not is_binary(N)],
- Ns;
- _ -> throw("names must be a JSON list of strings")
- end,
- case couch_util:get_value(<<"roles">>,Props,[]) of
- Rs when is_list(Rs) ->
- [throw("roles must be a JSON list of strings") ||R <- Rs, not is_binary(R)],
- Rs;
- _ -> throw("roles must be a JSON list of strings")
- end,
- ok.
-
-get_revs_limit(#db{revs_limit=Limit}) ->
- Limit.
-
-set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
- check_is_admin(Db),
- gen_server:call(Pid, {set_revs_limit, Limit}, infinity);
-set_revs_limit(_Db, _Limit) ->
- throw(invalid_revs_limit).
-
-name(#db{name=Name}) ->
- Name.
-
-compression(#db{compression=Compression}) ->
- Compression.
-
-update_doc(Db, Doc, Options) ->
- update_doc(Db, Doc, Options, interactive_edit).
-
-update_doc(Db, Doc, Options, UpdateType) ->
- case update_docs(Db, [Doc], Options, UpdateType) of
- {ok, [{ok, NewRev}]} ->
- {ok, NewRev};
- {ok, [{{_Id, _Rev}, Error}]} ->
- throw(Error);
- {ok, [Error]} ->
- throw(Error);
- {ok, []} ->
- % replication success
- {Pos, [RevId | _]} = Doc#doc.revs,
- {ok, {Pos, RevId}}
- end.
-
-update_docs(Db, Docs) ->
- update_docs(Db, Docs, []).
-
-% group_alike_docs groups the sorted documents into sublist buckets, by id.
-% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
-group_alike_docs(Docs) ->
- Sorted = lists:sort(fun({#doc{id=A},_},{#doc{id=B},_})-> A < B end, Docs),
- group_alike_docs(Sorted, []).
-
-group_alike_docs([], Buckets) ->
- lists:reverse(lists:map(fun lists:reverse/1, Buckets));
-group_alike_docs([Doc|Rest], []) ->
- group_alike_docs(Rest, [[Doc]]);
-group_alike_docs([{Doc,Ref}|Rest], [Bucket|RestBuckets]) ->
- [{#doc{id=BucketId},_Ref}|_] = Bucket,
- case Doc#doc.id == BucketId of
- true ->
- % add to existing bucket
- group_alike_docs(Rest, [[{Doc,Ref}|Bucket]|RestBuckets]);
- false ->
- % add to new bucket
- group_alike_docs(Rest, [[{Doc,Ref}]|[Bucket|RestBuckets]])
- end.
-
-validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}=Doc, _GetDiskDocFun) ->
- case catch check_is_admin(Db) of
- ok -> validate_ddoc(Db#db.name, Doc);
- Error -> Error
- end;
-validate_doc_update(#db{validate_doc_funs = undefined} = Db, Doc, Fun) ->
- ValidationFuns = load_validation_funs(Db),
- validate_doc_update(Db#db{validate_doc_funs=ValidationFuns}, Doc, Fun);
-validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) ->
- ok;
-validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
- ok;
-validate_doc_update(Db, Doc, GetDiskDocFun) ->
- case get(io_priority) of
- {internal_repl, _} ->
- ok;
- _ ->
- validate_doc_update_int(Db, Doc, GetDiskDocFun)
- end.
-
-validate_ddoc(DbName, DDoc) ->
- try
- couch_index_server:validate(DbName, couch_doc:with_ejson_body(DDoc))
- catch
- throw:Error ->
- Error
- end.
-
-validate_doc_update_int(Db, Doc, GetDiskDocFun) ->
- DiskDoc = GetDiskDocFun(),
- JsonCtx = couch_util:json_user_ctx(Db),
- SecObj = get_security(Db),
- try [case Fun(Doc, DiskDoc, JsonCtx, SecObj) of
- ok -> ok;
- Error -> throw(Error)
- end || Fun <- Db#db.validate_doc_funs],
- ok
- catch
- throw:Error ->
- Error
- end.
-
-
-% to be safe, spawn a middleman here
-load_validation_funs(#db{main_pid=Pid, name = <<"shards/", _/binary>>}=Db) ->
- {_, Ref} = spawn_monitor(fun() ->
- exit(ddoc_cache:open(mem3:dbname(Db#db.name), validation_funs))
- end),
- receive
- {'DOWN', Ref, _, _, {ok, Funs}} ->
- gen_server:cast(Pid, {load_validation_funs, Funs}),
- Funs;
- {'DOWN', Ref, _, _, Reason} ->
- ?LOG_ERROR("could not load validation funs ~p", [Reason]),
- throw(internal_server_error)
- end;
-load_validation_funs(#db{main_pid=Pid}=Db) ->
- {ok, DDocInfos} = get_design_docs(Db),
- OpenDocs = fun
- (#full_doc_info{}=D) ->
- {ok, Doc} = open_doc_int(Db, D, [ejson_body]),
- Doc
- end,
- DDocs = lists:map(OpenDocs, DDocInfos),
- Funs = lists:flatmap(fun(DDoc) ->
- case couch_doc:get_validate_doc_fun(DDoc) of
- nil -> [];
- Fun -> [Fun]
- end
- end, DDocs),
- gen_server:cast(Pid, {load_validation_funs, Funs}),
- Funs.
-
-prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
- OldFullDocInfo, LeafRevsDict, AllowConflict) ->
- case Revs of
- [PrevRev|_] ->
- case dict:find({RevStart, PrevRev}, LeafRevsDict) of
- {ok, {#leaf{deleted=Deleted, ptr=DiskSp}, DiskRevs}} ->
- case couch_doc:has_stubs(Doc) of
- true ->
- DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs),
- Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
- {validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2};
- false ->
- LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end,
- {validate_doc_update(Db, Doc, LoadDiskDoc), Doc}
- end;
- error when AllowConflict ->
- couch_doc:merge_stubs(Doc, #doc{}), % will generate error if
- % there are stubs
- {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
- error ->
- {conflict, Doc}
- end;
- [] ->
- % new doc, and we have existing revs.
- % reuse existing deleted doc
- if OldFullDocInfo#full_doc_info.deleted orelse AllowConflict ->
- {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
- true ->
- {conflict, Doc}
- end
- end.
-
-
-
-prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
- AccFatalErrors) ->
- {AccPrepped, AccFatalErrors};
-prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
- AllowConflict, AccPrepped, AccErrors) ->
- {PreppedBucket, AccErrors3} = lists:foldl(
- fun({#doc{revs=Revs}=Doc,Ref}, {AccBucket, AccErrors2}) ->
- case couch_doc:has_stubs(Doc) of
- true ->
- couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
- false -> ok
- end,
- case Revs of
- {0, []} ->
- case validate_doc_update(Db, Doc, fun() -> nil end) of
- ok ->
- {[{Doc, Ref} | AccBucket], AccErrors2};
- Error ->
- {AccBucket, [{Ref, Error} | AccErrors2]}
- end;
- _ ->
- % old revs specified but none exist, a conflict
- {AccBucket, [{Ref, conflict} | AccErrors2]}
- end
- end,
- {[], AccErrors}, DocBucket),
-
- prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
- [lists:reverse(PreppedBucket) | AccPrepped], AccErrors3);
-prep_and_validate_updates(Db, [DocBucket|RestBuckets],
- [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
- AllowConflict, AccPrepped, AccErrors) ->
- Leafs = couch_key_tree:get_all_leafs(OldRevTree),
- LeafRevsDict = dict:from_list([
- {{Start, RevId}, {Leaf, Revs}} ||
- {Leaf, {Start, [RevId | _]} = Revs} <- Leafs
- ]),
- {PreppedBucket, AccErrors3} = lists:foldl(
- fun({Doc, Ref}, {Docs2Acc, AccErrors2}) ->
- case prep_and_validate_update(Db, Doc, OldFullDocInfo,
- LeafRevsDict, AllowConflict) of
- {ok, Doc2} ->
- {[{Doc2, Ref} | Docs2Acc], AccErrors2};
- {Error, #doc{}} ->
- % Record the error
- {Docs2Acc, [{Ref, Error} |AccErrors2]}
- end
- end,
- {[], AccErrors}, DocBucket),
- prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
- [PreppedBucket | AccPrepped], AccErrors3).
-
-
-update_docs(Db, Docs, Options) ->
- update_docs(Db, Docs, Options, interactive_edit).
-
-
-prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
- Errors2 = [{{Id, {Pos, Rev}}, Error} ||
- {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors],
- {lists:reverse(AccPrepped), lists:reverse(Errors2)};
-prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) ->
- case OldInfo of
- not_found ->
- {ValidatedBucket, AccErrors3} = lists:foldl(
- fun({Doc, Ref}, {AccPrepped2, AccErrors2}) ->
- case couch_doc:has_stubs(Doc) of
- true ->
- couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
- false -> ok
- end,
- case validate_doc_update(Db, Doc, fun() -> nil end) of
- ok ->
- {[{Doc, Ref} | AccPrepped2], AccErrors2};
- Error ->
- {AccPrepped2, [{Doc, Error} | AccErrors2]}
- end
- end,
- {[], AccErrors}, Bucket),
- prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
- {ok, #full_doc_info{rev_tree=OldTree}} ->
- OldLeafs = couch_key_tree:get_all_leafs_full(OldTree),
- OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _}|_]} <- OldLeafs],
- NewRevTree = lists:foldl(
- fun({NewDoc, _Ref}, AccTree) ->
- {NewTree, _} = couch_key_tree:merge(AccTree,
- couch_doc:to_path(NewDoc), Db#db.revs_limit),
- NewTree
- end,
- OldTree, Bucket),
- Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
- LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
- {ValidatedBucket, AccErrors3} =
- lists:foldl(
- fun({#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, Ref}, {AccValidated, AccErrors2}) ->
- IsOldLeaf = lists:member({Pos, RevId}, OldLeafsLU),
- case dict:find({Pos, RevId}, LeafRevsFullDict) of
- {ok, {Start, Path}} when not IsOldLeaf ->
- % our unflushed doc is a leaf node. Go back on the path
- % to find the previous rev that's on disk.
-
- LoadPrevRevFun = fun() ->
- make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
- end,
-
- case couch_doc:has_stubs(Doc) of
- true ->
- DiskDoc = case LoadPrevRevFun() of
- #doc{} = DiskDoc0 ->
- DiskDoc0;
- _ ->
- % Force a missing_stub exception
- couch_doc:merge_stubs(Doc, #doc{})
- end,
- Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
- GetDiskDocFun = fun() -> DiskDoc end;
- false ->
- Doc2 = Doc,
- GetDiskDocFun = LoadPrevRevFun
- end,
-
- case validate_doc_update(Db, Doc2, GetDiskDocFun) of
- ok ->
- {[{Doc2, Ref} | AccValidated], AccErrors2};
- Error ->
- {AccValidated, [{Doc, Error} | AccErrors2]}
- end;
- _ ->
- % this doc isn't a leaf or already exists in the tree.
- % ignore but consider it a success.
- {AccValidated, AccErrors2}
- end
- end,
- {[], AccErrors}, Bucket),
- prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo,
- [ValidatedBucket | AccPrepped], AccErrors3)
- end.
-
-
-
-new_revid(#doc{body=Body,revs={OldStart,OldRevs},
- atts=Atts,deleted=Deleted}) ->
- case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M =/= <<>>] of
- Atts2 when length(Atts) =/= length(Atts2) ->
- % We must have old style non-md5 attachments
- ?l2b(integer_to_list(couch_util:rand32()));
- Atts2 ->
- OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end,
- couch_util:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2]))
- end.
-
-new_revs([], OutBuckets, IdRevsAcc) ->
- {lists:reverse(OutBuckets), IdRevsAcc};
-new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
- {NewBucket, IdRevsAcc3} = lists:mapfoldl(
- fun({#doc{revs={Start, RevIds}}=Doc, Ref}, IdRevsAcc2)->
- NewRevId = new_revid(Doc),
- {{Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, Ref},
- [{Ref, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
- end, IdRevsAcc, Bucket),
- new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
-
-check_dup_atts(#doc{atts=Atts}=Doc) ->
- Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
- check_dup_atts2(Atts2),
- Doc.
-
-check_dup_atts2([#att{name=N}, #att{name=N} | _]) ->
- throw({bad_request, <<"Duplicate attachments">>});
-check_dup_atts2([_ | Rest]) ->
- check_dup_atts2(Rest);
-check_dup_atts2(_) ->
- ok.
-
-
-update_docs(Db, Docs, Options, replicated_changes) ->
- increment_stat(Db, {couchdb, database_writes}),
- % associate reference with each doc in order to track duplicates
- Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end, Docs),
- DocBuckets = before_docs_update(Db, group_alike_docs(Docs2)),
- case (Db#db.validate_doc_funs /= []) orelse
- lists:any(
- fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}, _Ref}) -> true;
- ({#doc{atts=Atts}, _Ref}) ->
- Atts /= []
- end, Docs2) of
- true ->
- Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
- ExistingDocs = get_full_doc_infos(Db, Ids),
-
- {DocBuckets2, DocErrors} =
- prep_and_validate_replicated_updates(Db, DocBuckets, ExistingDocs, [], []),
- DocBuckets3 = [Bucket || [_|_]=Bucket <- DocBuckets2]; % remove empty buckets
- false ->
- DocErrors = [],
- DocBuckets3 = DocBuckets
- end,
- DocBuckets4 = [[{doc_flush_atts(check_dup_atts(Doc), Db#db.fd), Ref}
- || {Doc, Ref} <- Bucket] || Bucket <- DocBuckets3],
- {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
- {ok, DocErrors};
-
-update_docs(Db, Docs, Options, interactive_edit) ->
- increment_stat(Db, {couchdb, database_writes}),
- AllOrNothing = lists:member(all_or_nothing, Options),
- % go ahead and generate the new revision ids for the documents.
- % separate out the NonRep documents from the rest of the documents
-
- % associate reference with each doc in order to track duplicates
- Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end,Docs),
- {Docs3, NonRepDocs} = lists:foldl(
- fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) ->
- case Id of
- <<?LOCAL_DOC_PREFIX, _/binary>> ->
- {DocsAcc, [Doc | NonRepDocsAcc]};
- Id->
- {[Doc | DocsAcc], NonRepDocsAcc}
- end
- end, {[], []}, Docs2),
-
- DocBuckets = before_docs_update(Db, group_alike_docs(Docs3)),
-
- case (Db#db.validate_doc_funs /= []) orelse
- lists:any(
- fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}, _Ref}) ->
- true;
- ({#doc{atts=Atts}, _Ref}) ->
- Atts /= []
- end, Docs3) of
- true ->
- % lookup the doc by id and get the most recent
- Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
- ExistingDocInfos = get_full_doc_infos(Db, Ids),
-
- {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
- DocBuckets, ExistingDocInfos, AllOrNothing, [], []),
-
- % strip out any empty buckets
- DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped];
- false ->
- PreCommitFailures = [],
- DocBuckets2 = DocBuckets
- end,
-
- if (AllOrNothing) and (PreCommitFailures /= []) ->
- {aborted,
- lists:foldl(fun({#doc{id=Id,revs={Pos, RevIds}}, Ref},Acc) ->
- case lists:keyfind(Ref,1,PreCommitFailures) of
- {Ref, Error} ->
- [{{Id,{Pos,RevIds}}, Error} | Acc];
- false ->
- Acc
- end
- end,[],Docs3)};
-
- true ->
- Options2 = if AllOrNothing -> [merge_conflicts];
- true -> [] end ++ Options,
- DocBuckets3 = [[
- {doc_flush_atts(set_new_att_revpos(
- check_dup_atts(Doc)), Db#db.fd), Ref}
- || {Doc, Ref} <- B] || B <- DocBuckets2],
- {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
-
- {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
-
- ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
- {ok, lists:map(
- fun({#doc{}, Ref}) ->
- {ok, Result} = dict:find(Ref, ResultsDict),
- Result
- end, Docs2)}
- end.
-
-% Returns the first available document on disk. Input list is a full rev path
-% for the doc.
-make_first_doc_on_disk(_Db, _Id, _Pos, []) ->
- nil;
-make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #doc{}} | RestPath]) ->
- make_first_doc_on_disk(Db, Id, Pos-1, RestPath);
-make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) ->
- make_first_doc_on_disk(Db, Id, Pos - 1, RestPath);
-make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #leaf{deleted=IsDel, ptr=Sp}} |_]=DocPath) ->
- Revs = [Rev || {Rev, _} <- DocPath],
- make_doc(Db, Id, IsDel, Sp, {Pos, Revs}).
-
-set_commit_option(Options) ->
- CommitSettings = {
- [true || O <- Options, O==full_commit orelse O==delay_commit],
- config:get("couchdb", "delayed_commits", "false")
- },
- case CommitSettings of
- {[true], _} ->
- Options; % user requested explicit commit setting, do not change it
- {_, "true"} ->
- Options; % delayed commits are enabled, do nothing
- {_, "false"} ->
- [full_commit|Options];
- {_, Else} ->
- ?LOG_ERROR("[couchdb] delayed_commits setting must be true/false, not ~p",
- [Else]),
- [full_commit|Options]
- end.
-
-collect_results(Pid, MRef, ResultsAcc) ->
- receive
- {result, Pid, Result} ->
- collect_results(Pid, MRef, [Result | ResultsAcc]);
- {done, Pid} ->
- {ok, ResultsAcc};
- {retry, Pid} ->
- retry;
- {'DOWN', MRef, _, _, Reason} ->
- exit(Reason)
- end.
-
-write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
- NonRepDocs, Options0) ->
- DocBuckets = prepare_doc_summaries(Db, DocBuckets1),
- Options = set_commit_option(Options0),
- MergeConflicts = lists:member(merge_conflicts, Options),
- FullCommit = lists:member(full_commit, Options),
- MRef = erlang:monitor(process, Pid),
- try
- Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
- case collect_results(Pid, MRef, []) of
- {ok, Results} -> {ok, Results};
- retry ->
- % This can happen if the db file we wrote to was swapped out by
- % compaction. Retry by reopening the db and writing to the current file
- {ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
- DocBuckets2 = [
- [{doc_flush_atts(Doc, Db2#db.fd), Ref} || {Doc, Ref} <- Bucket] ||
- Bucket <- DocBuckets1
- ],
- % We only retry once
- DocBuckets3 = prepare_doc_summaries(Db2, DocBuckets2),
- close(Db2),
- Pid ! {update_docs, self(), DocBuckets3, NonRepDocs, MergeConflicts, FullCommit},
- case collect_results(Pid, MRef, []) of
- {ok, Results} -> {ok, Results};
- retry -> throw({update_error, compaction_retry})
- end
- end
- after
- erlang:demonitor(MRef, [flush])
- end.
-
-
-prepare_doc_summaries(Db, BucketList) ->
- [lists:map(
- fun({#doc{body = Body, atts = Atts} = Doc, Ref}) ->
- DiskAtts = [{N, T, P, AL, DL, R, M, E} ||
- #att{name = N, type = T, data = {_, P}, md5 = M, revpos = R,
- att_len = AL, disk_len = DL, encoding = E} <- Atts],
- AttsFd = case Atts of
- [#att{data = {Fd, _}} | _] ->
- Fd;
- [] ->
- nil
- end,
- SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}),
- {Doc#doc{body = {summary, SummaryChunk, AttsFd}}, Ref}
- end,
- Bucket) || Bucket <- BucketList].
-
-
-before_docs_update(#db{before_doc_update = nil}, BucketList) ->
- BucketList;
-before_docs_update(#db{before_doc_update = Fun} = Db, BucketList) ->
- [lists:map(
- fun({Doc, Ref}) ->
- NewDoc = Fun(couch_doc:with_ejson_body(Doc), Db),
- {NewDoc, Ref}
- end,
- Bucket) || Bucket <- BucketList].
-
-
-set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
- Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) ->
- % already commited to disk, do not set new rev
- Att;
- (Att) ->
- Att#att{revpos=RevPos+1}
- end, Atts)}.
-
-
-doc_flush_atts(Doc, Fd) ->
- Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
-
-check_md5(_NewSig, <<>>) -> ok;
-check_md5(Sig, Sig) -> ok;
-check_md5(_, _) -> throw(md5_mismatch).
-
-flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd ->
- % already written to our file, nothing to write
- Att;
-
-flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5,
- disk_len=InDiskLen} = Att) ->
- {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} =
- couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
- check_md5(IdentityMd5, InMd5),
- Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=InDiskLen};
-
-flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
- with_stream(Fd, Att, fun(OutputStream) ->
- couch_stream:write(OutputStream, Data)
- end);
-
-flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) ->
- MaxChunkSize = list_to_integer(
- config:get("couchdb", "attachment_stream_buffer_size", "4096")),
- with_stream(Fd, Att, fun(OutputStream) ->
- % Fun(MaxChunkSize, WriterFun) must call WriterFun
- % once for each chunk of the attachment,
- Fun(MaxChunkSize,
- % WriterFun({Length, Binary}, State)
- % WriterFun({0, _Footers}, State)
- % Called with Length == 0 on the last time.
- % WriterFun returns NewState.
- fun({0, Footers}, _) ->
- F = mochiweb_headers:from_binary(Footers),
- case mochiweb_headers:get_value("Content-MD5", F) of
- undefined ->
- ok;
- Md5 ->
- {md5, base64:decode(Md5)}
- end;
- ({_Length, Chunk}, _) ->
- couch_stream:write(OutputStream, Chunk)
- end, ok)
- end);
-
-flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) ->
- with_stream(Fd, Att, fun(OutputStream) ->
- write_streamed_attachment(OutputStream, Fun, AttLen)
- end);
-
-flush_att(Fd, #att{data={follows, Parser, Ref}}=Att) when is_pid(Parser) ->
- ParserRef = erlang:monitor(process, Parser),
- Fun = fun() ->
- Parser ! {get_bytes, Ref, self()},
- receive
- {started_open_doc_revs, NewRef} ->
- couch_doc:restart_open_doc_revs(Parser, Ref, NewRef);
- {bytes, Ref, Bytes} ->
- Bytes;
- {'DOWN', ParserRef, _, _, Reason} ->
- throw({mp_parser_died, Reason})
- end
- end,
- try
- flush_att(Fd, Att#att{data=Fun})
- after
- erlang:demonitor(ParserRef, [flush])
- end.
-
-
-compressible_att_type(MimeType) when is_binary(MimeType) ->
- compressible_att_type(?b2l(MimeType));
-compressible_att_type(MimeType) ->
- TypeExpList = re:split(
- config:get("attachments", "compressible_types", ""),
- "\\s*,\\s*",
- [{return, list}]
- ),
- lists:any(
- fun(TypeExp) ->
- Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"),
- "(?:\\s*;.*?)?\\s*", $$],
- re:run(MimeType, Regexp, [caseless]) =/= nomatch
- end,
- [T || T <- TypeExpList, T /= []]
- ).
-
-% From RFC 2616 3.6.1 - Chunked Transfer Coding
-%
-% In other words, the origin server is willing to accept
-% the possibility that the trailer fields might be silently
-% discarded along the path to the client.
-%
-% I take this to mean that if "Trailers: Content-MD5\r\n"
-% is present in the request, but there is no Content-MD5
-% trailer, we're free to ignore this inconsistency and
-% pretend that no Content-MD5 exists.
-with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) ->
- BufferSize = list_to_integer(
- config:get("couchdb", "attachment_stream_buffer_size", "4096")),
- {ok, OutputStream} = case (Enc =:= identity) andalso
- compressible_att_type(Type) of
- true ->
- CompLevel = list_to_integer(
- config:get("attachments", "compression_level", "0")
- ),
- couch_stream:open(Fd, [{buffer_size, BufferSize},
- {encoding, gzip}, {compression_level, CompLevel}]);
- _ ->
- couch_stream:open(Fd, [{buffer_size, BufferSize}])
- end,
- ReqMd5 = case Fun(OutputStream) of
- {md5, FooterMd5} ->
- case InMd5 of
- md5_in_footer -> FooterMd5;
- _ -> InMd5
- end;
- _ ->
- InMd5
- end,
- {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
- couch_stream:close(OutputStream),
- check_md5(IdentityMd5, ReqMd5),
- {AttLen, DiskLen, NewEnc} = case Enc of
- identity ->
- case {Md5, IdentityMd5} of
- {Same, Same} ->
- {Len, IdentityLen, identity};
- _ ->
- {Len, IdentityLen, gzip}
- end;
- gzip ->
- case {Att#att.att_len, Att#att.disk_len} of
- {AL, DL} when AL =:= undefined orelse DL =:= undefined ->
- % Compressed attachment uploaded through the standalone API.
- {Len, Len, gzip};
- {AL, DL} ->
- % This case is used for efficient push-replication, where a
- % compressed attachment is located in the body of multipart
- % content-type request.
- {AL, DL, gzip}
- end
- end,
- Att#att{
- data={Fd,StreamInfo},
- att_len=AttLen,
- disk_len=DiskLen,
- md5=Md5,
- encoding=NewEnc
- }.
-
-
-write_streamed_attachment(_Stream, _F, 0) ->
- ok;
-write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 ->
- Bin = read_next_chunk(F, LenLeft),
- ok = couch_stream:write(Stream, Bin),
- write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
-
-read_next_chunk(F, _) when is_function(F, 0) ->
- F();
-read_next_chunk(F, LenLeft) when is_function(F, 1) ->
- F(lists:min([LenLeft, 16#2000])).
-
-enum_docs_since_reduce_to_count(Reds) ->
- couch_btree:final_reduce(
- fun couch_db_updater:btree_by_seq_reduce/2, Reds).
-
-enum_docs_reduce_to_count(Reds) ->
- FinalRed = couch_btree:final_reduce(
- fun couch_db_updater:btree_by_id_reduce/2, Reds),
- element(1, FinalRed).
-
-changes_since(Db, StartSeq, Fun, Acc) ->
- changes_since(Db, StartSeq, Fun, [], Acc).
-
-changes_since(Db, StartSeq, Fun, Options, Acc) ->
- Wrapper = fun(FullDocInfo, _Offset, Acc2) ->
- DocInfo = case FullDocInfo of
- #full_doc_info{} ->
- couch_doc:to_doc_info(FullDocInfo);
- #doc_info{} ->
- FullDocInfo
- end,
- Fun(DocInfo, Acc2)
- end,
- {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree,
- Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
- {ok, AccOut}.
-
-count_changes_since(Db, SinceSeq) ->
- BTree = Db#db.seq_tree,
- {ok, Changes} =
- couch_btree:fold_reduce(BTree,
- fun(_SeqStart, PartialReds, 0) ->
- {ok, couch_btree:final_reduce(BTree, PartialReds)}
- end,
- 0, [{start_key, SinceSeq + 1}]),
- Changes.
-
-enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
- {ok, LastReduction, AccOut} = couch_btree:fold(
- Db#db.seq_tree, InFun, Acc,
- [{start_key, SinceSeq + 1} | Options]),
- {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
-
-enum_docs(Db, InFun, InAcc, Options) ->
- FoldFun = skip_deleted(InFun),
- {ok, LastReduce, OutAcc} = couch_btree:fold(
- Db#db.id_tree, FoldFun, InAcc, Options),
- {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
-
-
-%%% Internal function %%%
-open_doc_revs_int(Db, IdRevs, Options) ->
- Ids = [Id || {Id, _Revs} <- IdRevs],
- LookupResults = get_full_doc_infos(Db, Ids),
- lists:zipwith(
- fun({Id, Revs}, Lookup) ->
- case Lookup of
- {ok, #full_doc_info{rev_tree=RevTree}} ->
- {FoundRevs, MissingRevs} =
- case Revs of
- all ->
- {couch_key_tree:get_all_leafs(RevTree), []};
- _ ->
- case lists:member(latest, Options) of
- true ->
- couch_key_tree:get_key_leafs(RevTree, Revs);
- false ->
- couch_key_tree:get(RevTree, Revs)
- end
- end,
- FoundResults =
- lists:map(fun({Value, {Pos, [Rev|_]}=FoundRevPath}) ->
- case Value of
- ?REV_MISSING ->
- % we have the rev in our list but know nothing about it
- {{not_found, missing}, {Pos, Rev}};
- #leaf{deleted=IsDeleted, ptr=SummaryPtr} ->
- {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
- end
- end, FoundRevs),
- Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
- {ok, Results};
- not_found when Revs == all ->
- {ok, []};
- not_found ->
- {ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
- end
- end,
- IdRevs, LookupResults).
-
-open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
- case couch_btree:lookup(Db#db.local_tree, [Id]) of
- [{ok, {_, {Rev, BodyData}}}] ->
- Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData},
- apply_open_options({ok, Doc}, Options);
- [not_found] ->
- {not_found, missing}
- end;
-open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) ->
- #rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo,
- Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}),
- apply_open_options(
- {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}, Options);
-open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
- #doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} =
- DocInfo = couch_doc:to_doc_info(FullDocInfo),
- {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
- Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath),
- apply_open_options(
- {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options);
-open_doc_int(Db, Id, Options) ->
- case get_full_doc_info(Db, Id) of
- {ok, FullDocInfo} ->
- open_doc_int(Db, FullDocInfo, Options);
- not_found ->
- {not_found, missing}
- end.
-
-doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTree, Options) ->
- case lists:member(revs_info, Options) of
- false -> [];
- true ->
- {[{Pos, RevPath}],[]} =
- couch_key_tree:get_full_key_paths(RevTree, [Rev]),
-
- [{revs_info, Pos, lists:map(
- fun({Rev1, ?REV_MISSING}) ->
- {Rev1, missing};
- ({Rev1, Leaf}) ->
- case Leaf#leaf.deleted of
- true ->
- {Rev1, deleted};
- false ->
- {Rev1, available}
- end
- end, RevPath)}]
- end ++
- case lists:member(conflicts, Options) of
- false -> [];
- true ->
- case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of
- [] -> [];
- ConflictRevs -> [{conflicts, ConflictRevs}]
- end
- end ++
- case lists:member(deleted_conflicts, Options) of
- false -> [];
- true ->
- case [Rev1 || #rev_info{rev=Rev1,deleted=true} <- RestInfo] of
- [] -> [];
- DelConflictRevs -> [{deleted_conflicts, DelConflictRevs}]
- end
- end ++
- case lists:member(local_seq, Options) of
- false -> [];
- true -> [{local_seq, Seq}]
- end.
-
-read_doc(#db{fd=Fd}, Pos) ->
- couch_file:pread_term(Fd, Pos).
-
-
-make_doc(#db{fd = Fd} = Db, Id, Deleted, Bp, RevisionPath) ->
- {BodyData, Atts} =
- case Bp of
- nil ->
- {[], []};
- _ ->
- {ok, {BodyData0, Atts00}} = read_doc(Db, Bp),
- Atts0 = case Atts00 of
- _ when is_binary(Atts00) ->
- couch_compress:decompress(Atts00);
- _ when is_list(Atts00) ->
- % pre 1.2 format
- Atts00
- end,
- {BodyData0,
- lists:map(
- fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
- #att{name=Name,
- type=Type,
- att_len=AttLen,
- disk_len=DiskLen,
- md5=Md5,
- revpos=RevPos,
- data={Fd,Sp},
- encoding=
- case Enc of
- true ->
- % 0110 UPGRADE CODE
- gzip;
- false ->
- % 0110 UPGRADE CODE
- identity;
- _ ->
- Enc
- end
- };
- ({Name,Type,Sp,AttLen,RevPos,Md5}) ->
- #att{name=Name,
- type=Type,
- att_len=AttLen,
- disk_len=AttLen,
- md5=Md5,
- revpos=RevPos,
- data={Fd,Sp}};
- ({Name,{Type,Sp,AttLen}}) ->
- #att{name=Name,
- type=Type,
- att_len=AttLen,
- disk_len=AttLen,
- md5= <<>>,
- revpos=0,
- data={Fd,Sp}}
- end, Atts0)}
- end,
- Doc = #doc{
- id = Id,
- revs = RevisionPath,
- body = BodyData,
- atts = Atts,
- deleted = Deleted
- },
- after_doc_read(Db, Doc).
-
-
-after_doc_read(#db{after_doc_read = nil}, Doc) ->
- Doc;
-after_doc_read(#db{after_doc_read = Fun} = Db, Doc) ->
- Fun(couch_doc:with_ejson_body(Doc), Db).
-
-
-increment_stat(#db{options = Options}, Stat) ->
- case lists:member(sys_db, Options) of
- true ->
- ok;
- false ->
- couch_stats_collector:increment(Stat)
- end.
-
-skip_deleted(FoldFun) ->
- fun
- (visit, KV, Reds, Acc) ->
- FoldFun(KV, Reds, Acc);
- (traverse, _LK, {Undeleted, _Del, _Size}, Acc) when Undeleted == 0 ->
- {skip, Acc};
- (traverse, _, _, Acc) ->
- {ok, Acc}
- end.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ed98610c/src/couch/src/couch_db_update_notifier.erl
----------------------------------------------------------------------
diff --git a/src/couch/src/couch_db_update_notifier.erl b/src/couch/src/couch_db_update_notifier.erl
deleted file mode 100644
index 3958917..0000000
--- a/src/couch/src/couch_db_update_notifier.erl
+++ /dev/null
@@ -1,82 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-%
-% This causes an OS process to spawned and it is notified every time a database
-% is updated.
-%
-% The notifications are in the form of a the database name sent as a line of
-% text to the OS processes stdout.
-%
-
--module(couch_db_update_notifier).
-
--behaviour(gen_event).
-
--export([start_link/1, notify/1]).
--export([init/1, terminate/2, handle_event/2, handle_call/2, handle_info/2, code_change/3,stop/1]).
-
--include_lib("couch/include/couch_db.hrl").
-
-start_link(Exec) ->
- couch_event_sup:start_link(couch_db_update, {couch_db_update_notifier, make_ref()}, Exec).
-
-notify(Event) ->
- gen_event:notify(couch_db_update, Event).
-
-stop(Pid) ->
- couch_event_sup:stop(Pid).
-
-init(Exec) when is_list(Exec) -> % an exe
- couch_os_process:start_link(Exec, []);
-init(Else) ->
- {ok, Else}.
-
-terminate(_Reason, Pid) when is_pid(Pid) ->
- couch_os_process:stop(Pid),
- ok;
-terminate(_Reason, _State) ->
- ok.
-
-handle_event(Event, Fun) when is_function(Fun, 1) ->
- Fun(Event),
- {ok, Fun};
-handle_event(Event, {Fun, FunAcc}) ->
- FunAcc2 = Fun(Event, FunAcc),
- {ok, {Fun, FunAcc2}};
-handle_event({EventType, EventDesc}, Pid) ->
- Obj = encode_event(EventType, EventDesc),
- ok = couch_os_process:send(Pid, Obj),
- {ok, Pid}.
-
-handle_call(_Request, State) ->
- {reply, ok, State}.
-
-handle_info({'EXIT', Pid, Reason}, Pid) ->
- ?LOG_ERROR("Update notification process ~p died: ~p", [Pid, Reason]),
- remove_handler;
-handle_info({'EXIT', _, _}, Pid) ->
- %% the db_update event manager traps exits and forwards this message to all
- %% its handlers. Just ignore as it wasn't our os_process that exited.
- {ok, Pid}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-encode_event(EventType, EventDesc) when is_atom(EventType) ->
- encode_event(atom_to_list(EventType), EventDesc);
-encode_event(EventType, EventDesc) when is_list(EventType) ->
- encode_event(?l2b(EventType), EventDesc);
-encode_event(EventType, {DbName, DocId}) ->
- {[{type, EventType}, {db, DbName}, {id, DocId}]};
-encode_event(EventType, DbName) ->
- {[{type, EventType}, {db, DbName}]}.