You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by be...@apache.org on 2014/02/08 20:04:39 UTC

couchdb commit: updated refs/heads/1994-merge-rcouch to c94569a

Updated Branches:
  refs/heads/1994-merge-rcouch 2bf88e3ff -> c94569a67


couch_index: add background indexing facility

This change add the possibility to trigger a view indexation in
background. The indexation can only work in background if at least one
process acquired it using the `couch_index_server:acquire_index/3`
function. If all the process that acquired it are down or released it
using `couch_index_server:release_indexer/3` then the background task is
stopped.

By default the background indexation will happen every 1s or when 200
docs has been saved in the database. These parameters can be changed
using the options `threshold` and `refresh_interval` in the couch_index
section.

To use it with couch_mrview a new option {refresh, true} has been added
to couch_mrview_changes:handle_changes Also the query parameter
refresh=true is passsed in t the HTTP changes API.


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/c94569a6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/c94569a6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/c94569a6

Branch: refs/heads/1994-merge-rcouch
Commit: c94569a675741f099b7cb62f87599bbd61e907f9
Parents: 2bf88e3
Author: Benoit Chesneau <be...@apache.org>
Authored: Sat Feb 8 19:55:40 2014 +0100
Committer: Benoit Chesneau <be...@apache.org>
Committed: Sat Feb 8 20:00:57 2014 +0100

----------------------------------------------------------------------
 apps/couch_httpd/src/couch_httpd_changes.erl   |  18 +-
 apps/couch_index/src/couch_index.erl           |  35 +++-
 apps/couch_index/src/couch_index_indexer.erl   | 189 ++++++++++++++++++++
 apps/couch_index/src/couch_index_server.erl    |  18 ++
 apps/couch_mrview/src/couch_mrview_changes.erl |  56 ++++--
 apps/couch_mrview/test/10-index-changes.t      |  17 +-
 share/www/script/test/changes.js               |  19 +-
 7 files changed, 330 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/c94569a6/apps/couch_httpd/src/couch_httpd_changes.erl
----------------------------------------------------------------------
diff --git a/apps/couch_httpd/src/couch_httpd_changes.erl b/apps/couch_httpd/src/couch_httpd_changes.erl
index 82d9fe0..510e20a 100644
--- a/apps/couch_httpd/src/couch_httpd_changes.erl
+++ b/apps/couch_httpd/src/couch_httpd_changes.erl
@@ -177,8 +177,11 @@ handle_view_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions,
         since = Since,
         db_open_options = DbOptions} = ChangesArgs,
 
+    Refresh = refresh_option(Req),
+
     Options0 = [{since, Since},
-                {view_options, ViewOptions}],
+                {view_options, ViewOptions},
+                {refresh, Refresh}],
     Options = case ResponseType of
         "continuous" -> [stream | Options0];
         "eventsource" -> [stream | Options0];
