You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:09:58 UTC
[02/35] git commit: Initial implementation
Initial implementation
There are a few places we could be more efficient in managing the ets
table for event listeners but this simple approach should be able to
give us a rough idea on how fast we can pump messages through this sort
of design.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/commit/cc616a77
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/tree/cc616a77
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/diff/cc616a77
Branch: refs/heads/windsor-merge
Commit: cc616a77717730ac4dddc2940aff51f584385e6f
Parents: 73b38e8
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Apr 22 15:03:47 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 30 17:35:08 2014 +0100
----------------------------------------------------------------------
.gitignore | 2 +
src/couch_event.app.src | 22 +++++
src/couch_event.erl | 45 ++++++++++
src/couch_event_app.erl | 27 ++++++
src/couch_event_dist.erl | 83 +++++++++++++++++++
src/couch_event_int.hrl | 19 +++++
src/couch_event_listener.erl | 169 ++++++++++++++++++++++++++++++++++++++
src/couch_event_registry.erl | 109 ++++++++++++++++++++++++
src/couch_event_sup2.erl | 51 ++++++++++++
9 files changed, 527 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..1204ed7
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+deps/
+ebin/
http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event.app.src
----------------------------------------------------------------------
diff --git a/src/couch_event.app.src b/src/couch_event.app.src
new file mode 100644
index 0000000..17fbafd
--- /dev/null
+++ b/src/couch_event.app.src
@@ -0,0 +1,22 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_event, [
+ {description, "Event notification system for Apache CouchDB"},
+ {vsn, git},
+ {registered, [
+ couch_event_sup,
+ couch_event_server
+ ]},
+ {applications, [kernel, stdlib, couch_log, config]},
+ {mod, {couch_event_app, []}}
+]}.
http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event.erl
----------------------------------------------------------------------
diff --git a/src/couch_event.erl b/src/couch_event.erl
new file mode 100644
index 0000000..eaa4c88
--- /dev/null
+++ b/src/couch_event.erl
@@ -0,0 +1,45 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_event).
+
+-export([
+ register/2,
+ register_all/1,
+ unregister/2,
+ unregister_all/1,
+ notify/2
+]).
+
+
+-define(REGISTRY, couch_event_registry).
+-define(DIST, couch_event_dist).
+
+
+register(Pid, DbName) ->
+ gen_server:call(?REGISTRY, {register, Pid, DbName}).
+
+
+register_all(Pid) ->
+ gen_server:call(?REGISTRY, {register, Pid, all_dbs}).
+
+
+unregister(Pid, DbName) ->
+ gen_server:call(?REGISTRY, {unregister, Pid, DbName}).
+
+
+unregister_all(Pid) ->
+ gen_server:call(?REGISTRY, {unregister, Pid}).
+
+
+notify(DbName, Event) ->
+ gen_server:cast(?DIST, {DbName, Event}).
http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_app.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_app.erl b/src/couch_event_app.erl
new file mode 100644
index 0000000..3a8341b
--- /dev/null
+++ b/src/couch_event_app.erl
@@ -0,0 +1,27 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_event_app).
+-behavior(application).
+
+-export([
+ start/2,
+ stop/1
+]).
+
+
+start(_StartType, _StartArgs) ->
+ couch_event_sup2:start_link().
+
+
+stop(_State) ->
+ ok.
http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_dist.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_dist.erl b/src/couch_event_dist.erl
new file mode 100644
index 0000000..7f99a2d
--- /dev/null
+++ b/src/couch_event_dist.erl
@@ -0,0 +1,83 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_event_dist).
+-behavior(gen_server).
+
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-include("couch_event_int.hrl").
+
+
+-record(st, {
+ batch_size
+}).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+init(_) ->
+ {ok, #st{batch_size=25}}.
+
+
+terminate(_Reason, _St) ->
+ ok.
+
+
+handle_call(Msg, From, St) ->
+ couch_log:notice("~s ignoring call ~w from ~w", [?MODULE, Msg, From]),
+ {reply, ignored, St}.
+
+
+handle_cast({DbName, Event}, #st{batch_size=BS}=St) when is_binary(DbName) ->
+ P1 = #client{dbname=DbName, _='_'},
+ notify_clients(ets:select(?REGISTRY_TABLE, P1, BS), DbName, Event),
+ P2 = #client{dbname=all_dbs, _='_'},
+ notify_clients(ets:select(?REGISTRY_TABLE, P2, BS), DbName, Event),
+ {noreply, St};
+
+handle_cast(Msg, St) ->
+ couch_log:notice("~s ignoring cast ~w", [?MODULE, Msg]),
+ {noreply, St}.
+
+
+handle_info(Msg, St) ->
+ couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]),
+ {noreply, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+notify_clients('$end_of_table', _DbName, _Event) ->
+ ok;
+notify_clients({Clients, Cont}, DbName, Event) ->
+ lists:foreach(fun(#client{pid=Pid}) ->
+ Pid ! {'$couch_event', DbName, Event}
+ end, Clients),
+ notify_clients(ets:select(Cont), DbName, Event).
http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_int.hrl
----------------------------------------------------------------------
diff --git a/src/couch_event_int.hrl b/src/couch_event_int.hrl
new file mode 100644
index 0000000..dc4739e
--- /dev/null
+++ b/src/couch_event_int.hrl
@@ -0,0 +1,19 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(REGISTRY_TABLE, couch_event_registry).
+
+-record(client, {
+ dbname,
+ pid,
+ ref
+}).
http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_listener.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_listener.erl b/src/couch_event_listener.erl
new file mode 100644
index 0000000..c665036
--- /dev/null
+++ b/src/couch_event_listener.erl
@@ -0,0 +1,169 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_event_listener).
+-behavior(gen_server).
+
+
+-export([
+ start/3,
+ start/4,
+ start_link/3,
+ start_link/4
+]).
+
+-export([
+ behaviour_info/1
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+behaviour_info(callbacks) ->
+ [
+ {init,1},
+ {terminate/2},
+ {handle_event/2}
+ ];
+behaviour_info(_) ->
+ undefined.
+
+
+start(Mod, Args, Options) ->
+ gen_server:start(?MODULE, {Mod, Args}, Options).
+
+
+start(Name, Mod, Args, Options) ->
+ gen_server:start(Name, ?MODULE, {Mod, Args}, Options).
+
+
+start_link(Mod, Args, Options) ->
+ gen_server:start_link(?MODULE, {Mod, Args}, Options).
+
+
+start_link(Name, Mod, Args, Options) ->
+ gen_server:start_link(Name, ?MODULE, {Mod, Args}, Options).
+
+
+init({Mod, Args}) ->
+ case Mod:init(Args) of
+ {ok, St} ->
+ {ok, {Mod, St}};
+ {ok, St, Timeout} ->
+ {ok, {Mod, St}, Timeout};
+ {stop, Reason} ->
+ {stop, Reason};
+ ignore ->
+ ignore;
+ Else ->
+ erlang:error({bad_return, Else})
+ end.
+
+
+terminate(Reason, {Mod, St}) ->
+ Mod:terminate(Reason, St).
+
+
+handle_call(Msg, From, {Mod, St}) ->
+ case erlang:function_exported(Mod, handle_call, 3) of
+ true ->
+ case Mod:handle_call(Msg, From, St) of
+ {reply, Reply, NewState} ->
+ {reply, Reply, {Mod, NewState}};
+ {reply, Reply, NewState, Timeout} ->
+ {reply, Reply, {Mod, NewState}, Timeout};
+ {noreply, NewState} ->
+ {noreply, {Mod, NewState}};
+ {noreply, NewState, Timeout} ->
+ {noreply, {Mod, NewState}, Timeout};
+ {stop, Reason, Reply, NewState} ->
+ {stop, Reason, Reply, {Mod, NewState}};
+ {stop, Reason, NewState} ->
+ {stop, Reason, {Mod, NewState}};
+ Else ->
+ erlang:error({bad_return, Else})
+ end;
+ false ->
+ {stop, {invalid_call, Msg}, invalid_call, St}
+ end.
+
+
+handle_cast(Msg, {Mod, St}) ->
+ case erlang:function_exported(Mod, handle_cast, 2) of
+ true ->
+ case Mod:handle_cast(Msg, St) of
+ {noreply, NewState} ->
+ {noreply, {Mod, NewState}};
+ {noreply, NewState, Timeout} ->
+ {noreply, {Mod, NewState}, Timeout};
+ {stop, Reason, NewState} ->
+ {stop, Reason, {Mod, NewState}};
+ Else ->
+ erlang:error({bad_return, Else})
+ end;
+ false ->
+ {stop, {invalid_cast, Msg}, St}
+ end.
+
+
+handle_info({'$couch_event', DbName, Event}, {Mod, St}) ->
+ case Mod:handle_event(DbName, Event, St) of
+ {noreply, NewState} ->
+ {noreply, {Mod, NewState}};
+ {noreply, NewState, Timeout} ->
+ {noreply, {Mod, NewState}, Timeout};
+ {stop, Reason, NewState} ->
+ {stop, Reason, {Mod, NewState}};
+ Else ->
+ erlang:error({bad_return, Else})
+ end;
+
+handle_info(Msg, {Mod, St}) ->
+ case erlang:function_export(Mod, handle_info, 2) of
+ true ->
+ case Mod:handle_info(Msg, St) of
+ {noreply, NewState} ->
+ {noreply, {Mod, NewState}};
+ {noreply, NewState, Timeout} ->
+ {noreply, {Mod, NewState}, Timeout};
+ {stop, Reason, NewState} ->
+ {stop, Reason, {Mod, NewState}};
+ Else ->
+ erlang:error({bad_return, Else})
+ end;
+ false ->
+ {stop, {invalid_info, Msg}, St}
+ end.
+
+
+code_change(OldVsn, {Mod, St}, Extra) ->
+ case erlang:function_exported(Mod, code_change, 3) of
+ true ->
+ case Mod:code_change(OldVsn, St, Extra) of
+ {ok, NewState} ->
+ {ok, {Mod, NewState}};
+ {error, Reason} ->
+ {error, Reason};
+ Else ->
+ erlang:error({bad_return, Else})
+ end;
+ false ->
+ {ok, {Mod, St}}
+ end.
+
http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_registry.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_registry.erl b/src/couch_event_registry.erl
new file mode 100644
index 0000000..65fa160
--- /dev/null
+++ b/src/couch_event_registry.erl
@@ -0,0 +1,109 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_event_registry).
+-behavior(gen_server).
+
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-include("couch_event_int.hrl").
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+init(_) ->
+ EtsOpts = [
+ protected,
+ named_table,
+ bag,
+ {keypos, #client.dbname}
+ ],
+ ets:new(?REGISTRY_TABLE, EtsOpts),
+ {ok, nil}.
+
+
+terminate(_Reason, _St) ->
+ ok.
+
+
+handle_call({register, Pid, DbName}, _From, St) ->
+ Client = #client{
+ dbname = DbName,
+ pid = Pid,
+ ref = erlang:monitor(process, Pid)
+ },
+ ets:insert(?REGISTRY_TABLE, Client),
+ {reply, ok, St};
+
+handle_call({unregister, Pid, DbName}, _From, St) ->
+ Pattern = #client{dbname=DbName, pid=Pid, _='_'},
+ case ets:match_object(?REGISTRY_TABLE, Pattern) of
+ [] ->
+ ok;
+ [#client{ref=Ref}=Cli] ->
+ erlang:demonitor(Ref, [flush]),
+ ets:delete_object(?REGISTRY_TABLE, Cli)
+ end,
+ {reply, ok, St};
+
+handle_call({unregister_all, Pid}, _From, St) ->
+ Pattern = #client{pid=Pid, _='_'},
+ case ets:match_object(?REGISTRY_TABLE, Pattern) of
+ [] ->
+ ok;
+ Clients ->
+ lists:foreach(fun(Cli) ->
+ erlang:demonitor(Cli#client.ref, [flush]),
+ % I wonder if match_delete/2 is faster
+ % than repeated calls to delete_object.
+ ets:delete_object(Cli)
+ end, Clients)
+ end,
+ {reply, ok, St};
+
+handle_call(Msg, From, St) ->
+ couch_log:notice("~s ignoring call ~w from ~w", [?MODULE, Msg, From]),
+ {reply, ignored, St, 0}.
+
+
+handle_cast(Msg, St) ->
+ couch_log:notice("~s ignoring cast ~w", [?MODULE, Msg]),
+ {noreply, St, 0}.
+
+
+handle_info({'DOWN', Ref, process, Pid, _Reason}, St) ->
+ Pattern = #client{pid=Pid, ref=Ref, _='_'},
+ ets:match_delete(?REGISTRY_TABLE, Pattern),
+ {noreply, St};
+
+handle_info(Msg, St) ->
+ couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]),
+ {noreply, St, 0}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_sup2.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_sup2.erl b/src/couch_event_sup2.erl
new file mode 100644
index 0000000..039b897
--- /dev/null
+++ b/src/couch_event_sup2.erl
@@ -0,0 +1,51 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% This is named couch_event_sup2 to avoid
+% naming collisions with the couch_event_sup
+% module contained in the couch app. When
+% that supervisor is removed we'll be free
+% to rename this one.
+
+-module(couch_event_sup2).
+-behavior(supervisor).
+
+
+-export([
+ start_link/0,
+ init/1
+]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, nil).
+
+
+init(_) ->
+ Children = [
+ {couch_event_registry,
+ {couch_event_registry, start_link, []},
+ permanent,
+ 5000,
+ worker,
+ [couch_event_register]
+ },
+ {couch_event_dist,
+ {couch_event_dist, start_link, []},
+ permanent,
+ 5000,
+ worker,
+ [couch_event_dist]
+ }
+ ],
+ {ok, {{one_for_one, 5, 10}, Children}}.
+