You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2019/02/25 21:54:34 UTC

[couchdb-ken] branch else created (now 9837a29)

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a change to branch else
in repository https://gitbox.apache.org/repos/asf/couchdb-ken.git.


      at 9837a29  and the -else clauses

This branch includes the following new commits:

     new 390b919  Import ken
     new 50682f9  Fix compilation errors and use of macro
     new 004881a  Merge pull request #4 from apache/has_have_confusion
     new 7095e0d  Fix compiler warnings
     new 3e47130  Merge pull request #1 from cloudant/fix-compilation-warnings
     new 9837a29  and the -else clauses

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb-ken] 06/06: and the -else clauses

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch else
in repository https://gitbox.apache.org/repos/asf/couchdb-ken.git

commit 9837a298641dafd0ffc5817af80233aec027fd48
Author: Robert Newson <rn...@apache.org>
AuthorDate: Mon Feb 25 21:54:21 2019 +0000

    and the -else clauses
---
 src/ken_server.erl | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/ken_server.erl b/src/ken_server.erl
index f45acfb..5d9b29e 100644
--- a/src/ken_server.erl
+++ b/src/ken_server.erl
@@ -333,7 +333,7 @@ search_updated(Name, Doc, Seq, State) ->
         ok
     end.
 -else.
-search_updated(_Doc, _Seq, _State) ->
+search_updated(_Name, _Doc, _Seq, _State) ->
     ok.
 -endif.
 
@@ -350,7 +350,7 @@ st_updated(Name, Doc, Seq, State) ->
         ok
     end.
 -else.
-st_updated(_Doc, _Seq, _State) ->
+st_updated(_Name, _Doc, _Seq, _State) ->
     ok.
 -endif.
 


[couchdb-ken] 05/06: Merge pull request #1 from cloudant/fix-compilation-warnings

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch else
in repository https://gitbox.apache.org/repos/asf/couchdb-ken.git

commit 3e471304b5aeb898b33f9723269a2ed3e40cd219
Merge: 004881a 7095e0d
Author: iilyak <ii...@users.noreply.github.com>
AuthorDate: Mon Feb 25 12:06:37 2019 -0800

    Merge pull request #1 from cloudant/fix-compilation-warnings
    
    Fix compiler warnings

 src/ken_server.erl       | 69 +++++++++++++++++++++++++++++++++++++-----------
 test/ken_server_test.erl |  2 --
 2 files changed, 54 insertions(+), 17 deletions(-)


[couchdb-ken] 04/06: Fix compiler warnings

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch else
in repository https://gitbox.apache.org/repos/asf/couchdb-ken.git

commit 7095e0d07109417dfd7ca561207a884e53a09590
Author: ILYA Khlopotov <ii...@apache.org>
AuthorDate: Fri Feb 22 16:06:32 2019 +0000

    Fix compiler warnings
---
 src/ken_server.erl       | 69 +++++++++++++++++++++++++++++++++++++-----------
 test/ken_server_test.erl |  2 --
 2 files changed, 54 insertions(+), 17 deletions(-)

diff --git a/src/ken_server.erl b/src/ken_server.erl
index 91f18c1..f45acfb 100644
--- a/src/ken_server.erl
+++ b/src/ken_server.erl
@@ -36,7 +36,7 @@
     server, % Pid of either view group or search index
     worker_pid = nil,
     seq = 0,