@@ -236,9 +239,9 @@ view_changes_cb({{Seq, _Key, DocId}, _VAl},
             %% if we achieved the limit, stop here, else continue.
             NewLimit = OldLimit + 1,
             if Limit > NewLimit ->
-                    {ok, {<<",\n">>, Db, NewLimit, Callback, Args}};
+                    {ok, {<<",\n">>, NewLimit, Db, Callback, Args}};
                 true ->
-                    {stop, {<<"">>, Db, NewLimit, Callback, Args}}
+                    {stop, {<<"">>, NewLimit, Db, Callback, Args}}
             end;
         {error, not_found} ->
             %% doc not found, continue
@@ -416,6 +419,15 @@ parse_view_options([{K, V} | Rest], Acc) ->
     end,
     parse_view_options(Rest, Acc1).
 
+refresh_option({json_req, {Props}}) ->
+    {Query} = couch_util:get_value(<<"query">>, Props),
+    couch_util:get_value(<<"refresh">>, Query, true);
+refresh_option(Req) ->
+    case couch_httpd:qs_value(Req, "refresh", "true") of
+        "false" -> false;
+        _ -> true
+    end.
+
 parse_json(V) when is_list(V) ->
     ?JSON_DECODE(V);
 parse_json(V) ->

http://git-wip-us.apache.org/repos/asf/couchdb/blob/c94569a6/apps/couch_index/src/couch_index.erl
----------------------------------------------------------------------
diff --git a/apps/couch_index/src/couch_index.erl b/apps/couch_index/src/couch_index.erl
index b9ae567..01483bb 100644
--- a/apps/couch_index/src/couch_index.erl
+++ b/apps/couch_index/src/couch_index.erl
@@ -17,6 +17,7 @@
 %% API
 -export([start_link/1, stop/1, get_state/2, get_info/1]).
 -export([compact/1, compact/2, get_compactor_pid/1]).
+-export([acquire_indexer/1, release_indexer/1]).
 -export([config_change/3]).
 
 %% gen_server callbacks
@@ -30,6 +31,7 @@
     idx_state,
     updater,
     compactor,
+    indexer=nil,
     waiters=[],
     commit_delay,
     committed=true,
@@ -68,6 +70,16 @@ compact(Pid, Options) ->
 get_compactor_pid(Pid) ->
     gen_server:call(Pid, get_compactor_pid).
 
+
+acquire_indexer(Pid) ->
+    {ok, IPid} = gen_server:call(Pid, get_indexer_pid),
+    gen_server:call(IPid, {acquire, self()}).
+
+release_indexer(Pid) ->
+    {ok, IPid} = gen_server:call(Pid, get_indexer_pid),
+    gen_server:call(IPid, {release, self()}).
+
+
 config_change("query_server_config", "commit_freq", NewValue) ->
     ok = gen_server:cast(?MODULE, {config_update, NewValue}).
 
@@ -88,6 +100,7 @@ init({Mod, IdxState}) ->
         {ok, NewIdxState} ->
             {ok, UPid} = couch_index_updater:start_link(self(), Mod),
             {ok, CPid} = couch_index_compactor:start_link(self(), Mod),
+
             Delay = couch_config:get("query_server_config", "commit_freq", "5"),
             MsDelay = 1000 * list_to_integer(Delay),
             State = #st{
@@ -196,7 +209,18 @@ handle_call({compacted, NewIdxState}, _From, State) ->
             }};
         _ ->
             {reply, recompact, State}
