You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/06 18:40:01 UTC
[19/50] [abbrv] inital move to rebar compilation
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_server.erl
----------------------------------------------------------------------
diff --git a/src/couch_server.erl b/src/couch_server.erl
new file mode 100644
index 0000000..7cee0f5
--- /dev/null
+++ b/src/couch_server.erl
@@ -0,0 +1,499 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_server).
+-behaviour(gen_server).
+
+-export([open/2,create/2,delete/2,get_version/0,get_version/1,get_uuid/0]).
+-export([all_databases/0, all_databases/2]).
+-export([init/1, handle_call/3,sup_start_link/0]).
+-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
+-export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]).
+
+-include("couch_db.hrl").
+
+-record(server,{
+ root_dir = [],
+ dbname_regexp,
+ max_dbs_open=100,
+ dbs_open=0,
+ start_time=""
+ }).
+
+dev_start() ->
+ couch:stop(),
+ up_to_date = make:all([load, debug_info]),
+ couch:start().
+
+get_version() ->
+ Apps = application:loaded_applications(),
+ case lists:keysearch(couch, 1, Apps) of
+ {value, {_, _, Vsn}} ->
+ Vsn;
+ false ->
+ "0.0.0"
+ end.
+get_version(short) ->
+ %% strip git hash from version string
+ [Version|_Rest] = string:tokens(get_version(), "+"),
+ Version.
+
+
+get_uuid() ->
+ case couch_config:get("couchdb", "uuid", nil) of
+ nil ->
+ UUID = couch_uuids:random(),
+ couch_config:set("couchdb", "uuid", ?b2l(UUID)),
+ UUID;
+ UUID -> ?l2b(UUID)
+ end.
+
+get_stats() ->
+ {ok, #server{start_time=Time,dbs_open=Open}} =
+ gen_server:call(couch_server, get_server),
+ [{start_time, ?l2b(Time)}, {dbs_open, Open}].
+
+sup_start_link() ->
+ gen_server:start_link({local, couch_server}, couch_server, [], []).
+
+open(DbName, Options0) ->
+ Options = maybe_add_sys_db_callbacks(DbName, Options0),
+ case gen_server:call(couch_server, {open, DbName, Options}, infinity) of
+ {ok, Db} ->
+ Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
+ {ok, Db#db{user_ctx=Ctx}};
+ Error ->
+ Error
+ end.
+
+create(DbName, Options0) ->
+ Options = maybe_add_sys_db_callbacks(DbName, Options0),
+ case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
+ {ok, Db} ->
+ Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
+ {ok, Db#db{user_ctx=Ctx}};
+ Error ->
+ Error
+ end.
+
+delete(DbName, Options) ->
+ gen_server:call(couch_server, {delete, DbName, Options}, infinity).
+
+maybe_add_sys_db_callbacks(DbName, Options) when is_binary(DbName) ->
+ maybe_add_sys_db_callbacks(?b2l(DbName), Options);
+maybe_add_sys_db_callbacks(DbName, Options) ->
+ case couch_config:get("replicator", "db", "_replicator") of
+ DbName ->
+ [
+ {before_doc_update, fun couch_replicator_manager:before_doc_update/2},
+ {after_doc_read, fun couch_replicator_manager:after_doc_read/2},
+ sys_db | Options
+ ];
+ _ ->
+ case couch_config:get("couch_httpd_auth", "authentication_db", "_users") of
+ DbName ->
+ [
+ {before_doc_update, fun couch_users_db:before_doc_update/2},
+ {after_doc_read, fun couch_users_db:after_doc_read/2},
+ sys_db | Options
+ ];
+ _ ->
+ Options
+ end
+ end.
+
+check_dbname(#server{dbname_regexp=RegExp}, DbName) ->
+ case re:run(DbName, RegExp, [{capture, none}]) of
+ nomatch ->
+ case DbName of
+ "_users" -> ok;
+ "_replicator" -> ok;
+ _Else ->
+ {error, illegal_database_name, DbName}
+ end;
+ match ->
+ ok
+ end.
+
+is_admin(User, ClearPwd) ->
+ case couch_config:get("admins", User) of
+ "-hashed-" ++ HashedPwdAndSalt ->
+ [HashedPwd, Salt] = string:tokens(HashedPwdAndSalt, ","),
+ couch_util:to_hex(crypto:sha(ClearPwd ++ Salt)) == HashedPwd;
+ _Else ->
+ false
+ end.
+
+has_admins() ->
+ couch_config:get("admins") /= [].
+
+get_full_filename(Server, DbName) ->
+ filename:join([Server#server.root_dir, "./" ++ DbName ++ ".couch"]).
+
+hash_admin_passwords() ->
+ hash_admin_passwords(true).
+
+hash_admin_passwords(Persist) ->
+ lists:foreach(
+ fun({User, ClearPassword}) ->
+ HashedPassword = couch_passwords:hash_admin_password(ClearPassword),
+ couch_config:set("admins", User, ?b2l(HashedPassword), Persist)
+ end, couch_passwords:get_unhashed_admins()).
+
+init([]) ->
+ % read config and register for configuration changes
+
+ % just stop if one of the config settings change. couch_server_sup
+ % will restart us and then we will pick up the new settings.
+
+ RootDir = couch_config:get("couchdb", "database_dir", "."),
+ MaxDbsOpen = list_to_integer(
+ couch_config:get("couchdb", "max_dbs_open")),
+ Self = self(),
+ ok = couch_config:register(
+ fun("couchdb", "database_dir") ->
+ exit(Self, config_change)
+ end),
+ ok = couch_config:register(
+ fun("couchdb", "max_dbs_open", Max) ->
+ gen_server:call(couch_server,
+ {set_max_dbs_open, list_to_integer(Max)})
+ end),
+ ok = couch_file:init_delete_dir(RootDir),
+ hash_admin_passwords(),
+ ok = couch_config:register(
+ fun("admins", _Key, _Value, Persist) ->
+ % spawn here so couch_config doesn't try to call itself
+ spawn(fun() -> hash_admin_passwords(Persist) end)
+ end, false),
+ {ok, RegExp} = re:compile("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"),
+ ets:new(couch_dbs_by_name, [set, private, named_table]),
+ ets:new(couch_dbs_by_pid, [set, private, named_table]),
+ ets:new(couch_dbs_by_lru, [ordered_set, private, named_table]),
+ ets:new(couch_sys_dbs, [set, private, named_table]),
+ process_flag(trap_exit, true),
+ {ok, #server{root_dir=RootDir,
+ dbname_regexp=RegExp,
+ max_dbs_open=MaxDbsOpen,
+ start_time=couch_util:rfc1123_date()}}.
+
+terminate(_Reason, _Srv) ->
+ lists:foreach(
+ fun({_, {_, Pid, _}}) ->
+ couch_util:shutdown_sync(Pid)
+ end,
+ ets:tab2list(couch_dbs_by_name)).
+
+all_databases() ->
+ {ok, DbList} = all_databases(
+ fun(DbName, Acc) -> {ok, [DbName | Acc]} end, []),
+ {ok, lists:usort(DbList)}.
+
+all_databases(Fun, Acc0) ->
+ {ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server),
+ NormRoot = couch_util:normpath(Root),
+ FinalAcc = try
+ filelib:fold_files(Root, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$", true,
+ fun(Filename, AccIn) ->
+ NormFilename = couch_util:normpath(Filename),
+ case NormFilename -- NormRoot of
+ [$/ | RelativeFilename] -> ok;
+ RelativeFilename -> ok
+ end,
+ case Fun(?l2b(filename:rootname(RelativeFilename, ".couch")), AccIn) of
+ {ok, NewAcc} -> NewAcc;
+ {stop, NewAcc} -> throw({stop, Fun, NewAcc})
+ end
+ end, Acc0)
+ catch throw:{stop, Fun, Acc1} ->
+ Acc1
+ end,
+ {ok, FinalAcc}.
+
+
+maybe_close_lru_db(#server{dbs_open=NumOpen, max_dbs_open=MaxOpen}=Server)
+ when NumOpen < MaxOpen ->
+ {ok, Server};
+maybe_close_lru_db(#server{dbs_open=NumOpen}=Server) ->
+ % must free up the lru db.
+ case try_close_lru(now()) of
+ ok ->
+ {ok, Server#server{dbs_open=NumOpen - 1}};
+ Error -> Error
+ end.
+
+try_close_lru(StartTime) ->
+ LruTime = get_lru(),
+ if LruTime > StartTime ->
+ % this means we've looped through all our opened dbs and found them
+ % all in use.
+ {error, all_dbs_active};
+ true ->
+ [{_, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime),
+ [{_, {opened, MainPid, LruTime}}] = ets:lookup(couch_dbs_by_name, DbName),
+ case couch_db:is_idle(MainPid) of
+ true ->
+ ok = shutdown_idle_db(DbName, MainPid, LruTime);
+ false ->
+ % this still has referrers. Go ahead and give it a current lru time
+ % and try the next one in the table.
+ NewLruTime = now(),
+ true = ets:insert(couch_dbs_by_name, {DbName, {opened, MainPid, NewLruTime}}),
+ true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}),
+ true = ets:delete(couch_dbs_by_lru, LruTime),
+ true = ets:insert(couch_dbs_by_lru, {NewLruTime, DbName}),
+ try_close_lru(StartTime)
+ end
+ end.
+
+get_lru() ->
+ get_lru(ets:first(couch_dbs_by_lru)).
+
+get_lru(LruTime) ->
+ [{LruTime, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime),
+ case ets:member(couch_sys_dbs, DbName) of
+ false ->
+ LruTime;
+ true ->
+ [{_, {opened, MainPid, _}}] = ets:lookup(couch_dbs_by_name, DbName),
+ case couch_db:is_idle(MainPid) of
+ true ->
+ NextLru = ets:next(couch_dbs_by_lru, LruTime),
+ ok = shutdown_idle_db(DbName, MainPid, LruTime),
+ get_lru(NextLru);
+ false ->
+ get_lru(ets:next(couch_dbs_by_lru, LruTime))
+ end
+ end.
+
+shutdown_idle_db(DbName, MainPid, LruTime) ->
+ couch_util:shutdown_sync(MainPid),
+ true = ets:delete(couch_dbs_by_lru, LruTime),
+ true = ets:delete(couch_dbs_by_name, DbName),
+ true = ets:delete(couch_dbs_by_pid, MainPid),
+ true = ets:delete(couch_sys_dbs, DbName),
+ ok.
+
+open_async(Server, From, DbName, Filepath, Options) ->
+ Parent = self(),
+ Opener = spawn_link(fun() ->
+ Res = couch_db:start_link(DbName, Filepath, Options),
+ gen_server:call(
+ Parent, {open_result, DbName, Res, Options}, infinity
+ ),
+ unlink(Parent),
+ case Res of
+ {ok, DbReader} ->
+ unlink(DbReader);
+ _ ->
+ ok
+ end
+ end),
+ true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From]}}),
+ true = ets:insert(couch_dbs_by_pid, {Opener, DbName}),
+ DbsOpen = case lists:member(sys_db, Options) of
+ true ->
+ true = ets:insert(couch_sys_dbs, {DbName, true}),
+ Server#server.dbs_open;
+ false ->
+ Server#server.dbs_open + 1
+ end,
+ Server#server{dbs_open = DbsOpen}.
+
+handle_call({set_max_dbs_open, Max}, _From, Server) ->
+ {reply, ok, Server#server{max_dbs_open=Max}};
+handle_call(get_server, _From, Server) ->
+ {reply, {ok, Server}, Server};
+handle_call({open_result, DbName, {ok, OpenedDbPid}, Options}, _From, Server) ->
+ link(OpenedDbPid),
+ [{DbName, {opening,Opener,Froms}}] = ets:lookup(couch_dbs_by_name, DbName),
+ lists:foreach(fun({FromPid,_}=From) ->
+ gen_server:reply(From,
+ catch couch_db:open_ref_counted(OpenedDbPid, FromPid))
+ end, Froms),
+ LruTime = now(),
+ true = ets:insert(couch_dbs_by_name,
+ {DbName, {opened, OpenedDbPid, LruTime}}),
+ true = ets:delete(couch_dbs_by_pid, Opener),
+ true = ets:insert(couch_dbs_by_pid, {OpenedDbPid, DbName}),
+ true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}),
+ case lists:member(create, Options) of
+ true ->
+ couch_db_update_notifier:notify({created, DbName});
+ false ->
+ ok
+ end,
+ {reply, ok, Server};
+handle_call({open_result, DbName, {error, eexist}, Options}, From, Server) ->
+ handle_call({open_result, DbName, file_exists, Options}, From, Server);
+handle_call({open_result, DbName, Error, Options}, _From, Server) ->
+ [{DbName, {opening,Opener,Froms}}] = ets:lookup(couch_dbs_by_name, DbName),
+ lists:foreach(fun(From) ->
+ gen_server:reply(From, Error)
+ end, Froms),
+ true = ets:delete(couch_dbs_by_name, DbName),
+ true = ets:delete(couch_dbs_by_pid, Opener),
+ DbsOpen = case lists:member(sys_db, Options) of
+ true ->
+ true = ets:delete(couch_sys_dbs, DbName),
+ Server#server.dbs_open;
+ false ->
+ Server#server.dbs_open - 1
+ end,
+ {reply, ok, Server#server{dbs_open = DbsOpen}};
+handle_call({open, DbName, Options}, {FromPid,_}=From, Server) ->
+ LruTime = now(),
+ case ets:lookup(couch_dbs_by_name, DbName) of
+ [] ->
+ open_db(DbName, Server, Options, From);
+ [{_, {opening, Opener, Froms}}] ->
+ true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From|Froms]}}),
+ {noreply, Server};
+ [{_, {opened, MainPid, PrevLruTime}}] ->
+ true = ets:insert(couch_dbs_by_name, {DbName, {opened, MainPid, LruTime}}),
+ true = ets:delete(couch_dbs_by_lru, PrevLruTime),
+ true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}),
+ {reply, couch_db:open_ref_counted(MainPid, FromPid), Server}
+ end;
+handle_call({create, DbName, Options}, From, Server) ->
+ case ets:lookup(couch_dbs_by_name, DbName) of
+ [] ->
+ open_db(DbName, Server, [create | Options], From);
+ [_AlreadyRunningDb] ->
+ {reply, file_exists, Server}
+ end;
+handle_call({delete, DbName, _Options}, _From, Server) ->
+ DbNameList = binary_to_list(DbName),
+ case check_dbname(Server, DbNameList) of
+ ok ->
+ FullFilepath = get_full_filename(Server, DbNameList),
+ UpdateState =
+ case ets:lookup(couch_dbs_by_name, DbName) of
+ [] -> false;
+ [{_, {opening, Pid, Froms}}] ->
+ couch_util:shutdown_sync(Pid),
+ true = ets:delete(couch_dbs_by_name, DbName),
+ true = ets:delete(couch_dbs_by_pid, Pid),
+ [gen_server:reply(F, not_found) || F <- Froms],
+ true;
+ [{_, {opened, Pid, LruTime}}] ->
+ couch_util:shutdown_sync(Pid),
+ true = ets:delete(couch_dbs_by_name, DbName),
+ true = ets:delete(couch_dbs_by_pid, Pid),
+ true = ets:delete(couch_dbs_by_lru, LruTime),
+ true
+ end,
+ Server2 = case UpdateState of
+ true ->
+ DbsOpen = case ets:member(couch_sys_dbs, DbName) of
+ true ->
+ true = ets:delete(couch_sys_dbs, DbName),
+ Server#server.dbs_open;
+ false ->
+ Server#server.dbs_open - 1
+ end,
+ Server#server{dbs_open = DbsOpen};
+ false ->
+ Server
+ end,
+
+ %% Delete any leftover .compact files. If we don't do this a subsequent
+ %% request for this DB will try to open the .compact file and use it.
+ couch_file:delete(Server#server.root_dir, FullFilepath ++ ".compact"),
+
+ case couch_file:delete(Server#server.root_dir, FullFilepath) of
+ ok ->
+ couch_db_update_notifier:notify({deleted, DbName}),
+ {reply, ok, Server2};
+ {error, enoent} ->
+ {reply, not_found, Server2};
+ Else ->
+ {reply, Else, Server2}
+ end;
+ Error ->
+ {reply, Error, Server}
+ end.
+
+handle_cast(Msg, _Server) ->
+ exit({unknown_cast_message, Msg}).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info({'EXIT', _Pid, config_change}, Server) ->
+ {noreply, shutdown, Server};
+handle_info({'EXIT', Pid, Reason}, Server) ->
+ Server2 = case ets:lookup(couch_dbs_by_pid, Pid) of
+ [{Pid, DbName}] ->
+
+ % If the Pid is known, the name should be as well.
+ % If not, that's an error, which is why there is no [] clause.
+ case ets:lookup(couch_dbs_by_name, DbName) of
+ [{_, {opening, Pid, Froms}}] ->
+ Msg = case Reason of
+ snappy_nif_not_loaded ->
+ io_lib:format(
+ "To open the database `~s`, Apache CouchDB "
+ "must be built with Erlang OTP R13B04 or higher.",
+ [DbName]
+ );
+ true ->
+ io_lib:format("Error opening database ~p: ~p", [DbName, Reason])
+ end,
+ ?LOG_ERROR(Msg, []),
+ lists:foreach(
+ fun(F) -> gen_server:reply(F, {bad_otp_release, Msg}) end,
+ Froms
+ );
+ [{_, {opened, Pid, LruTime}}] ->
+ ?LOG_ERROR(
+ "Unexpected exit of database process ~p [~p]: ~p",
+ [Pid, DbName, Reason]
+ ),
+ true = ets:delete(couch_dbs_by_lru, LruTime)
+ end,
+
+ true = ets:delete(couch_dbs_by_pid, DbName),
+ true = ets:delete(couch_dbs_by_name, DbName),
+
+ case ets:lookup(couch_sys_dbs, DbName) of
+ [{DbName, _}] ->
+ true = ets:delete(couch_sys_dbs, DbName),
+ Server;
+ [] ->
+ Server#server{dbs_open = Server#server.dbs_open - 1}
+ end
+ end,
+ {noreply, Server2};
+handle_info(Error, _Server) ->
+ ?LOG_ERROR("Unexpected message, restarting couch_server: ~p", [Error]),
+ exit(kill).
+
+open_db(DbName, Server, Options, From) ->
+ DbNameList = binary_to_list(DbName),
+ case check_dbname(Server, DbNameList) of
+ ok ->
+ Filepath = get_full_filename(Server, DbNameList),
+ case lists:member(sys_db, Options) of
+ true ->
+ {noreply, open_async(Server, From, DbName, Filepath, Options)};
+ false ->
+ case maybe_close_lru_db(Server) of
+ {ok, Server2} ->
+ {noreply, open_async(Server2, From, DbName, Filepath, Options)};
+ CloseError ->
+ {reply, CloseError, Server}
+ end
+ end;
+ Error ->
+ {reply, Error, Server}
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_server_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_server_sup.erl b/src/couch_server_sup.erl
new file mode 100644
index 0000000..be3c3a3
--- /dev/null
+++ b/src/couch_server_sup.erl
@@ -0,0 +1,164 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_server_sup).
+-behaviour(supervisor).
+
+
+-export([start_link/1,stop/0, couch_config_start_link_wrapper/2,
+ restart_core_server/0, config_change/2]).
+
+-include("couch_db.hrl").
+
+%% supervisor callbacks
+-export([init/1]).
+
+start_link(IniFiles) ->
+ case whereis(couch_server_sup) of
+ undefined ->
+ start_server(IniFiles);
+ _Else ->
+ {error, already_started}
+ end.
+
+restart_core_server() ->
+ init:restart().
+
+couch_config_start_link_wrapper(IniFiles, FirstConfigPid) ->
+ case is_process_alive(FirstConfigPid) of
+ true ->
+ link(FirstConfigPid),
+ {ok, FirstConfigPid};
+ false -> couch_config:start_link(IniFiles)
+ end.
+
+start_server(IniFiles) ->
+ case init:get_argument(pidfile) of
+ {ok, [PidFile]} ->
+ case file:write_file(PidFile, os:getpid()) of
+ ok -> ok;
+ {error, Reason} ->
+ io:format("Failed to write PID file ~s: ~s",
+ [PidFile, file:format_error(Reason)])
+ end;
+ _ -> ok
+ end,
+
+ {ok, ConfigPid} = couch_config:start_link(IniFiles),
+
+ LogLevel = couch_config:get("log", "level", "info"),
+ % announce startup
+ io:format("Apache CouchDB ~s (LogLevel=~s) is starting.~n", [
+ couch_server:get_version(),
+ LogLevel
+ ]),
+ case LogLevel of
+ "debug" ->
+ io:format("Configuration Settings ~p:~n", [IniFiles]),
+ [io:format(" [~s] ~s=~p~n", [Module, Variable, Value])
+ || {{Module, Variable}, Value} <- couch_config:all()];
+ _ -> ok
+ end,
+
+ BaseChildSpecs =
+ {{one_for_all, 10, 3600},
+ [{couch_config,
+ {couch_server_sup, couch_config_start_link_wrapper, [IniFiles, ConfigPid]},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_config]},
+ {couch_primary_services,
+ {couch_primary_sup, start_link, []},
+ permanent,
+ infinity,
+ supervisor,
+ [couch_primary_sup]},
+ {couch_secondary_services,
+ {couch_secondary_sup, start_link, []},
+ permanent,
+ infinity,
+ supervisor,
+ [couch_secondary_sup]}
+ ]},
+
+ % ensure these applications are running
+ application:start(ibrowse),
+ application:start(crypto),
+
+ {ok, Pid} = supervisor:start_link(
+ {local, couch_server_sup}, couch_server_sup, BaseChildSpecs),
+
+ % launch the icu bridge
+ % just restart if one of the config settings change.
+ couch_config:register(fun ?MODULE:config_change/2, Pid),
+
+ unlink(ConfigPid),
+
+ Ip = couch_config:get("httpd", "bind_address"),
+ io:format("Apache CouchDB has started. Time to relax.~n"),
+ Uris = [get_uri(Name, Ip) || Name <- [couch_httpd, https]],
+ [begin
+ case Uri of
+ undefined -> ok;
+ Uri -> ?LOG_INFO("Apache CouchDB has started on ~s", [Uri])
+ end
+ end
+ || Uri <- Uris],
+ case couch_config:get("couchdb", "uri_file", null) of
+ null -> ok;
+ UriFile ->
+ Lines = [begin case Uri of
+ undefined -> [];
+ Uri -> io_lib:format("~s~n", [Uri])
+ end end || Uri <- Uris],
+ case file:write_file(UriFile, Lines) of
+ ok -> ok;
+ {error, Reason2} = Error ->
+ ?LOG_ERROR("Failed to write to URI file ~s: ~s",
+ [UriFile, file:format_error(Reason2)]),
+ throw(Error)
+ end
+ end,
+
+ {ok, Pid}.
+
+stop() ->
+ catch exit(whereis(couch_server_sup), normal).
+
+config_change("daemons", _) ->
+ supervisor:terminate_child(couch_server_sup, couch_secondary_services),
+ supervisor:restart_child(couch_server_sup, couch_secondary_services);
+config_change("couchdb", "util_driver_dir") ->
+ init:restart().
+
+init(ChildSpecs) ->
+ {ok, ChildSpecs}.
+
+get_uri(Name, Ip) ->
+ case get_port(Name) of
+ undefined ->
+ undefined;
+ Port ->
+ io_lib:format("~s://~s:~w/", [get_scheme(Name), Ip, Port])
+ end.
+
+get_scheme(couch_httpd) -> "http";
+get_scheme(https) -> "https".
+
+get_port(Name) ->
+ try
+ mochiweb_socket_server:get(Name, port)
+ catch
+ exit:{noproc, _}->
+ undefined
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_stats_aggregator.erl
----------------------------------------------------------------------
diff --git a/src/couch_stats_aggregator.erl b/src/couch_stats_aggregator.erl
new file mode 100644
index 0000000..6090355
--- /dev/null
+++ b/src/couch_stats_aggregator.erl
@@ -0,0 +1,297 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_stats_aggregator).
+-behaviour(gen_server).
+
+-export([start/0, start/1, stop/0]).
+-export([all/0, all/1, get/1, get/2, get_json/1, get_json/2, collect_sample/0]).
+
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-record(aggregate, {
+ description = <<"">>,
+ seconds = 0,
+ count = 0,
+ current = null,
+ sum = null,
+ mean = null,
+ variance = null,
+ stddev = null,
+ min = null,
+ max = null,
+ samples = []
+}).
+
+
+start() ->
+ PrivDir = couch_util:priv_dir(),
+ start(filename:join(PrivDir, "stat_descriptions.cfg")).
+
+start(FileName) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [FileName], []).
+
+stop() ->
+ gen_server:cast(?MODULE, stop).
+
+all() ->
+ ?MODULE:all(0).
+all(Time) when is_binary(Time) ->
+ ?MODULE:all(list_to_integer(binary_to_list(Time)));
+all(Time) when is_atom(Time) ->
+ ?MODULE:all(list_to_integer(atom_to_list(Time)));
+all(Time) when is_integer(Time) ->
+ Aggs = ets:match(?MODULE, {{'$1', Time}, '$2'}),
+ Stats = lists:map(fun([Key, Agg]) -> {Key, Agg} end, Aggs),
+ case Stats of
+ [] ->
+ {[]};
+ _ ->
+ Ret = lists:foldl(fun({{Mod, Key}, Agg}, Acc) ->
+ CurrKeys = case proplists:lookup(Mod, Acc) of
+ none -> [];
+ {Mod, {Keys}} -> Keys
+ end,
+ NewMod = {[{Key, to_json_term(Agg)} | CurrKeys]},
+ [{Mod, NewMod} | proplists:delete(Mod, Acc)]
+ end, [], Stats),
+ {Ret}
+ end.
+
+get(Key) ->
+ ?MODULE:get(Key, 0).
+get(Key, Time) when is_binary(Time) ->
+ ?MODULE:get(Key, list_to_integer(binary_to_list(Time)));
+get(Key, Time) when is_atom(Time) ->
+ ?MODULE:get(Key, list_to_integer(atom_to_list(Time)));
+get(Key, Time) when is_integer(Time) ->
+ case ets:lookup(?MODULE, {make_key(Key), Time}) of
+ [] -> #aggregate{seconds=Time};
+ [{_, Agg}] -> Agg
+ end.
+
+get_json(Key) ->
+ get_json(Key, 0).
+get_json(Key, Time) ->
+ to_json_term(?MODULE:get(Key, Time)).
+
+collect_sample() ->
+ gen_server:call(?MODULE, collect_sample, infinity).
+
+
+init(StatDescsFileName) ->
+ % Create an aggregate entry for each {description, rate} pair.
+ ets:new(?MODULE, [named_table, set, protected]),
+ SampleStr = couch_config:get("stats", "samples", "[0]"),
+ {ok, Samples} = couch_util:parse_term(SampleStr),
+ {ok, Descs} = file:consult(StatDescsFileName),
+ lists:foreach(fun({Sect, Key, Value}) ->
+ lists:foreach(fun(Secs) ->
+ Agg = #aggregate{
+ description=list_to_binary(Value),
+ seconds=Secs
+ },
+ ets:insert(?MODULE, {{{Sect, Key}, Secs}, Agg})
+ end, Samples)
+ end, Descs),
+
+ Self = self(),
+ ok = couch_config:register(
+ fun("stats", _) -> exit(Self, config_change) end
+ ),
+
+ Rate = list_to_integer(couch_config:get("stats", "rate", "1000")),
+ % TODO: Add timer_start to kernel start options.
+ {ok, TRef} = timer:apply_after(Rate, ?MODULE, collect_sample, []),
+ {ok, {TRef, Rate}}.
+
+terminate(_Reason, {TRef, _Rate}) ->
+ timer:cancel(TRef),
+ ok.
+
+handle_call(collect_sample, _, {OldTRef, SampleInterval}) ->
+ timer:cancel(OldTRef),
+ {ok, TRef} = timer:apply_after(SampleInterval, ?MODULE, collect_sample, []),
+ % Gather new stats values to add.
+ Incs = lists:map(fun({Key, Value}) ->
+ {Key, {incremental, Value}}
+ end, couch_stats_collector:all(incremental)),
+ Abs = lists:map(fun({Key, Values}) ->
+ couch_stats_collector:clear(Key),
+ Values2 = case Values of
+ X when is_list(X) -> X;
+ Else -> [Else]
+ end,
+ {_, Mean} = lists:foldl(fun(Val, {Count, Curr}) ->
+ {Count+1, Curr + (Val - Curr) / (Count+1)}
+ end, {0, 0}, Values2),
+ {Key, {absolute, Mean}}
+ end, couch_stats_collector:all(absolute)),
+
+ Values = Incs ++ Abs,
+ Now = erlang:now(),
+ lists:foreach(fun({{Key, Rate}, Agg}) ->
+ NewAgg = case proplists:lookup(Key, Values) of
+ none ->
+ rem_values(Now, Agg);
+ {Key, {Type, Value}} ->
+ NewValue = new_value(Type, Value, Agg#aggregate.current),
+ Agg2 = add_value(Now, NewValue, Agg),
+ rem_values(Now, Agg2)
+ end,
+ ets:insert(?MODULE, {{Key, Rate}, NewAgg})
+ end, ets:tab2list(?MODULE)),
+ {reply, ok, {TRef, SampleInterval}}.
+
+handle_cast(stop, State) ->
+ {stop, normal, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+code_change(_OldVersion, State, _Extra) ->
+ {ok, State}.
+
+
+new_value(incremental, Value, null) ->
+ Value;
+new_value(incremental, Value, Current) ->
+ Value - Current;
+new_value(absolute, Value, _Current) ->
+ Value.
+
+add_value(Time, Value, #aggregate{count=Count, seconds=Secs}=Agg) when Count < 1 ->
+ Samples = case Secs of
+ 0 -> [];
+ _ -> [{Time, Value}]
+ end,
+ Agg#aggregate{
+ count=1,
+ current=Value,
+ sum=Value,
+ mean=Value,
+ variance=0.0,
+ stddev=null,
+ min=Value,
+ max=Value,
+ samples=Samples
+ };
+add_value(Time, Value, Agg) ->
+ #aggregate{
+ count=Count,
+ current=Current,
+ sum=Sum,
+ mean=Mean,
+ variance=Variance,
+ samples=Samples
+ } = Agg,
+
+ NewCount = Count + 1,
+ NewMean = Mean + (Value - Mean) / NewCount,
+ NewVariance = Variance + (Value - Mean) * (Value - NewMean),
+ StdDev = case NewCount > 1 of
+ false -> null;
+ _ -> math:sqrt(NewVariance / (NewCount - 1))
+ end,
+ Agg2 = Agg#aggregate{
+ count=NewCount,
+ current=Current + Value,
+ sum=Sum + Value,
+ mean=NewMean,
+ variance=NewVariance,
+ stddev=StdDev,
+ min=lists:min([Agg#aggregate.min, Value]),
+ max=lists:max([Agg#aggregate.max, Value])
+ },
+ case Agg2#aggregate.seconds of
+ 0 -> Agg2;
+ _ -> Agg2#aggregate{samples=[{Time, Value} | Samples]}
+ end.
+
+rem_values(Time, Agg) ->
+ Seconds = Agg#aggregate.seconds,
+ Samples = Agg#aggregate.samples,
+ Pred = fun({When, _Value}) ->
+ timer:now_diff(Time, When) =< (Seconds * 1000000)
+ end,
+ {Keep, Remove} = lists:splitwith(Pred, Samples),
+ Agg2 = lists:foldl(fun({_, Value}, Acc) ->
+ rem_value(Value, Acc)
+ end, Agg, Remove),
+ Agg2#aggregate{samples=Keep}.
+
+rem_value(_Value, #aggregate{count=Count, seconds=Secs}) when Count =< 1 ->
+ #aggregate{seconds=Secs};
+rem_value(Value, Agg) ->
+ #aggregate{
+ count=Count,
+ sum=Sum,
+ mean=Mean,
+ variance=Variance
+ } = Agg,
+
+ OldMean = (Mean * Count - Value) / (Count - 1),
+ OldVariance = Variance - (Value - OldMean) * (Value - Mean),
+ OldCount = Count - 1,
+ StdDev = case OldCount > 1 of
+ false -> null;
+ _ -> math:sqrt(clamp_value(OldVariance / (OldCount - 1)))
+ end,
+ Agg#aggregate{
+ count=OldCount,
+ sum=Sum-Value,
+ mean=clamp_value(OldMean),
+ variance=clamp_value(OldVariance),
+ stddev=StdDev
+ }.
+
+to_json_term(Agg) ->
+ {Min, Max} = case Agg#aggregate.seconds > 0 of
+ false ->
+ {Agg#aggregate.min, Agg#aggregate.max};
+ _ ->
+ case length(Agg#aggregate.samples) > 0 of
+ true ->
+ Extract = fun({_Time, Value}) -> Value end,
+ Samples = lists:map(Extract, Agg#aggregate.samples),
+ {lists:min(Samples), lists:max(Samples)};
+ _ ->
+ {null, null}
+ end
+ end,
+ {[
+ {description, Agg#aggregate.description},
+ {current, round_value(Agg#aggregate.sum)},
+ {sum, round_value(Agg#aggregate.sum)},
+ {mean, round_value(Agg#aggregate.mean)},
+ {stddev, round_value(Agg#aggregate.stddev)},
+ {min, Min},
+ {max, Max}
+ ]}.
+
+make_key({Mod, Val}) when is_integer(Val) ->
+ {Mod, list_to_atom(integer_to_list(Val))};
+make_key(Key) ->
+ Key.
+
+round_value(Val) when not is_number(Val) ->
+ Val;
+round_value(Val) when Val == 0 ->
+ Val;
+round_value(Val) ->
+ erlang:round(Val * 1000.0) / 1000.0.
+
+clamp_value(Val) when Val > 0.00000000000001 ->
+ Val;
+clamp_value(_) ->
+ 0.0.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_stats_collector.erl
----------------------------------------------------------------------
diff --git a/src/couch_stats_collector.erl b/src/couch_stats_collector.erl
new file mode 100644
index 0000000..f7b9bb4
--- /dev/null
+++ b/src/couch_stats_collector.erl
@@ -0,0 +1,136 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% todo
+% - remove existance check on increment(), decrement() and record(). have
+% modules initialize counters on startup.
+
+-module(couch_stats_collector).
+
+-behaviour(gen_server).
+
+-export([start/0, stop/0]).
+-export([all/0, all/1, get/1, increment/1, decrement/1, record/2, clear/1]).
+-export([track_process_count/1, track_process_count/2]).
+
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-define(HIT_TABLE, stats_hit_table).
+-define(ABS_TABLE, stats_abs_table).
+
+start() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+stop() ->
+ gen_server:call(?MODULE, stop).
+
+all() ->
+ ets:tab2list(?HIT_TABLE) ++ abs_to_list().
+
+all(Type) ->
+ case Type of
+ incremental -> ets:tab2list(?HIT_TABLE);
+ absolute -> abs_to_list()
+ end.
+
+get(Key) ->
+ case ets:lookup(?HIT_TABLE, Key) of
+ [] ->
+ case ets:lookup(?ABS_TABLE, Key) of
+ [] ->
+ nil;
+ AbsVals ->
+ lists:map(fun({_, Value}) -> Value end, AbsVals)
+ end;
+ [{_, Counter}] ->
+ Counter
+ end.
+
+increment(Key) ->
+ Key2 = make_key(Key),
+ case catch ets:update_counter(?HIT_TABLE, Key2, 1) of
+ {'EXIT', {badarg, _}} ->
+ catch ets:insert(?HIT_TABLE, {Key2, 1}),
+ ok;
+ _ ->
+ ok
+ end.
+
+decrement(Key) ->
+ Key2 = make_key(Key),
+ case catch ets:update_counter(?HIT_TABLE, Key2, -1) of
+ {'EXIT', {badarg, _}} ->
+ catch ets:insert(?HIT_TABLE, {Key2, -1}),
+ ok;
+ _ -> ok
+ end.
+
+record(Key, Value) ->
+ catch ets:insert(?ABS_TABLE, {make_key(Key), Value}).
+
+clear(Key) ->
+ catch ets:delete(?ABS_TABLE, make_key(Key)).
+
+track_process_count(Stat) ->
+ track_process_count(self(), Stat).
+
+track_process_count(Pid, Stat) ->
+ MonitorFun = fun() ->
+ Ref = erlang:monitor(process, Pid),
+ receive {'DOWN', Ref, _, _, _} -> ok end,
+ couch_stats_collector:decrement(Stat)
+ end,
+ case (catch couch_stats_collector:increment(Stat)) of
+ ok -> spawn(MonitorFun);
+ _ -> ok
+ end.
+
+
+init(_) ->
+ ets:new(?HIT_TABLE, [named_table, set, public]),
+ ets:new(?ABS_TABLE, [named_table, duplicate_bag, public]),
+ {ok, nil}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+handle_call(stop, _, State) ->
+ {stop, normal, stopped, State}.
+
+handle_cast(foo, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+code_change(_OldVersion, State, _Extra) ->
+ {ok, State}.
+
+
+make_key({Module, Key}) when is_integer(Key) ->
+ {Module, list_to_atom(integer_to_list(Key))};
+make_key(Key) ->
+ Key.
+
+abs_to_list() ->
+ SortedKVs = lists:sort(ets:tab2list(?ABS_TABLE)),
+ lists:foldl(fun({Key, Val}, Acc) ->
+ case Acc of
+ [] ->
+ [{Key, [Val]}];
+ [{Key, Prev} | Rest] ->
+ [{Key, [Val | Prev]} | Rest];
+ Others ->
+ [{Key, [Val]} | Others]
+ end
+ end, [], SortedKVs).
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_stream.erl
----------------------------------------------------------------------
diff --git a/src/couch_stream.erl b/src/couch_stream.erl
new file mode 100644
index 0000000..959feef
--- /dev/null
+++ b/src/couch_stream.erl
@@ -0,0 +1,299 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_stream).
+-behaviour(gen_server).
+
+% public API
+-export([open/1, open/2, close/1]).
+-export([foldl/4, foldl/5, foldl_decode/6, range_foldl/6]).
+-export([copy_to_new_stream/3, write/2]).
+
+% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_cast/2, handle_call/3, handle_info/2]).
+
+-include("couch_db.hrl").
+
+-define(DEFAULT_BUFFER_SIZE, 4096).
+
+-record(stream,
+ {fd = 0,
+ written_pointers=[],
+ buffer_list = [],
+ buffer_len = 0,
+ max_buffer,
+ written_len = 0,
+ md5,
+ % md5 of the content without any transformation applied (e.g. compression)
+ % needed for the attachment upload integrity check (ticket 558)
+ identity_md5,
+ identity_len = 0,
+ encoding_fun,
+ end_encoding_fun
+ }).
+
+
+%%% Interface functions %%%
+
+open(Fd) ->
+ open(Fd, []).
+
+open(Fd, Options) ->
+ gen_server:start_link(couch_stream, {Fd, Options}, []).
+
+close(Pid) ->
+ gen_server:call(Pid, close, infinity).
+
+copy_to_new_stream(Fd, PosList, DestFd) ->
+ {ok, Dest} = open(DestFd),
+ foldl(Fd, PosList,
+ fun(Bin, _) ->
+ ok = write(Dest, Bin)
+ end, ok),
+ close(Dest).
+
+foldl(_Fd, [], _Fun, Acc) ->
+ Acc;
+foldl(Fd, [Pos|Rest], Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
+
+foldl(Fd, PosList, <<>>, Fun, Acc) ->
+ foldl(Fd, PosList, Fun, Acc);
+foldl(Fd, PosList, Md5, Fun, Acc) ->
+ foldl(Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc).
+
+foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) ->
+ {DecDataFun, DecEndFun} = case Enc of
+ gzip ->
+ ungzip_init();
+ identity ->
+ identity_enc_dec_funs()
+ end,
+ Result = foldl_decode(
+ DecDataFun, Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc
+ ),
+ DecEndFun(),
+ Result.
+
+foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
+ Md5 = couch_util:md5_final(Md5Acc),
+ Acc;
+foldl(Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> % 0110 UPGRADE CODE
+ foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc);
+foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, Bin)),
+ Fun(Bin, Acc);
+foldl(Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
+ foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
+foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ foldl(Fd, Rest, Md5, couch_util:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)).
+
+range_foldl(Fd, PosList, From, To, Fun, Acc) ->
+ range_foldl(Fd, PosList, From, To, 0, Fun, Acc).
+
+range_foldl(_Fd, _PosList, _From, To, Off, _Fun, Acc) when Off >= To ->
+ Acc;
+range_foldl(Fd, [Pos|Rest], From, To, Off, Fun, Acc) when is_integer(Pos) -> % old-style attachment
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ range_foldl(Fd, [{Pos, iolist_size(Bin)}] ++ Rest, From, To, Off, Fun, Acc);
+range_foldl(Fd, [{_Pos, Size}|Rest], From, To, Off, Fun, Acc) when From > Off + Size ->
+ range_foldl(Fd, Rest, From, To, Off + Size, Fun, Acc);
+range_foldl(Fd, [{Pos, Size}|Rest], From, To, Off, Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ Bin1 = if
+ From =< Off andalso To >= Off + Size -> Bin; %% the whole block is covered
+ true ->
+ PrefixLen = clip(From - Off, 0, Size),
+ PostfixLen = clip(Off + Size - To, 0, Size),
+ MatchLen = Size - PrefixLen - PostfixLen,
+ <<_Prefix:PrefixLen/binary,Match:MatchLen/binary,_Postfix:PostfixLen/binary>> = iolist_to_binary(Bin),
+ Match
+ end,
+ range_foldl(Fd, Rest, From, To, Off + Size, Fun, Fun(Bin1, Acc)).
+
+clip(Value, Lo, Hi) ->
+ if
+ Value < Lo -> Lo;
+ Value > Hi -> Hi;
+ true -> Value
+ end.
+
+foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
+ Md5 = couch_util:md5_final(Md5Acc),
+ Acc;
+foldl_decode(DecFun, Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) ->
+ foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc);
+foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
+ {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
+ Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, EncBin)),
+ Bin = DecFun(EncBin),
+ Fun(Bin, Acc);
+foldl_decode(DecFun, Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
+ foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
+foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
+ {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
+ Bin = DecFun(EncBin),
+ Md5Acc2 = couch_util:md5_update(Md5Acc, EncBin),
+ foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
+
+gzip_init(Options) ->
+ case couch_util:get_value(compression_level, Options, 0) of
+ Lvl when Lvl >= 1 andalso Lvl =< 9 ->
+ Z = zlib:open(),
+ % 15 = ?MAX_WBITS (defined in the zlib module)
+ % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
+ ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default),
+ {
+ fun(Data) ->
+ zlib:deflate(Z, Data)
+ end,
+ fun() ->
+ Last = zlib:deflate(Z, [], finish),
+ ok = zlib:deflateEnd(Z),
+ ok = zlib:close(Z),
+ Last
+ end
+ };
+ _ ->
+ identity_enc_dec_funs()
+ end.
+
+ungzip_init() ->
+ Z = zlib:open(),
+ zlib:inflateInit(Z, 16 + 15),
+ {
+ fun(Data) ->
+ zlib:inflate(Z, Data)
+ end,
+ fun() ->
+ ok = zlib:inflateEnd(Z),
+ ok = zlib:close(Z)
+ end
+ }.
+
+identity_enc_dec_funs() ->
+ {
+ fun(Data) -> Data end,
+ fun() -> [] end
+ }.
+
+write(_Pid, <<>>) ->
+ ok;
+write(Pid, Bin) ->
+ gen_server:call(Pid, {write, Bin}, infinity).
+
+
+init({Fd, Options}) ->
+ {EncodingFun, EndEncodingFun} =
+ case couch_util:get_value(encoding, Options, identity) of
+ identity ->
+ identity_enc_dec_funs();
+ gzip ->
+ gzip_init(Options)
+ end,
+ {ok, #stream{
+ fd=Fd,
+ md5=couch_util:md5_init(),
+ identity_md5=couch_util:md5_init(),
+ encoding_fun=EncodingFun,
+ end_encoding_fun=EndEncodingFun,
+ max_buffer=couch_util:get_value(
+ buffer_size, Options, ?DEFAULT_BUFFER_SIZE)
+ }
+ }.
+
+terminate(_Reason, _Stream) ->
+ ok.
+
+handle_call({write, Bin}, _From, Stream) ->
+ BinSize = iolist_size(Bin),
+ #stream{
+ fd = Fd,
+ written_len = WrittenLen,
+ written_pointers = Written,
+ buffer_len = BufferLen,
+ buffer_list = Buffer,
+ max_buffer = Max,
+ md5 = Md5,
+ identity_md5 = IdenMd5,
+ identity_len = IdenLen,
+ encoding_fun = EncodingFun} = Stream,
+ if BinSize + BufferLen > Max ->
+ WriteBin = lists:reverse(Buffer, [Bin]),
+ IdenMd5_2 = couch_util:md5_update(IdenMd5, WriteBin),
+ case EncodingFun(WriteBin) of
+ [] ->
+ % case where the encoder did some internal buffering
+ % (zlib does it for example)
+ WrittenLen2 = WrittenLen,
+ Md5_2 = Md5,
+ Written2 = Written;
+ WriteBin2 ->
+ {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
+ WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
+ Md5_2 = couch_util:md5_update(Md5, WriteBin2),
+ Written2 = [{Pos, iolist_size(WriteBin2)}|Written]
+ end,
+
+ {reply, ok, Stream#stream{
+ written_len=WrittenLen2,
+ written_pointers=Written2,
+ buffer_list=[],
+ buffer_len=0,
+ md5=Md5_2,
+ identity_md5=IdenMd5_2,
+ identity_len=IdenLen + BinSize}};
+ true ->
+ {reply, ok, Stream#stream{
+ buffer_list=[Bin|Buffer],
+ buffer_len=BufferLen + BinSize,
+ identity_len=IdenLen + BinSize}}
+ end;
+handle_call(close, _From, Stream) ->
+ #stream{
+ fd = Fd,
+ written_len = WrittenLen,
+ written_pointers = Written,
+ buffer_list = Buffer,
+ md5 = Md5,
+ identity_md5 = IdenMd5,
+ identity_len = IdenLen,
+ encoding_fun = EncodingFun,
+ end_encoding_fun = EndEncodingFun} = Stream,
+
+ WriteBin = lists:reverse(Buffer),
+ IdenMd5Final = couch_util:md5_final(couch_util:md5_update(IdenMd5, WriteBin)),
+ WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(),
+ Md5Final = couch_util:md5_final(couch_util:md5_update(Md5, WriteBin2)),
+ Result = case WriteBin2 of
+ [] ->
+ {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
+ _ ->
+ {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
+ StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]),
+ StreamLen = WrittenLen + iolist_size(WriteBin2),
+ {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
+ end,
+ {stop, normal, Result, Stream}.
+
+handle_cast(_Msg, State) ->
+ {noreply,State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_task_status.erl
----------------------------------------------------------------------
diff --git a/src/couch_task_status.erl b/src/couch_task_status.erl
new file mode 100644
index 0000000..e23b560
--- /dev/null
+++ b/src/couch_task_status.erl
@@ -0,0 +1,151 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_task_status).
+-behaviour(gen_server).
+
+% This module is used to track the status of long running tasks.
+% Long running tasks register themselves, via a call to add_task/1, and then
+% update their status properties via update/1. The status of a task is a
+% list of properties. Each property is a tuple, with the first element being
+% either an atom or a binary and the second element must be an EJSON value. When
+% a task updates its status, it can override some or all of its properties.
+% The properties {started_on, UnitTimestamp}, {updated_on, UnixTimestamp} and
+% {pid, ErlangPid} are automatically added by this module.
+% When a tracked task dies, its status will be automatically removed from
+% memory. To get the tasks list, call the all/0 function.
+
+-export([start_link/0, stop/0]).
+-export([all/0, add_task/1, update/1, get/1, set_update_frequency/1]).
+-export([is_task_added/0]).
+
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+
+-define(set(L, K, V), lists:keystore(K, 1, L, {K, V})).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+stop() ->
+ gen_server:cast(?MODULE, stop).
+
+
+all() ->
+ gen_server:call(?MODULE, all).
+
+
+add_task(Props) ->
+ put(task_status_update, {{0, 0, 0}, 0}),
+ Ts = timestamp(),
+ TaskProps = lists:ukeysort(
+ 1, [{started_on, Ts}, {updated_on, Ts} | Props]),
+ put(task_status_props, TaskProps),
+ gen_server:call(?MODULE, {add_task, TaskProps}).
+
+
+is_task_added() ->
+ is_list(erlang:get(task_status_props)).
+
+
+set_update_frequency(Msecs) ->
+ put(task_status_update, {{0, 0, 0}, Msecs * 1000}).
+
+
+update(Props) ->
+ MergeProps = lists:ukeysort(1, Props),
+ TaskProps = lists:ukeymerge(1, MergeProps, erlang:get(task_status_props)),
+ put(task_status_props, TaskProps),
+ maybe_persist(TaskProps).
+
+
+get(Props) when is_list(Props) ->
+ TaskProps = erlang:get(task_status_props),
+ [couch_util:get_value(P, TaskProps) || P <- Props];
+get(Prop) ->
+ TaskProps = erlang:get(task_status_props),
+ couch_util:get_value(Prop, TaskProps).
+
+
+maybe_persist(TaskProps0) ->
+ {LastUpdateTime, Frequency} = erlang:get(task_status_update),
+ case timer:now_diff(Now = now(), LastUpdateTime) >= Frequency of
+ true ->
+ put(task_status_update, {Now, Frequency}),
+ TaskProps = ?set(TaskProps0, updated_on, timestamp(Now)),
+ gen_server:cast(?MODULE, {update_status, self(), TaskProps});
+ false ->
+ ok
+ end.
+
+
+init([]) ->
+ % read configuration settings and register for configuration changes
+ ets:new(?MODULE, [ordered_set, protected, named_table]),
+ {ok, nil}.
+
+
+terminate(_Reason,_State) ->
+ ok.
+
+
+handle_call({add_task, TaskProps}, {From, _}, Server) ->
+ case ets:lookup(?MODULE, From) of
+ [] ->
+ true = ets:insert(?MODULE, {From, TaskProps}),
+ erlang:monitor(process, From),
+ {reply, ok, Server};
+ [_] ->
+ {reply, {add_task_error, already_registered}, Server}
+ end;
+handle_call(all, _, Server) ->
+ All = [
+ [{pid, ?l2b(pid_to_list(Pid))} | TaskProps]
+ ||
+ {Pid, TaskProps} <- ets:tab2list(?MODULE)
+ ],
+ {reply, All, Server}.
+
+
+handle_cast({update_status, Pid, NewProps}, Server) ->
+ case ets:lookup(?MODULE, Pid) of
+ [{Pid, _CurProps}] ->
+ ?LOG_DEBUG("New task status for ~p: ~p", [Pid, NewProps]),
+ true = ets:insert(?MODULE, {Pid, NewProps});
+ _ ->
+ % Task finished/died in the meanwhile and we must have received
+ % a monitor message before this call - ignore.
+ ok
+ end,
+ {noreply, Server};
+handle_cast(stop, State) ->
+ {stop, normal, State}.
+
+handle_info({'DOWN', _MonitorRef, _Type, Pid, _Info}, Server) ->
+ %% should we also erlang:demonitor(_MonitorRef), ?
+ ets:delete(?MODULE, Pid),
+ {noreply, Server}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+timestamp() ->
+ timestamp(now()).
+
+timestamp({Mega, Secs, _}) ->
+ Mega * 1000000 + Secs.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_users_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_users_db.erl b/src/couch_users_db.erl
new file mode 100644
index 0000000..9b875ba
--- /dev/null
+++ b/src/couch_users_db.erl
@@ -0,0 +1,121 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_users_db).
+
+-export([before_doc_update/2, after_doc_read/2, strip_non_public_fields/1]).
+
+-include("couch_db.hrl").
+
+-define(NAME, <<"name">>).
+-define(PASSWORD, <<"password">>).
+-define(DERIVED_KEY, <<"derived_key">>).
+-define(PASSWORD_SCHEME, <<"password_scheme">>).
+-define(PBKDF2, <<"pbkdf2">>).
+-define(ITERATIONS, <<"iterations">>).
+-define(SALT, <<"salt">>).
+-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
+
+% If the request's userCtx identifies an admin
+% -> save_doc (see below)
+%
+% If the request's userCtx.name is null:
+% -> save_doc
+% // this is an anonymous user registering a new document
+% // in case a user doc with the same id already exists, the anonymous
+% // user will get a regular doc update conflict.
+% If the request's userCtx.name doesn't match the doc's name
+% -> 404 // Not Found
+% Else
+% -> save_doc
+before_doc_update(Doc, #db{user_ctx = UserCtx} = Db) ->
+ #user_ctx{name=Name} = UserCtx,
+ DocName = get_doc_name(Doc),
+ case (catch couch_db:check_is_admin(Db)) of
+ ok ->
+ save_doc(Doc);
+ _ when Name =:= DocName orelse Name =:= null ->
+ save_doc(Doc);
+ _ ->
+ throw(not_found)
+ end.
+
+% If newDoc.password == null || newDoc.password == undefined:
+% ->
+% noop
+% Else -> // calculate password hash server side
+% newDoc.password_sha = hash_pw(newDoc.password + salt)
+% newDoc.salt = salt
+% newDoc.password = null
+save_doc(#doc{body={Body}} = Doc) ->
+ case couch_util:get_value(?PASSWORD, Body) of
+ null -> % server admins don't have a user-db password entry
+ Doc;
+ undefined ->
+ Doc;
+ ClearPassword ->
+ Iterations = list_to_integer(couch_config:get("couch_httpd_auth", "iterations", "1000")),
+ Salt = couch_uuids:random(),
+ DerivedKey = couch_passwords:pbkdf2(ClearPassword, Salt, Iterations),
+ Body0 = [{?PASSWORD_SCHEME, ?PBKDF2}, {?ITERATIONS, Iterations}|Body],
+ Body1 = ?replace(Body0, ?DERIVED_KEY, DerivedKey),
+ Body2 = ?replace(Body1, ?SALT, Salt),
+ Body3 = proplists:delete(?PASSWORD, Body2),
+ Doc#doc{body={Body3}}
+ end.
+
+% If the doc is a design doc
+% If the request's userCtx identifies an admin
+% -> return doc
+% Else
+% -> 403 // Forbidden
+% If the request's userCtx identifies an admin
+% -> return doc
+% If the request's userCtx.name doesn't match the doc's name
+% -> 404 // Not Found
+% Else
+% -> return doc
+after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, Db) ->
+ case (catch couch_db:check_is_admin(Db)) of
+ ok ->
+ Doc;
+ _ ->
+ throw({forbidden,
+ <<"Only administrators can view design docs in the users database.">>})
+ end;
+after_doc_read(Doc, #db{user_ctx = UserCtx} = Db) ->
+ #user_ctx{name=Name} = UserCtx,
+ DocName = get_doc_name(Doc),
+ case (catch couch_db:check_is_admin(Db)) of
+ ok ->
+ Doc;
+ _ when Name =:= DocName ->
+ Doc;
+ _ ->
+ Doc1 = strip_non_public_fields(Doc),
+ case Doc1 of
+ #doc{body={[]}} ->
+ throw(not_found);
+ _ ->
+ Doc1
+ end
+ end.
+
+get_doc_name(#doc{id= <<"org.couchdb.user:", Name/binary>>}) ->
+ Name;
+get_doc_name(_) ->
+ undefined.
+
+strip_non_public_fields(#doc{body={Props}}=Doc) ->
+ Public = re:split(couch_config:get("couch_httpd_auth", "public_fields", ""),
+ "\\s*,\\s*", [{return, binary}]),
+ Doc#doc{body={[{K, V} || {K, V} <- Props, lists:member(K, Public)]}}.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_util.erl
----------------------------------------------------------------------
diff --git a/src/couch_util.erl b/src/couch_util.erl
new file mode 100644
index 0000000..76a9293
--- /dev/null
+++ b/src/couch_util.erl
@@ -0,0 +1,487 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_util).
+
+-export([start_app_deps/1, ensure_started/1]).
+-export([priv_dir/0, normpath/1]).
+-export([should_flush/0, should_flush/1, to_existing_atom/1]).
+-export([rand32/0, implode/2]).
+-export([abs_pathname/1,abs_pathname/2, trim/1]).
+-export([encodeBase64Url/1, decodeBase64Url/1]).
+-export([validate_utf8/1, to_hex/1, parse_term/1, dict_find/3]).
+-export([get_nested_json_value/2, json_user_ctx/1]).
+-export([proplist_apply_field/2, json_apply_field/2]).
+-export([json_decode/1]).
+-export([to_binary/1, to_integer/1, to_list/1, url_encode/1]).
+-export([verify/2,simple_call/2,shutdown_sync/1]).
+-export([get_value/2, get_value/3]).
+-export([md5/1, md5_init/0, md5_update/2, md5_final/1]).
+-export([reorder_results/2]).
+-export([url_strip_password/1]).
+-export([encode_doc_id/1]).
+-export([with_db/2]).
+-export([rfc1123_date/0, rfc1123_date/1]).
+
+-include("couch_db.hrl").
+
+% arbitrarily chosen amount of memory to use before flushing to disk
+-define(FLUSH_MAX_MEM, 10000000).
+
+%% @spec start_app_deps(App :: atom()) -> ok
+%% @doc Start depedent applications of App.
+start_app_deps(App) ->
+ {ok, DepApps} = application:get_key(App, applications),
+ [ensure_started(A) || A <- DepApps],
+ ok.
+
+%% @spec ensure_started(Application :: atom()) -> ok
+%% @doc Start the named application if not already started.
+ensure_started(App) ->
+ case application:start(App) of
+ ok ->
+ ok;
+ {error, {already_started, App}} ->
+ ok
+ end.
+
+priv_dir() ->
+ case code:priv_dir(couch) of
+ {error, _} ->
+ %% try to get relative priv dir. useful for tests.
+ EbinDir = filename:dirname(code:which(?MODULE)),
+ AppPath = filename:dirname(EbinDir),
+ filename:join(AppPath, "priv");
+ Dir -> Dir
+ end.
+
+% Normalize a pathname by removing .. and . components.
+normpath(Path) ->
+ normparts(filename:split(Path), []).
+
+normparts([], Acc) ->
+ filename:join(lists:reverse(Acc));
+normparts([".." | RestParts], [_Drop | RestAcc]) ->
+ normparts(RestParts, RestAcc);
+normparts(["." | RestParts], Acc) ->
+ normparts(RestParts, Acc);
+normparts([Part | RestParts], Acc) ->
+ normparts(RestParts, [Part | Acc]).
+
+% works like list_to_existing_atom, except can be list or binary and it
+% gives you the original value instead of an error if no existing atom.
+to_existing_atom(V) when is_list(V) ->
+ try list_to_existing_atom(V) catch _:_ -> V end;
+to_existing_atom(V) when is_binary(V) ->
+ try list_to_existing_atom(?b2l(V)) catch _:_ -> V end;
+to_existing_atom(V) when is_atom(V) ->
+ V.
+
+shutdown_sync(Pid) when not is_pid(Pid)->
+ ok;
+shutdown_sync(Pid) ->
+ MRef = erlang:monitor(process, Pid),
+ try
+ catch unlink(Pid),
+ catch exit(Pid, shutdown),
+ receive
+ {'DOWN', MRef, _, _, _} ->
+ ok
+ end
+ after
+ erlang:demonitor(MRef, [flush])
+ end.
+
+
+simple_call(Pid, Message) ->
+ MRef = erlang:monitor(process, Pid),
+ try
+ Pid ! {self(), Message},
+ receive
+ {Pid, Result} ->
+ Result;
+ {'DOWN', MRef, _, _, Reason} ->
+ exit(Reason)
+ end
+ after
+ erlang:demonitor(MRef, [flush])
+ end.
+
+validate_utf8(Data) when is_list(Data) ->
+ validate_utf8(?l2b(Data));
+validate_utf8(Bin) when is_binary(Bin) ->
+ validate_utf8_fast(Bin, 0).
+
+validate_utf8_fast(B, O) ->
+ case B of
+ <<_:O/binary>> ->
+ true;
+ <<_:O/binary, C1, _/binary>> when
+ C1 < 128 ->
+ validate_utf8_fast(B, 1 + O);
+ <<_:O/binary, C1, C2, _/binary>> when
+ C1 >= 194, C1 =< 223,
+ C2 >= 128, C2 =< 191 ->
+ validate_utf8_fast(B, 2 + O);
+ <<_:O/binary, C1, C2, C3, _/binary>> when
+ C1 >= 224, C1 =< 239,
+ C2 >= 128, C2 =< 191,
+ C3 >= 128, C3 =< 191 ->
+ validate_utf8_fast(B, 3 + O);
+ <<_:O/binary, C1, C2, C3, C4, _/binary>> when
+ C1 >= 240, C1 =< 244,
+ C2 >= 128, C2 =< 191,
+ C3 >= 128, C3 =< 191,
+ C4 >= 128, C4 =< 191 ->
+ validate_utf8_fast(B, 4 + O);
+ _ ->
+ false
+ end.
+
+to_hex([]) ->
+ [];
+to_hex(Bin) when is_binary(Bin) ->
+ to_hex(binary_to_list(Bin));
+to_hex([H|T]) ->
+ [to_digit(H div 16), to_digit(H rem 16) | to_hex(T)].
+
+to_digit(N) when N < 10 -> $0 + N;
+to_digit(N) -> $a + N-10.
+
+
+parse_term(Bin) when is_binary(Bin) ->
+ parse_term(binary_to_list(Bin));
+parse_term(List) ->
+ {ok, Tokens, _} = erl_scan:string(List ++ "."),
+ erl_parse:parse_term(Tokens).
+
+get_value(Key, List) ->
+ get_value(Key, List, undefined).
+
+get_value(Key, List, Default) ->
+ case lists:keysearch(Key, 1, List) of
+ {value, {Key,Value}} ->
+ Value;
+ false ->
+ Default
+ end.
+
+get_nested_json_value({Props}, [Key|Keys]) ->
+ case couch_util:get_value(Key, Props, nil) of
+ nil -> throw({not_found, <<"missing json key: ", Key/binary>>});
+ Value -> get_nested_json_value(Value, Keys)
+ end;
+get_nested_json_value(Value, []) ->
+ Value;
+get_nested_json_value(_NotJSONObj, _) ->
+ throw({not_found, json_mismatch}).
+
+proplist_apply_field(H, L) ->
+ {R} = json_apply_field(H, {L}),
+ R.
+
+json_apply_field(H, {L}) ->
+ json_apply_field(H, L, []).
+json_apply_field({Key, NewValue}, [{Key, _OldVal} | Headers], Acc) ->
+ json_apply_field({Key, NewValue}, Headers, Acc);
+json_apply_field({Key, NewValue}, [{OtherKey, OtherVal} | Headers], Acc) ->
+ json_apply_field({Key, NewValue}, Headers, [{OtherKey, OtherVal} | Acc]);
+json_apply_field({Key, NewValue}, [], Acc) ->
+ {[{Key, NewValue}|Acc]}.
+
+json_user_ctx(#db{name=DbName, user_ctx=Ctx}) ->
+ {[{<<"db">>, DbName},
+ {<<"name">>,Ctx#user_ctx.name},
+ {<<"roles">>,Ctx#user_ctx.roles}]}.
+
+json_decode(D) ->
+ try
+ jiffy:decode(D)
+ catch
+ throw:Error ->
+ throw({invalid_json, Error})
+ end.
+
+% returns a random integer
+rand32() ->
+ crypto:rand_uniform(0, 16#100000000).
+
+% given a pathname "../foo/bar/" it gives back the fully qualified
+% absolute pathname.
+abs_pathname(" " ++ Filename) ->
+ % strip leading whitspace
+ abs_pathname(Filename);
+abs_pathname([$/ |_]=Filename) ->
+ Filename;
+abs_pathname(Filename) ->
+ {ok, Cwd} = file:get_cwd(),
+ {Filename2, Args} = separate_cmd_args(Filename, ""),
+ abs_pathname(Filename2, Cwd) ++ Args.
+
+abs_pathname(Filename, Dir) ->
+ Name = filename:absname(Filename, Dir ++ "/"),
+ OutFilename = filename:join(fix_path_list(filename:split(Name), [])),
+ % If the filename is a dir (last char slash, put back end slash
+ case string:right(Filename,1) of
+ "/" ->
+ OutFilename ++ "/";
+ "\\" ->
+ OutFilename ++ "/";
+ _Else->
+ OutFilename
+ end.
+
+% if this as an executable with arguments, seperate out the arguments
+% ""./foo\ bar.sh -baz=blah" -> {"./foo\ bar.sh", " -baz=blah"}
+separate_cmd_args("", CmdAcc) ->
+ {lists:reverse(CmdAcc), ""};
+separate_cmd_args("\\ " ++ Rest, CmdAcc) -> % handle skipped value
+ separate_cmd_args(Rest, " \\" ++ CmdAcc);
+separate_cmd_args(" " ++ Rest, CmdAcc) ->
+ {lists:reverse(CmdAcc), " " ++ Rest};
+separate_cmd_args([Char|Rest], CmdAcc) ->
+ separate_cmd_args(Rest, [Char | CmdAcc]).
+
+% Is a character whitespace?
+is_whitespace($\s) -> true;
+is_whitespace($\t) -> true;
+is_whitespace($\n) -> true;
+is_whitespace($\r) -> true;
+is_whitespace(_Else) -> false.
+
+
+% removes leading and trailing whitespace from a string
+trim(String) ->
+ String2 = lists:dropwhile(fun is_whitespace/1, String),
+ lists:reverse(lists:dropwhile(fun is_whitespace/1, lists:reverse(String2))).
+
+% takes a heirarchical list of dirs and removes the dots ".", double dots
+% ".." and the corresponding parent dirs.
+fix_path_list([], Acc) ->
+ lists:reverse(Acc);
+fix_path_list([".."|Rest], [_PrevAcc|RestAcc]) ->
+ fix_path_list(Rest, RestAcc);
+fix_path_list(["."|Rest], Acc) ->
+ fix_path_list(Rest, Acc);
+fix_path_list([Dir | Rest], Acc) ->
+ fix_path_list(Rest, [Dir | Acc]).
+
+
+implode(List, Sep) ->
+ implode(List, Sep, []).
+
+implode([], _Sep, Acc) ->
+ lists:flatten(lists:reverse(Acc));
+implode([H], Sep, Acc) ->
+ implode([], Sep, [H|Acc]);
+implode([H|T], Sep, Acc) ->
+ implode(T, Sep, [Sep,H|Acc]).
+
+should_flush() ->
+ should_flush(?FLUSH_MAX_MEM).
+
+should_flush(MemThreshHold) ->
+ {memory, ProcMem} = process_info(self(), memory),
+ BinMem = lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
+ 0, element(2,process_info(self(), binary))),
+ if ProcMem+BinMem > 2*MemThreshHold ->
+ garbage_collect(),
+ {memory, ProcMem2} = process_info(self(), memory),
+ BinMem2 = lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
+ 0, element(2,process_info(self(), binary))),
+ ProcMem2+BinMem2 > MemThreshHold;
+ true -> false end.
+
+encodeBase64Url(Url) ->
+ Url1 = re:replace(base64:encode(Url), ["=+", $$], ""),
+ Url2 = re:replace(Url1, "/", "_", [global]),
+ re:replace(Url2, "\\+", "-", [global, {return, binary}]).
+
+decodeBase64Url(Url64) ->
+ Url1 = re:replace(Url64, "-", "+", [global]),
+ Url2 = re:replace(Url1, "_", "/", [global]),
+ Padding = lists:duplicate((4 - iolist_size(Url2) rem 4) rem 4, $=),
+ base64:decode(iolist_to_binary([Url2, Padding])).
+
+dict_find(Key, Dict, DefaultValue) ->
+ case dict:find(Key, Dict) of
+ {ok, Value} ->
+ Value;
+ error ->
+ DefaultValue
+ end.
+
+to_binary(V) when is_binary(V) ->
+ V;
+to_binary(V) when is_list(V) ->
+ try
+ list_to_binary(V)
+ catch
+ _:_ ->
+ list_to_binary(io_lib:format("~p", [V]))
+ end;
+to_binary(V) when is_atom(V) ->
+ list_to_binary(atom_to_list(V));
+to_binary(V) ->
+ list_to_binary(io_lib:format("~p", [V])).
+
+to_integer(V) when is_integer(V) ->
+ V;
+to_integer(V) when is_list(V) ->
+ erlang:list_to_integer(V);
+to_integer(V) when is_binary(V) ->
+ erlang:list_to_integer(binary_to_list(V)).
+
+to_list(V) when is_list(V) ->
+ V;
+to_list(V) when is_binary(V) ->
+ binary_to_list(V);
+to_list(V) when is_atom(V) ->
+ atom_to_list(V);
+to_list(V) ->
+ lists:flatten(io_lib:format("~p", [V])).
+
+url_encode(Bin) when is_binary(Bin) ->
+ url_encode(binary_to_list(Bin));
+url_encode([H|T]) ->
+ if
+ H >= $a, $z >= H ->
+ [H|url_encode(T)];
+ H >= $A, $Z >= H ->
+ [H|url_encode(T)];
+ H >= $0, $9 >= H ->
+ [H|url_encode(T)];
+ H == $_; H == $.; H == $-; H == $: ->
+ [H|url_encode(T)];
+ true ->
+ case lists:flatten(io_lib:format("~.16.0B", [H])) of
+ [X, Y] ->
+ [$%, X, Y | url_encode(T)];
+ [X] ->
+ [$%, $0, X | url_encode(T)]
+ end
+ end;
+url_encode([]) ->
+ [].
+
+verify([X|RestX], [Y|RestY], Result) ->
+ verify(RestX, RestY, (X bxor Y) bor Result);
+verify([], [], Result) ->
+ Result == 0.
+
+verify(<<X/binary>>, <<Y/binary>>) ->
+ verify(?b2l(X), ?b2l(Y));
+verify(X, Y) when is_list(X) and is_list(Y) ->
+ case length(X) == length(Y) of
+ true ->
+ verify(X, Y, 0);
+ false ->
+ false
+ end;
+verify(_X, _Y) -> false.
+
+-spec md5(Data::(iolist() | binary())) -> Digest::binary().
+md5(Data) ->
+ try crypto:md5(Data) catch error:_ -> erlang:md5(Data) end.
+
+-spec md5_init() -> Context::binary().
+md5_init() ->
+ try crypto:md5_init() catch error:_ -> erlang:md5_init() end.
+
+-spec md5_update(Context::binary(), Data::(iolist() | binary())) ->
+ NewContext::binary().
+md5_update(Ctx, D) ->
+ try crypto:md5_update(Ctx,D) catch error:_ -> erlang:md5_update(Ctx,D) end.
+
+-spec md5_final(Context::binary()) -> Digest::binary().
+md5_final(Ctx) ->
+ try crypto:md5_final(Ctx) catch error:_ -> erlang:md5_final(Ctx) end.
+
+% linear search is faster for small lists, length() is 0.5 ms for 100k list
+reorder_results(Keys, SortedResults) when length(Keys) < 100 ->
+ [couch_util:get_value(Key, SortedResults) || Key <- Keys];
+reorder_results(Keys, SortedResults) ->
+ KeyDict = dict:from_list(SortedResults),
+ [dict:fetch(Key, KeyDict) || Key <- Keys].
+
+url_strip_password(Url) ->
+ re:replace(Url,
+ "http(s)?://([^:]+):[^@]+@(.*)$",
+ "http\\1://\\2:*****@\\3",
+ [{return, list}]).
+
+encode_doc_id(#doc{id = Id}) ->
+ encode_doc_id(Id);
+encode_doc_id(Id) when is_list(Id) ->
+ encode_doc_id(?l2b(Id));
+encode_doc_id(<<"_design/", Rest/binary>>) ->
+ "_design/" ++ url_encode(Rest);
+encode_doc_id(<<"_local/", Rest/binary>>) ->
+ "_local/" ++ url_encode(Rest);
+encode_doc_id(Id) ->
+ url_encode(Id).
+
+
+with_db(Db, Fun) when is_record(Db, db) ->
+ Fun(Db);
+with_db(DbName, Fun) ->
+ case couch_db:open_int(DbName, [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}]) of
+ {ok, Db} ->
+ try
+ Fun(Db)
+ after
+ catch couch_db:close(Db)
+ end;
+ Else ->
+ throw(Else)
+ end.
+
+rfc1123_date() ->
+ {{YYYY,MM,DD},{Hour,Min,Sec}} = calendar:universal_time(),
+ DayNumber = calendar:day_of_the_week({YYYY,MM,DD}),
+ lists:flatten(
+ io_lib:format("~s, ~2.2.0w ~3.s ~4.4.0w ~2.2.0w:~2.2.0w:~2.2.0w GMT",
+ [day(DayNumber),DD,month(MM),YYYY,Hour,Min,Sec])).
+
+rfc1123_date(undefined) ->
+ undefined;
+rfc1123_date(UniversalTime) ->
+ {{YYYY,MM,DD},{Hour,Min,Sec}} = UniversalTime,
+ DayNumber = calendar:day_of_the_week({YYYY,MM,DD}),
+ lists:flatten(
+ io_lib:format("~s, ~2.2.0w ~3.s ~4.4.0w ~2.2.0w:~2.2.0w:~2.2.0w GMT",
+ [day(DayNumber),DD,month(MM),YYYY,Hour,Min,Sec])).
+
+%% day
+
+day(1) -> "Mon";
+day(2) -> "Tue";
+day(3) -> "Wed";
+day(4) -> "Thu";
+day(5) -> "Fri";
+day(6) -> "Sat";
+day(7) -> "Sun".
+
+%% month
+
+month(1) -> "Jan";
+month(2) -> "Feb";
+month(3) -> "Mar";
+month(4) -> "Apr";
+month(5) -> "May";
+month(6) -> "Jun";
+month(7) -> "Jul";
+month(8) -> "Aug";
+month(9) -> "Sep";
+month(10) -> "Oct";
+month(11) -> "Nov";
+month(12) -> "Dec".
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_uuids.erl
----------------------------------------------------------------------
diff --git a/src/couch_uuids.erl b/src/couch_uuids.erl
new file mode 100644
index 0000000..6ed75a1
--- /dev/null
+++ b/src/couch_uuids.erl
@@ -0,0 +1,103 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(couch_uuids).
+-include("couch_db.hrl").
+
+-behaviour(gen_server).
+
+-export([start/0, stop/0]).
+-export([new/0, random/0, utc_random/0]).
+
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+start() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+stop() ->
+ gen_server:cast(?MODULE, stop).
+
+new() ->
+ gen_server:call(?MODULE, create).
+
+random() ->
+ list_to_binary(couch_util:to_hex(crypto:rand_bytes(16))).
+
+utc_random() ->
+ utc_suffix(couch_util:to_hex(crypto:rand_bytes(9))).
+
+utc_suffix(Suffix) ->
+ Now = {_, _, Micro} = now(),
+ Nowish = calendar:now_to_universal_time(Now),
+ Nowsecs = calendar:datetime_to_gregorian_seconds(Nowish),
+ Then = calendar:datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}),
+ Prefix = io_lib:format("~14.16.0b", [(Nowsecs - Then) * 1000000 + Micro]),
+ list_to_binary(Prefix ++ Suffix).
+
+init([]) ->
+ ok = couch_config:register(
+ fun("uuids", _) -> gen_server:cast(?MODULE, change) end
+ ),
+ {ok, state()}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+handle_call(create, _From, random) ->
+ {reply, random(), random};
+handle_call(create, _From, utc_random) ->
+ {reply, utc_random(), utc_random};
+handle_call(create, _From, {utc_id, UtcIdSuffix}) ->
+ {reply, utc_suffix(UtcIdSuffix), {utc_id, UtcIdSuffix}};
+handle_call(create, _From, {sequential, Pref, Seq}) ->
+ Result = ?l2b(Pref ++ io_lib:format("~6.16.0b", [Seq])),
+ case Seq >= 16#fff000 of
+ true ->
+ {reply, Result, {sequential, new_prefix(), inc()}};
+ _ ->
+ {reply, Result, {sequential, Pref, Seq + inc()}}
+ end.
+
+handle_cast(change, _State) ->
+ {noreply, state()};
+handle_cast(stop, State) ->
+ {stop, normal, State};
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+new_prefix() ->
+ couch_util:to_hex((crypto:rand_bytes(13))).
+
+inc() ->
+ crypto:rand_uniform(1, 16#ffe).
+
+state() ->
+ AlgoStr = couch_config:get("uuids", "algorithm", "random"),
+ case couch_util:to_existing_atom(AlgoStr) of
+ random ->
+ random;
+ utc_random ->
+ utc_random;
+ utc_id ->
+ UtcIdSuffix = couch_config:get("uuids", "utc_id_suffix", ""),
+ {utc_id, UtcIdSuffix};
+ sequential ->
+ {sequential, new_prefix(), inc()};
+ Unknown ->
+ throw({unknown_uuid_algorithm, Unknown})
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a6816bff/src/couch_work_queue.erl
----------------------------------------------------------------------
diff --git a/src/couch_work_queue.erl b/src/couch_work_queue.erl
new file mode 100644
index 0000000..22968d7
--- /dev/null
+++ b/src/couch_work_queue.erl
@@ -0,0 +1,187 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_work_queue).
+-behaviour(gen_server).
+
+-include("couch_db.hrl").
+
+% public API
+-export([new/1, queue/2, dequeue/1, dequeue/2, close/1, item_count/1, size/1]).
+
+% gen_server callbacks
+-export([init/1, terminate/2]).
+-export([handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
+
+-record(q, {
+ queue = queue:new(),
+ blocked = [],
+ max_size,
+ max_items,
+ items = 0,
+ size = 0,
+ work_waiters = [],
+ close_on_dequeue = false,
+ multi_workers = false
+}).
+
+
+new(Options) ->
+ gen_server:start_link(couch_work_queue, Options, []).
+
+
+queue(Wq, Item) when is_binary(Item) ->
+ gen_server:call(Wq, {queue, Item, byte_size(Item)}, infinity);
+queue(Wq, Item) ->
+ gen_server:call(Wq, {queue, Item, ?term_size(Item)}, infinity).
+
+
+dequeue(Wq) ->
+ dequeue(Wq, all).
+
+
+dequeue(Wq, MaxItems) ->
+ try
+ gen_server:call(Wq, {dequeue, MaxItems}, infinity)
+ catch
+ _:_ -> closed
+ end.
+
+
+item_count(Wq) ->
+ try
+ gen_server:call(Wq, item_count, infinity)
+ catch
+ _:_ -> closed
+ end.
+
+
+size(Wq) ->
+ try
+ gen_server:call(Wq, size, infinity)
+ catch
+ _:_ -> closed
+ end.
+
+
+close(Wq) ->
+ gen_server:cast(Wq, close).
+
+
+init(Options) ->
+ Q = #q{
+ max_size = couch_util:get_value(max_size, Options, nil),
+ max_items = couch_util:get_value(max_items, Options, nil),
+ multi_workers = couch_util:get_value(multi_workers, Options, false)
+ },
+ {ok, Q}.
+
+
+terminate(_Reason, #q{work_waiters=Workers}) ->
+ lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers).
+
+
+handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) ->
+ Q = Q0#q{size = Q0#q.size + Size,
+ items = Q0#q.items + 1,
+ queue = queue:in({Item, Size}, Q0#q.queue)},
+ case (Q#q.size >= Q#q.max_size) orelse
+ (Q#q.items >= Q#q.max_items) of
+ true ->
+ {noreply, Q#q{blocked = [From | Q#q.blocked]}};
+ false ->
+ {reply, ok, Q}
+ end;
+
+handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) ->
+ gen_server:reply(W, {ok, [Item]}),
+ {reply, ok, Q#q{work_waiters = Rest}};
+
+handle_call({dequeue, Max}, From, Q) ->
+ #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q,
+ case {Workers, Multi} of
+ {[_ | _], false} ->
+ exit("Only one caller allowed to wait for this work at a time");
+ {[_ | _], true} ->
+ {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}};
+ _ ->
+ case Count of
+ 0 ->
+ {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}};
+ C when C > 0 ->
+ deliver_queue_items(Max, Q)
+ end
+ end;
+
+handle_call(item_count, _From, Q) ->
+ {reply, Q#q.items, Q};
+
+handle_call(size, _From, Q) ->
+ {reply, Q#q.size, Q}.
+
+
+deliver_queue_items(Max, Q) ->
+ #q{
+ queue = Queue,
+ items = Count,
+ size = Size,
+ close_on_dequeue = Close,
+ blocked = Blocked
+ } = Q,
+ case (Max =:= all) orelse (Max >= Count) of
+ false ->
+ {Items, Size2, Queue2, Blocked2} = dequeue_items(
+ Max, Size, Queue, Blocked, []),
+ Q2 = Q#q{
+ items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2
+ },
+ {reply, {ok, Items}, Q2};
+ true ->
+ lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked),
+ Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()},
+ Items = [Item || {Item, _} <- queue:to_list(Queue)],
+ case Close of
+ false ->
+ {reply, {ok, Items}, Q2};
+ true ->
+ {stop, normal, {ok, Items}, Q2}
+ end
+ end.
+
+
+dequeue_items(0, Size, Queue, Blocked, DequeuedAcc) ->
+ {lists:reverse(DequeuedAcc), Size, Queue, Blocked};
+
+dequeue_items(NumItems, Size, Queue, Blocked, DequeuedAcc) ->
+ {{value, {Item, ItemSize}}, Queue2} = queue:out(Queue),
+ case Blocked of
+ [] ->
+ Blocked2 = Blocked;
+ [From | Blocked2] ->
+ gen_server:reply(From, ok)
+ end,
+ dequeue_items(
+ NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc]).
+
+
+handle_cast(close, #q{items = 0} = Q) ->
+ {stop, normal, Q};
+
+handle_cast(close, Q) ->
+ {noreply, Q#q{close_on_dequeue = true}}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(X, Q) ->
+ {stop, X, Q}.