You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2019/02/06 21:45:55 UTC

[couchdb-ken] branch tmp updated (224592e -> b152c56)

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a change to branch tmp
in repository https://gitbox.apache.org/repos/asf/couchdb-ken.git.


 discard 224592e  Import ken
     new b152c56  Import ken

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (224592e)
            \
             N -- N -- N   refs/heads/tmp (b152c56)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 rebar.config.script | 9 ++++-----
 src/ken_server.erl  | 8 ++++----
 2 files changed, 8 insertions(+), 9 deletions(-)


[couchdb-ken] 01/01: Import ken

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch tmp
in repository https://gitbox.apache.org/repos/asf/couchdb-ken.git

commit b152c56078bc223cba966ddd90546cbf5f91143c
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Feb 6 20:07:46 2019 +0000

    Import ken
---
 README.md                 |  12 ++
 rebar.config.script       |  26 +++
 src/ken.app.src.script    |  38 ++++
 src/ken.erl               |  29 +++
 src/ken_app.erl           |  28 +++
 src/ken_event_handler.erl |  56 +++++
 src/ken_server.erl        | 524 ++++++++++++++++++++++++++++++++++++++++++++++
 src/ken_sup.erl           |  33 +++
 test/config.ini           |   2 +
 test/ken_server_test.erl  |  99 +++++++++
 10 files changed, 847 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..a5a6576
