You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2019/11/22 19:13:03 UTC

[couchdb] 01/15: Import ken

This is an automated email from the ASF dual-hosted git repository.

kocolosk pushed a commit to branch ken-in-tree
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 390b91949220366a0c64aa74bffb46fa0b22c4b6
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Feb 6 20:07:46 2019 +0000

    Import ken
---
 README.md                 |  12 ++
 rebar.config.script       |  26 +++
 src/ken.app.src.script    |  38 ++++
 src/ken.erl               |  29 +++
 src/ken_app.erl           |  28 +++
 src/ken_event_handler.erl |  56 +++++
 src/ken_server.erl        | 524 ++++++++++++++++++++++++++++++++++++++++++++++
 src/ken_sup.erl           |  33 +++
 test/config.ini           |   2 +
 test/ken_server_test.erl  |  99 +++++++++
 10 files changed, 847 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..a5a6576
--- /dev/null
+++ b/README.md
@@ -0,0 +1,12 @@
+ken
+===
+
+Ken builds views and search indexes. Automatically.
+
+#### Overview
+
+When the couch\_db\_update event is triggered with an `updated` event, ken will spawn indexing jobs for view groups and search indexes (one job per view group shard or search index shard). If a `deleted` event is triggered, all jobs associated with the corresponding database shard will be removed.
+
+#### Testing
+
+Testing for ken expected to be executed from the top level `couchdb` repo as a part of `make check` run. The isolated ken test could be ran as `rebar eunit apps=ken verbose=1` from the `couchdb`'s root directory.
diff --git a/rebar.config.script b/rebar.config.script
new file mode 100644
index 0000000..4570bfc
--- /dev/null
+++ b/rebar.config.script
@@ -0,0 +1,26 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}.
+HaveHastings = code:lib_dir(hastings) /= {error, bad_name}.
+
+CurrOpts = case lists:keyfind(erl_opts, 1, CONFIG) of
+    {erl_opts, Opts} -> Opts;
+    false -> []
+end,
+
+NewOpts =
+    if HaveDreyfus -> [{d, 'HAVE_DREYFUS'}]; true -> [] end ++
+    if HaveHastings -> [{d, 'HAVE_HASTINGS'}]; true -> [] end ++
+    [{i, "../"}] ++ CurrOpts.
+
+lists:keystore(erl_opts, 1, CONFIG, {erl_opts, NewOpts}).
diff --git a/src/ken.app.src.script b/src/ken.app.src.script
new file mode 100644
index 0000000..dcf4a23
--- /dev/null
+++ b/src/ken.app.src.script
@@ -0,0 +1,38 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}.
+HaveHastings = code:lib_dir(hastings) /= {error, bad_name}.
+
+BaseApplications = [
+    kernel,
+    stdlib,
+    couch_log,
+    couch_event,
+    couch,
+    config
+].
+
+Applications =
+    if HaveDreyfus -> [dreyfus]; true -> [] end ++
+    if HaveHastings -> [hastings]; true -> [] end ++
+    BaseApplications.
+
+{application, ken,
+ [
+  {description, ""},
+  {vsn, git},
+  {registered, []},
+  {applications, Applications},
+  {mod, { ken_app, []}},
+  {env, []}
+ ]}.
diff --git a/src/ken.erl b/src/ken.erl
new file mode 100644
index 0000000..87a724b
--- /dev/null
+++ b/src/ken.erl
@@ -0,0 +1,29 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken).
+
+-export([add/1]).
+-export([remove/1]).
+-export([add_all_shards/1]).
+
+% Add a database shard to be indexed.
+add(DbName) ->
+    ken_server:add(DbName).
+
+% Remove all pending jobs for a database shard.
+remove(DbName) ->
+    ken_server:remove(DbName).
+
+% Add all shards for a database to be indexed.
+add_all_shards(DbName) ->
+    ken_server:add_all_shards(DbName).
diff --git a/src/ken_app.erl b/src/ken_app.erl
new file mode 100644
index 0000000..15f235d
--- /dev/null
+++ b/src/ken_app.erl
@@ -0,0 +1,28 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+    ken_sup:start_link().
+
+stop(_State) ->
+    ok.
diff --git a/src/ken_event_handler.erl b/src/ken_event_handler.erl
new file mode 100644
index 0000000..8f158f4
--- /dev/null
+++ b/src/ken_event_handler.erl
@@ -0,0 +1,56 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_event_handler).
+-behaviour(couch_event_listener).
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_event/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+
+start_link() ->
+    couch_event_listener:start_link(?MODULE, nil, [all_dbs]).
+
+%% couch_event_listener callbacks
+
+init(_) ->
+    {ok, nil}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+handle_event(DbName, updated, State) ->
+    ken:add(DbName),
+    {ok, State};
+handle_event(DbName, deleted, State) ->
+    ken:remove(DbName),
+    {ok, State};
+handle_event(DbName, ddoc_updated, State) ->
+    ken:add_all_shards(DbName),
+    {ok, State};
+handle_event(_DbName, _Event, State) ->
+    {ok, State}.
+
+handle_cast(_Msg, State) ->
+    {ok, State}.
+
+handle_info(_Msg, State) ->
+    {ok, State}.
diff --git a/src/ken_server.erl b/src/ken_server.erl
new file mode 100644
index 0000000..25da5ac
--- /dev/null
+++ b/src/ken_server.erl
@@ -0,0 +1,524 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_server).
+
+% gen_server boilerplate
+-behaviour(gen_server).
+-vsn(1).
+-export([init/1, terminate/2]).
+-export([handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
+
+% Public interface
+-export([start_link/0]).
+-export([add/1]).
+-export([remove/1]).
+-export([add_all_shards/1]).
+-export([set_batch_size/1]).
+-export([set_delay/1]).
+-export([set_limit/1]).
+-export([set_prune_interval/1]).
+
+% exports for spawn
+-export([update_db_indexes/2]).
+
+-record(job, {
+    name, % {DbName, GroupId} for view. {DbName, DDocId, IndexId} for search.
+    server, % Pid of either view group or search index
+    worker_pid = nil,
+    seq = 0,
+    lru = now()
+}).
+
+-record(state, {
+    q = queue:new(),
+    dbworker = nil,
+    limit = 20,
+    delay = 5000,
+    batch_size = 1,
+    prune_interval = 60000,
+    pruned_last
+}).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-ifdef(HAVE_DREYFUS).
+-include_lib("dreyfus/include/dreyfus.hrl").
+-endif.
+
+-ifdef(HAVE_HASTINGS).
+-include_lib("hastings/src/hastings.hrl").
+-endif.
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%% @doc Adds a database shard to be indexed
+-spec add(binary()) -> ok.
+add(DbName) ->
+    gen_server:cast(?MODULE, {add, DbName}).
+
+%% @doc Removes all the pending jobs for a database shard.
+-spec remove(binary()) -> ok.
+remove(DbName) ->
+    gen_server:cast(?MODULE, {remove, DbName}).
+
+%% @doc Adds all the shards for a database to be indexed.
+-spec add_all_shards(binary()) -> ok.
+add_all_shards(DbName) ->
+    try
+        Shards = mem3:shards(mem3:dbname(DbName)),
+        lists:map(fun(Shard) ->
+            rexi:cast(Shard#shard.node, {ken_server, add, [Shard#shard.name]})
+        end, Shards)
+    catch error:database_does_not_exist ->
+        ok
+    end.
+
+%% @doc Changes the configured value for a batch size.
+%% Returns previous value.
+-spec set_batch_size(pos_integer()) -> pos_integer().
+set_batch_size(BS) when is_integer(BS), BS > 0 ->
+    gen_server:call(?MODULE, {set_batch_size, BS}).
+
+%% @doc Changes the configured value for a delay between batches.
+%% Returns previous value.
+-spec set_delay(non_neg_integer()) -> non_neg_integer().
+set_delay(Delay) when is_integer(Delay), Delay >= 0 ->
+    gen_server:call(?MODULE, {set_delay, Delay}).
+
+%% @doc Changes the configured value for a limit.
+%% Returns previous value.
+-spec set_limit(pos_integer()) -> pos_integer().
+set_limit(Limit) when is_integer(Limit), Limit > 0 ->
+    gen_server:call(?MODULE, {set_limit, Limit}).
+
+%% @doc Changes the configured value for a prune interval.
+%% Returns previous value.
+-spec set_prune_interval(pos_integer()) -> pos_integer().
+set_prune_interval(Interval) when is_integer(Interval), Interval > 1000 ->
+    gen_server:call(?MODULE, {set_prune_interval, Interval}).
+
+%% gen_server callbacks
+
+init(_) ->
+    erlang:send(self(), start_event_handler),
+    ets:new(ken_pending, [named_table]),
+    ets:new(ken_resubmit, [named_table]),
+    ets:new(ken_workers, [named_table, public, {keypos, #job.name}]),
+    Limit = list_to_integer(config("limit", "20")),
+    {ok, #state{pruned_last = now(), limit = Limit}}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+handle_call({set_batch_size, BS}, _From, #state{batch_size = Old} = State) ->
+    {reply, Old, State#state{batch_size = BS}, 0};
+
+handle_call({set_delay, Delay}, _From, #state{delay = Old} = State) ->
+    {reply, Old, State#state{delay = Delay}, 0};
+
+handle_call({set_limit, Limit}, _From, #state{limit = Old} = State) ->
+    {reply, Old, State#state{limit = Limit}, 0};
+
+handle_call({set_prune_interval, Interval}, _From , State) ->
+    Old = State#state.prune_interval,
+    {reply, Old, State#state{prune_interval = Interval}, 0};
+
+handle_call(Msg, From, State) ->
+    {stop, {unknown_call, Msg, From}, State}.
+
+% Queues a DB to (maybe) have indexing jobs spawned.
+handle_cast({add, DbName}, State) ->
+    case ets:insert_new(ken_pending, {DbName}) of
+    true ->
+        {noreply, State#state{q = queue:in(DbName, State#state.q)}, 0};
+    false ->
+        {noreply, State, 0}
+    end;
+
+handle_cast({remove, DbName}, State) ->
+    Q2 = queue:filter(fun(X) -> X =/= DbName end, State#state.q),
+    ets:delete(ken_pending, DbName),
+    % Delete search index workers
+    ets:match_delete(ken_workers, #job{name={DbName,'_','_'}, _='_'}),
+    % Delete view index workers
+    ets:match_delete(ken_workers, #job{name={DbName,'_'}, _='_'}),
+    % TODO kill off active jobs for this DB as well
+    {noreply, State#state{q = Q2}, 0};
+
+handle_cast({resubmit, DbName}, State) ->
+    ets:delete(ken_resubmit, DbName),
+    handle_cast({add, DbName}, State);
+
+% st index job names have 3 elements, 3rd being 'hastings'. See job record definition.
+handle_cast({trigger_update, #job{name={_, _, hastings}, server=GPid, seq=Seq} = Job}, State) ->
+    % hastings_index:await will trigger a hastings index update
+    {Pid, _} = erlang:spawn_monitor(hastings_index, await,
+        [GPid, Seq]),
+    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
+    {noreply, State, 0};
+% search index job names have 3 elements. See job record definition.
+handle_cast({trigger_update, #job{name={_,_,_}, server=GPid, seq=Seq} = Job}, State) ->
+    % dreyfus_index:await will trigger a search index update.
+    {Pid, _} = erlang:spawn_monitor(dreyfus_index, await,
+        [GPid, Seq]),
+    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
+    {noreply, State, 0};
+handle_cast({trigger_update, #job{name={_,_}, server=SrvPid, seq=Seq} = Job}, State) ->
+    % couch_index:get_state/2 will trigger a view group index update.
+    {Pid, _} = erlang:spawn_monitor(couch_index, get_state, [SrvPid, Seq]),
+    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
+    {noreply, State, 0};
+
+handle_cast(Msg, State) ->
+    {stop, {unknown_cast, Msg}, State}.
+
+handle_info({gen_event_EXIT, ken_event_handler, Reason}, State) ->
+    couch_log:error("ken_event_handler terminated: ~w", [Reason]),
+    erlang:send_after(5000, self(), start_event_handler),
+    {ok, State, 0};
+
+handle_info(start_event_handler, State) ->
+    case ken_event_handler:start_link() of
+    {ok, _Pid} ->
+        ok;
+    Error ->
+        couch_log:error("ken_event_handler init: ~w", [Error]),
+        erlang:send_after(5000, self(), start_event_handler)
+    end,
+    {noreply, State, 0};
+
+handle_info(timeout, #state{prune_interval = I, pruned_last = Last} = State) ->
+    case timer:now_diff(now(), Last) of
+    X when X > (1000 * I) ->
+        NewState = prune_worker_table(State);
+    _ ->
+        NewState = State
+    end,
+    {noreply, maybe_start_next_queued_job(NewState), I};
+
+handle_info({'DOWN', _, _, Pid, Reason}, #state{dbworker = {Name,Pid}} = St) ->
+    maybe_resubmit(Name, Reason),
+    {noreply, St#state{dbworker=nil}, 0};
+
+handle_info({'DOWN', _, _, Pid, Reason}, State) ->
+    debrief_worker(Pid, Reason, State),
+    {noreply, State, 0};
+
+handle_info(Msg, State) ->
+    {stop, {unknown_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% private functions
+
+maybe_start_next_queued_job(#state{dbworker = {_,_}} = State) ->
+    State;
+maybe_start_next_queued_job(#state{q=Q} = State) ->
+    IncrementalChannels = list_to_integer(config("incremental_channels", "80")),
+    BatchChannels = list_to_integer(config("batch_channels", "20")),
+    TotalChannels = IncrementalChannels + BatchChannels,
+    case queue:out(Q) of
+    {{value, DbName}, Q2} ->
+        case skip_job(DbName) of
+        true ->
+            % job is either being resubmitted or ignored, skip it
+            ets:delete(ken_pending, DbName),
+            maybe_start_next_queued_job(State#state{q = Q2});
+        false ->
+            case get_active_count() of A when A < TotalChannels ->
+                Args = [DbName, State],
+                {Pid, _} = spawn_monitor(?MODULE, update_db_indexes, Args),
+                ets:delete(ken_pending, DbName),
+                State#state{dbworker = {DbName,Pid}, q = Q2};
+            _ ->
+                State#state{q = queue:in_r(DbName, Q2)}
+            end
+        end;
+    {empty, Q} ->
+        State
+    end.
+
+skip_job(DbName) ->
+    ets:member(ken_resubmit, DbName) orelse ignore_db(DbName).
+
+ignore_db(DbName) ->
+    case config:get("ken.ignore", ?b2l(DbName), false) of
+    "true" ->
+        true;
+     _ ->
+        false
+    end.
+
+get_active_count() ->
+    MatchSpec = [{#job{worker_pid='$1', _='_'}, [{is_pid, '$1'}], [true]}],
+    ets:select_count(ken_workers, MatchSpec).
+
+% If any indexing job fails, resubmit requests for all indexes.
+update_db_indexes(Name, State) ->
+    {ok, DDocs} = design_docs(Name),
+    random:seed(now()),
+    RandomSorted = lists:sort([{random:uniform(), D} || D <- DDocs]),
+    Resubmit = lists:foldl(fun({_, DDoc}, Acc) ->
+        JsonDDoc = couch_doc:from_json_obj(DDoc),
+        case update_ddoc_indexes(Name, JsonDDoc, State) of
+            ok -> Acc;
+            _ -> true
+        end
+    end, false, RandomSorted),
+    if Resubmit -> exit(resubmit); true -> ok end.
+
+design_docs(Name) ->
+    try
+        case fabric:design_docs(mem3:dbname(Name)) of
+            {error, {maintenance_mode, _, _Node}} ->
+                {ok, []};
+            Else ->
+                Else
+        end
+    catch error:database_does_not_exist ->
+        {ok, []}
+    end.
+
+% Returns an error if any job creation fails.
+update_ddoc_indexes(Name, #doc{}=Doc, State) ->
+    {ok, Db} = case couch_db:open_int(Name, []) of
+        {ok, _} = Resp -> Resp;
+        Else -> exit(Else)
+    end,
+    Seq = couch_db:get_update_seq(Db),
+    couch_db:close(Db),
+    ViewUpdated = case should_update(Doc, <<"views">>) of true ->
+        try couch_mrview_util:ddoc_to_mrst(Name, Doc) of
+            {ok, MRSt} -> update_ddoc_views(Name, MRSt, Seq, State)
+        catch _:_ ->
+            ok
+        end;
+    false ->
+        ok
+    end,
+    SearchUpdated = search_updated(Doc, Seq, State),
+    STUpdated = st_updated(Doc, Seq, State),
+    case {ViewUpdated, SearchUpdated, STUpdated} of
+        {ok, ok, ok} -> ok;
+        _ -> resubmit
+    end.
+
+-ifdef(HAS_DREYFUS).
+search_updated(Doc, Seq, State) ->
+    case should_update(Doc, <<"indexes">>) of true ->
+        try dreyfus_index:design_doc_to_indexes(Doc) of
+            SIndexes -> update_ddoc_search_indexes(Name, SIndexes, Seq, State)
+        catch _:_ ->
+            ok
+        end;
+    false ->
+        ok
+    end.
+-else.
+search_updated(_Doc, _Seq, _State) ->
+    ok.
+-endif.
+
+-ifdef(HAS_HASTINGS).
+st_updated(Doc, Seq, State) ->
+    case should_update(Doc, <<"st_indexes">>) of true ->
+        try
+            hastings_index:design_doc_to_indexes(Doc) of
+            STIndexes -> update_ddoc_st_indexes(Name, STIndexes, Seq, State)
+        catch _:_ ->
+            ok
+       end;
+    false ->
+        ok
+    end.
+-else.
+st_updated(_Doc, _Seq, _State) ->
+    ok.
+-endif.
+
+should_update(#doc{body={Props}}, IndexType) ->
+    case couch_util:get_value(<<"autoupdate">>, Props) of
+        false ->
+            false;
+        {AUProps} ->
+            case couch_util:get_value(IndexType, AUProps) of
+                false ->
+                    false;
+                _ ->
+                    true
+            end;
+        _ ->
+            true
+    end.
+
+update_ddoc_views(Name, MRSt, Seq, State) ->
+    Language = couch_mrview_index:get(language, MRSt),
+    Allowed = lists:member(Language, allowed_languages()),
+    Views = couch_mrview_index:get(views, MRSt),
+    if Allowed andalso Views =/= [] ->
+        {ok, Pid} = couch_index_server:get_index(couch_mrview_index, MRSt),
+        GroupName = couch_mrview_index:get(idx_name, MRSt),
+        maybe_start_job({Name, GroupName}, Pid, Seq, State);
+    true -> ok end.
+
+-ifdef(HAVE_DREYFUS).
+update_ddoc_search_indexes(DbName, Indexes, Seq, State) ->
+    if Indexes =/= [] ->
+        % Spawn a job for each search index in the ddoc
+        lists:foldl(fun(#index{name=IName, ddoc_id=DDocName}=Index, Acc) ->
+            case dreyfus_index_manager:get_index(DbName, Index) of
+                {ok, Pid} ->
+                    case maybe_start_job({DbName, DDocName, IName}, Pid, Seq, State) of
+                        resubmit -> resubmit;
+                        _ -> Acc
+                    end;
+                _ ->
+                    % If any job fails, retry the db.
+                    resubmit
+            end end, ok, Indexes);
+    true -> ok end.
+-endif.
+
+-ifdef(HAVE_HASTINGS).
+update_ddoc_st_indexes(DbName, Indexes, Seq, State) ->
+    if Indexes =/= [] ->
+        % The record name in hastings is #h_idx rather than #index as it is for dreyfus
+        % Spawn a job for each spatial index in the ddoc
+        lists:foldl(fun(#h_idx{ddoc_id=DDocName}=Index, Acc) ->
+        case hastings_index_manager:get_index(DbName, Index) of
+            {ok, Pid} ->
+                case maybe_start_job({DbName, DDocName, hastings}, Pid, Seq, State) of
+                    resubmit -> resubmit;
+                    _ -> Acc
+                end;
+            _ ->
+                % If any job fails, retry the db.
+                resubmit
+        end end, ok, Indexes);
+    true -> ok end.
+-endif.
+
+should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) ->
+    Threshold = list_to_integer(config("max_incremental_updates", "1000")),
+    IncrementalChannels = list_to_integer(config("incremental_channels", "80")),
+    BatchChannels = list_to_integer(config("batch_channels", "20")),
+    TotalChannels = IncrementalChannels + BatchChannels,
+    A = get_active_count(),
+    #state{delay = Delay, batch_size = BS} = State,
+    case ets:lookup(ken_workers, Name) of
+    [] ->
+        if
+            A < BatchChannels ->
+                true;
+            A < TotalChannels ->
+                case Name of
+                    % st_index name has three elements
+                    {_, _, hastings} ->
+                        {ok, CurrentSeq} = hastings_index:await(Pid, 0),
+                        (Seq - CurrentSeq) < Threshold;
+                    % View name has two elements.
+                    {_,_} ->
+                        % Since seq is 0, couch_index:get_state/2 won't
+                        % spawn an index update.
+                        {ok, MRSt} = couch_index:get_state(Pid, 0),
+                        CurrentSeq = couch_mrview_index:get(update_seq, MRSt),
+                        (Seq - CurrentSeq) < Threshold;
+                    % Search name has three elements.
+                    {_,_,_} ->
+                        {ok, _IndexPid, CurrentSeq} = dreyfus_index:await(Pid, 0),
+                        (Seq - CurrentSeq) < Threshold;
+                    _ -> % Should never happen, but if it does, ignore.
+                        false
+                    end;
+            true ->
+                false
+        end;
+    [#job{worker_pid = nil, lru = LRU, seq = OldSeq}] ->
+        DeltaT = timer:now_diff(now(), LRU) / 1000,
+        if
+            A < BatchChannels, (Seq - OldSeq) >= BS ->
+                true;
+            A < BatchChannels, DeltaT > Delay ->
+                true;
+            A < TotalChannels, (Seq - OldSeq) < Threshold, DeltaT > Delay ->
+                true;
+            true ->
+                false
+        end;
+    _ ->
+        false
+    end.
+
+maybe_start_job(JobName, IndexPid, Seq, State) ->
+    Job = #job{
+        name = JobName,
+        server = IndexPid,
+        seq = Seq
+    },
+    case should_start_job(Job, State) of
+    true ->
+        gen_server:cast(?MODULE, {trigger_update, Job});
+    false ->
+        resubmit
+    end.
+
+debrief_worker(Pid, Reason, _State) ->
+    case ets:match_object(ken_workers, #job{worker_pid=Pid, _='_'}) of
+    [#job{name = Name} = Job] ->
+        case Name of
+            {DbName,_} ->
+                maybe_resubmit(DbName, Reason);
+            {DbName,_,_} ->
+                maybe_resubmit(DbName, Reason)
+        end,
+        ets:insert(ken_workers, Job#job{worker_pid = nil});
+    [] -> % should never happen, but if it does, ignore
+        ok
+    end.
+
+maybe_resubmit(_DbName, normal) ->
+    ok;
+maybe_resubmit(_DbName, {database_does_not_exist, _}) ->
+    ok;
+maybe_resubmit(_DbName, {not_found, no_db_file}) ->
+    ok;
+maybe_resubmit(DbName, resubmit) ->
+    resubmit(60000, DbName);
+maybe_resubmit(DbName, _) ->
+    resubmit(5000, DbName).
+
+resubmit(Delay, DbName) ->
+    case ets:insert_new(ken_resubmit, {DbName}) of
+        true ->
+            erlang:send_after(Delay, ?MODULE, {'$gen_cast', {resubmit, DbName}});
+        false ->
+            ok
+    end.
+
+prune_worker_table(State) ->
+    {A, B, _} = now(),
+    C = (1000000 * A) + B - 0.001 * State#state.delay,
+    MatchHead = #job{worker_pid=nil, lru={'$1','$2','_'}, _='_'},
+    Guard = {'<', {'+', {'*', '$1', 1000000}, '$2'}, C},
+    ets:select_delete(ken_workers, [{MatchHead, [Guard], [true]}]),
+    State#state{pruned_last = now()}.
+
+allowed_languages() ->
+    Config = config:get("query_servers") ++ config:get("native_query_servers"),
+    [list_to_binary(Lang) || {Lang, _Cmd} <- Config].
+
+config(Key, Default) ->
+    config:get("ken", Key, Default).
diff --git a/src/ken_sup.erl b/src/ken_sup.erl
new file mode 100644
index 0000000..fd08cfd
--- /dev/null
+++ b/src/ken_sup.erl
@@ -0,0 +1,33 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% supervisor callbacks
+
+init([]) ->
+    {ok, { {one_for_one, 5, 10}, [?CHILD(ken_server, worker)]} }.
+
diff --git a/test/config.ini b/test/config.ini
new file mode 100644
index 0000000..a28eae4
--- /dev/null
+++ b/test/config.ini
@@ -0,0 +1,2 @@
+[ken]
+limit = 42
diff --git a/test/ken_server_test.erl b/test/ken_server_test.erl
new file mode 100644
index 0000000..1d2af7a
--- /dev/null
+++ b/test/ken_server_test.erl
@@ -0,0 +1,99 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_server_test).
+
+-compile([export_all]).
+
+-include_lib("eunit/include/eunit.hrl").
+
+%% hardcoded defaults: limit: 20; batch: 1; delay: 5000; prune: 60000
+default_test_() ->
+    {inorder, {setup,
+        fun setup_default/0,
+        fun teardown/1,
+        [
+            set_builder("returns default", set_limit, 12, 20),
+            set_builder("keeps set", set_limit, 6, 12),
+            set_builder("returns default", set_batch_size, 3, 1),
+            set_builder("keeps set", set_batch_size, 6, 3),
+            set_builder("returns default", set_delay, 7000, 5000),
+            set_builder("keeps set", set_delay, 10000, 7000),
+            set_builder("returns default", set_prune_interval, 70000, 60000),
+            set_builder("keeps set", set_prune_interval, 80000, 70000)
+        ]
+    }}.
+
+exception_test_() ->
+    {inorder, {foreach,
+        fun setup_default/0,
+        fun teardown/1,
+        [
+            exception_builder("exception on zero", set_limit, 0),
+            exception_builder("exception on negative", set_limit, -12),
+            exception_builder("exception on zero", set_batch_size, 0),
+            exception_builder("exception on negative", set_batch_size, -12),
+            set_builder("no exception on zero", set_delay, 0, 5000),
+            exception_builder("exception on negative", set_delay, -12),
+            exception_builder("exception on zero", set_prune_interval, 0),
+            exception_builder("exception on negative", set_prune_interval, -12)
+        ]
+    }}.
+
+config_test_() ->
+    {inorder, {setup,
+        fun setup_config/0,
+        fun teardown/1,
+        [
+            set_builder("reads config", set_limit, 24, 42),
+            set_builder("keeps set", set_limit, 6, 24)
+        ]
+    }}.
+
+setup_default() ->
+    {ok, EventPid} = start_server(couch_event_server),
+    {ok, CfgPid} = start_server(config),
+    {ok, KenPid} = start_server(ken_server),
+    [{ken_pid, KenPid}, {cfg_pid, CfgPid}, {event_pid, EventPid}].
+
+setup_config() ->
+    {ok, Pwd} = file:get_cwd(),
+    Config = filename:join([Pwd, "..", "test", "config.ini"]),
+    {ok, EventPid} = start_server(couch_event_server),
+    {ok, CfgPid} = start_server(config, [[Config]]),
+    {ok, KenPid} = start_server(ken_server),
+    [{ken_pid, KenPid}, {cfg_pid, CfgPid}, {event_pid, EventPid}].
+
+teardown(Cfg) ->
+    ok = stop_server(event_pid, Cfg),
+    ok = stop_server(cfg_pid, Cfg),
+    ok = stop_server(ken_pid, Cfg).
+
+exception_builder(Desc, F, Val) ->
+    D = atom_to_list(F) ++ " " ++ Desc,
+    {D, ?_assertException(error, function_clause, ken_server:F(Val))}.
+
+set_builder(Desc, F, In, Out) ->
+    D = atom_to_list(F) ++ " " ++ Desc,
+    {D, ?_assertEqual(Out, ken_server:F(In))}.
+
+start_server(Module) ->
+    start_server(Module, []).
+
+start_server(Module, Config) ->
+    gen_server:start({local, Module}, Module, Config, []).
+
+stop_server(Key, Cfg) ->
+    {Key, Pid} = lists:keyfind(Key, 1, Cfg),
+    MRef = erlang:monitor(process, Pid),
+    true = exit(Pid, kill),
+    receive {'DOWN', MRef, _, _, _} -> ok end.