You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:09:58 UTC

[02/35] git commit: Initial implementation

Initial implementation

There are a few places we could be more efficient in managing the ets
table for event listeners but this simple approach should be able to
give us a rough idea on how fast we can pump messages through this sort
of design.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/commit/cc616a77
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/tree/cc616a77
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/diff/cc616a77

Branch: refs/heads/windsor-merge
Commit: cc616a77717730ac4dddc2940aff51f584385e6f
Parents: 73b38e8
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Apr 22 15:03:47 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 30 17:35:08 2014 +0100

----------------------------------------------------------------------
 .gitignore                   |   2 +
 src/couch_event.app.src      |  22 +++++
 src/couch_event.erl          |  45 ++++++++++
 src/couch_event_app.erl      |  27 ++++++
 src/couch_event_dist.erl     |  83 +++++++++++++++++++
 src/couch_event_int.hrl      |  19 +++++
 src/couch_event_listener.erl | 169 ++++++++++++++++++++++++++++++++++++++
 src/couch_event_registry.erl | 109 ++++++++++++++++++++++++
 src/couch_event_sup2.erl     |  51 ++++++++++++
 9 files changed, 527 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..1204ed7
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+deps/
+ebin/

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event.app.src
----------------------------------------------------------------------
diff --git a/src/couch_event.app.src b/src/couch_event.app.src
new file mode 100644
index 0000000..17fbafd
--- /dev/null
+++ b/src/couch_event.app.src
@@ -0,0 +1,22 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_event, [
+    {description, "Event notification system for Apache CouchDB"},
+    {vsn, git},
+    {registered, [
+        couch_event_sup,
+        couch_event_server
+    ]},
+    {applications, [kernel, stdlib, couch_log, config]},
+    {mod, {couch_event_app, []}}
+]}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event.erl
----------------------------------------------------------------------
diff --git a/src/couch_event.erl b/src/couch_event.erl
new file mode 100644
index 0000000..eaa4c88
--- /dev/null
+++ b/src/couch_event.erl
@@ -0,0 +1,45 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_event).
+
+-export([
+    register/2,
+    register_all/1,
+    unregister/2,
+    unregister_all/1,
+    notify/2
+]).
+
+
+-define(REGISTRY, couch_event_registry).
+-define(DIST, couch_event_dist).
+
+
+register(Pid, DbName) ->
+    gen_server:call(?REGISTRY, {register, Pid, DbName}).
+
+
+register_all(Pid) ->
+    gen_server:call(?REGISTRY, {register, Pid, all_dbs}).
+
+
+unregister(Pid, DbName) ->
+    gen_server:call(?REGISTRY, {unregister, Pid, DbName}).
+
+
+unregister_all(Pid) ->
+    gen_server:call(?REGISTRY, {unregister, Pid}).
+
+
+notify(DbName, Event) ->
+    gen_server:cast(?DIST, {DbName, Event}).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_app.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_app.erl b/src/couch_event_app.erl
new file mode 100644
index 0000000..3a8341b
--- /dev/null
+++ b/src/couch_event_app.erl
@@ -0,0 +1,27 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_event_app).
+-behavior(application).
+
+-export([
+    start/2,
+    stop/1
+]).
+
+
+start(_StartType, _StartArgs) ->
+    couch_event_sup2:start_link().
+
+
+stop(_State) ->
+    ok.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_dist.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_dist.erl b/src/couch_event_dist.erl
new file mode 100644
index 0000000..7f99a2d
--- /dev/null
+++ b/src/couch_event_dist.erl
@@ -0,0 +1,83 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_event_dist).
+-behavior(gen_server).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-include("couch_event_int.hrl").
+
+
+-record(st, {
+    batch_size
+}).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+init(_) ->
+    {ok, #st{batch_size=25}}.
+
+
+terminate(_Reason, _St) ->
+    ok.
+
+
+handle_call(Msg, From, St) ->
+    couch_log:notice("~s ignoring call ~w from ~w", [?MODULE, Msg, From]),
+    {reply, ignored, St}.
+
+
+handle_cast({DbName, Event}, #st{batch_size=BS}=St) when is_binary(DbName) ->
+    P1 = #client{dbname=DbName, _='_'},
+    notify_clients(ets:select(?REGISTRY_TABLE, P1, BS), DbName, Event),
+    P2 = #client{dbname=all_dbs, _='_'},
+    notify_clients(ets:select(?REGISTRY_TABLE, P2, BS), DbName, Event),
+    {noreply, St};
+
+handle_cast(Msg, St) ->
+    couch_log:notice("~s ignoring cast ~w", [?MODULE, Msg]),
+    {noreply, St}.
+
+
+handle_info(Msg, St) ->
+    couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]),
+    {noreply, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+notify_clients('$end_of_table', _DbName, _Event) ->
+    ok;
+notify_clients({Clients, Cont}, DbName, Event) ->
+    lists:foreach(fun(#client{pid=Pid}) ->
+        Pid ! {'$couch_event', DbName, Event}
+    end, Clients),
+    notify_clients(ets:select(Cont), DbName, Event).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_int.hrl
----------------------------------------------------------------------
diff --git a/src/couch_event_int.hrl b/src/couch_event_int.hrl
new file mode 100644
index 0000000..dc4739e
--- /dev/null
+++ b/src/couch_event_int.hrl
@@ -0,0 +1,19 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(REGISTRY_TABLE, couch_event_registry).
+
+-record(client, {
+    dbname,
+    pid,
+    ref
+}).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_listener.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_listener.erl b/src/couch_event_listener.erl
new file mode 100644
index 0000000..c665036
--- /dev/null
+++ b/src/couch_event_listener.erl
@@ -0,0 +1,169 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_event_listener).
+-behavior(gen_server).
+
+
+-export([
+    start/3,
+    start/4,
+    start_link/3,
+    start_link/4
+]).
+
+-export([
+    behaviour_info/1
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+behaviour_info(callbacks) ->
+    [
+        {init,1},
+        {terminate/2},
+        {handle_event/2}
+    ];
+behaviour_info(_) ->
+    undefined.
+
+
+start(Mod, Args, Options) ->
+    gen_server:start(?MODULE, {Mod, Args}, Options).
+
+
+start(Name, Mod, Args, Options) ->
+    gen_server:start(Name, ?MODULE, {Mod, Args}, Options).
+
+
+start_link(Mod, Args, Options) ->
+    gen_server:start_link(?MODULE, {Mod, Args}, Options).
+
+
+start_link(Name, Mod, Args, Options) ->
+    gen_server:start_link(Name, ?MODULE, {Mod, Args}, Options).
+
+
+init({Mod, Args}) ->
+    case Mod:init(Args) of
+        {ok, St} ->
+            {ok, {Mod, St}};
+        {ok, St, Timeout} ->
+            {ok, {Mod, St}, Timeout};
+        {stop, Reason} ->
+            {stop, Reason};
+        ignore ->
+            ignore;
+        Else ->
+            erlang:error({bad_return, Else})
+    end.
+
+
+terminate(Reason, {Mod, St}) ->
+    Mod:terminate(Reason, St).
+
+
+handle_call(Msg, From, {Mod, St}) ->
+    case erlang:function_exported(Mod, handle_call, 3) of
+        true ->
+            case Mod:handle_call(Msg, From, St) of
+                {reply, Reply, NewState} ->
+                    {reply, Reply, {Mod, NewState}};
+                {reply, Reply, NewState, Timeout} ->
+                    {reply, Reply, {Mod, NewState}, Timeout};
+                {noreply, NewState} ->
+                    {noreply, {Mod, NewState}};
+                {noreply, NewState, Timeout} ->
+                    {noreply, {Mod, NewState}, Timeout};
+                {stop, Reason, Reply, NewState} ->
+                    {stop, Reason, Reply, {Mod, NewState}};
+                {stop, Reason, NewState} ->
+                    {stop, Reason, {Mod, NewState}};
+                Else ->
+                    erlang:error({bad_return, Else})
+            end;
+        false ->
+            {stop, {invalid_call, Msg}, invalid_call, St}
+    end.
+
+
+handle_cast(Msg, {Mod, St}) ->
+    case erlang:function_exported(Mod, handle_cast, 2) of
+        true ->
+            case Mod:handle_cast(Msg, St) of
+                {noreply, NewState} ->
+                    {noreply, {Mod, NewState}};
+                {noreply, NewState, Timeout} ->
+                    {noreply, {Mod, NewState}, Timeout};
+                {stop, Reason, NewState} ->
+                    {stop, Reason, {Mod, NewState}};
+                Else ->
+                    erlang:error({bad_return, Else})
+            end;
+        false ->
+            {stop, {invalid_cast, Msg}, St}
+    end.
+
+
+handle_info({'$couch_event', DbName, Event}, {Mod, St}) ->
+    case Mod:handle_event(DbName, Event, St) of
+        {noreply, NewState} ->
+            {noreply, {Mod, NewState}};
+        {noreply, NewState, Timeout} ->
+            {noreply, {Mod, NewState}, Timeout};
+        {stop, Reason, NewState} ->
+            {stop, Reason, {Mod, NewState}};
+        Else ->
+            erlang:error({bad_return, Else})
+    end;
+
+handle_info(Msg, {Mod, St}) ->
+    case erlang:function_export(Mod, handle_info, 2) of
+        true ->
+            case Mod:handle_info(Msg, St) of
+                {noreply, NewState} ->
+                    {noreply, {Mod, NewState}};
+                {noreply, NewState, Timeout} ->
+                    {noreply, {Mod, NewState}, Timeout};
+                {stop, Reason, NewState} ->
+                    {stop, Reason, {Mod, NewState}};
+                Else ->
+                    erlang:error({bad_return, Else})
+            end;
+        false ->
+            {stop, {invalid_info, Msg}, St}
+    end.
+
+
+code_change(OldVsn, {Mod, St}, Extra) ->
+    case erlang:function_exported(Mod, code_change, 3) of
+        true ->
+            case Mod:code_change(OldVsn, St, Extra) of
+                {ok, NewState} ->
+                    {ok, {Mod, NewState}};
+                {error, Reason} ->
+                    {error, Reason};
+                Else ->
+                    erlang:error({bad_return, Else})
+            end;
+        false ->
+            {ok, {Mod, St}}
+    end.
+

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_registry.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_registry.erl b/src/couch_event_registry.erl
new file mode 100644
index 0000000..65fa160
--- /dev/null
+++ b/src/couch_event_registry.erl
@@ -0,0 +1,109 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_event_registry).
+-behavior(gen_server).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-include("couch_event_int.hrl").
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+init(_) ->
+    EtsOpts = [
+        protected,
+        named_table,
+        bag,
+        {keypos, #client.dbname}
+    ],
+    ets:new(?REGISTRY_TABLE, EtsOpts),
+    {ok, nil}.
+
+
+terminate(_Reason, _St) ->
+    ok.
+
+
+handle_call({register, Pid, DbName}, _From, St) ->
+    Client = #client{
+        dbname = DbName,
+        pid = Pid,
+        ref = erlang:monitor(process, Pid)
+    },
+    ets:insert(?REGISTRY_TABLE, Client),
+    {reply, ok, St};
+
+handle_call({unregister, Pid, DbName}, _From, St) ->
+    Pattern = #client{dbname=DbName, pid=Pid, _='_'},
+    case ets:match_object(?REGISTRY_TABLE, Pattern) of
+        [] ->
+            ok;
+        [#client{ref=Ref}=Cli] ->
+            erlang:demonitor(Ref, [flush]),
+            ets:delete_object(?REGISTRY_TABLE, Cli)
+    end,
+    {reply, ok, St};
+
+handle_call({unregister_all, Pid}, _From, St) ->
+    Pattern = #client{pid=Pid, _='_'},
+    case ets:match_object(?REGISTRY_TABLE, Pattern) of
+        [] ->
+            ok;
+        Clients ->
+            lists:foreach(fun(Cli) ->
+                erlang:demonitor(Cli#client.ref, [flush]),
+                % I wonder if match_delete/2 is faster
+                % than repeated calls to delete_object.
+                ets:delete_object(Cli)
+            end, Clients)
+    end,
+    {reply, ok, St};
+
+handle_call(Msg, From, St) ->
+    couch_log:notice("~s ignoring call ~w from ~w", [?MODULE, Msg, From]),
+    {reply, ignored, St, 0}.
+
+
+handle_cast(Msg, St) ->
+    couch_log:notice("~s ignoring cast ~w", [?MODULE, Msg]),
+    {noreply, St, 0}.
+
+
+handle_info({'DOWN', Ref, process, Pid, _Reason}, St) ->
+    Pattern = #client{pid=Pid, ref=Ref, _='_'},
+    ets:match_delete(?REGISTRY_TABLE, Pattern),
+    {noreply, St};
+
+handle_info(Msg, St) ->
+    couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]),
+    {noreply, St, 0}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-event/blob/cc616a77/src/couch_event_sup2.erl
----------------------------------------------------------------------
diff --git a/src/couch_event_sup2.erl b/src/couch_event_sup2.erl
new file mode 100644
index 0000000..039b897
--- /dev/null
+++ b/src/couch_event_sup2.erl
@@ -0,0 +1,51 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% This is named couch_event_sup2 to avoid
+% naming collisions with the couch_event_sup
+% module contained in the couch app. When
+% that supervisor is removed we'll be free
+% to rename this one.
+
+-module(couch_event_sup2).
+-behavior(supervisor).
+
+
+-export([
+    start_link/0,
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, nil).
+
+
+init(_) ->
+    Children = [
+        {couch_event_registry,
+            {couch_event_registry, start_link, []},
+            permanent,
+            5000,
+            worker,
+            [couch_event_register]
+        },
+        {couch_event_dist,
+            {couch_event_dist, start_link, []},
+            permanent,
+            5000,
+            worker,
+            [couch_event_dist]
+        }
+    ],
+    {ok, {{one_for_one, 5, 10}, Children}}.
+