-    lru = now()
+    lru = erlang:monotonic_time()
 }).
 
 -record(state, {
@@ -117,7 +117,7 @@ init(_) ->
     ets:new(ken_resubmit, [named_table]),
     ets:new(ken_workers, [named_table, public, {keypos, #job.name}]),
     Limit = list_to_integer(config("limit", "20")),
-    {ok, #state{pruned_last = now(), limit = Limit}}.
+    {ok, #state{pruned_last = erlang:monotonic_time(), limit = Limit}}.
 
 terminate(_Reason, _State) ->
     ok.
@@ -166,19 +166,22 @@ handle_cast({trigger_update, #job{name={_, _, hastings}, server=GPid, seq=Seq} =
     % hastings_index:await will trigger a hastings index update
     {Pid, _} = erlang:spawn_monitor(hastings_index, await,
         [GPid, Seq]),
-    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
+    Now = erlang:monotonic_time(),
+    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
     {noreply, State, 0};
 % search index job names have 3 elements. See job record definition.
 handle_cast({trigger_update, #job{name={_,_,_}, server=GPid, seq=Seq} = Job}, State) ->
     % dreyfus_index:await will trigger a search index update.
     {Pid, _} = erlang:spawn_monitor(dreyfus_index, await,
         [GPid, Seq]),
-    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
+    Now = erlang:monotonic_time(),
+    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
     {noreply, State, 0};
 handle_cast({trigger_update, #job{name={_,_}, server=SrvPid, seq=Seq} = Job}, State) ->
     % couch_index:get_state/2 will trigger a view group index update.
     {Pid, _} = erlang:spawn_monitor(couch_index, get_state, [SrvPid, Seq]),
-    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
+    Now = erlang:monotonic_time(),
+    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = Now}),
     {noreply, State, 0};
 
 handle_cast(Msg, State) ->
@@ -200,8 +203,11 @@ handle_info(start_event_handler, State) ->
     {noreply, State, 0};
 
 handle_info(timeout, #state{prune_interval = I, pruned_last = Last} = State) ->
-    case timer:now_diff(now(), Last) of
-    X when X > (1000 * I) ->
+    Now = erlang:monotonic_time(),
+    Interval = erlang:convert_time_unit(
+        State#state.delay, millisecond, native),
+    case Now - Last > Interval of
+    true ->
         NewState = prune_worker_table(State);
     _ ->
         NewState = State
@@ -269,8 +275,7 @@ get_active_count() ->
 % If any indexing job fails, resubmit requests for all indexes.
 update_db_indexes(Name, State) ->
     {ok, DDocs} = design_docs(Name),
-    random:seed(now()),
-    RandomSorted = lists:sort([{random:uniform(), D} || D <- DDocs]),
+    RandomSorted = lists:sort([{rand:uniform(), D} || D <- DDocs]),
     Resubmit = lists:foldl(fun({_, DDoc}, Acc) ->
         JsonDDoc = couch_doc:from_json_obj(DDoc),
         case update_ddoc_indexes(Name, JsonDDoc, State) of
@@ -447,7 +452,8 @@ should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) ->
                 false
         end;
     [#job{worker_pid = nil, lru = LRU, seq = OldSeq}] ->
-        DeltaT = timer:now_diff(now(), LRU) / 1000,
+        Now = erlang:monotonic_time(),
+        DeltaT = erlang:convert_time_unit(Now - LRU, native, millisecond),
         if
             A < BatchChannels, (Seq - OldSeq) >= BS ->
                 true;
@@ -509,12 +515,14 @@ resubmit(Delay, DbName) ->
     end.
 
 prune_worker_table(State) ->
-    {A, B, _} = now(),
-    C = (1000000 * A) + B - 0.001 * State#state.delay,
-    MatchHead = #job{worker_pid=nil, lru={'$1','$2','_'}, _='_'},
-    Guard = {'<', {'+', {'*', '$1', 1000000}, '$2'}, C},
+    % remove all entries older than specified `delay` in milliseconds
+    Delay = erlang:convert_time_unit(State#state.delay, millisecond, native),
+    C = erlang:monotonic_time() - Delay,
+    %% fun(#job{worker_pid=nil, lru=A) when A < C -> true end
+    MatchHead = #job{worker_pid=nil, lru='$1', _='_'},
+    Guard = {'<', '$1', C},
     ets:select_delete(ken_workers, [{MatchHead, [Guard], [true]}]),
-    State#state{pruned_last = now()}.
+    State#state{pruned_last = erlang:monotonic_time()}.
 
 allowed_languages() ->
     Config = config:get("query_servers") ++ config:get("native_query_servers"),
@@ -522,3 +530,34 @@ allowed_languages() ->
 
 config(Key, Default) ->
     config:get("ken", Key, Default).
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+
+prune_old_entries_test() ->
+    {
+        setup,
+        fun() ->
+            ets:new(ken_workers, [named_table, public, {keypos, #job.name}])
+        end,
+        fun(_) ->
+            catch ets:delete(ken_workers)
+        end,
+        ?_test(begin
+            lists:foreach(fun(Idx) ->
+                ets:insert(ken_workers, #job{name=Idx}),
+                timer:sleep(100)
+            end, lists:seq(1, 3)),
+            prune_worker_table(#state{delay=250}),
+            ?assertEqual(
+                [2, 3],
+                lists:usort(
+                    [N || #job{name = N} <- ets:tab2list(ken_workers)])
+            ),
+            ok
+        end)
+    }.
+
+-endif.
diff --git a/test/ken_server_test.erl b/test/ken_server_test.erl
index 1d2af7a..eed3484 100644
--- a/test/ken_server_test.erl
+++ b/test/ken_server_test.erl
@@ -12,8 +12,6 @@
 
 -module(ken_server_test).
 
--compile([export_all]).
-
 -include_lib("eunit/include/eunit.hrl").
 
 %% hardcoded defaults: limit: 20; batch: 1; delay: 5000; prune: 60000


[couchdb-ken] 01/06: Import ken

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch else
in repository https://gitbox.apache.org/repos/asf/couchdb-ken.git

commit 390b91949220366a0c64aa74bffb46fa0b22c4b6
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Feb 6 20:07:46 2019 +0000

    Import ken
---
 README.md                 |  12 ++
 rebar.config.script       |  26 +++
 src/ken.app.src.script    |  38 ++++
 src/ken.erl               |  29 +++
 src/ken_app.erl           |  28 +++
 src/ken_event_handler.erl |  56 +++++
 src/ken_server.erl        | 524 ++++++++++++++++++++++++++++++++++++++++++++++
 src/ken_sup.erl           |  33 +++
 test/config.ini           |   2 +
 test/ken_server_test.erl  |  99 +++++++++
 10 files changed, 847 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..a5a6576
--- /dev/null
+++ b/README.md
@@ -0,0 +1,12 @@
+ken
+===
+
+Ken builds views and search indexes. Automatically.
+
+#### Overview
+
+When the couch\_db\_update event is triggered with an `updated` event, ken will spawn indexing jobs for view groups and search indexes (one job per view group shard or search index shard). If a `deleted` event is triggered, all jobs associated with the corresponding database shard will be removed.
+
+#### Testing
+
+Testing for ken expected to be executed from the top level `couchdb` repo as a part of `make check` run. The isolated ken test could be ran as `rebar eunit apps=ken verbose=1` from the `couchdb`'s root directory.
diff --git a/rebar.config.script b/rebar.config.script
new file mode 100644
index 0000000..4570bfc
--- /dev/null
+++ b/rebar.config.script
@@ -0,0 +1,26 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}.
+HaveHastings = code:lib_dir(hastings) /= {error, bad_name}.
+
+CurrOpts = case lists:keyfind(erl_opts, 1, CONFIG) of
+    {erl_opts, Opts} -> Opts;
+    false -> []
+end,
+
+NewOpts =
+    if HaveDreyfus -> [{d, 'HAVE_DREYFUS'}]; true -> [] end ++
+    if HaveHastings -> [{d, 'HAVE_HASTINGS'}]; true -> [] end ++
+    [{i, "../"}] ++ CurrOpts.
+
+lists:keystore(erl_opts, 1, CONFIG, {erl_opts, NewOpts}).
diff --git a/src/ken.app.src.script b/src/ken.app.src.script
new file mode 100644
index 0000000..dcf4a23
--- /dev/null
+++ b/src/ken.app.src.script
@@ -0,0 +1,38 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}.
+HaveHastings = code:lib_dir(hastings) /= {error, bad_name}.
+
+BaseApplications = [
+    kernel,
+    stdlib,
+    couch_log,
+    couch_event,
+    couch,
+    config
+].
+
+Applications =
+    if HaveDreyfus -> [dreyfus]; true -> [] end ++
+    if HaveHastings -> [hastings]; true -> [] end ++
+    BaseApplications.
+
+{application, ken,
+ [
+  {description, ""},
+  {vsn, git},
+  {registered, []},
+  {applications, Applications},
+  {mod, { ken_app, []}},
+  {env, []}
+ ]}.
diff --git a/src/ken.erl b/src/ken.erl
new file mode 100644
index 0000000..87a724b
--- /dev/null
+++ b/src/ken.erl
@@ -0,0 +1,29 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken).
+
+-export([add/1]).
+-export([remove/1]).
+-export([add_all_shards/1]).
+
+% Add a database shard to be indexed.
+add(DbName) ->
+    ken_server:add(DbName).
+
+% Remove all pending jobs for a database shard.
+remove(DbName) ->
+    ken_server:remove(DbName).
+
+% Add all shards for a database to be indexed.
+add_all_shards(DbName) ->
+    ken_server:add_all_shards(DbName).
diff --git a/src/ken_app.erl b/src/ken_app.erl
new file mode 100644
index 0000000..15f235d
--- /dev/null
+++ b/src/ken_app.erl
@@ -0,0 +1,28 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+    ken_sup:start_link().
+
+stop(_State) ->
+    ok.
diff --git a/src/ken_event_handler.erl b/src/ken_event_handler.erl
new file mode 100644
index 0000000..8f158f4
--- /dev/null
+++ b/src/ken_event_handler.erl
@@ -0,0 +1,56 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_event_handler).
+-behaviour(couch_event_listener).
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_event/3,
+    handle_cast/2,
+    handle_info/2
+]).
+
+
+start_link() ->
+    couch_event_listener:start_link(?MODULE, nil, [all_dbs]).
+
+%% couch_event_listener callbacks
+
+init(_) ->
+    {ok, nil}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+handle_event(DbName, updated, State) ->
+    ken:add(DbName),
+    {ok, State};
+handle_event(DbName, deleted, State) ->
+    ken:remove(DbName),
+    {ok, State};
+handle_event(DbName, ddoc_updated, State) ->
+    ken:add_all_shards(DbName),
+    {ok, State};
+handle_event(_DbName, _Event, State) ->
+    {ok, State}.
+
+handle_cast(_Msg, State) ->
+    {ok, State}.
+
+handle_info(_Msg, State) ->
+    {ok, State}.
diff --git a/src/ken_server.erl b/src/ken_server.erl
new file mode 100644
index 0000000..25da5ac
--- /dev/null
+++ b/src/ken_server.erl
@@ -0,0 +1,524 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_server).
+
+% gen_server boilerplate
+-behaviour(gen_server).
+-vsn(1).
+-export([init/1, terminate/2]).
+-export([handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
+
+% Public interface
+-export([start_link/0]).
+-export([add/1]).
+-export([remove/1]).
+-export([add_all_shards/1]).
+-export([set_batch_size/1]).
+-export([set_delay/1]).
+-export([set_limit/1]).
+-export([set_prune_interval/1]).
+
+% exports for spawn
+-export([update_db_indexes/2]).
+
+-record(job, {
+    name, % {DbName, GroupId} for view. {DbName, DDocId, IndexId} for search.
+    server, % Pid of either view group or search index
+    worker_pid = nil,
+    seq = 0,
+    lru = now()
+}).
+
+-record(state, {
+    q = queue:new(),
+    dbworker = nil,
+    limit = 20,
+    delay = 5000,
+    batch_size = 1,
+    prune_interval = 60000,
+    pruned_last
+}).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-ifdef(HAVE_DREYFUS).
+-include_lib("dreyfus/include/dreyfus.hrl").
+-endif.
+
+-ifdef(HAVE_HASTINGS).
+-include_lib("hastings/src/hastings.hrl").
+-endif.
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%% @doc Adds a database shard to be indexed
+-spec add(binary()) -> ok.
+add(DbName) ->
+    gen_server:cast(?MODULE, {add, DbName}).
+
+%% @doc Removes all the pending jobs for a database shard.
+-spec remove(binary()) -> ok.
+remove(DbName) ->
+    gen_server:cast(?MODULE, {remove, DbName}).
+
+%% @doc Adds all the shards for a database to be indexed.
+-spec add_all_shards(binary()) -> ok.
+add_all_shards(DbName) ->
+    try
+        Shards = mem3:shards(mem3:dbname(DbName)),
+        lists:map(fun(Shard) ->
+            rexi:cast(Shard#shard.node, {ken_server, add, [Shard#shard.name]})
+        end, Shards)
+    catch error:database_does_not_exist ->
+        ok
+    end.
+
+%% @doc Changes the configured value for a batch size.
+%% Returns previous value.
+-spec set_batch_size(pos_integer()) -> pos_integer().
+set_batch_size(BS) when is_integer(BS), BS > 0 ->
+    gen_server:call(?MODULE, {set_batch_size, BS}).
+
+%% @doc Changes the configured value for a delay between batches.
+%% Returns previous value.
+-spec set_delay(non_neg_integer()) -> non_neg_integer().
+set_delay(Delay) when is_integer(Delay), Delay >= 0 ->
+    gen_server:call(?MODULE, {set_delay, Delay}).
+
+%% @doc Changes the configured value for a limit.
+%% Returns previous value.
+-spec set_limit(pos_integer()) -> pos_integer().
+set_limit(Limit) when is_integer(Limit), Limit > 0 ->
+    gen_server:call(?MODULE, {set_limit, Limit}).
+
+%% @doc Changes the configured value for a prune interval.
+%% Returns previous value.
+-spec set_prune_interval(pos_integer()) -> pos_integer().
+set_prune_interval(Interval) when is_integer(Interval), Interval > 1000 ->
+    gen_server:call(?MODULE, {set_prune_interval, Interval}).
+
+%% gen_server callbacks
+
+init(_) ->
+    erlang:send(self(), start_event_handler),
+    ets:new(ken_pending, [named_table]),
+    ets:new(ken_resubmit, [named_table]),
+    ets:new(ken_workers, [named_table, public, {keypos, #job.name}]),
+    Limit = list_to_integer(config("limit", "20")),
+    {ok, #state{pruned_last = now(), limit = Limit}}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+handle_call({set_batch_size, BS}, _From, #state{batch_size = Old} = State) ->
+    {reply, Old, State#state{batch_size = BS}, 0};
+
+handle_call({set_delay, Delay}, _From, #state{delay = Old} = State) ->
+    {reply, Old, State#state{delay = Delay}, 0};
+
+handle_call({set_limit, Limit}, _From, #state{limit = Old} = State) ->
+    {reply, Old, State#state{limit = Limit}, 0};
+
+handle_call({set_prune_interval, Interval}, _From , State) ->
+    Old = State#state.prune_interval,
+    {reply, Old, State#state{prune_interval = Interval}, 0};
+
+handle_call(Msg, From, State) ->
+    {stop, {unknown_call, Msg, From}, State}.
+
+% Queues a DB to (maybe) have indexing jobs spawned.
+handle_cast({add, DbName}, State) ->
+    case ets:insert_new(ken_pending, {DbName}) of
+    true ->
+        {noreply, State#state{q = queue:in(DbName, State#state.q)}, 0};
+    false ->
+        {noreply, State, 0}
+    end;
+
+handle_cast({remove, DbName}, State) ->
+    Q2 = queue:filter(fun(X) -> X =/= DbName end, State#state.q),
+    ets:delete(ken_pending, DbName),
+    % Delete search index workers
+    ets:match_delete(ken_workers, #job{name={DbName,'_','_'}, _='_'}),
+    % Delete view index workers
+    ets:match_delete(ken_workers, #job{name={DbName,'_'}, _='_'}),
+    % TODO kill off active jobs for this DB as well
+    {noreply, State#state{q = Q2}, 0};
+
+handle_cast({resubmit, DbName}, State) ->
+    ets:delete(ken_resubmit, DbName),
+    handle_cast({add, DbName}, State);
+
+% st index job names have 3 elements, 3rd being 'hastings'. See job record definition.
+handle_cast({trigger_update, #job{name={_, _, hastings}, server=GPid, seq=Seq} = Job}, State) ->
+    % hastings_index:await will trigger a hastings index update
+    {Pid, _} = erlang:spawn_monitor(hastings_index, await,
+        [GPid, Seq]),
+    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
+    {noreply, State, 0};
+% search index job names have 3 elements. See job record definition.
+handle_cast({trigger_update, #job{name={_,_,_}, server=GPid, seq=Seq} = Job}, State) ->
+    % dreyfus_index:await will trigger a search index update.
+    {Pid, _} = erlang:spawn_monitor(dreyfus_index, await,
+        [GPid, Seq]),
+    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
+    {noreply, State, 0};
+handle_cast({trigger_update, #job{name={_,_}, server=SrvPid, seq=Seq} = Job}, State) ->
+    % couch_index:get_state/2 will trigger a view group index update.
+    {Pid, _} = erlang:spawn_monitor(couch_index, get_state, [SrvPid, Seq]),
+    ets:insert(ken_workers, Job#job{worker_pid = Pid, lru = now()}),
+    {noreply, State, 0};
+
+handle_cast(Msg, State) ->
+    {stop, {unknown_cast, Msg}, State}.
+
+handle_info({gen_event_EXIT, ken_event_handler, Reason}, State) ->
+    couch_log:error("ken_event_handler terminated: ~w", [Reason]),
+    erlang:send_after(5000, self(), start_event_handler),
+    {ok, State, 0};
+
+handle_info(start_event_handler, State) ->
+    case ken_event_handler:start_link() of
+    {ok, _Pid} ->
+        ok;
+    Error ->
+        couch_log:error("ken_event_handler init: ~w", [Error]),
+        erlang:send_after(5000, self(), start_event_handler)
+    end,
+    {noreply, State, 0};
+
+handle_info(timeout, #state{prune_interval = I, pruned_last = Last} = State) ->
+    case timer:now_diff(now(), Last) of
+    X when X > (1000 * I) ->
+        NewState = prune_worker_table(State);
+    _ ->
+        NewState = State
+    end,
+    {noreply, maybe_start_next_queued_job(NewState), I};
+
+handle_info({'DOWN', _, _, Pid, Reason}, #state{dbworker = {Name,Pid}} = St) ->
+    maybe_resubmit(Name, Reason),
+    {noreply, St#state{dbworker=nil}, 0};
+
+handle_info({'DOWN', _, _, Pid, Reason}, State) ->
+    debrief_worker(Pid, Reason, State),
+    {noreply, State, 0};
+
+handle_info(Msg, State) ->
+    {stop, {unknown_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% private functions
+
+maybe_start_next_queued_job(#state{dbworker = {_,_}} = State) ->
+    State;
+maybe_start_next_queued_job(#state{q=Q} = State) ->
+    IncrementalChannels = list_to_integer(config("incremental_channels", "80")),
+    BatchChannels = list_to_integer(config("batch_channels", "20")),
+    TotalChannels = IncrementalChannels + BatchChannels,
+    case queue:out(Q) of
+    {{value, DbName}, Q2} ->
+        case skip_job(DbName) of
+        true ->
+            % job is either being resubmitted or ignored, skip it
+            ets:delete(ken_pending, DbName),
+            maybe_start_next_queued_job(State#state{q = Q2});
+        false ->
+            case get_active_count() of A when A < TotalChannels ->
+                Args = [DbName, State],
+                {Pid, _} = spawn_monitor(?MODULE, update_db_indexes, Args),
+                ets:delete(ken_pending, DbName),
+                State#state{dbworker = {DbName,Pid}, q = Q2};
+            _ ->
+                State#state{q = queue:in_r(DbName, Q2)}
+            end
+        end;
+    {empty, Q} ->
+        State
+    end.
+
+skip_job(DbName) ->
+    ets:member(ken_resubmit, DbName) orelse ignore_db(DbName).
+
+ignore_db(DbName) ->
+    case config:get("ken.ignore", ?b2l(DbName), false) of
+    "true" ->
+        true;
+     _ ->
+        false
+    end.
+
+get_active_count() ->
+    MatchSpec = [{#job{worker_pid='$1', _='_'}, [{is_pid, '$1'}], [true]}],
+    ets:select_count(ken_workers, MatchSpec).
+
+% If any indexing job fails, resubmit requests for all indexes.
+update_db_indexes(Name, State) ->
+    {ok, DDocs} = design_docs(Name),
+    random:seed(now()),
+    RandomSorted = lists:sort([{random:uniform(), D} || D <- DDocs]),
+    Resubmit = lists:foldl(fun({_, DDoc}, Acc) ->
+        JsonDDoc = couch_doc:from_json_obj(DDoc),
+        case update_ddoc_indexes(Name, JsonDDoc, State) of
+            ok -> Acc;
+            _ -> true
+        end
+    end, false, RandomSorted),
+    if Resubmit -> exit(resubmit); true -> ok end.
+
+design_docs(Name) ->
+    try
+        case fabric:design_docs(mem3:dbname(Name)) of
+            {error, {maintenance_mode, _, _Node}} ->
+                {ok, []};
+            Else ->
+                Else
+        end
+    catch error:database_does_not_exist ->
+        {ok, []}
+    end.
+
+% Returns an error if any job creation fails.
+update_ddoc_indexes(Name, #doc{}=Doc, State) ->
+    {ok, Db} = case couch_db:open_int(Name, []) of
+        {ok, _} = Resp -> Resp;
+        Else -> exit(Else)
+    end,
+    Seq = couch_db:get_update_seq(Db),
+    couch_db:close(Db),
+    ViewUpdated = case should_update(Doc, <<"views">>) of true ->
+        try couch_mrview_util:ddoc_to_mrst(Name, Doc) of
+            {ok, MRSt} -> update_ddoc_views(Name, MRSt, Seq, State)
+        catch _:_ ->
+            ok
+        end;
+    false ->
+        ok
+    end,
+    SearchUpdated = search_updated(Doc, Seq, State),
+    STUpdated = st_updated(Doc, Seq, State),
+    case {ViewUpdated, SearchUpdated, STUpdated} of
+        {ok, ok, ok} -> ok;
+        _ -> resubmit
+    end.
+
+-ifdef(HAS_DREYFUS).
+search_updated(Doc, Seq, State) ->
+    case should_update(Doc, <<"indexes">>) of true ->
+        try dreyfus_index:design_doc_to_indexes(Doc) of
+            SIndexes -> update_ddoc_search_indexes(Name, SIndexes, Seq, State)
+        catch _:_ ->
+            ok
+        end;
+    false ->
+        ok
+    end.
+-else.
+search_updated(_Doc, _Seq, _State) ->
+    ok.
+-endif.
+
+-ifdef(HAS_HASTINGS).
+st_updated(Doc, Seq, State) ->
+    case should_update(Doc, <<"st_indexes">>) of true ->
+        try
+            hastings_index:design_doc_to_indexes(Doc) of
+            STIndexes -> update_ddoc_st_indexes(Name, STIndexes, Seq, State)
+        catch _:_ ->
+            ok
+       end;
+    false ->
+        ok
+    end.
+-else.
+st_updated(_Doc, _Seq, _State) ->
+    ok.
+-endif.
+
+should_update(#doc{body={Props}}, IndexType) ->
+    case couch_util:get_value(<<"autoupdate">>, Props) of
+        false ->
+            false;
+        {AUProps} ->
+            case couch_util:get_value(IndexType, AUProps) of
+                false ->
+                    false;
+                _ ->
+                    true
+            end;
+        _ ->
+            true
+    end.
+
+update_ddoc_views(Name, MRSt, Seq, State) ->
+    Language = couch_mrview_index:get(language, MRSt),
+    Allowed = lists:member(Language, allowed_languages()),
+    Views = couch_mrview_index:get(views, MRSt),
+    if Allowed andalso Views =/= [] ->
+        {ok, Pid} = couch_index_server:get_index(couch_mrview_index, MRSt),
+        GroupName = couch_mrview_index:get(idx_name, MRSt),
+        maybe_start_job({Name, GroupName}, Pid, Seq, State);
+    true -> ok end.
+
+-ifdef(HAVE_DREYFUS).
+update_ddoc_search_indexes(DbName, Indexes, Seq, State) ->
+    if Indexes =/= [] ->
+        % Spawn a job for each search index in the ddoc
+        lists:foldl(fun(#index{name=IName, ddoc_id=DDocName}=Index, Acc) ->
+            case dreyfus_index_manager:get_index(DbName, Index) of
+                {ok, Pid} ->
+                    case maybe_start_job({DbName, DDocName, IName}, Pid, Seq, State) of
+                        resubmit -> resubmit;
+                        _ -> Acc
+                    end;
+                _ ->
+                    % If any job fails, retry the db.
+                    resubmit
+            end end, ok, Indexes);
+    true -> ok end.
+-endif.
+
+-ifdef(HAVE_HASTINGS).
+update_ddoc_st_indexes(DbName, Indexes, Seq, State) ->
+    if Indexes =/= [] ->
+        % The record name in hastings is #h_idx rather than #index as it is for dreyfus
+        % Spawn a job for each spatial index in the ddoc
+        lists:foldl(fun(#h_idx{ddoc_id=DDocName}=Index, Acc) ->
+        case hastings_index_manager:get_index(DbName, Index) of
+            {ok, Pid} ->
+                case maybe_start_job({DbName, DDocName, hastings}, Pid, Seq, State) of
+                    resubmit -> resubmit;
+                    _ -> Acc
+                end;
+            _ ->
+                % If any job fails, retry the db.
+                resubmit
+        end end, ok, Indexes);
+    true -> ok end.
+-endif.
+
+should_start_job(#job{name = Name, seq = Seq, server = Pid}, State) ->
+    Threshold = list_to_integer(config("max_incremental_updates", "1000")),
+    IncrementalChannels = list_to_integer(config("incremental_channels", "80")),
+    BatchChannels = list_to_integer(config("batch_channels", "20")),
+    TotalChannels = IncrementalChannels + BatchChannels,
+    A = get_active_count(),
+    #state{delay = Delay, batch_size = BS} = State,
+    case ets:lookup(ken_workers, Name) of
+    [] ->
+        if
+            A < BatchChannels ->
+                true;
+            A < TotalChannels ->
+                case Name of
+                    % st_index name has three elements
+                    {_, _, hastings} ->
+                        {ok, CurrentSeq} = hastings_index:await(Pid, 0),
+                        (Seq - CurrentSeq) < Threshold;
+                    % View name has two elements.
+                    {_,_} ->
+                        % Since seq is 0, couch_index:get_state/2 won't
+                        % spawn an index update.
+                        {ok, MRSt} = couch_index:get_state(Pid, 0),
+                        CurrentSeq = couch_mrview_index:get(update_seq, MRSt),
+                        (Seq - CurrentSeq) < Threshold;
+                    % Search name has three elements.
+                    {_,_,_} ->
+                        {ok, _IndexPid, CurrentSeq} = dreyfus_index:await(Pid, 0),
+                        (Seq - CurrentSeq) < Threshold;
+                    _ -> % Should never happen, but if it does, ignore.
+                        false
+                    end;
+            true ->
+                false
+        end;
+    [#job{worker_pid = nil, lru = LRU, seq = OldSeq}] ->
+        DeltaT = timer:now_diff(now(), LRU) / 1000,
+        if
+            A < BatchChannels, (Seq - OldSeq) >= BS ->
+                true;
+            A < BatchChannels, DeltaT > Delay ->
+                true;
+            A < TotalChannels, (Seq - OldSeq) < Threshold, DeltaT > Delay ->
+                true;
+            true ->
+                false
+        end;
+    _ ->
+        false
+    end.
+
+maybe_start_job(JobName, IndexPid, Seq, State) ->
+    Job = #job{
+        name = JobName,
+        server = IndexPid,
+        seq = Seq
+    },
+    case should_start_job(Job, State) of
+    true ->
+        gen_server:cast(?MODULE, {trigger_update, Job});
+    false ->
+        resubmit
+    end.
+
+debrief_worker(Pid, Reason, _State) ->
+    case ets:match_object(ken_workers, #job{worker_pid=Pid, _='_'}) of
+    [#job{name = Name} = Job] ->
+        case Name of
+            {DbName,_} ->
+                maybe_resubmit(DbName, Reason);
+            {DbName,_,_} ->
+                maybe_resubmit(DbName, Reason)
+        end,
+        ets:insert(ken_workers, Job#job{worker_pid = nil});
+    [] -> % should never happen, but if it does, ignore
+        ok
+    end.
+
+maybe_resubmit(_DbName, normal) ->
+    ok;
+maybe_resubmit(_DbName, {database_does_not_exist, _}) ->
+    ok;
+maybe_resubmit(_DbName, {not_found, no_db_file}) ->
+    ok;
+maybe_resubmit(DbName, resubmit) ->
+    resubmit(60000, DbName);
+maybe_resubmit(DbName, _) ->
+    resubmit(5000, DbName).
+
+resubmit(Delay, DbName) ->
+    case ets:insert_new(ken_resubmit, {DbName}) of
+        true ->
+            erlang:send_after(Delay, ?MODULE, {'$gen_cast', {resubmit, DbName}});
+        false ->
+            ok
+    end.
+
+prune_worker_table(State) ->
+    {A, B, _} = now(),
+    C = (1000000 * A) + B - 0.001 * State#state.delay,
+    MatchHead = #job{worker_pid=nil, lru={'$1','$2','_'}, _='_'},
+    Guard = {'<', {'+', {'*', '$1', 1000000}, '$2'}, C},
+    ets:select_delete(ken_workers, [{MatchHead, [Guard], [true]}]),
+    State#state{pruned_last = now()}.
+
+allowed_languages() ->
+    Config = config:get("query_servers") ++ config:get("native_query_servers"),
+    [list_to_binary(Lang) || {Lang, _Cmd} <- Config].
+
+config(Key, Default) ->
+    config:get("ken", Key, Default).
diff --git a/src/ken_sup.erl b/src/ken_sup.erl
new file mode 100644
index 0000000..fd08cfd
--- /dev/null
+++ b/src/ken_sup.erl
@@ -0,0 +1,33 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% supervisor callbacks
+
+init([]) ->
+    {ok, { {one_for_one, 5, 10}, [?CHILD(ken_server, worker)]} }.
+
diff --git a/test/config.ini b/test/config.ini
new file mode 100644
index 0000000..a28eae4
--- /dev/null
+++ b/test/config.ini
@@ -0,0 +1,2 @@
+[ken]
+limit = 42
diff --git a/test/ken_server_test.erl b/test/ken_server_test.erl
new file mode 100644
index 0000000..1d2af7a
--- /dev/null
+++ b/test/ken_server_test.erl
@@ -0,0 +1,99 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ken_server_test).
+
+-compile([export_all]).
+
+-include_lib("eunit/include/eunit.hrl").
+
+%% hardcoded defaults: limit: 20; batch: 1; delay: 5000; prune: 60000
+default_test_() ->
+    {inorder, {setup,
+        fun setup_default/0,
+        fun teardown/1,
+        [
+            set_builder("returns default", set_limit, 12, 20),
+            set_builder("keeps set", set_limit, 6, 12),
+            set_builder("returns default", set_batch_size, 3, 1),
+            set_builder("keeps set", set_batch_size, 6, 3),
+            set_builder("returns default", set_delay, 7000, 5000),
+            set_builder("keeps set", set_delay, 10000, 7000),
+            set_builder("returns default", set_prune_interval, 70000, 60000),
+            set_builder("keeps set", set_prune_interval, 80000, 70000)
+        ]
+    }}.
+
+exception_test_() ->
+    {inorder, {foreach,
+        fun setup_default/0,
+        fun teardown/1,
+        [
+            exception_builder("exception on zero", set_limit, 0),
+            exception_builder("exception on negative", set_limit, -12),
+            exception_builder("exception on zero", set_batch_size, 0),
+            exception_builder("exception on negative", set_batch_size, -12),
+            set_builder("no exception on zero", set_delay, 0, 5000),
+            exception_builder("exception on negative", set_delay, -12),
+            exception_builder("exception on zero", set_prune_interval, 0),
+            exception_builder("exception on negative", set_prune_interval, -12)
+        ]
+    }}.
+
+config_test_() ->
+    {inorder, {setup,
+        fun setup_config/0,
+        fun teardown/1,
+        [
+            set_builder("reads config", set_limit, 24, 42),
+            set_builder("keeps set", set_limit, 6, 24)
+        ]
+    }}.
+
+setup_default() ->
+    {ok, EventPid} = start_server(couch_event_server),
+    {ok, CfgPid} = start_server(config),
+    {ok, KenPid} = start_server(ken_server),
+    [{ken_pid, KenPid}, {cfg_pid, CfgPid}, {event_pid, EventPid}].
+
+setup_config() ->
+    {ok, Pwd} = file:get_cwd(),
+    Config = filename:join([Pwd, "..", "test", "config.ini"]),
+    {ok, EventPid} = start_server(couch_event_server),
+    {ok, CfgPid} = start_server(config, [[Config]]),
+    {ok, KenPid} = start_server(ken_server),
+    [{ken_pid, KenPid}, {cfg_pid, CfgPid}, {event_pid, EventPid}].
+
+teardown(Cfg) ->
+    ok = stop_server(event_pid, Cfg),
+    ok = stop_server(cfg_pid, Cfg),
+    ok = stop_server(ken_pid, Cfg).
+
+exception_builder(Desc, F, Val) ->
+    D = atom_to_list(F) ++ " " ++ Desc,
+    {D, ?_assertException(error, function_clause, ken_server:F(Val))}.
+
+set_builder(Desc, F, In, Out) ->
+    D = atom_to_list(F) ++ " " ++ Desc,
+    {D, ?_assertEqual(Out, ken_server:F(In))}.
+
+start_server(Module) ->
+    start_server(Module, []).
+
+start_server(Module, Config) ->
+    gen_server:start({local, Module}, Module, Config, []).
+
+stop_server(Key, Cfg) ->
+    {Key, Pid} = lists:keyfind(Key, 1, Cfg),
+    MRef = erlang:monitor(process, Pid),
+    true = exit(Pid, kill),
+    receive {'DOWN', MRef, _, _, _} -> ok end.


[couchdb-ken] 03/06: Merge pull request #4 from apache/has_have_confusion

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch else
in repository https://gitbox.apache.org/repos/asf/couchdb-ken.git

commit 004881adbe93db4a769a42a8c76996c3c45d4d46
Merge: 390b919 50682f9
Author: Robert Newson <rn...@apache.org>
AuthorDate: Mon Feb 25 18:50:04 2019 +0000

    Merge pull request #4 from apache/has_have_confusion
    
    Fix compilation errors and use of macro

 src/ken_server.erl | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)


[couchdb-ken] 02/06: Fix compilation errors and use of macro

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch else
in repository https://gitbox.apache.org/repos/asf/couchdb-ken.git

commit 50682f928e6d2b371415c53cca2180c14bdd3408
Author: Robert Newson <rn...@apache.org>
AuthorDate: Mon Feb 25 18:37:48 2019 +0000

    Fix compilation errors and use of macro
    
    A bit of confusion with HAS_ and HAVE_ led to me not noticing that the
    dreyfus/hastings bits didn't compile.
---
 src/ken_server.erl | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/src/ken_server.erl b/src/ken_server.erl
index 25da5ac..91f18c1 100644
--- a/src/ken_server.erl
+++ b/src/ken_server.erl
@@ -309,15 +309,15 @@ update_ddoc_indexes(Name, #doc{}=Doc, State) ->
     false ->
         ok
     end,
-    SearchUpdated = search_updated(Doc, Seq, State),
-    STUpdated = st_updated(Doc, Seq, State),
+    SearchUpdated = search_updated(Name, Doc, Seq, State),
+    STUpdated = st_updated(Name, Doc, Seq, State),
     case {ViewUpdated, SearchUpdated, STUpdated} of
         {ok, ok, ok} -> ok;
         _ -> resubmit
     end.
 
--ifdef(HAS_DREYFUS).
-search_updated(Doc, Seq, State) ->
+-ifdef(HAVE_DREYFUS).
+search_updated(Name, Doc, Seq, State) ->
     case should_update(Doc, <<"indexes">>) of true ->
         try dreyfus_index:design_doc_to_indexes(Doc) of
             SIndexes -> update_ddoc_search_indexes(Name, SIndexes, Seq, State)
@@ -332,8 +332,8 @@ search_updated(_Doc, _Seq, _State) ->
     ok.
 -endif.
 
--ifdef(HAS_HASTINGS).
-st_updated(Doc, Seq, State) ->
+-ifdef(HAVE_HASTINGS).
+st_updated(Name, Doc, Seq, State) ->
     case should_update(Doc, <<"st_indexes">>) of true ->
         try
             hastings_index:design_doc_to_indexes(Doc) of