--- /dev/null
+++ b/README.md
@@ -0,0 +1,12 @@
+ken
+===
+
+Ken builds views and search indexes. Automatically.
+
+#### Overview
+
+When the couch\_db\_update event is triggered with an `updated` event, ken will spawn indexing jobs for view groups and search indexes (one job per view group shard or search index shard). If a `deleted` event is triggered, all jobs associated with the corresponding database shard will be removed.
+
+#### Testing
+
+Testing for ken expected to be executed from the top level `couchdb` repo as a part of `make check` run. The isolated ken test could be ran as `rebar eunit apps=ken verbose=1` from the `couchdb`'s root directory.
diff --git a/rebar.config.script b/rebar.config.script
new file mode 100644
index 0000000..9234a0b
--- /dev/null
+++ b/rebar.config.script
@@ -0,0 +1,26 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}.
+HaveHastings = code:lib_dir(hastings) /= {error, bad_name}.
+
+CurrOpts = case lists:keyfind(erl_opts, 1, CONFIG) of
+    {erl_opts, Opts} -> Opts;
+    false -> []
+end,
+
+NewOpts =
+    if HaveDreyfus -> [{d, 'HAVE_DREYFUS'}]; true -> [] end ++
+    if HaveHastings -> [{d, 'HAVE_HASTINGS'}]; true -> [] end ++
+    CurrOpts.
+
+lists:keystore(erl_opts, 1, CONFIG, {erl_opts, NewOpts}).
diff --git a/src/ken.app.src.script b/src/ken.app.src.script
new file mode 100644
index 0000000..dcf4a23
--- /dev/null
+++ b/src/ken.app.src.script
@@ -0,0 +1,38 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}.
+HaveHastings = code:lib_dir(hastings) /= {error, bad_name}.
+
+BaseApplications = [
+    kernel,
+    stdlib,
+    couch_log,
+    couch_event,
+    couch,
+    config
+].
+
+Applications =
+    if HaveDreyfus -> [dreyfus]; true -> [] end ++
+    if HaveHastings -> [hastings]; true -> [] end ++
+    BaseApplications.
+
+{application, ken,
+ [
+  {description, ""},
+  {vsn, git},
+  {registered, []},
+  {applications, Applications},
+  {mod, { ken_app, []}},
+  {env, []}
+ ]}.
diff --git a/src/ken.erl b/src/ken.erl
new file mode 100644
index 0000000..87a724b
--- /dev/null
+++ b/src/ken.erl
@@ -0,0 +1,29 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken).
+
+-export([add/1]).
+-export([remove/1]).
+-export([add_all_shards/1]).
+
+% Add a database shard to be indexed.
+add(DbName) ->
+    ken_server:add(DbName).
+
+% Remove all pending jobs for a database shard.
+remove(DbName) ->
+    ken_server:remove(DbName).
+
+% Add all shards for a database to be indexed.
+add_all_shards(DbName) ->
+    ken_server:add_all_shards(DbName).
diff --git a/src/ken_app.erl b/src/ken_app.erl
new file mode 100644
index 0000000..15f235d
--- /dev/null
+++ b/src/ken_app.erl
@@ -0,0 +1,28 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+    ken_sup:start_link().
+
+stop(_State) ->
+    ok.
diff --git a/src/ken_event_handler.erl b/src/ken_event_handler.erl
new file mode 100644
index 0000000..8f158f4
--- /dev/null
+++ b/src/ken_event_handler.erl
@@ -0,0 +1,56 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_event_handler).
+-behaviour(couch_event_listener).
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_event/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+
+start_link() ->
+    couch_event_listener:start_link(?MODULE, nil, [all_dbs]).
+
+%% couch_event_listener callbacks
+
+init(_) ->
+    {ok, nil}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+handle_event(DbName, updated, State) ->
+    ken:add(DbName),
+    {ok, State};
+handle_event(DbName, deleted, State) ->
+    ken:remove(DbName),
+    {ok, State};
+handle_event(DbName, ddoc_updated, State) ->
+    ken:add_all_shards(DbName),
+    {ok, State};
+handle_event(_DbName, _Event, State) ->
+    {ok, State}.
+
+handle_cast(_Msg, State) ->
+    {ok, State}.
+
+handle_info(_Msg, State) ->
+    {ok, State}.
diff --git a/src/ken_server.erl b/src/ken_server.erl
new file mode 100644
index 0000000..379c1e7
--- /dev/null
+++ b/src/ken_server.erl
@@ -0,0 +1,524 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_server).
+
+% gen_server boilerplate
+-behaviour(gen_server).
+-vsn(1).
+-export([init/1, terminate/2]).
+-export([handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
+
+% Public interface
+-export([start_link/0]).
+-export([add/1]).
+-export([remove/1]).
+-export([add_all_shards/1]).
+-export([set_batch_size/1]).
+-export([set_delay/1]).
+-export([set_limit/1]).
+-export([set_prune_interval/1]).
+
+% exports for spawn
+-export([update_db_indexes/2]).
+
+-record(job, {
+    name, % {DbName, GroupId} for view. {DbName, DDocId, IndexId} for search.
+    server, % Pid of either view group or search index
+    worker_pid = nil,
+    seq = 0,
+    lru = now()
+}).
+
+-record(state, {
+    q = queue:new(),
+    dbworker = nil,
+    limit = 20,
+    delay = 5000,
+    batch_size = 1,
+    prune_interval = 60000,
+    pruned_last
+}).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-ifdef(HAVE_DREYFUS).
+-include_lib("dreyfus/include/dreyfus.hrl").
+-endif.
+
+-ifdef(HAVE_HASTINGS).
+-include_lib("hastings/src/hastings.hrl").
+-endif.
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%% @doc Adds a database shard to be indexed
+-spec add(binary()) -> ok.
+add(DbName) ->
+    gen_server:cast(?MODULE, {add, DbName}).
+
+%% @doc Removes all the pending jobs for a database shard.
+-spec remove(binary()) -> ok.
+remove(DbName) ->
+    gen_server:cast(?MODULE, {remove, DbName}).
+
+%% @doc Adds all the shards for a database to be indexed.
+-spec add_all_shards(binary()) -> ok.
+add_all_shards(DbName) ->
+    try
+        Shards = mem3:shards(mem3:dbname(DbName)),
+        lists:map(fun(Shard) ->
+            rexi:cast(Shard#shard.node, {ken_server, add, [Shard#shard.name]})
+        end, Shards)
+    catch error:database_does_not_exist ->
+        ok
+    end.
+
+%% @doc Changes the configured value for a batch size.
+%% Returns previous value.
+-spec set_batch_size(pos_integer()) -> pos_integer().
+set_batch_size(BS) when is_integer(BS), BS > 0 ->
+    gen_server:call(?MODULE, {set_batch_size, BS}).
+
+%% @doc Changes the configured value for a delay between batches.
+%% Returns previous value.
+-spec set_delay(non_neg_integer()) -> non_neg_integer().
+set_delay(Delay) when is_integer(Delay), Delay >= 0 ->
+    gen_server:call(?MODULE, {set_delay, Delay}).
+
+%% @doc Changes the configured value for a limit.
+%% Returns previous value.
+-spec set_limit(pos_integer()) -> pos_integer().
+set_limit(Limit) when is_integer(Limit), Limit > 0 ->
+    gen_server:call(?MODULE, {set_limit, Limit}).
+
+%% @doc Changes the configured value for a prune interval.
+%% Returns previous value.
+-spec set_prune_interval(pos_integer()) -> pos_integer().
+set_prune_interval(Interval) when is_integer(Interval), Interval > 1000 ->
+    gen_server:call(?MODULE, {set_prune_interval, Interval}).
+
+%% gen_server callbacks
+
+init(_) ->
+    erlang:send(self(), start_event_handler),
+    ets:new(ken_pending, [named_table]),
+    ets:new(ken_resubmit, [named_table]),
+    ets:new(ken_workers, [named_table, public, {keypos, #job.name}]),
+    Limit = list_to_integer(config("limit", "20")),
+    {ok, #state{pruned_last = now(), limit = Limit}}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+handle_call({set_batch_size, BS}, _From, #state{batch_size = Old} = State) ->
+    {reply, Old, State#state{batch_size = BS}, 0};
+
+handle_call({set_delay, Delay}, _From, #state{delay = Old} = State) ->
+    {reply, Old, State#state{delay = Delay}, 0};
+
+handle_call({set_limit, Limit}, _From, #state{limit = Old} = State) ->
+    {reply, Old, State#state{limit = Limit}, 0};
+
+handle_call({set_prune_interval, Interval}, _From , State) ->
+    Old = State#state.prune_interval,
+    {reply, Old, State#state{prune_interval = Interval}, 0};
+
+handle_call(Msg, From, State) ->
+    {stop, {unknown_call, Msg, From}, State}.
+
+% Queues a DB to (maybe) have indexing jobs spawned.
+handle_cast({add, DbName}, State) ->
+    case ets:insert_new(ken_pending, {DbName}) of
+    true ->
+        {noreply, State#state{q = queue:in(DbName, State#state.q)}, 0};
+    false ->
+        {noreply, State, 0}
+    end;
+
+handle_cast({remove, DbName}, State) ->
+    Q2 = queue:filter(fun(X) -> X =/= DbName end, State#state.q),
+    ets:delete(ken_pending, DbName),
+    % Delete search index workers
+    ets:match_delete(ken_workers, #job{name={DbName,'_','_'}, _='_'}),
+    % Delete view index workers
+    ets:match_delete(ken_workers, #job{name={DbName,'_'}, _='_'}),
+    % TODO kill off active jobs for this DB as well
+    {noreply, State#state{q = Q2}, 0};
+
+handle_cast({resubmit, DbName}, State) ->
+    ets:delete(ken_resubmit, DbName),
+    handle_cast({add, DbName}, State);
+
+% st index job names have 3 elements, 3rd being 'hastings'. See job record definition.
+handle_cast({trigger_update, #job{name={_, _, hastings}, server=GPid, seq=Seq} = Job}, State) ->
+    % hastings_index:await will trigger a hastings index update
+    {Pid, _} = erlang:spawn_monitor(hastings_index, await,
+        [GPid, Seq]),
+    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
+    {noreply, State, 0};
+% search index job names have 3 elements. See job record definition.
+handle_cast({trigger_update, #job{name={_,_,_}, server=GPid, seq=Seq} = Job}, State) ->
+    % dreyfus_index:await will trigger a search index update.
+    {Pid, _} = erlang:spawn_monitor(dreyfus_index, await,
+        [GPid, Seq]),
+    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
+    {noreply, State, 0};
+handle_cast({trigger_update, #job{name={_,_}, server=SrvPid, seq=Seq} = Job}, State) ->
+    % couch_index:get_state/2 will trigger a view group index update.
+    {Pid, _} = erlang:spawn_monitor(couch_index, get_state, [SrvPid, Seq]),
+    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
+    {noreply, State, 0};
+
+handle_cast(Msg, State) ->
+    {stop, {unknown_cast, Msg}, State}.
+
+handle_info({gen_event_EXIT, ken_event_handler, Reason}, State) ->
+    couch_log:error("ken_event_handler terminated: ~w", [Reason]),
+    erlang:send_after(5000, self(), start_event_handler),
+    {ok, State, 0};
+
+handle_info(start_event_handler, State) ->
+    case ken_event_handler:start_link() of
+    {ok, _Pid} ->
+        ok;
+    Error ->
+        couch_log:error("ken_event_handler init: ~w", [Error]),
+        erlang:send_after(5000, self(), start_event_handler)
+    end,
+    {noreply, State, 0};
+
+handle_info(timeout, #state{prune_interval = I, pruned_last = Last} = State) ->
+    case timer:now_diff(now(), Last) of
+    X when X > (1000 * I) ->
+        NewState = prune_worker_table(State);
+    _ ->
+        NewState = State
+    end,
+    {noreply, maybe_start_next_queued_job(NewState), I};
+
+handle_info({'DOWN', _, _, Pid, Reason}, #state{dbworker = {Name,Pid}} = St) ->
+    maybe_resubmit(Name, Reason),
+    {noreply, St#state{dbworker=nil}, 0};
+
+handle_info({'DOWN', _, _, Pid, Reason}, State) ->
+    debrief_worker(Pid, Reason, State),
+    {noreply, State, 0};
+
+handle_info(Msg, State) ->
+    {stop, {unknown_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% private functions
+
+maybe_start_next_queued_job(#state{dbworker = {_,_}} = State) ->
+    State;
+maybe_start_next_queued_job(#state{q=Q} = State) ->
+    IncrementalChannels = list_to_integer(config("incremental_channels", "80")),
+    BatchChannels = list_to_integer(config("batch_channels", "20")),
+    TotalChannels = IncrementalChannels + BatchChannels,
+    case queue:out(Q) of
+    {{value, DbName}, Q2} ->
+        case skip_job(DbName) of
+        true ->
+            % job is either being resubmitted or ignored, skip it
+            ets:delete(ken_pending, DbName),
+            maybe_start_next_queued_job(State#state{q = Q2});
+        false ->
+            case get_active_count() of A when A < TotalChannels ->
+                Args = [DbName, State],
+                {Pid, _} = spawn_monitor(?MODULE, update_db_indexes, Args),
+                ets:delete(ken_pending, DbName),
+                State#state{dbworker = {DbName,Pid}, q = Q2};
+            _ ->
+                State#state{q = queue:in_r(DbName, Q2)}
+            end
+        end;
+    {empty, Q} ->
+        State
+    end.
+
+skip_job(DbName) ->
+    ets:member(ken_resubmit, DbName) orelse ignore_db(DbName).
+
+ignore_db(DbName) ->
+    case config:get("ken.ignore", ?b2l(DbName), false) of
+    "true" ->
+        true;
+     _ ->
+        false
+    end.
+
+get_active_count() ->
+    MatchSpec = [{#job{worker_pid='$1', _='_'}, [{is_pid, '$1'}], [true]}],
+    ets:select_count(ken_workers, MatchSpec).
+
+% If any indexing job fails, resubmit requests for all indexes.
+update_db_indexes(Name, State) ->
+    {ok, DDocs} = design_docs(Name),
+    random:seed(now()),
+    RandomSorted = lists:sort([{random:uniform(), D} || D <- DDocs]),
+    Resubmit = lists:foldl(fun({_, DDoc}, Acc) ->
+        JsonDDoc = couch_doc:from_json_obj(DDoc),
+        case update_ddoc_indexes(Name, JsonDDoc, State) of
+            ok -> Acc;
+            _ -> true
+        end
+    end, false, RandomSorted),
+    if Resubmit -> exit(resubmit); true -> ok end.
+
+design_docs(Name) ->
+    try
+        case fabric:design_docs(mem3:dbname(Name)) of
+            {error, {maintenance_mode, _, _Node}} ->
+                {ok, []};
+            Else ->
+                Else
+        end
+    catch error:database_does_not_exist ->
+        {ok, []}
+    end.
+
+% Returns an error if any job creation fails.
+update_ddoc_indexes(Name, #doc{}=Doc, State) ->
+    {ok, Db} = case couch_db:open_int(Name, []) of
+        {ok, _} = Resp -> Resp;
+        Else -> exit(Else)
+    end,
+    Seq = couch_db:get_update_seq(Db),
+    couch_db:close(Db),
+    ViewUpdated = case should_update(Doc, <<"views">>) of true ->
+        try couch_mrview_util:ddoc_to_mrst(Name, Doc) of
+            {ok, MRSt} -> update_ddoc_views(Name, MRSt, Seq, State)
+        catch _:_ ->
+            ok
+        end;
+    false ->
+        ok
+    end,
+    SearchUpdated = search_updated(Doc, Seq, State),
+    STUpdated = st_updated(Doc, Seq, State),
+    case {ViewUpdated, SearchUpdated, STUpdated} of
+        {ok, ok, ok} -> ok;
+        _ -> resubmit
+    end.
+
+-if(?HAS_DREYFUS).
+search_updated(Doc, Seq, State) ->
+    case should_update(Doc, <<"indexes">>) of true ->
+        try dreyfus_index:design_doc_to_indexes(Doc) of
+            SIndexes -> update_ddoc_search_indexes(Name, SIndexes, Seq, State)
+        catch _:_ ->
+            ok
+        end;
+    false ->
+        ok
+    end.
+-else.
+search_updated(_Doc, _Seq, _State) ->
+    ok.
+-endif.
+
+-if(?HAS_HASTINGS).
+st_updated(Doc, Seq, State) ->
+    case should_update(Doc, <<"st_indexes">>) of true ->
+        try
+            hastings_index:design_doc_to_indexes(Doc) of
+            STIndexes -> update_ddoc_st_indexes(Name, STIndexes, Seq, State)
+        catch _:_ ->
+            ok
+       end;
+    false ->
+        ok
+    end.
+-else.
+st_updated(_Doc, _Seq, _State) ->
+    ok.
+-endif.
+
+should_update(#doc{body={Props}}, IndexType) ->
+    case couch_util:get_value(<<"autoupdate">>, Props) of
+        false ->
+            false;
+        {AUProps} ->
+            case couch_util:get_value(IndexType, AUProps) of
+                false ->
+                    false;
+                _ ->
+                    true
+            end;
+        _ ->
+            true
+    end.
+
+update_ddoc_views(Name, MRSt, Seq, State) ->
+    Language = couch_mrview_index:get(language, MRSt),
+    Allowed = lists:member(Language, allowed_languages()),
+    Views = couch_mrview_index:get(views, MRSt),
+    if Allowed andalso Views =/= [] ->
+        {ok, Pid} = couch_index_server:get_index(couch_mrview_index, MRSt),
+        GroupName = couch_mrview_index:get(idx_name, MRSt),
+        maybe_start_job({Name, GroupName}, Pid, Seq, State);
+    true -> ok end.
+
+-ifdef(HAVE_DREYFUS).
+update_ddoc_search_indexes(DbName, Indexes, Seq, State) ->
+    if Indexes =/= [] ->
+        % Spawn a job for each search index in the ddoc
+        lists:foldl(fun(#index{name=IName, ddoc_id=DDocName}=Index, Acc) ->
+            case dreyfus_index_manager:get_index(DbName, Index) of
+                {ok, Pid} ->
+                    case maybe_start_job({DbName, DDocName, IName}, Pid, Seq, State) of
+                        resubmit -> resubmit;
+                        _ -> Acc
+                    end;
+                _ ->
+                    % If any job fails, retry the db.
+                    resubmit
+            end end, ok, Indexes);
+    true -> ok end.
+-endif.
+
+-ifdef(HAVE_HASTINGS).
+update_ddoc_st_indexes(DbName, Indexes, Seq, State) ->
+    if Indexes =/= [] ->
+        % The record name in hastings is #h_idx rather than #index as it is for dreyfus
+        % Spawn a job for each spatial index in the ddoc
+        lists:foldl(fun(#h_idx{ddoc_id=DDocName}=Index, Acc) ->
+        case hastings_index_manager:get_index(DbName, Index) of
+            {ok, Pid} ->
+                case maybe_start_job({DbName, DDocName, hastings}, Pid, Seq, State) of
+                    resubmit -> resubmit;
+                    _ -> Acc
+                end;
+            _ ->
+                % If any job fails, retry the db.
+                resubmit
+        end end, ok, Indexes);
+    true -> ok end.
+-endif.
+
+should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) ->
+    Threshold = list_to_integer(config("max_incremental_updates", "1000")),
+    IncrementalChannels = list_to_integer(config("incremental_channels", "80")),
+    BatchChannels = list_to_integer(config("batch_channels", "20")),
+    TotalChannels = IncrementalChannels + BatchChannels,
+    A = get_active_count(),
+    #state{delay = Delay, batch_size = BS} = State,
+    case ets:lookup(ken_workers, Name) of
+    [] ->
+        if
+            A < BatchChannels ->
+                true;
+            A < TotalChannels ->
+                case Name of
+                    % st_index name has three elements
+                    {_, _, hastings} ->
+                        {ok, CurrentSeq} = hastings_index:await(Pid, 0),
+                        (Seq - CurrentSeq) < Threshold;
+                    % View name has two elements.
+                    {_,_} ->
+                        % Since seq is 0, couch_index:get_state/2 won't
+                        % spawn an index update.
+                        {ok, MRSt} = couch_index:get_state(Pid, 0),
+                        CurrentSeq = couch_mrview_index:get(update_seq, MRSt),
+                        (Seq - CurrentSeq) < Threshold;
+                    % Search name has three elements.
+                    {_,_,_} ->
+                        {ok, _IndexPid, CurrentSeq} = dreyfus_index:await(Pid, 0),
+                        (Seq - CurrentSeq) < Threshold;
+                    _ -> % Should never happen, but if it does, ignore.
+                        false
+                    end;
+            true ->
+                false
+        end;
+    [#job{worker_pid = nil, lru = LRU, seq = OldSeq}] ->
+        DeltaT = timer:now_diff(now(), LRU) / 1000,
+        if
+            A < BatchChannels, (Seq - OldSeq) >= BS ->
+                true;
+            A < BatchChannels, DeltaT > Delay ->
+                true;
+            A < TotalChannels, (Seq - OldSeq) < Threshold, DeltaT > Delay ->
+                true;
+            true ->
+                false
+        end;
+    _ ->
+        false
+    end.
+
+maybe_start_job(JobName, IndexPid, Seq, State) ->
+    Job = #job{
+        name = JobName,
+        server = IndexPid,
+        seq = Seq
+    },
+    case should_start_job(Job, State) of
+    true ->
+        gen_server:cast(?MODULE, {trigger_update, Job});
+    false ->
+        resubmit
+    end.
+
+debrief_worker(Pid, Reason, _State) ->
+    case ets:match_object(ken_workers, #job{worker_pid=Pid, _='_'}) of
+    [#job{name = Name} = Job] ->
+        case Name of
+            {DbName,_} ->
+                maybe_resubmit(DbName, Reason);
+            {DbName,_,_} ->
+                maybe_resubmit(DbName, Reason)
+        end,
+        ets:insert(ken_workers, Job#job{worker_pid = nil});
+    [] -> % should never happen, but if it does, ignore
+        ok
+    end.
+
+maybe_resubmit(_DbName, normal) ->
+    ok;
+maybe_resubmit(_DbName, {database_does_not_exist, _}) ->
+    ok;
+maybe_resubmit(_DbName, {not_found, no_db_file}) ->
+    ok;
+maybe_resubmit(DbName, resubmit) ->
+    resubmit(60000, DbName);
+maybe_resubmit(DbName, _) ->
+    resubmit(5000, DbName).
+
+resubmit(Delay, DbName) ->
+    case ets:insert_new(ken_resubmit, {DbName}) of
+        true ->
+            erlang:send_after(Delay, ?MODULE, {'$gen_cast', {resubmit, DbName}});
+        false ->
+            ok
+    end.
+
+prune_worker_table(State) ->
+    {A, B, _} = now(),
+    C = (1000000 * A) + B - 0.001 * State#state.delay,
+    MatchHead = #job{worker_pid=nil, lru={'$1','$2','_'}, _='_'},
+    Guard = {'<', {'+', {'*', '$1', 1000000}, '$2'}, C},
+    ets:select_delete(ken_workers, [{MatchHead, [Guard], [true]}]),
+    State#state{pruned_last = now()}.
+
+allowed_languages() ->
+    Config = config:get("query_servers") ++ config:get("native_query_servers"),
+    [list_to_binary(Lang) || {Lang, _Cmd} <- Config].
+
+config(Key, Default) ->
+    config:get("ken", Key, Default).
diff --git a/src/ken_sup.erl b/src/ken_sup.erl
new file mode 100644
index 0000000..fd08cfd
--- /dev/null
+++ b/src/ken_sup.erl
@@ -0,0 +1,33 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% supervisor callbacks
+
+init([]) ->
+    {ok, { {one_for_one, 5, 10}, [?CHILD(ken_server, worker)]} }.
+
diff --git a/test/config.ini b/test/config.ini
new file mode 100644
index 0000000..a28eae4
--- /dev/null
+++ b/test/config.ini
@@ -0,0 +1,2 @@
+[ken]
+limit = 42
diff --git a/test/ken_server_test.erl b/test/ken_server_test.erl
new file mode 100644
index 0000000..1d2af7a
--- /dev/null
+++ b/test/ken_server_test.erl
@@ -0,0 +1,99 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_server_test).
+
+-compile([export_all]).
+
+-include_lib("eunit/include/eunit.hrl").
+
+%% hardcoded defaults: limit: 20; batch: 1; delay: 5000; prune: 60000
+default_test_() ->
+    {inorder, {setup,
+        fun setup_default/0,
+        fun teardown/1,
+        [
+            set_builder("returns default", set_limit, 12, 20),
+            set_builder("keeps set", set_limit, 6, 12),
+            set_builder("returns default", set_batch_size, 3, 1),
+            set_builder("keeps set", set_batch_size, 6, 3),
+            set_builder("returns default", set_delay, 7000, 5000),
+            set_builder("keeps set", set_delay, 10000, 7000),
+            set_builder("returns default", set_prune_interval, 70000, 60000),
+            set_builder("keeps set", set_prune_interval, 80000, 70000)
+        ]
+    }}.
+
+exception_test_() ->
+    {inorder, {foreach,
+        fun setup_default/0,
+        fun teardown/1,
+        [
+            exception_builder("exception on zero", set_limit, 0),
+            exception_builder("exception on negative", set_limit, -12),
+            exception_builder("exception on zero", set_batch_size, 0),
+            exception_builder("exception on negative", set_batch_size, -12),
+            set_builder("no exception on zero", set_delay, 0, 5000),
+            exception_builder("exception on negative", set_delay, -12),
+            exception_builder("exception on zero", set_prune_interval, 0),
+            exception_builder("exception on negative", set_prune_interval, -12)
+        ]
+    }}.
+
+config_test_() ->
+    {inorder, {setup,
+        fun setup_config/0,
+        fun teardown/1,
+        [
+            set_builder("reads config", set_limit, 24, 42),
+            set_builder("keeps set", set_limit, 6, 24)
+        ]
+    }}.
+
+setup_default() ->
+    {ok, EventPid} = start_server(couch_event_server),
+    {ok, CfgPid} = start_server(config),
+    {ok, KenPid} = start_server(ken_server),
+    [{ken_pid, KenPid}, {cfg_pid, CfgPid}, {event_pid, EventPid}].
+
+setup_config() ->
+    {ok, Pwd} = file:get_cwd(),
+    Config = filename:join([Pwd, "..", "test", "config.ini"]),
+    {ok, EventPid} = start_server(couch_event_server),
+    {ok, CfgPid} = start_server(config, [[Config]]),
+    {ok, KenPid} = start_server(ken_server),
+    [{ken_pid, KenPid}, {cfg_pid, CfgPid}, {event_pid, EventPid}].
+
+teardown(Cfg) ->
+    ok = stop_server(event_pid, Cfg),
+    ok = stop_server(cfg_pid, Cfg),
+    ok = stop_server(ken_pid, Cfg).
+
+exception_builder(Desc, F, Val) ->
+    D = atom_to_list(F) ++ " " ++ Desc,
+    {D, ?_assertException(error, function_clause, ken_server:F(Val))}.
+
+set_builder(Desc, F, In, Out) ->
+    D = atom_to_list(F) ++ " " ++ Desc,
+    {D, ?_assertEqual(Out, ken_server:F(In))}.
+
+start_server(Module) ->
+    start_server(Module, []).
+
+start_server(Module, Config) ->
+    gen_server:start({local, Module}, Module, Config, []).
+
+stop_server(Key, Cfg) ->
+    {Key, Pid} = lists:keyfind(Key, 1, Cfg),
+    MRef = erlang:monitor(process, Pid),
+    true = exit(Pid, kill),
+    receive {'DOWN', MRef, _, _, _} -> ok end.