You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@couchdb.apache.org by rn...@apache.org on 2015/06/25 11:38:54 UTC

[2/6] couchdb-couch-epi git commit: Initial version

Initial version

Add `ignore_providers` option

Rename `hash(FilePath)` into `hashof_file(FilePath)`

Use monitor to unsubscribe when subscriber die

Rename couch_epi:all into couch_epi:dump

Remove modules from dispatch on termination

Add all/any convinence funs to couch_epi

Document `ignore_providers` option

Add childspec helper to _data_source and _functions

Add license to test suite


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/commit/32ba6e79
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/tree/32ba6e79
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/diff/32ba6e79

Branch: refs/heads/master
Commit: 32ba6e793460f9a4962cd5885fd14361554c52ed
Parents: cf2e463
Author: ILYA Khlopotov <ii...@ca.ibm.com>
Authored: Wed Jun 10 16:02:59 2015 -0700
Committer: ILYA Khlopotov <ii...@ca.ibm.com>
Committed: Wed Jun 24 15:13:41 2015 -0700

----------------------------------------------------------------------
 LICENSE                              | 203 +++++++++++++++
 README.md                            | 113 ++++++++
 rebar.config                         |   3 +
 src/couch_epi.app.src                |  24 ++
 src/couch_epi.erl                    | 158 ++++++++++++
 src/couch_epi_app.erl                |  23 ++
 src/couch_epi_codegen.erl            |  72 ++++++
 src/couch_epi_data_gen.erl           | 283 ++++++++++++++++++++
 src/couch_epi_data_source.erl        | 191 ++++++++++++++
 src/couch_epi_functions.erl          | 155 +++++++++++
 src/couch_epi_functions_gen.erl      | 347 +++++++++++++++++++++++++
 src/couch_epi_server.erl             | 144 +++++++++++
 src/couch_epi_sup.erl                |  38 +++
 src/couch_epi_util.erl               |  24 ++
 test/couch_epi_data_source_tests.erl |  90 +++++++
 test/couch_epi_functions_tests.erl   | 126 +++++++++
 test/couch_epi_tests.erl             | 413 ++++++++++++++++++++++++++++++
 test/fixtures/app_data1.cfg          |   4 +
 test/fixtures/app_data2.cfg          |   8 +
 19 files changed, 2419 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..94ad231
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,203 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..008adf6
--- /dev/null
+++ b/README.md
@@ -0,0 +1,113 @@
+# What it is
+
+`couch_epi` is extensible plugin interface (EPI) for couchdb.
+
+## Requirements
+
+  1. Automatically discoverable
+  2. Minimize apps that need to be started for tests
+  3. Support release upgrades
+
+## Glossary
+
+  * service - an abstract functionality defined by unique name and API
+  * provider - a self-contained implementation of `Service`'s API
+  * subscriber - an application or a process which uses functionality provided by `Provider`
+  * epi_key - is a routing key it has to be in on of the following forms
+    - `{service_id :: atom(), key :: term()}` - for `couch_epi_data_source`
+    - `service_id :: atom()` - for `couch_epi_functions`
+  * handle - is opaque data structure returned from `couch_epi:get_handle(EpiKey)`
+
+## Support release upgrade
+
+We monitor the source of config information and have an ability to notify the subscriber.
+The source is either a file for a `couch_epi_data_source` or module for `couch_epi_functions`.
+
+If the subscriber wants to receive notifications when the config has been updated it can use:
+
+    couch_epi:subscribe(App, Key, Module, Func, ExtraArgs)
+
+The function would be called with following arguments
+
+    Fun(App :: app(), Key :: key(),
+        OldData :: notification(), NewData :: notification(),
+        ExtraArgs :: term()
+
+The `notification()` is either `{data, term()}` or `{modules, [module()]}`
+
+
+
+## data_source example
+
+Any application that wants to register some configuration data for a service
+could add an entry in its supervision tree with something like:
+
+    {
+        appname_stats,
+        {couch_epi_data_source, start_link, [
+            appname,
+            {epi_key, {couch_stats, definitions}},
+            {priv_file, "couch_stats.cfg"},
+            [{interval, 100}]
+        ]},
+        permanent,
+        5000,
+        worker,
+        dynamic
+    }
+
+Note we also support `{file, FilePath}` instead of `{priv_file, File}`
+
+When service provider wants to learn about all the installed config data for it to use
+it would then just do something like:
+
+
+     couch_epi:get(Handle, Service, Key)
+
+There are also additional functions to get the same data in various formats
+
+- `couch_epi:all(Handle)` - returns config data for all services for a given handle
+- `couch_epi:get(Handle, Subscriber)` - returns config data for a given subscriber
+- `couch_epi:get_value(Handle, Subscriber, Key)` - returns config data for a given subscriber and key
+- `couch_epi:by_key(Handle, Key)` - returns config data for a given key
+- `couch_epi:by_key(Handle)` - returns config data grouped by key
+- `couch_epi:by_source(Handle)` - returns config data grouped by source (subscriber)
+- `couch_epi:keys(Handle)` - returns list of configured keys
+- `couch_epi:subscribers(Handle)` - return list of known subscribers
+
+# Function dispatch example
+
+Any application that wants to register some functions for a service
+could add an entry in its supervision tree with something like:
+
+    {
+        appname_stats,
+        {couch_epi_functions, start_link, [
+            appname,
+            {epi_key, my_service},
+            {modules, [my_module]},
+            [{interval, 100}]
+        ]},
+        permanent,
+        5000,
+        worker,
+        dynamic
+    }
+
+Adding the entry would generate a dispatch methods for any exported function of modules passed.
+
+
+When app wants to dispatch the call to all service providers it calls
+
+    couch_epi:apply(Handle, ServiceId, Function, Args, Opts)
+
+There are multiple ways of doing the apply which is controlled by Opts
+
+  - ignore_errors - the call is wrapped into try/catch
+  - concurrent - spawn a new process for every service provider
+  - pipe - use output of one service provider as an input for the next one
+  - ignore_providers - do not fail if there are no providers for the service are available
+
+Notes:
+
+  - `concurrent` is incompatible with `pipe`

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/rebar.config
----------------------------------------------------------------------
diff --git a/rebar.config b/rebar.config
new file mode 100644
index 0000000..82db830
--- /dev/null
+++ b/rebar.config
@@ -0,0 +1,3 @@
+{cover_enabled, true}.
+
+{cover_print_enabled, true}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/src/couch_epi.app.src
----------------------------------------------------------------------
diff --git a/src/couch_epi.app.src b/src/couch_epi.app.src
new file mode 100644
index 0000000..b4a433f
--- /dev/null
+++ b/src/couch_epi.app.src
@@ -0,0 +1,24 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_epi,
+ [
+  {description, "extensible plugin interface"},
+  {vsn, "1"},
+  {registered, [couch_epi_sup, couch_epi_server]},
+  {applications, [
+                  kernel,
+                  stdlib
+                 ]},
+  {mod, { couch_epi_app, []}},
+  {env, []}
+ ]}.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/src/couch_epi.erl
----------------------------------------------------------------------
diff --git a/src/couch_epi.erl b/src/couch_epi.erl
new file mode 100644
index 0000000..8787c88
--- /dev/null
+++ b/src/couch_epi.erl
@@ -0,0 +1,158 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi).
+
+%% subscribtion management
+-export([subscribe/5, unsubscribe/1, get_handle/1]).
+
+%% queries and introspection
+-export([
+    dump/1, get/2, get_value/3,
+    by_key/1, by_key/2, by_source/1, by_source/2,
+    keys/1, subscribers/1]).
+
+%% apply
+-export([apply/5]).
+-export([any/5, all/5]).
+
+-export_type([service_id/0, app/0, key/0, handle/0, notify_cb/0]).
+
+-type app() :: atom().
+-type key() :: term().
+-type service_id() :: atom().
+
+-type properties() :: [{key(), term()}].
+
+-type notification() :: {data, term()} | {modules, [module()]}.
+-type notify_cb() :: fun(
+    (App :: app(), Key :: key(), Data :: notification(), Extra :: term()) -> ok).
+
+-type subscription() :: term().
+
+-opaque handle() :: module().
+
+-type apply_opt()
+    :: ignore_errors
+        | ignore_providers
+        | concurrent
+        | pipe.
+
+-type apply_opts() :: [apply_opt()].
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+-spec dump(Handle :: handle()) ->
+    [Config :: properties()].
+
+dump(Handle) ->
+    couch_epi_data_gen:get(Handle).
+
+-spec get(Handle :: handle(), Key :: key()) ->
+    [Config :: properties()].
+
+get(Handle, Key) ->
+    couch_epi_data_gen:get(Handle, Key).
+
+-spec get_value(Handle :: handle(), Subscriber :: app(), Key :: key()) ->
+    properties().
+
+get_value(Handle, Subscriber, Key) ->
+    couch_epi_data_gen:get(Handle, Subscriber, Key).
+
+
+-spec by_key(Handle :: handle()) ->
+    [{Key :: key(), [{Source :: app(), properties()}]}].
+
+by_key(Handle) ->
+    couch_epi_data_gen:by_key(Handle).
+
+
+-spec by_key(Handle :: handle(), Key :: key()) ->
+    [{Source :: app(), properties()}].
+
+by_key(Handle, Key) ->
+    couch_epi_data_gen:by_key(Handle, Key).
+
+
+-spec by_source(Handle :: handle()) ->
+    [{Source :: app(), [{Key :: key(), properties()}]}].
+
+by_source(Handle) ->
+    couch_epi_data_gen:by_source(Handle).
+
+
+-spec by_source(Handle :: handle(), Subscriber :: app()) ->
+    [{Key :: key(), properties()}].
+
+by_source(Handle, Subscriber) ->
+    couch_epi_data_gen:by_source(Handle, Subscriber).
+
+
+-spec keys(Handle :: handle()) ->
+    [Key :: key()].
+
+keys(Handle) ->
+    couch_epi_data_gen:keys(Handle).
+
+
+-spec subscribers(Handle :: handle()) ->
+    [Subscriber :: app()].
+
+subscribers(Handle) ->
+    couch_epi_data_gen:subscribers(Handle).
+
+
+%% Passed MFA should implement notify_cb() type
+-spec subscribe(App :: app(), Key :: key(),
+    Module :: module(), Function :: atom(), Args :: [term()]) ->
+        {ok, SubscriptionId :: subscription()}.
+
+subscribe(App, Key, M, F, A) ->
+    couch_epi_server:subscribe(App, Key, {M, F, A}).
+
+
+-spec unsubscribe(SubscriptionId :: subscription()) -> ok.
+
+unsubscribe(SubscriptionId) ->
+    couch_epi_server:unsubscribe(SubscriptionId).
+
+%% The success typing is (atom() | tuple(),_,_,[any()],_) -> [any()]
+-spec apply(Handle :: handle(), ServiceId :: atom(), Function :: atom(),
+    Args :: [term()], Opts :: apply_opts()) -> ok.
+
+apply(Handle, ServiceId, Function, Args, Opts) ->
+    couch_epi_functions_gen:apply(Handle, ServiceId, Function, Args, Opts).
+
+-spec get_handle({ServiceId :: service_id(), Key :: key()}) -> handle();
+                (ServiceId :: service_id()) -> handle().
+
+get_handle({_ServiceId, _Key} = EPIKey) ->
+    couch_epi_data_gen:get_handle(EPIKey);
+get_handle(ServiceId) when is_atom(ServiceId) ->
+    couch_epi_functions_gen:get_handle(ServiceId).
+
+-spec any(Handle :: handle(), ServiceId :: atom(), Function :: atom(),
+    Args :: [term()], Opts :: apply_opts()) -> boolean().
+
+any(Handle, ServiceId, Function, Args, Opts) ->
+    Replies = apply(Handle, ServiceId, Function, Args, Opts),
+    [] /= [Reply || Reply <- Replies, Reply == true].
+
+-spec all(Handle :: handle(), ServiceId :: atom(), Function :: atom(),
+    Args :: [term()], Opts :: apply_opts()) -> boolean().
+
+all(Handle, ServiceId, Function, Args, Opts) ->
+    Replies = apply(Handle, ServiceId, Function, Args, Opts),
+    [] == [Reply || Reply <- Replies, Reply == false].

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/src/couch_epi_app.erl
----------------------------------------------------------------------
diff --git a/src/couch_epi_app.erl b/src/couch_epi_app.erl
new file mode 100644
index 0000000..0dd42c2
--- /dev/null
+++ b/src/couch_epi_app.erl
@@ -0,0 +1,23 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi_app).
+
+-behaviour(application).
+
+-export([start/2, stop/1]).
+
+start(_Type, _Args) ->
+    couch_epi_sup:start_link().
+
+stop(_State) ->
+    ok.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/src/couch_epi_codegen.erl
----------------------------------------------------------------------
diff --git a/src/couch_epi_codegen.erl b/src/couch_epi_codegen.erl
new file mode 100644
index 0000000..caedb88
--- /dev/null
+++ b/src/couch_epi_codegen.erl
@@ -0,0 +1,72 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi_codegen).
+
+-export([generate/2, scan/1, parse/1, function/1, format_term/1]).
+
+generate(ModuleName, Forms) when is_atom(ModuleName) ->
+    generate(atom_to_list(ModuleName), Forms);
+generate(ModuleName, Forms0) ->
+    Forms = scan("-module(" ++ ModuleName ++ ").") ++ Forms0,
+    ASTForms = parse(Forms),
+    {ok, Mod, Bin} = compile:forms(ASTForms, [verbose, report_errors]),
+    {module, Mod} = code:load_binary(Mod, atom_to_list(Mod) ++ ".erl", Bin),
+    ok.
+
+scan(String) ->
+    Exprs = [E || E <- re:split(String, "\\.\n", [{return, list}, trim])],
+    FormsTokens = lists:foldl(fun(Expr, Acc) ->
+        case erl_scan:string(Expr) of
+            {ok, [], _} ->
+                Acc;
+            {ok, Tokens, _} ->
+                [{Expr, fixup_terminator(Tokens)} | Acc]
+        end
+    end, [], Exprs),
+    lists:reverse(FormsTokens).
+
+parse(FormsTokens) ->
+    ASTForms = lists:foldl(fun(Tokens, Forms) ->
+        {ok, AST} = parse_form(Tokens),
+        [AST | Forms]
+    end, [], FormsTokens),
+    lists:reverse(ASTForms).
+
+format_term(Data) ->
+    lists:flatten(io_lib:format("~w", [Data])).
+
+parse_form(Tokens) ->
+    {Expr, Forms} = split_expression(Tokens),
+    case erl_parse:parse_form(Forms) of
+        {ok, AST} -> {ok, AST};
+        {error,{_,_, Reason}} ->
+            %%Expr = [E || {E, _Form} <- Tokens],
+            {error, Expr, Reason}
+    end.
+
+split_expression({Expr, Forms}) -> {Expr, Forms};
+split_expression(Tokens) ->
+    {Exprs, Forms} = lists:unzip(Tokens),
+    {string:join(Exprs, "\n"), lists:append(Forms)}.
+
+function(Clauses) ->
+    [lists:flatten(Clauses)].
+
+fixup_terminator(Tokens) ->
+    case lists:last(Tokens) of
+        {dot, _} -> Tokens;
+        {';', _} -> Tokens;
+        Token ->
+            {line, Line} = erl_scan:token_info(Token, line),
+            Tokens ++ [{dot, Line}]
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/src/couch_epi_data_gen.erl
----------------------------------------------------------------------
diff --git a/src/couch_epi_data_gen.erl b/src/couch_epi_data_gen.erl
new file mode 100644
index 0000000..73cf901
--- /dev/null
+++ b/src/couch_epi_data_gen.erl
@@ -0,0 +1,283 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi_data_gen).
+
+%% @doc
+%% We generate and compile module with name constructed as:
+%%   "couch_epi_data_" + Service + "_" + Key
+%% To get an idea about he code of the generated module see preamble()
+
+-export([get_handle/1]).
+-export([set/3, get/1, get/2, get/3]).
+-export([by_key/1, by_key/2]).
+-export([by_source/1, by_source/2]).
+-export([keys/1, subscribers/1]).
+
+set(Handle, Source, Data) ->
+    case is_updated(Handle, Source, Data) of
+        false ->
+            ok;
+        true ->
+            save(Handle, Source, Data)
+    end.
+
+get(Handle) ->
+    Handle:all().
+
+get(Handle, Key) ->
+    Handle:all(Key).
+
+get(Handle, Source, Key) ->
+    Handle:get(Source, Key).
+
+by_key(Handle) ->
+    Handle:by_key().
+
+by_key(Handle, Key) ->
+    Handle:by_key(Key).
+
+by_source(Handle) ->
+    Handle:by_source().
+
+by_source(Handle, Source) ->
+    Handle:by_source(Source).
+
+keys(Handle) ->
+    Handle:keys().
+
+subscribers(Handle) ->
+    Handle:subscribers().
+
+get_handle({Service, Key}) ->
+    module_name({atom_to_list(Service), atom_to_list(Key)}).
+
+%% ------------------------------------------------------------------
+%% Codegeneration routines
+%% ------------------------------------------------------------------
+
+preamble() ->
+    "
+    -export([by_key/0, by_key/1]).
+    -export([by_source/0, by_source/1]).
+    -export([all/0, all/1, get/2]).
+    -export([version/0, version/1]).
+    -export([keys/0, subscribers/0]).
+    -compile({no_auto_import,[get/0, get/1]}).
+    all() ->
+        lists:foldl(fun({Key, Defs}, Acc) ->
+           [D || {_Subscriber, D} <- Defs ] ++ Acc
+        end, [], by_key()).
+
+    all(Key) ->
+        lists:foldl(fun({Subscriber, Data}, Acc) ->
+           [Data | Acc]
+        end, [], by_key(Key)).
+
+    by_key() ->
+        [{Key, by_key(Key)} || Key <- keys()].
+
+    by_key(Key) ->
+        lists:foldl(
+            fun(Source, Acc) -> append_if_defined(Source, get(Source, Key), Acc)
+        end, [], subscribers()).
+
+
+    by_source() ->
+        [{Source, by_source(Source)} || Source <- subscribers()].
+
+    by_source(Source) ->
+        lists:foldl(
+            fun(Key, Acc) -> append_if_defined(Key, get(Source, Key), Acc)
+        end, [], keys()).
+
+    version() ->
+        [{Subscriber, version(Subscriber)} || Subscriber <- subscribers()].
+
+    %% Helper functions
+    append_if_defined(Type, undefined, Acc) -> Acc;
+    append_if_defined(Type, Value, Acc) -> [{Type, Value} | Acc].
+    "
+    %% In addition to preamble we also generate following methods
+    %% get(Source1, Key1) -> Data;
+    %% get(Source, Key) -> undefined.
+
+    %% version(Source1) -> "HASH";
+    %% version(Source) -> {error, {unknown, Source}}.
+
+    %% keys() -> [].
+    %% subscribers() -> [].
+    .
+
+generate(Handle, Defs) ->
+    GetFunForms = couch_epi_codegen:function(getters(Defs)),
+    VersionFunForms = couch_epi_codegen:function(version_method(Defs)),
+    KeysForms = keys_method(Defs),
+    SubscribersForms = subscribers_method(Defs),
+
+    Forms = couch_epi_codegen:scan(preamble())
+        ++ GetFunForms ++ VersionFunForms
+        ++ KeysForms ++ SubscribersForms,
+
+    couch_epi_codegen:generate(Handle, Forms).
+
+keys_method(Defs) ->
+    Keys = couch_epi_codegen:format_term(defined_keys(Defs)),
+    couch_epi_codegen:scan("keys() -> " ++ Keys ++ ".").
+
+subscribers_method(Defs) ->
+    Subscribers = couch_epi_codegen:format_term(defined_subscribers(Defs)),
+    couch_epi_codegen:scan("subscribers() -> " ++ Subscribers ++ ".").
+
+getters(Defs) ->
+    DefaultClause = "get(_S, _K) -> undefined.",
+    fold_defs(Defs, [couch_epi_codegen:scan(DefaultClause)],
+        fun({Source, Key, Data}, Acc) ->
+            getter(Source, Key, Data) ++ Acc
+        end).
+
+version_method(Defs) ->
+    DefaultClause = "version(S) -> {error, {unknown, S}}.",
+    lists:foldl(fun({Source, Data}, Clauses) ->
+        version(Source, Data) ++ Clauses
+    end, [couch_epi_codegen:scan(DefaultClause)], Defs).
+
+getter(Source, Key, Data) ->
+    D = couch_epi_codegen:format_term(Data),
+    Src = atom_to_list(Source),
+    K = couch_epi_codegen:format_term(Key),
+    couch_epi_codegen:scan(
+        "get(" ++ Src ++ ", " ++ K ++ ") ->" ++ D ++ ";").
+
+version(Source, Data) ->
+    Src = atom_to_list(Source),
+    VSN = couch_epi_util:hash(Data),
+    couch_epi_codegen:scan("version(" ++ Src ++ ") ->" ++ VSN ++ ";").
+
+%% ------------------------------------------------------------------
+%% Helper functions
+%% ------------------------------------------------------------------
+
+module_name({Service, Key}) when is_list(Service) andalso is_list(Key) ->
+    list_to_atom(string:join([atom_to_list(?MODULE), Service, Key], "_")).
+
+is_updated(Handle, Source, Data) ->
+    Sig = couch_epi_util:hash(Data),
+    try Handle:version(Source) of
+        {error, {unknown, Source}} -> true;
+        {error, Reason} -> throw(Reason);
+        Sig -> false;
+        _ -> true
+    catch
+        error:undef -> true;
+        Class:Reason ->
+            throw({Class, {Source, Reason}})
+    end.
+
+save(Handle, Source, Data) ->
+    CurrentData = get_current_data(Handle),
+    NewDefs = lists:keystore(Source, 1, CurrentData, {Source, Data}),
+    generate(Handle, NewDefs).
+
+get_current_data(Handle) ->
+    try Handle:by_source()
+    catch error:undef -> []
+    end.
+
+
+defined_keys(Defs) ->
+    Keys = fold_defs(Defs, [], fun({_Source, Key, _Data}, Acc) ->
+        [Key | Acc]
+    end),
+    lists:usort(Keys).
+
+defined_subscribers(Defs) ->
+    [Source || {Source, _} <- Defs].
+
+fold_defs(Defs, Acc, Fun) ->
+    lists:foldl(fun({Source, SourceData}, Clauses) ->
+        lists:foldl(fun({Key, Data}, InAcc) ->
+            Fun({Source, Key, Data}, InAcc)
+        end, [], SourceData) ++ Clauses
+    end, Acc, Defs).
+
+%% ------------------------------------------------------------------
+%% Tests
+%% ------------------------------------------------------------------
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+basic_test() ->
+    Module = foo_bar_baz_bugz,
+    Data1 = [some_nice_data],
+    Data2 = "other data",
+    Data3 = {"even more data"},
+    Defs1 = [{foo, Data1}],
+    Defs2 = lists:usort([{foo, Data2}, {bar, Data3}]),
+
+    set(Module, app1, Defs1),
+    set(Module, app2, Defs2),
+
+    ?assertEqual([bar, foo], lists:usort(Module:keys())),
+    ?assertEqual([app1, app2], lists:usort(Module:subscribers())),
+
+    ?assertEqual(Data1, Module:get(app1, foo)),
+    ?assertEqual(Data2, Module:get(app2, foo)),
+    ?assertEqual(Data3, Module:get(app2, bar)),
+
+    ?assertEqual(undefined, Module:get(bad, key)),
+    ?assertEqual(undefined, Module:get(source, bad)),
+
+    ?assertEqual("3KZ4EG4WBF4J683W8GSDDPYR3", Module:version(app1)),
+    ?assertEqual("4EFUU47W9XDNMV9RMZSSJQU3Y", Module:version(app2)),
+
+    ?assertEqual({error,{unknown,bad}}, Module:version(bad)),
+
+    ?assertEqual(
+        [{app1,"3KZ4EG4WBF4J683W8GSDDPYR3"},
+         {app2,"4EFUU47W9XDNMV9RMZSSJQU3Y"}], lists:usort(Module:version())),
+
+    ?assertEqual(
+       [{app1,[some_nice_data]},{app2,"other data"}],
+       lists:usort(Module:by_key(foo))),
+
+    ?assertEqual([], lists:usort(Module:by_key(bad))),
+
+    ?assertEqual(
+        [
+            {bar, [{app2, {"even more data"}}]},
+            {foo, [{app2, "other data"}, {app1, [some_nice_data]}]}
+        ],
+        lists:usort(Module:by_key())),
+
+
+    ?assertEqual(Defs1, lists:usort(Module:by_source(app1))),
+    ?assertEqual(Defs2, lists:usort(Module:by_source(app2))),
+
+    ?assertEqual([], lists:usort(Module:by_source(bad))),
+
+    ?assertEqual(
+        [
+            {app1, [{foo, [some_nice_data]}]},
+            {app2, [{foo, "other data"}, {bar, {"even more data"}}]}
+        ],
+        lists:usort(Module:by_source())),
+
+    ?assertEqual(
+        lists:usort([Data1, Data2, Data3]), lists:usort(Module:all())),
+    ?assertEqual(lists:usort([Data1, Data2]), lists:usort(Module:all(foo))),
+    ?assertEqual([], lists:usort(Module:all(bad))),
+
+    ok.
+
+-endif.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/src/couch_epi_data_source.erl
----------------------------------------------------------------------
diff --git a/src/couch_epi_data_source.erl b/src/couch_epi_data_source.erl
new file mode 100644
index 0000000..68b4aab
--- /dev/null
+++ b/src/couch_epi_data_source.erl
@@ -0,0 +1,191 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi_data_source).
+
+-behaviour(gen_server).
+-define(MONITOR_INTERVAL, 5000).
+
+%% ------------------------------------------------------------------
+%% API Function Exports
+%% ------------------------------------------------------------------
+
+-export([childspec/5]).
+-export([start_link/4, reload/1]).
+-export([wait/1, stop/1]).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Exports
+%% ------------------------------------------------------------------
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {
+    subscriber, locator, key, hash, handle,
+    initialized = false, pending = []}).
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+childspec(Id, App, EpiKey, Locator, Options) ->
+    {
+        Id,
+        {?MODULE, start_link, [
+            App,
+            EpiKey,
+            Locator,
+            Options
+        ]},
+        permanent,
+        5000,
+        worker,
+        [?MODULE]
+    }.
+
+start_link(SubscriberApp, {epi_key, Key}, Src, Options) ->
+    {ok, Locator} = locate(SubscriberApp, Src),
+    gen_server:start_link(?MODULE, [SubscriberApp, Locator, Key, Options], []).
+
+reload(Server) ->
+    gen_server:call(Server, reload).
+
+wait(Server) ->
+    gen_server:call(Server, wait).
+
+stop(Server) ->
+    catch gen_server:call(Server, stop).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
+
+init([Subscriber, Locator, Key, Options]) ->
+    gen_server:cast(self(), init),
+    Interval = proplists:get_value(interval, Options, ?MONITOR_INTERVAL),
+    {ok, _Timer} = timer:send_interval(Interval, self(), tick),
+    {ok, #state{
+        subscriber = Subscriber,
+        locator = Locator,
+        key = Key,
+        handle = couch_epi_data_gen:get_handle(Key)}}.
+
+handle_call(wait, _From, #state{initialized = true} = State) ->
+    {reply, ok, State};
+handle_call(wait, From, #state{pending = Pending} = State) ->
+    {noreply, State#state{pending = [From | Pending]}};
+handle_call(reload, _From, State) ->
+    {Res, NewState} = reload_if_updated(State),
+    {reply, Res, NewState};
+handle_call(stop, _From, State) ->
+    {stop, normal, State};
+handle_call(_Request, _From, State) ->
+    {reply, ok, State}.
+
+handle_cast(init, #state{pending = Pending} = State) ->
+    {_, NewState} = reload_if_updated(State),
+    [gen_server:reply(Client, ok) || Client <- Pending],
+    {noreply, NewState#state{initialized = true, pending = []}};
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(tick, State0) ->
+    {_Res, State1} = reload_if_updated(State0),
+    {noreply, State1};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% ------------------------------------------------------------------
+%% Internal Function Definitions
+%% ------------------------------------------------------------------
+
+locate(App, {priv_file, FileName}) ->
+    case priv_path(App, FileName) of
+        {ok, FilePath} ->
+            ok = ensure_exists(FilePath),
+            {ok, {file, FilePath}};
+        Else ->
+            Else
+    end;
+locate(_App, {file, FilePath}) ->
+    ok = ensure_exists(FilePath),
+    {ok, {file, FilePath}}.
+
+priv_path(AppName, FileName) ->
+    case code:priv_dir(AppName) of
+        {error, _Error} = Error ->
+            Error;
+        Dir ->
+            {ok, filename:join(Dir, FileName)}
+    end.
+
+ensure_exists(FilePath) ->
+    case filelib:is_regular(FilePath) of
+        true ->
+            ok;
+        false ->
+            {error, {notfound, FilePath}}
+    end.
+
+reload_if_updated(#state{hash = OldHash, locator = Locator} = State) ->
+    case read(Locator) of
+        {ok, OldHash, _Data} ->
+            {ok, State};
+        {ok, Hash, Data} ->
+            safe_set(Hash, Data, State);
+        Else ->
+            {Else, State}
+    end.
+
+safe_set(Hash, Data, #state{} = State) ->
+    #state{
+        handle = Handle,
+        subscriber = Subscriber,
+        key = Key} = State,
+
+    try
+        OldData = current(Handle, Subscriber),
+        ok = couch_epi_data_gen:set(Handle, Subscriber, Data),
+        couch_epi_server:notify(Subscriber, Key, {data, OldData}, {data, Data}),
+        {ok, State#state{hash = Hash}}
+    catch Class:Reason ->
+        {{Class, Reason}, State}
+    end.
+
+read({file, FilePath}) ->
+    case file:consult(FilePath) of
+        {ok, Data} ->
+            {ok, hash_of_file(FilePath), Data};
+        {error, Reason} ->
+            {error, {FilePath, Reason}}
+    end.
+
+hash_of_file(FilePath) ->
+    {ok, Data} = file:read_file(FilePath),
+    crypto:hash(md5, Data).
+
+current(Handle, Subscriber) ->
+    try
+        case couch_epi_data_gen:by_source(Handle, Subscriber) of
+            undefined -> [];
+            Data -> Data
+        end
+    catch error:undef ->
+        []
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/src/couch_epi_functions.erl
----------------------------------------------------------------------
diff --git a/src/couch_epi_functions.erl b/src/couch_epi_functions.erl
new file mode 100644
index 0000000..f86ca61
--- /dev/null
+++ b/src/couch_epi_functions.erl
@@ -0,0 +1,155 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi_functions).
+
+-behaviour(gen_server).
+-define(MONITOR_INTERVAL, 5000).
+
+%% ------------------------------------------------------------------
+%% API Function Exports
+%% ------------------------------------------------------------------
+
+-export([childspec/4]).
+-export([start_link/4, reload/1]).
+-export([wait/1, stop/1]).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Exports
+%% ------------------------------------------------------------------
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {
+    provider, service_id, modules, hash, handle,
+    initialized = false, pending = []}).
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+childspec(Id, App, EpiKey, Module) ->
+    {
+        Id,
+        {?MODULE, start_link, [
+            App,
+            EpiKey,
+            Module,
+            []
+        ]},
+        permanent,
+        5000,
+        worker,
+        [Module]
+    }.
+
+start_link(ProviderApp, {epi_key, ServiceId}, {modules, Modules}, Options) ->
+    gen_server:start_link(
+        ?MODULE, [ProviderApp, ServiceId, Modules, Options], []).
+
+reload(Server) ->
+    gen_server:call(Server, reload).
+
+wait(Server) ->
+    gen_server:call(Server, wait).
+
+stop(Server) ->
+    catch gen_server:call(Server, stop).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
+
+init([Provider, ServiceId, Modules, Options]) ->
+    gen_server:cast(self(), init),
+    Interval = proplists:get_value(interval, Options, ?MONITOR_INTERVAL),
+    {ok, _Timer} = timer:send_interval(Interval, self(), tick),
+    {ok, #state{
+        provider = Provider,
+        modules = Modules,
+        service_id = ServiceId,
+        handle = couch_epi_functions_gen:get_handle(ServiceId)}}.
+
+handle_call(wait, _From, #state{initialized = true} = State) ->
+    {reply, ok, State};
+handle_call(wait, From, #state{pending = Pending} = State) ->
+    {noreply, State#state{pending = [From | Pending]}};
+handle_call(reload, _From, State) ->
+    {Res, NewState} = reload_if_updated(State),
+    {reply, Res, NewState};
+handle_call(stop, _From, State) ->
+    {stop, normal, State};
+handle_call(_Request, _From, State) ->
+    {reply, ok, State}.
+
+handle_cast(init, #state{pending = Pending} = State) ->
+    {_, NewState} = reload_if_updated(State),
+    [gen_server:reply(Client, ok) || Client <- Pending],
+    {noreply, NewState#state{initialized = true, pending = []}};
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(tick, State) ->
+    {_Res, NewState} = reload_if_updated(State),
+    {noreply, NewState};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, State) ->
+    safe_remove(State),
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% ------------------------------------------------------------------
+%% Internal Function Definitions
+%% ------------------------------------------------------------------
+
+reload_if_updated(#state{hash = OldHash, modules = Modules} = State) ->
+    case couch_epi_functions_gen:hash(Modules) of
+        OldHash ->
+            {ok, State};
+        Hash ->
+            safe_add(Hash, State)
+    end.
+
+safe_add(Hash, #state{modules = OldModules} = State) ->
+    #state{
+        handle = Handle,
+        provider = Provider,
+        modules = Modules,
+        service_id = ServiceId} = State,
+    try
+        ok = couch_epi_functions_gen:add(Handle, Provider, Modules),
+        couch_epi_server:notify(
+            Provider, ServiceId, {modules, OldModules}, {modules, Modules}),
+        {ok, State#state{hash = Hash}}
+    catch Class:Reason ->
+        {{Class, Reason}, State}
+    end.
+
+safe_remove(#state{} = State) ->
+    #state{
+        handle = Handle,
+        provider = Provider,
+        modules = Modules,
+        service_id = ServiceId} = State,
+    try
+        ok = couch_epi_functions_gen:remove(Handle, Provider, Modules),
+        couch_epi_server:notify(
+            Provider, ServiceId, {modules, Modules}, {modules, []}),
+        {ok, State#state{modules = []}}
+    catch Class:Reason ->
+        {{Class, Reason}, State}
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/src/couch_epi_functions_gen.erl
----------------------------------------------------------------------
diff --git a/src/couch_epi_functions_gen.erl b/src/couch_epi_functions_gen.erl
new file mode 100644
index 0000000..b62fbb5
--- /dev/null
+++ b/src/couch_epi_functions_gen.erl
@@ -0,0 +1,347 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi_functions_gen).
+
+-export([add/3, remove/3, get_handle/1, hash/1, apply/4, apply/5]).
+
+-ifdef(TEST).
+
+-export([foo/2, bar/0]).
+
+-endif.
+
+-record(opts, {
+    ignore_errors = false,
+    ignore_providers = false,
+    pipe = false,
+    concurrent = false
+}).
+
+add(Handle, Source, Modules) ->
+    case is_updated(Handle, Source, Modules) of
+        false ->
+            ok;
+        true ->
+            save(Handle, Source, Modules)
+    end.
+
+remove(Handle, Source, Modules) ->
+    CurrentDefs = get_current_definitions(Handle),
+    {SourceDefs, Defs} = remove_from_definitions(CurrentDefs, Source),
+
+    NewSourceDefs = lists:filter(fun({M, _}) ->
+        not lists:member(M, Modules)
+    end, SourceDefs),
+
+    generate(Handle, Defs ++ NewSourceDefs).
+
+get_handle(ServiceId) ->
+    module_name(atom_to_list(ServiceId)).
+
+apply(ServiceId, Function, Args, Opts) when is_atom(ServiceId) ->
+    apply(get_handle(ServiceId), ServiceId, Function, Args, Opts).
+
+-spec apply(Handle :: atom(), ServiceId :: atom(), Function :: atom(),
+    Args :: [term()], Opts :: couch_epi:apply_opts()) -> ok.
+
+apply(Handle, _ServiceId, Function, Args, Opts) ->
+    DispatchOpts = parse_opts(Opts),
+    Modules = providers(Handle, Function, length(Args), DispatchOpts),
+    dispatch(Handle, Modules, Function, Args, DispatchOpts).
+
+
+%% ------------------------------------------------------------------
+%% Codegeneration routines
+%% ------------------------------------------------------------------
+
+preamble() ->
+    "
+    -export([version/0, version/1]).
+    -export([providers/0, providers/2]).
+    -export([definitions/0, definitions/1]).
+    -export([dispatch/3]).
+    -export([callbacks/2]).
+
+    version() ->
+        [{Provider, version(Provider)} || Provider <- providers()].
+
+    definitions() ->
+        [{Provider, definitions(Provider)} || Provider <- providers()].
+
+    callbacks(Provider, Function) ->
+        [].
+
+    "
+    %% In addition to preamble we also generate following methods
+    %% dispatch(Module, Function, [A1, A2]) -> Module:Function(A1, A2);
+
+    %% version(Source1) -> "HASH";
+    %% version(Source) -> {error, {unknown, Source}}.
+
+    %% providers() -> [].
+    %% providers(Function, Arity) -> [].
+    %% definitions(Provider) -> [{Module, [{Fun, Arity}]}].
+    .
+
+generate(Handle, Defs) ->
+    DispatchFunForms = couch_epi_codegen:function(dispatchers(Defs)),
+    VersionFunForms = couch_epi_codegen:function(version_method(Defs)),
+
+    AllProvidersForms = all_providers_method(Defs),
+    ProvidersForms = couch_epi_codegen:function(providers_method(Defs)),
+    DefinitionsForms = couch_epi_codegen:function(definitions_method(Defs)),
+
+    Forms = couch_epi_codegen:scan(preamble())
+        ++ DispatchFunForms ++ VersionFunForms
+        ++ ProvidersForms ++ AllProvidersForms
+        ++ DefinitionsForms,
+
+    couch_epi_codegen:generate(Handle, Forms).
+
+all_providers_method(Defs) ->
+    Providers = couch_epi_codegen:format_term(defined_providers(Defs)),
+    couch_epi_codegen:scan("providers() -> " ++ Providers ++ ".").
+
+providers_method(Defs) ->
+    Providers = providers_by_function(Defs),
+    DefaultClause = "providers(_, _) -> [].",
+    lists:foldl(fun({{Fun, Arity}, Modules}, Clauses) ->
+        providers(Fun, Arity, Modules) ++ Clauses
+    end, [couch_epi_codegen:scan(DefaultClause)], Providers).
+
+providers(Function, Arity, Modules) ->
+    ArityStr = integer_to_list(Arity),
+    Mods = couch_epi_codegen:format_term(Modules),
+    Fun = atom_to_list(Function),
+    %% providers(Function, Arity) -> [Module];
+    couch_epi_codegen:scan(
+        "providers(" ++ Fun ++ "," ++ ArityStr ++ ") ->" ++ Mods ++ ";").
+
+dispatchers(Defs) ->
+    DefaultClause = "dispatch(_Module, _Fun, _Args) -> ok.",
+    fold_defs(Defs, [couch_epi_codegen:scan(DefaultClause)],
+        fun({_Source, Module, Function, Arity}, Acc) ->
+            dispatcher(Module, Function, Arity) ++ Acc
+        end).
+
+version_method(Defs) ->
+    DefaultClause = "version(S) -> {error, {unknown, S}}.",
+    lists:foldl(fun({Source, SrcDefs}, Clauses) ->
+        version(Source, SrcDefs) ++ Clauses
+    end, [couch_epi_codegen:scan(DefaultClause)], Defs).
+
+definitions_method(Defs) ->
+    DefaultClause = "definitions(S) -> {error, {unknown, S}}.",
+    lists:foldl(fun({Source, SrcDefs}, Clauses) ->
+        definition(Source, SrcDefs) ++ Clauses
+    end, [couch_epi_codegen:scan(DefaultClause)], Defs).
+
+definition(Source, Defs) ->
+    Src = atom_to_list(Source),
+    DefsStr = couch_epi_codegen:format_term(Defs),
+    couch_epi_codegen:scan("definitions(" ++ Src ++ ") -> " ++ DefsStr ++ ";").
+
+dispatcher(Module, Function, 0) ->
+    M = atom_to_list(Module),
+    Fun = atom_to_list(Function),
+
+    %% dispatch(Module, Function, []) -> Module:Function();
+    couch_epi_codegen:scan(
+        "dispatch(" ++ M ++ "," ++ Fun ++ ", []) ->"
+            ++ M ++ ":" ++ Fun ++ "();");
+dispatcher(Module, Function, Arity) ->
+    Args = args_string(Arity),
+    M = atom_to_list(Module),
+    Fun = atom_to_list(Function),
+    %% dispatch(Module, Function, [A1, A2]) -> Module:Function(A1, A2);
+    couch_epi_codegen:scan(
+        "dispatch(" ++ M ++ "," ++ Fun ++ ", [" ++ Args ++ "]) ->"
+            ++ M ++ ":" ++ Fun ++ "(" ++ Args ++ ");").
+
+args_string(Arity) ->
+    Vars = ["A" ++ integer_to_list(Seq)  || Seq <- lists:seq(1, Arity)],
+    string:join(Vars, ", ").
+
+version(Source, SrcDefs) ->
+    Modules = [Module || {Module, _Exports} <- SrcDefs],
+    couch_epi_codegen:scan(
+        "version(" ++ atom_to_list(Source) ++ ") ->" ++ hash(Modules) ++ ";").
+
+
+
+%% ------------------------------------------------------------------
+%% Helper functions
+%% ------------------------------------------------------------------
+
+module_name(ServiceId) when is_list(ServiceId) ->
+    list_to_atom(string:join([atom_to_list(?MODULE), ServiceId], "_")).
+
+is_updated(Handle, Source, Modules) ->
+    Sig = hash(Modules),
+    try Handle:version(Source) of
+        {error, {unknown, Source}} -> true;
+        {error, Reason} -> throw(Reason);
+        Sig -> false;
+        _ -> true
+    catch
+        error:undef -> true;
+        Class:Reason ->
+            throw({Class, {Source, Reason}})
+    end.
+
+save(Handle, Source, Modules) ->
+    CurrentDefs = get_current_definitions(Handle),
+    Definitions = definitions(Source, Modules),
+    NewDefs = lists:keystore(Source, 1, CurrentDefs, Definitions),
+    generate(Handle, NewDefs).
+
+definitions(Source, Modules) ->
+    Blacklist = [{module_info, 0}, {module_info, 1}],
+    SrcDefs = [{M, M:module_info(exports) -- Blacklist} || M <- Modules],
+    {Source, SrcDefs}.
+
+get_current_definitions(Handle) ->
+    try Handle:definitions()
+    catch error:undef -> []
+    end.
+
+defined_providers(Defs) ->
+    [Source || {Source, _} <- Defs].
+
+%% Defs = [{Source, [{Module, [{Fun, Arity}]}]}]
+fold_defs(Defs, Acc, Fun) ->
+    lists:foldl(fun({Source, SourceData}, Clauses) ->
+        lists:foldl(fun({Module, Exports}, ExportsAcc) ->
+            lists:foldl(fun({Function, Arity}, InAcc) ->
+                Fun({Source, Module, Function, Arity}, InAcc)
+            end, [], Exports) ++ ExportsAcc
+        end, [], SourceData) ++ Clauses
+    end, Acc, Defs).
+
+providers_by_function(Defs) ->
+    Providers = fold_defs(Defs, [],
+        fun({_Source, Module, Function, Arity}, Acc) ->
+            [{{Function, Arity}, Module} | Acc]
+        end
+    ),
+    Dict = lists:foldl(fun({K, V}, Acc) ->
+        dict:append(K, V, Acc)
+    end, dict:new(), Providers),
+    dict:to_list(Dict).
+
+
+hash(Modules) ->
+    VSNs = [couch_epi_util:module_version(M) || M <- lists:usort(Modules)],
+    couch_epi_util:hash(VSNs).
+
+dispatch(_Handle, _Modules, _Func, _Args, #opts{concurrent = true, pipe = true}) ->
+    throw({error, {incompatible_options, [concurrent, pipe]}});
+dispatch(Handle, Modules, Function, Args,
+        #opts{pipe = true, ignore_errors = true}) ->
+    lists:foldl(fun(Module, Acc) ->
+        try
+            Handle:dispatch(Module, Function, Acc)
+        catch _:_ ->
+            Acc
+        end
+    end, Args, Modules);
+dispatch(Handle, Modules, Function, Args,
+        #opts{pipe = true}) ->
+    lists:foldl(fun(Module, Acc) ->
+        Handle:dispatch(Module, Function, Acc)
+    end, Args, Modules);
+dispatch(Handle, Modules, Function, Args, #opts{} = Opts) ->
+    [do_dispatch(Handle, Module, Function, Args, Opts) || Module <- Modules].
+
+do_dispatch(Handle, Module, Function, Args,
+        #opts{concurrent = true, ignore_errors = true}) ->
+    spawn(fun() ->
+        (catch Handle:dispatch(Module, Function, Args))
+    end);
+do_dispatch(Handle, Module, Function, Args,
+        #opts{ignore_errors = true}) ->
+    (catch Handle:dispatch(Module, Function, Args));
+do_dispatch(Handle, Module, Function, Args,
+        #opts{concurrent = true}) ->
+    spawn(fun() -> Handle:dispatch(Module, Function, Args) end);
+do_dispatch(Handle, Module, Function, Args, #opts{}) ->
+    Handle:dispatch(Module, Function, Args).
+
+
+parse_opts(Opts) ->
+    parse_opts(Opts, #opts{}).
+
+parse_opts([ignore_errors|Rest], #opts{} = Acc) ->
+    parse_opts(Rest, Acc#opts{ignore_errors = true});
+parse_opts([pipe|Rest], #opts{} = Acc) ->
+    parse_opts(Rest, Acc#opts{pipe = true});
+parse_opts([concurrent|Rest], #opts{} = Acc) ->
+    parse_opts(Rest, Acc#opts{concurrent = true});
+parse_opts([ignore_providers|Rest], #opts{} = Acc) ->
+    parse_opts(Rest, Acc#opts{ignore_providers = true});
+parse_opts([], Acc) ->
+    Acc.
+
+providers(Handle, Function, Arity, #opts{ignore_providers = true}) ->
+    try
+        Handle:providers(Function, Arity)
+    catch
+        error:undef -> []
+    end;
+providers(Handle, Function, Arity, #opts{}) ->
+    Handle:providers(Function, Arity).
+
+remove_from_definitions(Defs, Source) ->
+    case lists:keytake(Source, 1, Defs) of
+        {value, {Source, Value}, Rest} ->
+            {Value, Rest};
+        false ->
+            {[], Defs}
+    end.
+
+%% ------------------------------------------------------------------
+%% Tests
+%% ------------------------------------------------------------------
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+foo(A1, A2) ->
+    {A1, A2}.
+
+bar() ->
+    [].
+
+basic_test() ->
+    Module = foo_bar_dispatcher,
+    add(Module, app1, [?MODULE]),
+
+    ?assert(is_list(Module:version(app1))),
+
+    Defs1 = lists:usort(Module:definitions()),
+    ?assertMatch([{app1, [{?MODULE, _}]}], Defs1),
+    [{app1, [{?MODULE, Exports}]}] = Defs1,
+    ?assert(lists:member({bar, 0}, Exports)),
+
+    add(Module, app2, [?MODULE]),
+    Defs2 = lists:usort(Module:definitions()),
+    ?assertMatch([{app1, [{?MODULE, _}]}, {app2, [{?MODULE, _}]}], Defs2),
+
+    ?assertMatch([{app1, Hash}, {app2, Hash}], Module:version()),
+
+    ?assertMatch([], Module:dispatch(?MODULE, bar, [])),
+    ?assertMatch({1, 2}, Module:dispatch(?MODULE, foo, [1, 2])),
+
+    ok.
+
+-endif.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/src/couch_epi_server.erl
----------------------------------------------------------------------
diff --git a/src/couch_epi_server.erl b/src/couch_epi_server.erl
new file mode 100644
index 0000000..e325db7
--- /dev/null
+++ b/src/couch_epi_server.erl
@@ -0,0 +1,144 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi_server).
+-behaviour(gen_server).
+-define(SERVER, ?MODULE).
+
+%% ------------------------------------------------------------------
+%% API Function Exports
+%% ------------------------------------------------------------------
+
+-export([start_link/0]).
+-export([subscribe/3, subscribe/4, unsubscribe/1, unsubscribe/2]).
+-export([notify/4, notify/5]).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Exports
+%% ------------------------------------------------------------------
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(epi_server_state, {subscriptions}).
+
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
+
+start_link() ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+subscribe(App, Key, MFA) ->
+    subscribe(?SERVER, App, Key, MFA).
+
+subscribe(Server, App, Key, {_M, _F, _A} = MFA) ->
+    gen_server:call(Server, {subscribe, App, Key, MFA}).
+
+unsubscribe(Subscription) ->
+    unsubscribe(?SERVER, Subscription).
+
+unsubscribe(Server, Subscription) ->
+    gen_server:call(Server, {unsubscribe, Subscription}).
+
+notify(App, Key, OldData, Data) ->
+    notify(?SERVER, App, Key, OldData, Data).
+
+notify(Server, App, Key, OldData, Data) ->
+    gen_server:cast(Server, {notify, App, Key, OldData, Data}).
+
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
+
+init(_Args) ->
+    State = #epi_server_state{subscriptions = dict:new()},
+    {ok, State}.
+
+handle_call({subscribe, App, Key, MFA}, {Pid, _Tag},
+        #epi_server_state{subscriptions = Subscriptions0} = State0) ->
+    {Subscription, Subscriptions1} = add(Pid, Subscriptions0, App, Key, MFA),
+    State1 = State0#epi_server_state{subscriptions = Subscriptions1},
+    {reply, {ok, Subscription}, State1};
+handle_call({unsubscribe, Subscription}, _From,
+        #epi_server_state{subscriptions = Subscriptions0} = State0) ->
+    Subscriptions1 = remove(Subscriptions0, Subscription),
+    State1 = State0#epi_server_state{subscriptions = Subscriptions1},
+    {reply, ok, State1};
+handle_call(_Request, _From, State) ->
+    {stop, normal, State}.
+
+handle_cast({notify, App, Key, OldData, Data},
+        #epi_server_state{subscriptions = Subscriptions} = State) ->
+    Subscribers = subscribers(Subscriptions, App, Key),
+    notify_subscribers(Subscribers, App, Key, OldData, Data),
+    {noreply, State};
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({'DOWN', MonitorRef, Type, Object, Info},
+        #epi_server_state{subscriptions = Subscriptions0} = State0) ->
+    Subscriptions1 = remove(Subscriptions0, MonitorRef),
+    State1 = State0#epi_server_state{subscriptions = Subscriptions1},
+    {noreply, State1};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% ------------------------------------------------------------------
+%% Internal Function Definitions
+%% ------------------------------------------------------------------
+
+subscribers(Subscriptions, App, Key) ->
+    case dict:find({App, Key}, Subscriptions) of
+        error ->
+            [];
+        {ok, Subscribers} ->
+            Subscribers
+    end.
+
+add(Pid, Subscriptions, App, Key, MFA) ->
+    Subscription = erlang:monitor(process, Pid),
+    {Subscription, dict:append({App, Key}, {Subscription, MFA}, Subscriptions)}.
+
+remove(Subscriptions, SubscriptionId) ->
+    case find(Subscriptions, SubscriptionId) of
+        {App, Key} ->
+            demonitor(SubscriptionId, [flush]),
+            delete_subscriber(Subscriptions, App, Key, SubscriptionId);
+        _ ->
+            Subscriptions
+    end.
+
+find(Subscriptions, SubscriptionId) ->
+    dict:fold(fun(Key, Subscribers, Acc) ->
+        case [ok || {Id, _MFA} <- Subscribers, Id =:= SubscriptionId] of
+            [_] ->
+                Key;
+            [] ->
+                Acc
+        end
+    end, not_found, Subscriptions).
+
+delete_subscriber(Subscriptions, App, Key, SubscriptionId) ->
+    dict:update({App, Key}, fun(Subscribers) ->
+        [{Id, MFA} || {Id, MFA} <- Subscribers, Id =/= SubscriptionId]
+    end, Subscriptions).
+
+notify_subscribers(Subscribers, App, Key, OldData, Data) ->
+    [M:F(App, Key, OldData, Data, A) || {_Id, {M, F, A}} <- Subscribers].

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/src/couch_epi_sup.erl
----------------------------------------------------------------------
diff --git a/src/couch_epi_sup.erl b/src/couch_epi_sup.erl
new file mode 100644
index 0000000..5e44d1b
--- /dev/null
+++ b/src/couch_epi_sup.erl
@@ -0,0 +1,38 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+    {ok, { {one_for_one, 5, 10}, [?CHILD(couch_epi_server, worker)]} }.

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/src/couch_epi_util.erl
----------------------------------------------------------------------
diff --git a/src/couch_epi_util.erl b/src/couch_epi_util.erl
new file mode 100644
index 0000000..1c39aa5
--- /dev/null
+++ b/src/couch_epi_util.erl
@@ -0,0 +1,24 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi_util).
+
+-export([module_version/1, hash/1]).
+
+module_version(Module) ->
+    Attributes = Module:module_info(attributes),
+    {vsn, VSNs} = lists:keyfind(vsn, 1, Attributes),
+    VSNs.
+
+hash(Term) ->
+    <<SigInt:128/integer>> = crypto:hash(md5, term_to_binary(Term)),
+    io_lib:format("\"~.36B\"",[SigInt]).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/test/couch_epi_data_source_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_epi_data_source_tests.erl b/test/couch_epi_data_source_tests.erl
new file mode 100644
index 0000000..f5d701f
--- /dev/null
+++ b/test/couch_epi_data_source_tests.erl
@@ -0,0 +1,90 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi_data_source_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(DATA_FILE1, ?ABS_PATH("test/fixtures/app_data1.cfg")).
+-define(DATA_FILE2, ?ABS_PATH("test/fixtures/app_data2.cfg")).
+
+-record(ctx, {file, handle, pid}).
+
+setup() ->
+    Key = {test_app, descriptions},
+    File = ?tempfile(),
+    {ok, _} = file:copy(?DATA_FILE1, File),
+    {ok, Pid} = couch_epi_data_source:start_link(
+        test_app, {epi_key, Key}, {file, File}, [{interval, 100}]),
+    ok = couch_epi_data_source:wait(Pid),
+    #ctx{
+        pid = Pid,
+        file = File,
+        handle = couch_epi_data_gen:get_handle(Key)}.
+
+
+teardown(#ctx{pid = Pid, file = File}) ->
+    file:delete(File),
+    couch_epi_data_source:stop(Pid),
+    catch meck:unload(compile),
+    ok.
+
+
+epi_data_source_reload_test_() ->
+    {
+        "data_source reload tests",
+        {
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [
+                fun ensure_reload_if_manually_triggered/1,
+                fun ensure_reload_if_changed/1,
+                fun ensure_no_reload_when_no_change/1
+            ]
+        }
+    }.
+
+ensure_reload_if_manually_triggered(#ctx{pid = Pid, file = File}) ->
+    ?_test(begin
+        ok = meck:new(compile, [passthrough, unstick]),
+        ok = meck:expect(compile, forms, fun(_, _) -> {error, reload} end),
+        {ok, _} = file:copy(?DATA_FILE2, File),
+        Result = couch_epi_data_source:reload(Pid),
+        ?assertMatch({error,{badmatch,{error,reload}}}, Result)
+    end).
+
+ensure_reload_if_changed(#ctx{file = File, handle = Handle}) ->
+    ?_test(begin
+        ?assertMatch(
+            [[{type,counter},{desc,foo}]],
+            couch_epi_data_gen:get(Handle, [complex, key, 1])),
+        {ok, _} = file:copy(?DATA_FILE2, File),
+        timer:sleep(150),
+        ?assertMatch(
+            [[{type,counter},{desc,bar}]],
+            couch_epi_data_gen:get(Handle, [complex, key, 2]))
+    end).
+
+ensure_no_reload_when_no_change(#ctx{handle = Handle}) ->
+    ok = meck:new(compile, [passthrough, unstick]),
+    ok = meck:expect(compile, forms, fun(_, _) ->
+        {error, compile_should_not_be_called} end),
+    ?_test(begin
+        ?assertMatch(
+            [[{type,counter},{desc,foo}]],
+            couch_epi_data_gen:get(Handle, [complex, key, 1])),
+        timer:sleep(200),
+        ?assertMatch(
+            [],
+            couch_epi_data_gen:get(Handle, [complex, key, 2]))
+    end).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/test/couch_epi_functions_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_epi_functions_tests.erl b/test/couch_epi_functions_tests.erl
new file mode 100644
index 0000000..c12b326
--- /dev/null
+++ b/test/couch_epi_functions_tests.erl
@@ -0,0 +1,126 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi_functions_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(MODULE1(Name), "
+    -export([foo/2, bar/0, inc/1]).
+    foo(A1, A2) ->
+        {A1, A2}.
+
+    bar() ->
+        [].
+
+    inc(A) ->
+        A + 1.
+").
+
+-define(MODULE2(Name), "
+    -export([baz/1, inc/1]).
+    baz(A1) ->
+        A1.
+
+    inc(A) ->
+        A + 1.
+").
+
+setup() ->
+    setup([{interval, 100}]).
+
+setup(Opts) ->
+    ServiceId = my_service,
+    Module = my_test_module,
+    ok = generate_module(Module, ?MODULE1(Module)),
+    {ok, Pid} = couch_epi_functions:start_link(
+        test_app, {epi_key, ServiceId}, {modules, [Module]}, Opts),
+    ok = couch_epi_functions:wait(Pid),
+    {Pid, Module, ServiceId, couch_epi_functions_gen:get_handle(ServiceId)}.
+
+teardown({Pid, Module, _, Handle}) ->
+    code:purge(Module),
+    %%code:purge(Handle), %% FIXME temporary hack
+    couch_epi_functions:stop(Pid),
+    catch meck:unload(compile),
+    ok.
+
+generate_module(Name, Body) ->
+    Tokens = couch_epi_codegen:scan(Body),
+    couch_epi_codegen:generate(Name, Tokens).
+
+temp_atom() ->
+    {A, B, C} = erlang:now(),
+    list_to_atom(lists:flatten(io_lib:format("module~p~p~p", [A, B, C]))).
+
+
+epi_functions_test_() ->
+    {
+        "functions reload tests",
+        {
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [
+                fun ensure_reload_if_changed/1,
+                fun ensure_no_reload_when_no_change/1
+            ]
+        }
+    }.
+
+epi_functions_manual_reload_test_() ->
+    {
+        "functions manual reload tests",
+        {
+            foreach,
+            fun() -> setup([{interval, 10000}]) end,
+            fun teardown/1,
+            [
+                fun ensure_reload_if_manually_triggered/1
+            ]
+        }
+    }.
+
+ensure_reload_if_manually_triggered({Pid, Module, _ServiceId, _Handle}) ->
+    ?_test(begin
+        ok = generate_module(Module, ?MODULE2(Module)),
+        ok = meck:new(compile, [passthrough, unstick]),
+        ok = meck:expect(compile, forms, fun(_, _) -> {error, reload} end),
+        Result = couch_epi_functions:reload(Pid),
+        ?assertMatch({error,{badmatch,{error,reload}}}, Result)
+    end).
+
+ensure_reload_if_changed({_Pid, Module, ServiceId, Handle}) ->
+    ?_test(begin
+        ?assertMatch(
+            [{1, 2}],
+            couch_epi_functions_gen:apply(ServiceId, foo, [1, 2], [])),
+        ok = generate_module(Module, ?MODULE2(Module)),
+        timer:sleep(150),
+        ?assertMatch(
+            [3],
+            couch_epi_functions_gen:apply(ServiceId, baz, [3], []))
+    end).
+
+ensure_no_reload_when_no_change({_Pid, Module, ServiceId, Handle}) ->
+    ok = meck:new(compile, [passthrough, unstick]),
+    ok = meck:expect(compile, forms, fun(_, _) ->
+        {error, compile_should_not_be_called} end),
+    ?_test(begin
+        ?assertMatch(
+            [{1, 2}],
+            couch_epi_functions_gen:apply(ServiceId, foo, [1, 2], [])),
+        timer:sleep(200),
+        ?assertMatch(
+            [],
+            couch_epi_functions_gen:apply(ServiceId, baz, [3], []))
+    end).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-epi/blob/32ba6e79/test/couch_epi_tests.erl
----------------------------------------------------------------------
diff --git a/test/couch_epi_tests.erl b/test/couch_epi_tests.erl
new file mode 100644
index 0000000..bba2ae6
--- /dev/null
+++ b/test/couch_epi_tests.erl
@@ -0,0 +1,413 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_epi_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(DATA_FILE1, ?ABS_PATH("test/fixtures/app_data1.cfg")).
+-define(DATA_FILE2, ?ABS_PATH("test/fixtures/app_data2.cfg")).
+
+-export([notify_cb/5, save/3]).
+
+-record(ctx, {
+    file, data_handle, data_source,
+    functions_handle, functions_source, kv}).
+
+-define(TIMEOUT, 5000).
+
+-define(MODULE1(Name), "
+    -export([inc/2, fail/2]).
+
+    inc(KV, A) ->
+        Reply = A + 1,
+        couch_epi_tests:save(KV, inc1, Reply),
+        [KV, Reply].
+
+    fail(KV, A) ->
+        inc(KV, A).
+").
+
+-define(MODULE2(Name), "
+    -export([inc/2, fail/2]).
+
+    inc(KV, A) ->
+        Reply = A + 1,
+        couch_epi_tests:save(KV, inc2, Reply),
+        [KV, Reply].
+
+    fail(KV, _A) ->
+        couch_epi_tests:save(KV, inc2, check_error),
+        throw(check_error).
+").
+
+
+notify_cb(App, Key, OldData, Data, KV) ->
+    save(KV, is_called, {App, Key, OldData, Data}).
+
+setup() ->
+    error_logger:tty(false),
+
+    Key = {test_app, descriptions},
+    File = ?tempfile(),
+    {ok, _} = file:copy(?DATA_FILE1, File),
+    application:start(couch_epi),
+    {ok, DataPid} = couch_epi_data_source:start_link(
+        test_app, {epi_key, Key}, {file, File}, [{interval, 100}]),
+    ok = couch_epi_data_source:wait(DataPid),
+
+    ok = generate_module(provider1, ?MODULE1(provider1)),
+    ok = generate_module(provider2, ?MODULE2(provider2)),
+
+    {ok, FunctionsPid} = couch_epi_functions:start_link(
+        test_app, {epi_key, my_service}, {modules, [provider1, provider2]},
+        [{interval, 100}]),
+    ok = couch_epi_functions:wait(FunctionsPid),
+    KV = state_storage(),
+    #ctx{
+        file = File,
+        data_handle = couch_epi:get_handle(Key),
+        functions_handle = couch_epi:get_handle(my_service),
+        kv = KV,
+        data_source = DataPid,
+        functions_source = FunctionsPid}.
+
+setup(_Opts) ->
+    setup().
+
+teardown(_, #ctx{} = Ctx) ->
+    teardown(Ctx).
+
+teardown(#ctx{data_source = DataPid,
+              functions_source = FunctionsPid,
+              kv = KV, file = File}) ->
+    file:delete(File),
+    couch_epi_data_source:stop(DataPid),
+    couch_epi_functions:stop(FunctionsPid),
+    catch meck:unload(compile),
+    call(KV, stop),
+    application:stop(couch_epi),
+    ok.
+
+epi_config_update_test_() ->
+    Funs = [
+        fun ensure_notified_when_changed/2,
+        fun ensure_not_notified_when_no_change/2,
+        fun ensure_not_notified_when_unsubscribed/2
+    ],
+    {
+        "config update tests",
+        {
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [make_case("Check notifications for: ", [module, file], Funs)]
+        }
+    }.
+
+epi_data_source_test_() ->
+    {
+        "epi data_source tests",
+        {
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [
+                fun check_dump/1,
+                fun check_get/1,
+                fun check_get_value/1,
+                fun check_by_key/1,
+                fun check_by_source/1,
+                fun check_keys/1,
+                fun check_subscribers/1
+            ]
+        }
+    }.
+
+
+epi_apply_test_() ->
+    {
+        "epi dispatch tests",
+        {
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [
+                fun check_pipe/1,
+                fun check_broken_pipe/1,
+                fun ensure_fail/1,
+                fun ensure_fail_pipe/1
+            ]
+        }
+    }.
+
+epi_subscription_test_() ->
+    {
+        "epi subscription tests",
+        {
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [
+                fun ensure_unsubscribe_when_caller_die/1
+            ]
+        }
+    }.
+
+apply_options_test_() ->
+    Funs = [fun ensure_apply_is_called/2],
+    make_case("Apply with options: ", valid_options_permutations(), Funs).
+
+
+make_case(Msg, P, Funs) ->
+    [{format_case_name(Msg, Case), [
+        {
+            foreachx, fun setup/1, fun teardown/2,
+            [
+                 {Case, Fun} || Fun <- Funs
+            ]
+        }
+    ]} || Case <- P].
+
+format_case_name(Msg, Case) ->
+    lists:flatten(Msg ++ io_lib:format("~p", [Case])).
+
+valid_options_permutations() ->
+    [
+        [],
+        [ignore_errors],
+        [pipe],
+        [pipe, ignore_errors],
+        [concurrent],
+        [concurrent, ignore_errors]
+    ].
+
+ensure_notified_when_changed(file, #ctx{file = File} = Ctx) ->
+    ?_test(begin
+        Key = {test_app, descriptions},
+        subscribe(Ctx, test_app, Key),
+        update(file, Ctx),
+        timer:sleep(200),
+        ExpectedData = lists:usort([
+            {[complex, key, 1], [{type, counter}, {desc, updated_foo}]},
+            {[complex, key, 2], [{type, counter}, {desc, bar}]}
+        ]),
+        Result = get(Ctx, is_called),
+        ?assertMatch({ok, {test_app, Key, {data, _}, {data, _}}}, Result),
+        {ok, {test_app, Key, {data, OldData}, {data, Data}}} = Result,
+        ?assertMatch(ExpectedData, lists:usort(Data)),
+        ?assertMatch(
+            [{[complex, key, 1], [{type, counter}, {desc, foo}]}],
+            lists:usort(OldData))
+    end);
+ensure_notified_when_changed(module, #ctx{file = File} = Ctx) ->
+    ?_test(begin
+        subscribe(Ctx, test_app, my_service),
+        update(module, Ctx),
+        timer:sleep(200),
+        Result = get(Ctx, is_called),
+        Expected = {test_app, my_service,
+            {modules, [provider1, provider2]},
+            {modules, [provider1, provider2]}},
+        ?assertMatch({ok, Expected}, Result),
+        ok
+    end).
+
+ensure_not_notified_when_no_change(Case, #ctx{} = Ctx) ->
+    ?_test(begin
+        Key = {test_app, descriptions},
+        subscribe(Ctx, test_app, Key),
+        timer:sleep(200),
+        ?assertMatch(error, get(Ctx, is_called))
+    end).
+
+ensure_not_notified_when_unsubscribed(Case, #ctx{file = File} = Ctx) ->
+    ?_test(begin
+        Key = {test_app, descriptions},
+        SubscriptionId = subscribe(Ctx, test_app, Key),
+        couch_epi:unsubscribe(SubscriptionId),
+        timer:sleep(100),
+        update(Case, Ctx),
+        timer:sleep(200),
+        ?assertMatch(error, get(Ctx, is_called))
+    end).
+
+ensure_apply_is_called(Opts, #ctx{functions_handle = Handle, kv = KV} = Ctx) ->
+    ?_test(begin
+        couch_epi:apply(Handle, my_service, inc, [KV, 2], Opts),
+        maybe_wait(Opts),
+        ?assertMatch({ok, _}, get(Ctx, inc1)),
+        ?assertMatch({ok, _}, get(Ctx, inc2)),
+        ok
+    end).
+
+check_pipe(#ctx{functions_handle = Handle, kv = KV}) ->
+    ?_test(begin
+        Result = couch_epi:apply(Handle, my_service, inc, [KV, 2], [pipe]),
+        ?assertMatch([KV, 4], Result),
+        ok
+    end).
+
+check_broken_pipe(#ctx{functions_handle = Handle, kv = KV} = Ctx) ->
+    ?_test(begin
+        Result = couch_epi:apply(Handle, my_service, fail, [KV, 2], [pipe, ignore_errors]),
+        ?assertMatch([KV, 3], Result),
+        ?assertMatch([3, check_error], pipe_state(Ctx)),
+        ok
+    end).
+
+ensure_fail_pipe(#ctx{functions_handle = Handle, kv = KV}) ->
+    ?_test(begin
+        ?assertThrow(check_error,
+            couch_epi:apply(Handle, my_service, fail, [KV, 2], [pipe])),
+        ok
+    end).
+
+ensure_fail(#ctx{functions_handle = Handle, kv = KV}) ->
+    ?_test(begin
+        ?assertThrow(check_error,
+            couch_epi:apply(Handle, my_service, fail, [KV, 2], [])),
+        ok
+    end).
+
+ensure_unsubscribe_when_caller_die(#ctx{} = Ctx) ->
+    ?_test(begin
+        Key = {test_app, descriptions},
+        spawn(fun() ->
+            subscribe(Ctx, test_app, Key)
+        end),
+        %%update(file, Ctx),
+        timer:sleep(200),
+        ?assertMatch(error, get(Ctx, is_called))
+    end).
+
+
+pipe_state(Ctx) ->
+    Trace = [get(Ctx, inc1), get(Ctx, inc2)],
+    lists:usort([State || {ok, State} <- Trace]).
+
+check_dump(#ctx{data_handle = Handle}) ->
+    ?_test(begin
+        ?assertMatch(
+            [[{type, counter}, {desc, foo}]],
+            couch_epi:dump(Handle))
+    end).
+
+check_get(#ctx{data_handle = Handle}) ->
+    ?_test(begin
+        ?assertMatch(
+            [[{type, counter}, {desc, foo}]],
+            couch_epi:get(Handle, [complex,key, 1]))
+    end).
+
+check_get_value(#ctx{data_handle = Handle}) ->
+    ?_test(begin
+        ?assertMatch(
+            [{type, counter}, {desc, foo}],
+            couch_epi:get_value(Handle, test_app, [complex,key, 1]))
+    end).
+
+check_by_key(#ctx{data_handle = Handle}) ->
+    ?_test(begin
+        ?assertMatch(
+            [{[complex, key, 1],
+                [{test_app, [{type, counter}, {desc, foo}]}]}],
+            couch_epi:by_key(Handle)),
+        ?assertMatch(
+            [{test_app, [{type, counter}, {desc, foo}]}],
+            couch_epi:by_key(Handle, [complex, key, 1]))
+    end).
+
+check_by_source(#ctx{data_handle = Handle}) ->
+    ?_test(begin
+        ?assertMatch(
+            [{test_app,
+                [{[complex,key, 1], [{type, counter}, {desc, foo}]}]}],
+            couch_epi:by_source(Handle)),
+        ?assertMatch(
+            [{[complex,key, 1], [{type, counter}, {desc, foo}]}],
+            couch_epi:by_source(Handle, test_app))
+    end).
+
+check_keys(#ctx{data_handle = Handle}) ->
+    ?_assertMatch([[complex,key,1]], couch_epi:keys(Handle)).
+
+check_subscribers(#ctx{data_handle = Handle}) ->
+    ?_assertMatch([test_app], couch_epi:subscribers(Handle)).
+
+
+%% ------------------------------------------------------------------
+%% Internal Function Definitions
+%% ------------------------------------------------------------------
+
+generate_module(Name, Body) ->
+    Tokens = couch_epi_codegen:scan(Body),
+    couch_epi_codegen:generate(Name, Tokens).
+
+update(module, #ctx{}) ->
+    ok = generate_module(provider1, ?MODULE2(provider1));
+update(file, #ctx{file = File}) ->
+    {ok, _} = file:copy(?DATA_FILE2, File),
+    ok.
+
+subscribe(#ctx{kv = Kv}, App, Key) ->
+    {ok, Pid} = couch_epi:subscribe(App, Key, ?MODULE, notify_cb, Kv),
+    call(Kv, empty),
+    Pid.
+
+maybe_wait(Opts) ->
+    case lists:member(concurrent, Opts) of
+        true ->
+            timer:sleep(100);
+        false ->
+            ok
+    end.
+
+%% ------------
+%% State tracer
+
+save(Kv, Key, Value) ->
+    call(Kv, {set, Key, Value}).
+
+get(#ctx{kv = Kv}, Key) ->
+    call(Kv, {get, Key}).
+
+call(Server, Msg) ->
+    Ref = make_ref(),
+    Server ! {{Ref, self()}, Msg},
+    receive
+        {reply, Ref, Reply} ->
+            Reply
+    after ?TIMEOUT ->
+        {error, {timeout, Msg}}
+    end.
+
+reply({Ref, From}, Msg) ->
+    From ! {reply, Ref, Msg}.
+
+state_storage() ->
+    spawn_link(fun() -> state_storage(dict:new()) end).
+
+state_storage(Dict) ->
+    receive
+        {From, {set, Key, Value}} ->
+            reply(From, ok),
+            state_storage(dict:store(Key, Value, Dict));
+        {From, {get, Key}} ->
+            reply(From, dict:find(Key, Dict)),
+            state_storage(Dict);
+        {From, empty} ->
+            reply(From, ok),
+            state_storage(dict:new());
+        {From, stop} ->
+            reply(From, ok)
+    end.