-    end.
+    end;
+handle_call(get_indexer_pid, _From, #st{mod=Mod, idx_state=IdxState}=State) ->
+    Pid = case State#st.indexer of
+        Pid1 when is_pid(Pid1) ->
+            Pid1;
+        _ ->
+            DbName = Mod:get(db_name, IdxState),
+            {ok, IPid} = couch_index_indexer:start_link(self(), DbName),
+            erlang:monitor(process, IPid),
+            IPid
+    end,
+    {reply, {ok, Pid}, State#st{indexer=Pid}}.
 
 
 handle_cast({config_change, NewDelay}, State) ->
@@ -318,6 +342,15 @@ handle_info(commit, State) ->
             erlang:send_after(Delay, self(), commit),
             {noreply, State}
     end;
+
+handle_info({'DOWN', _, _, Pid, _}, #st{mod=Mod, idx_state=IdxState,
+                                        indexer=Pid}=State) ->
+    Args = [Mod:get(db_name, IdxState),
+            Mod:get(idx_name, IdxState)],
+    ?LOG_INFO("Background indexer shutdown by monitor notice for db: ~s idx: ~s", Args),
+
+    {noreply, State#st{indexer=nil}};
+
 handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) ->
     DbName = Mod:get(db_name, IdxState),
     DDocId = Mod:get(idx_name, IdxState),

http://git-wip-us.apache.org/repos/asf/couchdb/blob/c94569a6/apps/couch_index/src/couch_index_indexer.erl
----------------------------------------------------------------------
diff --git a/apps/couch_index/src/couch_index_indexer.erl b/apps/couch_index/src/couch_index_indexer.erl
new file mode 100644
index 0000000..4af85cf
--- /dev/null
+++ b/apps/couch_index/src/couch_index_indexer.erl
@@ -0,0 +1,189 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_indexer).
+
+-export([start_link/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {index,
+                dbname,
+                db_updates=0,
+                tref=nil,
+                notifier=nil,
+                locks}).
+
+
+start_link(Index, DbName) ->
+    gen_server:start_link(?MODULE, {Index, DbName}, []).
+
+init({Index, DbName}) ->
+    process_flag(trap_exit, true),
+    %% delay background index indexing
+    self() ! start_indexing,
+    {ok, #state{index=Index,
+                dbname=DbName,
+                locks=dict:new()}}.
+
+handle_call({acquire, Pid}, _From, #state{locks=Locks}=State) ->
+    NLocks = case dict:find(Pid, Locks) of
+        error ->
+            dict:store(Pid, {erlang:monitor(process, Pid), 1}, Locks);
+        {ok, {MRef, Refc}} ->
+             dict:store(Pid, {MRef, Refc+1}, Locks)
+    end,
+    {reply, ok, State#state{locks=NLocks}};
+
+handle_call({release, Pid}, _From, #state{locks=Locks}=State) ->
+     NLocks = case dict:find(Pid, Locks) of
+        {ok, {MRef, 1}} ->
+            erlang:demonitor(MRef, [flush]),
+            dict:erase(Pid, Locks);
+        {ok, {MRef, Refc}} ->
+            dict:store(Pid, {MRef, Refc-1}, Locks);
+        error ->
+            Locks
+    end,
+
+    NState = State#state{locks=NLocks},
+
+    case should_close() of
+        true -> {stop, normal, ok, NState};
+        false -> {reply, ok, NState}
+    end;
+
+handle_call(stop, _From, State) ->
+    {stop, normal, ok, State}.
+
+handle_cast(updated, #state{index=Index, dbname=DbName,
+                            db_updates=Updates}=State) ->
+    Threshold = get_db_threshold(),
+    NUpdates = Updates + 1,
+
+    %% we only update if the number of updates is greater than the
+    %% threshold.
+    case NUpdates =:= Threshold of
+        true ->
+            refresh_index(DbName, Index),
+            {noreply, State#state{db_updates=0}};
+        false ->
+             {noreply, State#state{db_updates=NUpdates}}
+
+    end;
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(start_indexing, #state{dbname=DbName}=State) ->
+    %% start the db notifier to watch db update events
+    {ok, NotifierPid} = start_db_notifier(DbName),
+
+    %% start the timer
+    R = get_refresh_interval(),
+    TRef = erlang:start_timer(R, self(), refresh_index),
+
+    {noreply, State#state{tref=TRef, notifier=NotifierPid}};
+
+handle_info({timeout, TRef, refresh_index}, #state{index=Index,
+                                                   dbname=DbName,
+                                                   tref=TRef,
+                                                   db_updates=N}=State) ->
+    %% only refresh the index if an update happened
+    case N > 0 of
+        true ->
+            refresh_index(DbName, Index);
+        false ->
+            ok
+    end,
+    {noreply, #state{db_updates=0}=State};
+
+handle_info({'DOWN', MRef, _, Pid, _}, #state{locks=Locks}=State) ->
+    NLocks = case dict:find(Pid, Locks) of
+        {ok, {MRef, _}} ->
+            dict:erase(Pid, Locks);
+        error ->
+            Locks
+    end,
+
+    NState = State#state{locks=NLocks},
+
+    case should_close() of
+        true -> {stop, normal, NState};
+        false -> {noreply, NState}
+    end;
+
+handle_info({'EXIT', Pid, _Reason}, #state{notifier=Pid}=State) ->
+    %% db notifier exited
+    {stop, normal, State#state{notifier=nil}}.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+terminate(_Reason, #state{tref=TRef, notifier=Pid}) ->
+    if TRef /= nil ->
+            erlang:cancel_timer(TRef);
+        true -> ok
+    end,
+
+    case is_pid(Pid) of
+        true -> couch_util:shutdown_sync(Pid);
+        _ -> ok
+    end,
+    ok.
+
+%% refresh the index to trigger updates.
+refresh_index(Db, Index) ->
+    UpdateSeq = couch_util:with_db(Db, fun(WDb) ->
+                    couch_db:get_update_seq(WDb)
+            end),
+
+    case catch couch_index:get_state(Index, UpdateSeq) of
+        {ok, _} -> ok;
+        Error -> {error, Error}
+    end.
+
+%% if none has acquired us, we could stop the server.
+should_close() ->
+    case process_info(self(), monitors) of
+        {monitors, []} ->   true;
+        _ ->                false
+    end.
+
+
+%% number of max updates before refreshing the index. We don't
+%% update the index on each db update. Instead we are waiting for a
+%% minimum. If the minimum is not acchieved, the update will happen
+%% in the next interval.
+get_db_threshold() ->
+    list_to_integer(
+            couch_config:get("couch_index", "threshold", "200")
+    ).
+
+%% refresh interval in ms, the interval in which the index will be
+%% updated
+get_refresh_interval() ->
+    list_to_integer(
+            couch_config:get("couch_index", "refresh_interval", "1000")
+    ).
+
+%% db notifier
+start_db_notifier(DbName) ->
+    Self = self(),
+
+    couch_db_update_notifier:start_link(fun
+            ({updated, Name}) when Name =:= DbName ->
+                gen_server:cast(Self, updated);
+            (_) ->
+                ok
+        end).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/c94569a6/apps/couch_index/src/couch_index_server.erl
----------------------------------------------------------------------
diff --git a/apps/couch_index/src/couch_index_server.erl b/apps/couch_index/src/couch_index_server.erl
index 86791db..2c6ebc9 100644
--- a/apps/couch_index/src/couch_index_server.erl
+++ b/apps/couch_index/src/couch_index_server.erl
@@ -14,6 +14,7 @@
 -behaviour(gen_server).
 
 -export([start_link/0, get_index/4, get_index/3, get_index/2]).
+-export([acquire_indexer/3, release_indexer/3]).
 -export([config_change/2, update_notify/1]).
 
 -export([init/1, terminate/2, code_change/3]).
@@ -67,6 +68,23 @@ get_index(Module, IdxState) ->
             gen_server:call(?MODULE, {get_index, Args}, infinity)
     end.
 
+acquire_indexer(Module, DbName, DDoc) ->
+    case get_index(Module, DbName, DDoc) of
+        {ok, Pid} ->
+            couch_index:acquire_indexer(Pid);
+        Error ->
+            Error
+    end.
+
+release_indexer(Module, DbName, DDoc) ->
+    case get_index(Module, DbName, DDoc) of
+        {ok, Pid} ->
+            couch_index:release_indexer(Pid);
+        Error ->
+            Error
+    end.
+
+
 
 init([]) ->
     process_flag(trap_exit, true),

http://git-wip-us.apache.org/repos/asf/couchdb/blob/c94569a6/apps/couch_mrview/src/couch_mrview_changes.erl
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/src/couch_mrview_changes.erl b/apps/couch_mrview/src/couch_mrview_changes.erl
index a0e5281..c31624e 100644
--- a/apps/couch_mrview/src/couch_mrview_changes.erl
+++ b/apps/couch_mrview/src/couch_mrview_changes.erl
@@ -28,14 +28,16 @@
               heartbeat,
               timeout_acc=0,
               notifier,
-              stream}).
+              stream,
+              refresh}).
 
 -type changes_stream() :: true | false | once.
 -type changes_options() :: [{stream, changes_stream()} |
                             {since, integer()} |
                             {view_options, list()} |
                             {timeout, integer()} |
-                            {heartbeat, true | integer()}].
+                            {heartbeat, true | integer()} |
+                            {refresh, true | false}].
 
 -export_type([changes_stream/0]).
 -export_type([changes_options/0]).
@@ -47,6 +49,7 @@ handle_changes(DbName, DDocId, View, Fun, Acc, Options) ->
     Since = proplists:get_value(since, Options, 0),
     Stream = proplists:get_value(stream, Options, false),
     ViewOptions = proplists:get_value(view_options, Options, []),
+    Refresh = proplists:get_value(refresh, Options, false),
 
     State0 = #vst{dbname=DbName,
                   ddoc=DDocId,
@@ -56,20 +59,25 @@ handle_changes(DbName, DDocId, View, Fun, Acc, Options) ->
                   callback=Fun,
                   acc=Acc},
 
-    case view_changes_since(State0) of
-        {ok, #vst{since=LastSeq, acc=Acc2}=State} ->
-            case Stream of
-                true ->
-                    start_loop(State#vst{stream=true}, Options);
-                once when LastSeq =:= Since ->
-                    start_loop(State#vst{stream=once}, Options);
-                _ ->
-                    Fun(stop, {LastSeq, Acc2})
-            end;
-        {stop, #vst{since=LastSeq, acc=Acc2}} ->
-            Fun(stop, {LastSeq, Acc2});
-        Error ->
-            Error
+    maybe_acquire_indexer(Refresh, DbName, DDocId),
+    try
+        case view_changes_since(State0) of
+            {ok, #vst{since=LastSeq, acc=Acc2}=State} ->
+                case Stream of
+                    true ->
+                        start_loop(State#vst{stream=true}, Options);
+                    once when LastSeq =:= Since ->
+                        start_loop(State#vst{stream=once}, Options);
+                    _ ->
+                        Fun(stop, {LastSeq, Acc2})
+                end;
+            {stop, #vst{since=LastSeq, acc=Acc2}} ->
+                Fun(stop, {LastSeq, Acc2});
+            Error ->
+                Error
+        end
+    after
+        maybe_release_indexer(Refresh, DbName, DDocId)
     end.
 
 start_loop(#vst{dbname=DbName, ddoc=DDocId}=State, Options) ->
@@ -169,3 +177,19 @@ index_update_notifier(DbName, DDocId) ->
                     ok
             end),
     NotifierPid.
+
+%% acquire the background indexing task so it can eventually be started
+%% if the process close the background task will be automatically
+%% released.
+maybe_acquire_indexer(false, _, _) ->
+    ok;
+maybe_acquire_indexer(true, DbName, DDocId) ->
+    couch_index_server:acquire_indexer(couch_mrview_index, DbName,
+                                       DDocId).
+
+%% release the background indexing task so it can eventually be stopped
+maybe_release_indexer(false, _, _) ->
+    ok;
+maybe_release_indexer(true, DbName, DDocId) ->
+    couch_index_server:release_indexer(couch_mrview_index, DbName,
+                                       DDocId).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/c94569a6/apps/couch_mrview/test/10-index-changes.t
----------------------------------------------------------------------
diff --git a/apps/couch_mrview/test/10-index-changes.t b/apps/couch_mrview/test/10-index-changes.t
index 627376f..f53e9ed 100644
--- a/apps/couch_mrview/test/10-index-changes.t
+++ b/apps/couch_mrview/test/10-index-changes.t
@@ -15,7 +15,7 @@
 % the License.
 
 main(_) ->
-    etap:plan(6),
+    etap:plan(8),
     case (catch test()) of
         ok ->
             etap:end_tests();
@@ -35,6 +35,7 @@ test() ->
     test_stream_once_timeout(Db),
     test_stream_once_heartbeat(Db),
     test_stream(Db),
+    test_indexer(Db),
     test_util:stop_couch(),
     ok.
 
@@ -173,6 +174,20 @@ test_stream(Db) ->
     end.
 
 
+test_indexer(Db) ->
+    Result = run_query(Db, [{since, 14}]),
+    Expect = {ok, 15, [{{15,14,<<"14">>},14}]},
+    etap:is(Result, Expect, "refresh index by hand OK."),
+
+    {ok, Db1} = save_doc(Db, 15),
+    timer:sleep(1000),
+    Result1 = run_query(Db, [{since, 14}]),
+    Expect1 = {ok, 16, [{{15,14,<<"14">>},14},
+                       {{16,15,<<"15">>},15}]},
+    etap:is(Result1, Expect1, "changes indexed in background OK."),
+    ok.
+
+
 save_doc(Db, Id) ->
     Doc = couch_mrview_test_util:doc(Id),
     {ok, _Rev} = couch_db:update_doc(Db, Doc, []),

http://git-wip-us.apache.org/repos/asf/couchdb/blob/c94569a6/share/www/script/test/changes.js
----------------------------------------------------------------------
diff --git a/share/www/script/test/changes.js b/share/www/script/test/changes.js
index 3419eb6..2822a90 100644
--- a/share/www/script/test/changes.js
+++ b/share/www/script/test/changes.js
@@ -295,7 +295,7 @@ couchTests.changes = function(debug) {
       },
       blah: {
         map : 'function(doc) {' +
-              '  if (doc._id == "blah") {' +
+              '  if ((doc._id == "blah") || (doc._id == "blah2")) {' +
               '    emit("test", null);' +
               '  }' +
               '}'
@@ -453,6 +453,23 @@ couchTests.changes = function(debug) {
   TEquals(400, req.status, "should return 400 for when use_index=no");
 
 
+  T(db.save({"_id":"blah2", "bop" : "plankton"}).ok);
+
+  var req = CouchDB.request("GET", "/test_suite_db/_changes?filter=_view&view=changes_seq_indexed/blah");
+  var resp = JSON.parse(req.responseText);
+  T(resp.results.length === 2);
+  T(resp.results[0].id === "blah");
+  T(resp.results[1].id === "blah2");
+
+  var req = CouchDB.request("GET", '/test_suite_db/_changes?filter=_view&view=changes_seq_indexed/blah&key="test"');
+  var resp = JSON.parse(req.responseText);
+  T(resp.results.length === 2);
+  T(resp.results[0].id === "blah");
+  T(resp.results[1].id === "blah2");
+
+
+
+
   // test for userCtx
   run_on_modified_server(
     [{section: "httpd",