You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 14:06:03 UTC
[02/11] git commit: Add the initial version of the global_changes app.
Add the initial version of the global_changes app.
BugzID: 17681
Project: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/commit/679b1345
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/tree/679b1345
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/diff/679b1345
Branch: refs/heads/windsor-merge
Commit: 679b1345db0f1d6df297b5c21df67eb7ebef42cd
Parents: fed1844
Author: Benjamin Bastian <be...@gmail.com>
Authored: Thu Sep 5 18:33:38 2013 -0700
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 13:01:54 2014 +0100
----------------------------------------------------------------------
.gitignore | 2 +
README.md | 27 ++++
src/global_changes.app.src | 18 +++
src/global_changes_app.erl | 18 +++
src/global_changes_config_listener.erl | 94 +++++++++++++
src/global_changes_httpd.erl | 203 ++++++++++++++++++++++++++
src/global_changes_listener.erl | 148 +++++++++++++++++++
src/global_changes_server.erl | 211 ++++++++++++++++++++++++++++
src/global_changes_sup.erl | 35 +++++
src/global_changes_util.erl | 17 +++
10 files changed, 773 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/679b1345/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..e1b16d5
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+.eunit/
+ebin/
http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/679b1345/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index e69de29..f22ee2c 100644
--- a/README.md
+++ b/README.md
@@ -0,0 +1,27 @@
+### global\_changes
+
+This app supplies the functionality for the `/_db_updates` endpoint.
+
+When a database is created, deleted, or updated, a corresponding event will be persisted to disk (Note: This was designed without the guarantee that a DB event will be persisted or ever occur in the `_db_updates` feed. It probably will, but it isn't guaranteed). Users can subscribe to a `_changes`-like feed of these database events by querying the `_db_updates` endpoint.
+
+When an admin user queries the `/_db_updates` endpoint, they will see the account name associated with the DB update as well as update
+
+### Captured Metrics
+
+1: `global_changes`, `db_writes`: The number of doc updates caused by global\_changes.
+
+2: `global_changes`, `server_pending_updates`: The number of documents aggregated into the pending write batch.
+
+3: `global_changes`, `listener_pending_updates`: The number of documents aggregated into the pending event batch.
+
+4: `global_changes`, `event_doc_conflict`: The number of rev tree branches in event docs encountered by global\_changes. Should never happen.
+
+5: `global_changes`, `rpcs`: The number of non-fabric RPCs caused by global\_changes.
+
+### Important Configs
+
+1: `global_changes`, `max_event_delay`: (integer, milliseconds) The total timed added before an event is forwarded to the writer.
+
+2: `global_changes`, `max_write_delay`: (integer, milliseconds) The time added before an event is sent to disk.
+
+3: `global_changes`, `update_db`: (true/false) A flag setting whether to update the global\_changes database. If false, changes will be lost and there will be no performance impact of global\_changes on the cluster.
http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/679b1345/src/global_changes.app.src
----------------------------------------------------------------------
diff --git a/src/global_changes.app.src b/src/global_changes.app.src
new file mode 100644
index 0000000..8586d7c
--- /dev/null
+++ b/src/global_changes.app.src
@@ -0,0 +1,18 @@
+{application, global_changes, [
+ {description, "_changes-like feeds for multiple DBs"},
+ {vsn, git},
+ {registered, [global_changes_config_listener, global_changes_server]},
+ {applications, [
+ kernel,
+ stdlib,
+ config,
+ couch_log,
+ couch,
+ mem3,
+ fabric
+ ]},
+ {mod, {global_changes_app, []}},
+ {env, [
+ {dbname, <<"global_changes">>}
+ ]}
+]}.
http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/679b1345/src/global_changes_app.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_app.erl b/src/global_changes_app.erl
new file mode 100644
index 0000000..a722155
--- /dev/null
+++ b/src/global_changes_app.erl
@@ -0,0 +1,18 @@
+%% Copyright 2013 Cloudant
+
+-module(global_changes_app).
+-behavior(application).
+
+
+-export([
+ start/2,
+ stop/1
+]).
+
+
+start(_StartType, _StartArgs) ->
+ global_changes_sup:start_link().
+
+
+stop(_State) ->
+ ok.
http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/679b1345/src/global_changes_config_listener.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_config_listener.erl b/src/global_changes_config_listener.erl
new file mode 100644
index 0000000..58f69bb
--- /dev/null
+++ b/src/global_changes_config_listener.erl
@@ -0,0 +1,94 @@
+% Copyright 2013 Cloudant. All rights reserved.
+
+-module(global_changes_config_listener).
+-behavior(gen_server).
+-behavior(config_listener).
+
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+-export([
+ handle_config_change/5
+]).
+
+
+-define(LISTENER, global_changes_listener).
+-define(SERVER, global_changes_server).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init([]) ->
+ ok = config:listen_for_changes(?MODULE, nil),
+ {ok, nil}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {invalid_cast, Msg}, St}.
+
+
+handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, St) ->
+ erlang:send_after(5000, self(), restart_config_listener),
+ {noreply, St};
+handle_info(restart_config_listener, St) ->
+ ok = config:listen_for_changes(?MODULE, St),
+ {noreply, St};
+handle_info(_Msg, St) ->
+ {noreply, St}.
+
+
+code_change(_, St, _) ->
+ {ok, St}.
+
+
+handle_config_change("global_changes", "max_event_delay", MaxDelayStr, _, _) ->
+ try list_to_integer(MaxDelayStr) of
+ MaxDelay ->
+ gen_server:cast(?LISTENER, {set_max_event_delay, MaxDelay})
+ catch error:badarg ->
+ ok
+ end,
+ {ok, nil};
+
+handle_config_change("global_changes", "max_write_delay", MaxDelayStr, _, _) ->
+ try list_to_integer(MaxDelayStr) of
+ MaxDelay ->
+ gen_server:cast(?SERVER, {set_max_write_delay, MaxDelay})
+ catch error:badarg ->
+ ok
+ end,
+ {ok, nil};
+
+handle_config_change("global_changes", "update_db", "false", _, _) ->
+ gen_server:cast(?LISTENER, {set_update_db, false}),
+ gen_server:cast(?SERVER, {set_update_db, false}),
+ {ok, nil};
+
+handle_config_change("global_changes", "update_db", _, _, _) ->
+ gen_server:cast(?LISTENER, {set_update_db, true}),
+ gen_server:cast(?SERVER, {set_update_db, true}),
+ {ok, nil};
+
+handle_config_change(_, _, _, _, _) ->
+ {ok, nil}.
http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/679b1345/src/global_changes_httpd.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_httpd.erl b/src/global_changes_httpd.erl
new file mode 100644
index 0000000..0bbc534
--- /dev/null
+++ b/src/global_changes_httpd.erl
@@ -0,0 +1,203 @@
+%% Copyright 2013 Cloudant
+
+-module(global_changes_httpd).
+
+-export([handle_global_changes_req/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-record(acc, {
+ heartbeat_interval,
+ last_data_sent_time,
+ feed,
+ prepend,
+ resp,
+ etag,
+ username
+}).
+
+handle_global_changes_req(#httpd{method='GET'}=Req) ->
+ Db = global_changes_util:get_dbname(),
+ Feed = couch_httpd:qs_value(Req, "feed", "normal"),
+ Options = parse_global_changes_query(Req),
+ Heartbeat = case lists:keyfind(heartbeat, 1, Options) of
+ {heartbeat, Other} -> Other;
+ {heartbeat, true} -> 60000;
+ false -> false
+ end,
+ chttpd:verify_is_server_admin(Req),
+ Acc = #acc{
+ username=admin,
+ feed=Feed,
+ resp=Req,
+ heartbeat_interval=Heartbeat
+ },
+ case Feed of
+ "normal" ->
+ {ok, Info} = fabric:get_db_info(Db),
+ Etag = chttpd:make_etag(Info),
+ chttpd:etag_respond(Req, Etag, fun() ->
+ fabric:changes(Db, fun changes_callback/2, Acc#acc{etag=Etag}, Options)
+ end);
+ Feed when Feed =:= "continuous"; Feed =:= "longpoll" ->
+ fabric:changes(Db, fun changes_callback/2, Acc, Options);
+ _ ->
+ Msg = <<"Supported `feed` types: normal, continuous, longpoll">>,
+ throw({bad_request, Msg})
+ end;
+handle_global_changes_req(Req) ->
+ chttpd:send_method_not_allowed(Req, "GET").
+
+
+transform_change(Username, Resp, {Props}) ->
+ {id, Id} = lists:keyfind(id, 1, Props),
+ {seq, Seq} = lists:keyfind(seq, 1, Props),
+ Info = case binary:split(Id, <<":">>) of
+ [Event0, DbNameAndUsername] ->
+ case binary:split(DbNameAndUsername, <<"/">>) of
+ [AccountName0, DbName0] ->
+ {Event0, AccountName0, DbName0};
+ [DbName0] ->
+ {Event0, '_admin', DbName0}
+ end;
+ _ ->
+ skip
+ end,
+ case Info of
+ % Matches the client's username
+ {Event, Username, DbName} when Username /= admin ->
+ {[
+ {dbname, DbName},
+ {type, Event},
+ {seq, Seq}
+ ]};
+ % Client is an admin, show them everything.
+ {Event, AccountName, DbName} when Username == admin ->
+ {[
+ {dbname, DbName},
+ {type, Event},
+ {account, AccountName},
+ {seq, Seq}
+ ]};
+ _ ->
+ skip
+ end.
+
+
+% callbacks for continuous feed (newline-delimited JSON Objects)
+changes_callback(start, #acc{feed="continuous"}=Acc) ->
+ #acc{resp=Req} = Acc,
+ {ok, Resp} = chttpd:start_delayed_json_response(Req, 200),
+ {ok, Acc#acc{resp=Resp, last_data_sent_time=os:timestamp()}};
+changes_callback({change, Change0}, #acc{feed="continuous"}=Acc) ->
+ #acc{resp=Resp, username=Username} = Acc,
+ case transform_change(Username, Resp, Change0) of
+ skip ->
+ {ok, maybe_send_heartbeat(Acc)};
+ Change ->
+ Line = [?JSON_ENCODE(Change) | "\n"],
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Line),
+ {ok, Acc#acc{resp=Resp1, last_data_sent_time=os:timestamp()}}
+ end;
+changes_callback({stop, EndSeq}, #acc{feed="continuous"}=Acc) ->
+ #acc{resp=Resp} = Acc,
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp,
+ [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]),
+ chttpd:end_delayed_json_response(Resp1);
+
+% callbacks for longpoll and normal (single JSON Object)
+changes_callback(start, #acc{feed="normal", etag=Etag}=Acc)
+ when Etag =/= undefined ->
+ #acc{resp=Req} = Acc,
+ FirstChunk = "{\"results\":[\n",
+ {ok, Resp} = chttpd:start_delayed_json_response(Req, 200,
+ [{"Etag",Etag}], FirstChunk),
+ {ok, Acc#acc{resp=Resp, prepend="", last_data_sent_time=os:timestamp()}};
+changes_callback(start, Acc) ->
+ #acc{resp=Req} = Acc,
+ FirstChunk = "{\"results\":[\n",
+ {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk),
+ {ok, Acc#acc{resp=Resp, prepend="", last_data_sent_time=os:timestamp()}};
+changes_callback({change, Change0}, Acc) ->
+ #acc{resp=Resp, prepend=Prepend, username=Username} = Acc,
+ case transform_change(Username, Resp, Change0) of
+ skip ->
+ {ok, maybe_send_heartbeat(Acc)};
+ Change ->
+ #acc{resp=Resp, prepend=Prepend} = Acc,
+ Line = [Prepend, ?JSON_ENCODE(Change)],
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Line),
+ Acc1 = Acc#acc{
+ prepend=",\r\n",
+ resp=Resp1,
+ last_data_sent_time=os:timestamp()
+ },
+ {ok, Acc1}
+ end;
+changes_callback({stop, EndSeq}, Acc) ->
+ #acc{resp=Resp} = Acc,
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp,
+ ["\n],\n\"last_seq\":", ?JSON_ENCODE(EndSeq), "}\n"]),
+ chttpd:end_delayed_json_response(Resp1);
+
+changes_callback(timeout, Acc) ->
+ {ok, maybe_send_heartbeat(Acc)};
+
+changes_callback({error, Reason}, #acc{resp=Req=#httpd{}}) ->
+ chttpd:send_error(Req, Reason);
+changes_callback({error, Reason}, Acc) ->
+ #acc{etag=Etag, feed=Feed, resp=Resp} = Acc,
+ case {Feed, Etag} of
+ {"normal", Etag} when Etag =/= undefined ->
+ chttpd:send_error(Resp, Reason);
+ _ ->
+ chttpd:send_delayed_error(Resp, Reason)
+ end.
+
+
+maybe_send_heartbeat(#acc{heartbeat_interval=false}=Acc) ->
+ Acc;
+maybe_send_heartbeat(Acc) ->
+ #acc{last_data_sent_time=LastSentTime, heartbeat_interval=Interval, resp=Resp} = Acc,
+ Now = os:timestamp(),
+ case timer:now_diff(Now, LastSentTime) div 1000 > Interval of
+ true ->
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"),
+ Acc#acc{last_data_sent_time=Now, resp=Resp1};
+ false ->
+ Acc
+ end.
+
+
+parse_global_changes_query(Req) ->
+ lists:foldl(fun({Key, Value}, Args) ->
+ case {Key, Value} of
+ {"feed", _} ->
+ [{feed, Value} | Args];
+ {"descending", "true"} ->
+ [{dir, rev} | Args];
+ {"since", _} ->
+ [{since, Value} | Args];
+ {"limit", _} ->
+ [{limit, to_non_neg_int(Value)} | Args];
+ {"heartbeat", "true"} ->
+ [{heartbeat, true} | Args];
+ {"heartbeat", _} ->
+ [{heartbeat, to_non_neg_int(Value)} | Args];
+ {"timeout", _} ->
+ [{timeout, to_non_neg_int(Value)} | Args];
+ _Else -> % unknown key value pair, ignore.
+ Args
+ end
+ end, [], couch_httpd:qs(Req)).
+
+
+to_non_neg_int(Value) ->
+ try list_to_integer(Value) of
+ V when V >= 0 ->
+ V;
+ _ ->
+ throw({bad_request, invalid_integer})
+ catch error:badarg ->
+ throw({bad_request, invalid_integer})
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/679b1345/src/global_changes_listener.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_listener.erl b/src/global_changes_listener.erl
new file mode 100644
index 0000000..d836f2b
--- /dev/null
+++ b/src/global_changes_listener.erl
@@ -0,0 +1,148 @@
+% Copyright 2013 Cloudant. All rights reserved.
+
+-module(global_changes_listener).
+-behavior(couch_event_listener).
+
+
+-export([
+ start/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_event/3,
+ handle_cast/2,
+ handle_info/2
+]).
+
+-record(state, {
+ update_db,
+ pending_update_count,
+ pending_updates,
+ last_update_time,
+ max_event_delay,
+ dbname
+}).
+
+
+-include_lib("mem3/include/mem3.hrl").
+
+
+start() ->
+ couch_event_listener:start(?MODULE, nil, [all_dbs]).
+
+
+init(_) ->
+ % get configs as strings
+ UpdateDb0 = config:get("global_changes", "update_db", "true"),
+ MaxEventDelay0 = config:get("global_changes", "max_event_delay", "25"),
+
+ % make config strings into other data types
+ UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end,
+ MaxEventDelay = list_to_integer(MaxEventDelay0),
+
+ State = #state{
+ update_db=UpdateDb,
+ pending_update_count=0,
+ pending_updates=sets:new(),
+ max_event_delay=MaxEventDelay,
+ dbname=global_changes_util:get_dbname()
+ },
+ {ok, State}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+handle_event(_ShardName, _Event, #state{update_db=false}=State) ->
+ {ok, State};
+handle_event(ShardName, Event, State0)
+ when Event =:= updated orelse Event =:= deleted
+ orelse Event =:= created ->
+ #state{dbname=ChangesDbName} = State0,
+ State = case mem3:dbname(ShardName) of
+ ChangesDbName ->
+ State0;
+ DbName ->
+ #state{pending_update_count=Count} = State0,
+ EventBin = erlang:atom_to_binary(Event, latin1),
+ Key = <<EventBin/binary, <<":">>/binary, DbName/binary>>,
+ Pending = sets:add_element(Key, State0#state.pending_updates),
+ Metric = [global_changes, listener_pending_updates],
+ State0#state{pending_updates=Pending, pending_update_count=Count+1}
+ end,
+ maybe_send_updates(State);
+handle_event(_DbName, _Event, State) ->
+ maybe_send_updates(State).
+
+
+handle_cast({set_max_event_delay, MaxEventDelay}, State) ->
+ maybe_send_updates(State#state{max_event_delay=MaxEventDelay});
+handle_cast({set_update_db, Boolean}, State0) ->
+ % If turning update_db off, clear out server state
+ State = case {Boolean, State0#state.update_db} of
+ {false, true} ->
+ State0#state{
+ update_db=Boolean,
+ pending_updates=sets:new(),
+ pending_update_count=0,
+ last_update_time=undefined
+ };
+ _ ->
+ State0#state{update_db=Boolean}
+ end,
+ maybe_send_updates(State);
+handle_cast(_Msg, State) ->
+ maybe_send_updates(State).
+
+
+maybe_send_updates(#state{pending_update_count=0}=State) ->
+ {ok, State};
+maybe_send_updates(#state{update_db=true}=State) ->
+ #state{max_event_delay=MaxEventDelay, last_update_time=LastUpdateTime} = State,
+ Now = os:timestamp(),
+ case LastUpdateTime of
+ undefined ->
+ {ok, State#state{last_update_time=Now}, MaxEventDelay};
+ _ ->
+ Delta = timer:now_diff(Now, LastUpdateTime) div 1000,
+ if Delta >= MaxEventDelay ->
+ Updates = sets:to_list(State#state.pending_updates),
+ try group_updates_by_node(State#state.dbname, Updates) of
+ Grouped ->
+ dict:map(fun(Node, Docs) ->
+ Metric = [global_changes, rpcs],
+ MFA = {global_changes_server, update_docs, [Docs]},
+ rexi:cast(Node, MFA)
+ end, Grouped)
+ catch error:database_does_not_exist ->
+ ok
+ end,
+ State1 = State#state{
+ pending_updates=sets:new(),
+ pending_update_count=0,
+ last_update_time=undefined
+ },
+ {ok, State1};
+ true ->
+ {ok, State, MaxEventDelay-Delta}
+ end
+ end;
+maybe_send_updates(State) ->
+ {ok, State}.
+
+
+handle_info(_Msg, State) ->
+ maybe_send_updates(State).
+
+
+-spec group_updates_by_node(binary(), [binary()]) -> dict().
+group_updates_by_node(DbName, Updates) ->
+ lists:foldl(fun(Key, OuterAcc) ->
+ Shards = mem3:shards(DbName, Key),
+ lists:foldl(fun(#shard{node=Node}, InnerAcc) ->
+ dict:append(Node, Key, InnerAcc)
+ end, OuterAcc, Shards)
+ end, dict:new(), Updates).
http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/679b1345/src/global_changes_server.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_server.erl b/src/global_changes_server.erl
new file mode 100644
index 0000000..8ad9244
--- /dev/null
+++ b/src/global_changes_server.erl
@@ -0,0 +1,211 @@
+% Copyright 2013 Cloudant. All rights reserved.
+
+-module(global_changes_server).
+-behavior(gen_server).
+
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+-export([
+ update_docs/1
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+-record(state, {
+ update_db,
+ pending_update_count,
+ last_update_time,
+ pending_updates,
+ max_write_delay,
+ dbname,
+ handler_ref
+}).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init([]) ->
+ {ok, Handler} = global_changes_listener:start(),
+ % get configs as strings
+ UpdateDb0 = config:get("global_changes", "update_db", "true"),
+ MaxWriteDelay0 = config:get("global_changes", "max_write_delay", "25"),
+
+ % make config strings into other data types
+ UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end,
+ MaxWriteDelay = list_to_integer(MaxWriteDelay0),
+
+ State = #state{
+ update_db=UpdateDb,
+ pending_update_count=0,
+ pending_updates=sets:new(),
+ max_write_delay=MaxWriteDelay,
+ dbname=global_changes_util:get_dbname(),
+ handler_ref=erlang:monitor(process, Handler)
+ },
+ {ok, State}.
+
+
+terminate(_Reason, _Srv) ->
+ ok.
+
+
+handle_call(_Msg, _From, #state{update_db=false}=State) ->
+ {reply, ok, State};
+handle_call({update_docs, DocIds}, _From, State) ->
+ Pending = sets:union(sets:from_list(DocIds), State#state.pending_updates),
+ NewState = State#state{
+ pending_updates=Pending,
+ pending_update_count=sets:size(Pending)
+ },
+ format_reply(reply, maybe_update_docs(NewState)).
+
+
+handle_cast({set_max_write_delay, MaxWriteDelay}, State) ->
+ NewState = State#state{max_write_delay=MaxWriteDelay},
+ format_reply(noreply, maybe_update_docs(NewState));
+handle_cast({set_update_db, Boolean}, State0) ->
+ % If turning update_db off, clear out server state
+ State = case {Boolean, State0#state.update_db} of
+ {false, true} ->
+ State0#state{
+ update_db=Boolean,
+ pending_updates=sets:new(),
+ pending_update_count=0,
+ last_update_time=undefined
+ };
+ _ ->
+ State0#state{update_db=Boolean}
+ end,
+ format_reply(noreply, maybe_update_docs(State));
+handle_cast(_Msg, State) ->
+ format_reply(noreply, maybe_update_docs(State)).
+
+
+handle_info(start_listener, State) ->
+ {ok, Handler} = global_changes_listener:start(),
+ NewState = State#state{
+ handler_ref=erlang:monitor(process, Handler)
+ },
+ format_reply(noreply, maybe_update_docs(NewState));
+handle_info({'DOWN', Ref, _, _, Reason}, #state{handler_ref=Ref}=State) ->
+ couch_log:error("global_changes_listener terminated: ~w", [Reason]),
+ erlang:send_after(5000, self(), start_listener),
+ format_reply(noreply, maybe_update_docs(State));
+handle_info(_, State) ->
+ format_reply(noreply, maybe_update_docs(State)).
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+maybe_update_docs(#state{pending_update_count=0}=State) ->
+ State;
+maybe_update_docs(#state{update_db=true}=State) ->
+ #state{max_write_delay=MaxWriteDelay, last_update_time=LastUpdateTime} = State,
+ Now = os:timestamp(),
+ case LastUpdateTime of
+ undefined ->
+ {State#state{last_update_time=Now}, MaxWriteDelay};
+ _ ->
+ Delta = round(timer:now_diff(Now, LastUpdateTime)/1000),
+ if Delta >= MaxWriteDelay ->
+ DocIds = sets:to_list(State#state.pending_updates),
+ try group_ids_by_shard(State#state.dbname, DocIds) of
+ GroupedIds ->
+ Docs = dict:fold(fun(ShardName, Ids, DocInfoAcc) ->
+ {ok, Shard} = couch_db:open(ShardName, []),
+ try
+ GroupedDocs = get_docs_locally(Shard, Ids),
+ GroupedDocs ++ DocInfoAcc
+ after
+ couch_db:close(Shard)
+ end
+ end, [], GroupedIds),
+
+ spawn(fun() ->
+ fabric:update_docs(State#state.dbname, Docs, [])
+ end),
+
+ Count = State#state.pending_update_count,
+ catch error:database_does_not_exist ->
+ ok
+ end,
+ State#state{
+ pending_updates=sets:new(),
+ pending_update_count=0,
+ last_update_time=undefined
+ };
+ true ->
+ {State, MaxWriteDelay-Delta}
+ end
+ end;
+maybe_update_docs(State) ->
+ State.
+
+
+update_docs(Updates) ->
+ gen_server:call(?MODULE, {update_docs, Updates}).
+
+
+group_ids_by_shard(DbName, DocIds) ->
+ LocalNode = node(),
+ lists:foldl(fun(DocId, Acc) ->
+ Shards = mem3:shards(DbName, DocId),
+ lists:foldl(fun
+ (#shard{node=Node, name=Name}, Acc1) when Node == LocalNode ->
+ dict:append(Name, DocId, Acc1);
+ (_, Acc1) ->
+ Acc1
+ end, Acc, Shards)
+ end, dict:new(), DocIds).
+
+
+format_reply(reply, #state{}=State) ->
+ {reply, ok, State};
+format_reply(reply, {State, Timeout}) ->
+ {reply, ok, State, Timeout};
+format_reply(noreply, #state{}=State) ->
+ {noreply, State};
+format_reply(noreply, {State, Timeout}) ->
+ {noreply, State, Timeout}.
+
+
+get_docs_locally(Shard, Ids) ->
+ lists:map(fun(Id) ->
+ DocInfo = couch_db:get_doc_info(Shard, Id),
+ #doc{id=Id, revs=get_rev(DocInfo)}
+ end, Ids).
+
+
+get_rev(not_found) ->
+ {0, []};
+get_rev({ok, #doc_info{revs=[RevInfo]}}) ->
+ {Pos, Rev} = RevInfo#rev_info.rev,
+ {Pos, [Rev]};
+get_rev({ok, #doc_info{revs=[RevInfo|_]}}) ->
+ % couch_doc:to_doc_info/1 sorts things so that the first
+ % #rev_info in the list is the "winning" revision which is
+ % the one we'd want to base our edit off of. In theory
+ % global_changes should never encounter a conflict by design
+ % but we should record if it happens in case our design isn't
+ % quite right.
+ {Pos, Rev} = RevInfo#rev_info.rev,
+ {Pos, [Rev]}.
http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/679b1345/src/global_changes_sup.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_sup.erl b/src/global_changes_sup.erl
new file mode 100644
index 0000000..471c8ab
--- /dev/null
+++ b/src/global_changes_sup.erl
@@ -0,0 +1,35 @@
+% Copyright 2013 Cloudant. All rights reserved.
+
+-module(global_changes_sup).
+-behavior(supervisor).
+
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+ {ok, {
+ {one_for_one, 5, 10}, [
+ {
+ global_changes_server,
+ {global_changes_server, start_link, []},
+ permanent,
+ 5000,
+ worker,
+ [global_changes_server]
+ },
+ {
+ global_changes_config_listener,
+ {global_changes_config_listener, start_link, []},
+ permanent,
+ 5000,
+ worker,
+ [global_changes_config_listener]
+ }
+ ]}}.
http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/679b1345/src/global_changes_util.erl
----------------------------------------------------------------------
diff --git a/src/global_changes_util.erl b/src/global_changes_util.erl
new file mode 100644
index 0000000..46abf48
--- /dev/null
+++ b/src/global_changes_util.erl
@@ -0,0 +1,17 @@
+% Copyright 2013 Cloudant. All rights reserved.
+
+-module(global_changes_util).
+
+
+-export([get_dbname/0]).
+
+
+get_dbname() ->
+ case application:get_env(global_changes, dbname) of
+ {ok, DbName} when is_binary(DbName) ->
+ DbName;
+ {ok, DbName} when is_list(DbName) ->
+ iolist_to_binary(DbName);
+ _ ->
+ <<"global_changes">>
+ end.